Oinone远程调用链路源码分析

前提

源码分析版本是 5.1.x版本

概要

在服务启动时,获取注解REMOTE的函数,通过dubbo的泛化调用发布。在调用函数时,通过dubbo泛化调用获取结果。

注册服务者

  1. 在spring 启动方法installOrLoad中初始化
  2. 寻找定义REMOTE的方法
  3. 组装dubbo的服务配置
  4. 组装服务对象实现引用,内容如下,用于注册
    • 调用前置处理
      • 放信息到SessionApi
      • 函数调用链追踪,放到本地TransmittableThreadLocal
      • 从redis中获取到的数据进行反序列化并存在到本地的线程里
      • Trace信息,放一份在sessionApi中 和ThreadLocal
    • 调用函数执行
    • 返回数据转成特定格式
  5. 通过线程组调用dubbo的ServiceConfig.export 服务发布

时序图

Oinone远程调用链路源码分析
注册

源码分析

根据条件判断,确定向dubbo进行服务发布
RemoteServiceLoader

public void publishService(List<FunctionDefinition> functionList,Map<String,Runnable> isPublished) {
        // 因为泛化接口只能控制到namespace,控制粒度不能到fun级别,这里进行去重处理
        Map<String, Function> genericNamespaceMap = new HashMap<>();
        for (FunctionDefinition functionDefinition : functionList) {
            Function function = new Function(functionDefinition)

            try {
               //定义REMOTE, 才给予远程调用
                if (FunctionOpenEnum.REMOTE.in(function.getOpen()) && !ClassUtils.isInterface(function.getClazz())) {
                    genericNamespaceMap.putIfAbsent(RegistryUtils.getRegistryInterface(function), function);
                }
            } catch (PamirsException e) {
            }
        }
        // 发布远程服务
        for (String namespace : genericNamespaceMap.keySet()) {
            Function function = genericNamespaceMap.get(namespace);
            if(isPublished.get(RegistryUtils.getRegistryInterface(function)) == null){
                // 发布,注册远程函数服务,底层使用dubbo的泛化调用
                Runnable registryTask = () -> remoteRegistry.registryService(function);
                isPublished.put(RegistryUtils.getRegistryInterface(function),registryTask);
            }else{

            }
        }
    }

构造ServiceConfig方法,设置成泛化调用,进行发布export()
DefaultRemoteRegistryComponent

     public void registryGenericService(String interfaceName, List<MethodConfig> methods,
                                       String group, String version, Integer timeout, Integer retries) {
        ....
        try {
            ServiceConfig<GenericService> service = new ServiceConfig<>();
            // 服务接口名
            service.setInterface(interfaceName);
            // 服务对象实现引用
            service.setRef(genericService(interfaceName));
            if (null != methods) {
                service.setMethods(methods);
            }
            // 声明为泛化接口
            service.setGeneric(Boolean.TRUE.toString());
            // 基础元数据
            constructService(group, version, timeout, retries, service);
            service.export();
        } catch (Exception e) {
           .....
        }
    }

// 服务对象实现引用
private GenericService genericService(String interfaceName) {
        return (method, parameterTypes, args) -> {
            PamirsSession.clear();
            Function function = Objects.requireNonNull(PamirsSession.getContext()).getFunction(RegistryUtils.getFunctionNamespace(method), RegistryUtils.getFunctionFun(method));
            if (log.isDebugEnabled()) {
                log.debug("interfaceName: " + interfaceName + ", isDataManage: " + function.isDataManager());
            }
            try {
                //前置处理:服务提供者,对请求参数进行对象化拆解,并对请求携带的上下文进行处理
                // 放信息到SessionApi
                // 函数调用链追踪,放到本地TransmittableThreadLocal
                // CommonMetaDataCacheApi.computeMetaData() 从redis中获取到的数据进行反序列化并存在到本地的TL中
                // DataAuditApi.computeDataAuditSession() Trace信息,放一份在sessionApi中 和ThreadLocal中
                Object[] args1 = Spider.getDefaultExtension(RemoteRequestArgApi.class).providerHandle(function.getNamespace(), function.getFun(), args, function.getArguments());

                Object result = FunEngine.get().exclude(ScriptType.REMOTE).run(function, args1);

                //后置处理:服务提供者,对结果进行对象化封装、携带请求上下文进行处理
                return Spider.getDefaultExtension(RemoteResponseApi.class).providerHandle(function, method, result);
            } catch (Throwable e) {
                return Spider.getDefaultExtension(RemoteResponseApi.class).providerExceptionHandle(function, method, e);
            } finally {
                PamirsSession.clear();
            }
        };
    }

注册消费者

  1. 函数处理调用
  2. 注册服务消费者
    • 从ReferenceConfigCache获取泛化
  3. 调用dubbo泛化调用接口
  4. 获取返回信息
    • 获取用户id,放入PamirsSession
    • 如果开启debug模式
      • 存入DEBUG_THREAD_LOCAL本地线程
    • 返回格式
      • IWrapper
      • Pagination
      • Result

时序图

Oinone远程调用链路源码分析

源码分析

泛化调用dobbo接口,并解析返回对象
RemoteComputer

public Object compute(Function function, Object... args) {

        .....
        List<Arg> functionArguments = function.getArguments();
        String methodName = RegistryUtils.getGenericServiceMethodName(function);
        String[] argTypes = FunctionUtils.fetchArgTypes(functionArguments);
        //前置处理:服务消费者,对请求参数进行对象化封装、携带请求上下文进行处理
        Object[] arguments = getRemoteRequestApi().consumerHandle(function.getNamespace(),function.getFun(), args, functionArguments);
        // 泛化调用
        Object result = invoke(function, methodName, argTypes, arguments);
        .....
        //后置处理:服务消费者,对返回结果进行对象化拆解,并对结果携带的上下文进行处理
        // 数据转成IWrapper/Pagination/Result
        return getRemoteResponseApi().consumerHandle(function, result);
}

// 配置请求信息,通过$invoke 实际调用
private Object invoke(Function function, String methodName, String[] argTypes, Object[] arguments) {
        Object result;
        String configCdOwnSign = getSessionFillOwnSignApi().getConfigCdOwnSign();
        if (StringUtils.isBlank(configCdOwnSign)) {
.....
        } else {
            try {
            // 获取服务,由于是泛化调用,所以获取的一定是GenericService类型
                GenericService remoteClient = CommonApiFactory.getApi(RemoteRegistry.class).registryOriginConsumer(function);
         // 第一个参数是需要调用的方法名
         // 第二个参数是需要调用的方法的参数类型数组,为String数组,里面存入参数的全类名。
         // 第三个参数是需要调用的方法的参数数组,为Object数组,里面存入需要的参数
                result = remoteClient.$invoke(methodName, argTypes, arguments);
            } catch (RpcException e) {
                ....            }
        }
        return result;
    }

从缓存中获取泛化
DefaultRemoteRegistryComponent

    public GenericService registryGenericConsumer(String interfaceName, List<MethodConfig> methods,
                                                  String group, String version, Integer timeout, Integer retries) {
        ....
        // 创建服务引用配置
        ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
        reference.setInterface(interfaceName);
        // 设置为泛化调用
        reference.setGeneric(Boolean.TRUE.toString());
        if (null != methods) {
            reference.setMethods(methods);
        }
        constructReference(group, version, timeout, retries, reference);
        return ReferenceConfigCache.getCache().get(reference);
    }

名词解释

泛化调用是指在调用方没有服务方提供的API(SDK)的情况下,对服务方进行调用,并且可以正常拿到调用结果
泛化调用(客户端泛化)
实现泛化实现(服务端泛化)

Oinone社区 作者:oinone原创文章,如若转载,请注明出处:https://doc.oinone.top/backend/17027.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

(0)
oinone的头像oinone
上一篇 2024年9月3日 pm12:53
下一篇 2024年9月5日 pm8:13

相关推荐

  • 如何自定义SQL(Mapper)语句

    场景描述 在实际业务场景中,存在复杂SQL的情况,具体表现为: 单表单SQL满足不了的情况下 有复杂的Join关系或者子查询 复杂SQL的逻辑通过程序逻辑难以实现或实现代价较大 在此情况下,通过原生的mybatis/mybatis-plus, 自定义Mapper的方式实现业务功能 1、编写所需的Mapper SQL Mapper写法无限制,与使用原生的mybaits/mybaits-plus用法一样; Mapper(DAO)和SQL可以写在一个文件中,也分开写在两个文件中。 package pro.shushi.pamirs.demo.core.map; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import java.util.List; import java.util.Map; @Mapper public interface DemoItemMapper { @Select("<script>select sum(item_price) as itemPrice,sum(inventory_quantity) as inventoryQuantity,categoryId from ${demoItemTable} as core_demo_item ${where} group by category_id</script>") List<Map<String, Object>> groupByCategoryId(@Param("demoItemTable") String pamirsUserTable, @Param("where") String where); } 2.调用mapper 调用Mapper代码示例 package pro.shushi.pamirs.demo.core.map; import com.google.api.client.util.Lists; import org.springframework.stereotype.Component; import pro.shushi.pamirs.demo.api.model.DemoItem; import pro.shushi.pamirs.framework.connectors.data.api.datasource.DsHintApi; import pro.shushi.pamirs.meta.api.core.orm.convert.DataConverter; import pro.shushi.pamirs.meta.api.session.PamirsSession; import pro.shushi.pamirs.meta.common.spring.BeanDefinitionUtils; import java.util.List; import java.util.Map; @Component public class DemoItemDAO { public List<DemoItem> customSqlDemoItem(){ try (DsHintApi dsHint = DsHintApi.model(DemoItem.MODEL_MODEL)) { String demoItemTable = PamirsSession.getContext().getModelCache().get(DemoItem.MODEL_MODEL).getTable(); DemoItemMapper demoItemMapper = BeanDefinitionUtils.getBean(DemoItemMapper.class); String where = " where status = 'ACTIVE'"; List<Map<String, Object>> dataList = demoItemMapper.groupByCategoryId(demoItemTable,where); DataConverter persistenceDataConverter = BeanDefinitionUtils.getBean(DataConverter.class); return persistenceDataConverter.out(DemoItem.MODEL_MODEL, dataList); } return Lists.newArrayList(); } } 调用Mapper一些说明 启动类需要配置扫描包MapperScan @MapperScan(value = "pro.shushi", annotationClass = Mapper.class) @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, FreeMarkerAutoConfiguration.class}) public class DemoApplication { 调用Mapper接口的时候,需要指定数据源;即上述示例代码中的 DsHintApi dsHint = DsHintApi.model(DemoItem.MODEL_MODEL), 实际代码中使用 try-with-resources语法。 从Mapper返回的结果中获取数据 如果SQL Mapper中已定义了resultMap,调用Mapper(DAO)返回的就是Java对象 如果Mapper返回的是Map<String, Object>,则通过 DataConverter.out进行转化,参考上面的示例 其他参考:Oinone连接外部数据源方案:https://doc.oinone.top/backend/4562.html

    2023年11月27日
    1.6K00
  • 如何自定义Excel导入功能

    介绍 在平台提供的默认导入功能无法满足业务需求的时候,我们可以自定义导入功能,以满足业务中个性化的需求。 功能示例 下面以导入文件的时候加入发布人的字段作为示例讲解。 继承平台的导入任务模型,加上需要在导入的弹窗视图需要展示的字段 package pro.shushi.pamirs.demo.api.model; import pro.shushi.pamirs.file.api.model.ExcelImportTask; import pro.shushi.pamirs.meta.annotation.Field; import pro.shushi.pamirs.meta.annotation.Model; @Model.model(DemoItemImportTask.MODEL_MODEL) @Model(displayName = "商品-Excel导入任务") public class DemoItemImportTask extends ExcelImportTask { public static final String MODEL_MODEL = "demo.DemoItemImportTask"; // 自定义显示的字段 @Field.String @Field(displayName = "发布人") private String publishUserName; } 编写自定义导入弹窗视图的数据初始化方法和导入提交的action package pro.shushi.pamirs.demo.core.action; import org.springframework.stereotype.Component; import pro.shushi.pamirs.boot.base.resource.PamirsFile; import pro.shushi.pamirs.demo.api.model.DemoItemImportTask; import pro.shushi.pamirs.file.api.action.ExcelImportTaskAction; import pro.shushi.pamirs.file.api.config.FileProperties; import pro.shushi.pamirs.file.api.model.ExcelWorkbookDefinition; import pro.shushi.pamirs.file.api.service.ExcelFileService; import pro.shushi.pamirs.meta.annotation.Action; import pro.shushi.pamirs.meta.annotation.Function; import pro.shushi.pamirs.meta.annotation.Model; import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j; import pro.shushi.pamirs.meta.enmu.ActionContextTypeEnum; import pro.shushi.pamirs.meta.enmu.FunctionOpenEnum; import pro.shushi.pamirs.meta.enmu.FunctionTypeEnum; import pro.shushi.pamirs.meta.enmu.ViewTypeEnum; @Slf4j @Component @Model.model(DemoItemImportTask.MODEL_MODEL) public class DemoItemExcelImportTaskAction extends ExcelImportTaskAction { public DemoItemExcelImportTaskAction(FileProperties fileProperties, ExcelFileService excelFileService) { super(fileProperties, excelFileService); } @Action(displayName = "导入", contextType = ActionContextTypeEnum.CONTEXT_FREE, bindingType = {ViewTypeEnum.TABLE}) public DemoItemImportTask createImportTask(DemoItemImportTask data) { if (data.getWorkbookDefinitionId() != null) { ExcelWorkbookDefinition workbookDefinition = new ExcelWorkbookDefinition(); workbookDefinition.setId(data.getWorkbookDefinitionId()); data.setWorkbookDefinition(workbookDefinition); } Object fileId = data.get_d().get("fileId"); if (fileId != null) { PamirsFile pamirsFile = new PamirsFile().queryById(Long.valueOf(fileId.toString())); data.setFile(pamirsFile); } super.createImportTask(data); return data; } /** * @param data * @return */ @Function(openLevel = FunctionOpenEnum.API) @Function.Advanced(type = FunctionTypeEnum.QUERY) public DemoItemImportTask construct(DemoItemImportTask data) { data.construct(); return data; } } 编写导入的单行数据处理逻辑,此处可以拿到导入弹窗内自定义的字段提交的值,然后根据这些值处理自定义逻辑,此处演示代码就是将导入后的商品的发布人都设置为自定义导入视图填的发布人信息 package pro.shushi.pamirs.demo.core.excel.extPoint; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import pro.shushi.pamirs.demo.api.model.DemoItem; import pro.shushi.pamirs.demo.api.model.DemoItemImportTask; import pro.shushi.pamirs.demo.api.service.DemoItemService; import pro.shushi.pamirs.file.api.context.ExcelImportContext; import pro.shushi.pamirs.file.api.extpoint.AbstractExcelImportDataExtPointImpl; import pro.shushi.pamirs.file.api.extpoint.ExcelImportDataExtPoint;…

    2023年11月22日
    1.4K00
  • 自定义审批方式、自定义审批节点名称

    @Model.model(审批模型.MODEL_MODEL) @Component public class 审批模型Action { @Function @Function.Advanced(category = FunctionCategoryEnum.CUSTOM_DESIGNER, displayName = "测试自定义审批类型") public WorkflowSignTypeEnum signType(String json) { // json为业务数据,可用JsonUtils转换 return WorkflowSignTypeEnum.COUNTERSIGN_ONEAGREE_ONEREJUST; } @Function @Function.Advanced(category = FunctionCategoryEnum.CUSTOM_DESIGNER, displayName = "测试自定义审批名称") public String customApprovalName() { return UUID.randomUUID().toString(); } }

    2023年12月5日
    1.8K00
  • 【DM】后端部署使用Dameng数据库(达梦)

    达梦数据库配置 驱动配置 达梦数据库的服务端版本和驱动版本需要匹配,建议使用服务端安装时提供的jdbc驱动,不要使用官方maven仓库中的驱动。 报错 表 xx 中不能同时包含聚集 KEY 和大字段,建表的时候就指定非聚集主键。SELECT * FROM V$DM_INI WHERE PARA_NAME = ‘PK_WITH_CLUSTER’;SP_SET_PARA_VALUE(1,’PK_WITH_CLUSTER’,0) Maven配置 DM8(目前maven仓库最新版本) <dm.version>8.1.2.192</dm.version> <dependency> <groupId>com.dameng</groupId> <artifactId>DmJdbcDriver18</artifactId> <version>${dm.version}</version> </dependency> PS: 8.1.3.12版本驱动需要手动上传到nexus仓库使用,本文包含该版本相关内容。 Maven配置 DM7 <dm7.version>7.6.1.120</dm7.version> <dependency> <groupId>com.dameng</groupId> <artifactId>Dm7JdbcDriver18</artifactId> <version>${dm7.version}</version> </dependency> PS: 7.6.1.120版本驱动需要手动上传到nexus仓库使用,本文包含该版本相关内容。 离线驱动下载 Dm7JdbcDriver18-7.6.1.120.jarDmJdbcDriver18-8.1.3.12.jar JDBC连接配置 pamirs: datasource: base: type: com.alibaba.druid.pool.DruidDataSource driverClassName: dm.jdbc.driver.DmDriver # url: jdbc:dm://127.0.0.1:5236/BASE?clobAsString=true&useUnicode=true&characterEncoding=utf8&compatibleMode=mysql url: jdbc:dm://127.0.0.1:5236?schema=BASE&clobAsString=true&columnNameUpperCase=false&useUnicode=true&characterEncoding=utf8&compatibleMode=mysql username: xxxxxx password: xxxxxx initialSize: 5 maxActive: 200 minIdle: 5 maxWait: 60000 timeBetweenEvictionRunsMillis: 60000 testWhileIdle: true testOnBorrow: false testOnReturn: false poolPreparedStatements: true asyncInit: true validConnectionCheckerClassName: com.alibaba.druid.pool.vendor.OracleValidConnectionChecker validationQuery: SELECT 1 FROM DUAL 连接url配置 点击查看官方文档:DM JDBC 编程指南 连接串1 jdbc:dm://127.0.0.1:5236?schema=BASE&clobAsString=true&columnNameUpperCase=false&useUnicode=true&characterEncoding=utf8&compatibleMode=mysql PS:schema参数在低版本驱动区分大小写,高版本驱动不再区分大小写,为了避免错误,统一使用全大写。columnNameUpperCase参数与官方介绍不一致,为了避免错误,需要显式指定。 连接串2 jdbc:dm://127.0.0.1:5236/BASE?clobAsString=true&useUnicode=true&characterEncoding=utf8&compatibleMode=mysql PS:可能是未来更高版本中使用的连接串形式。 达梦数据库在不同驱动版本下需要使用不同的连接串进行处理,具体可参考下表:(使用错误的连接串将无法正常启动) Dm7JdbcDriver18版本 Build-Time 使用的连接串类型 是否支持指定schema schema是否区分大小写 是否可用 不可用原因 7.6.0.165 2019.06.04 1 否 是 否 不支持LocalDateTime类型 7.6.1.120(建议) 2022.09.14 1 是 是 是 – DmJdbcDriver18版本 Build-Time 使用的连接串类型 是否支持指定schema schema是否区分大小写 是否可用 不可用原因 8.1.2.192 2023.01.12 1 是 否 是 – 8.1.3.12(建议) 2023.04.17 2 是 否 是 – 方言配置 pamirs方言配置 pamirs: dialect: ds: base: type: DM version: 8 majorVersion: 8 pamirs: type: DM version: 8 majorVersion: 8 数据库版本 type version majorVersion 7-20220916 DM 7 20220916 8-20230418 DM 8 8 schedule方言配置 pamirs: event: schedule: dialect: type: DM version: 8 majorVersion: 8 type version majorVersion…

    2023年11月1日
    14.0K00
  • IWrapper、QueryWrapper和LambdaQueryWrapper使用

    条件更新updateByWrapper 通常我们在更新的时候new一个对象出来在去更新,减少更新的字段 Integer update = new DemoUser().updateByWrapper(new DemoUser().setFirstLogin(Boolean.FALSE), Pops.<DemoUser>lambdaUpdate().from(DemoUser.MODEL_MODEL).eq(IdModel::getId, userId) 使用基础模型的updateById方法更新指定字段的方法: new 一下update对象出来,更新这个对象。 WorkflowUserTask userTaskUp = new WorkflowUserTask(); userTaskUp.setId(userTask.getId()); userTaskUp.setNodeContext(json); userTaskUp.updateById(); 条件删除updateByWrapper public List<T> delete(List<T> data) { List<Long> petTypeIdList = new ArrayList(); for(T item:data){ petTypeIdList.add(item.getId()); } Models.data().deleteByWrapper(Pops.<PetType>lambdaQuery().from(PetType.MODEL_MODEL).in(PetType::getId,petTypeIdList)); return data; } 构造条件查询数据 示例1: LambdaQueryWrapper拼接查询条件 private void queryPetShops() { LambdaQueryWrapper<PetShop> query = Pops.<PetShop>lambdaQuery(); query.from(PetShop.MODEL_MODEL); query.setSortable(Boolean.FALSE); query.orderBy(true, true, PetShop::getId); List<PetShop> petShops2 = new PetShop().queryList(query); System.out.printf(petShops2.size() + ""); } 示例2: IWrapper拼接查询条件 private void queryPetShops() { IWrapper<PetShop> wrapper = Pops.<PetShop>lambdaQuery() .from(PetShop.MODEL_MODEL).eq(PetShop::getId,1L); List<PetShop> petShops4 = new PetShop().queryList(wrapper); System.out.printf(petShops4.size() + ""); } 示例3: QueryWrapper拼接查询条件 private void queryPetShops() { //使用Lambda获取字段名,防止后面改字段名漏改 String nameField = LambdaUtil.fetchFieldName(PetTalent::getName); //使用Lambda获取Clumon名,防止后面改字段名漏改 String nameColumn = PStringUtils.fieldName2Column(nameField); QueryWrapper<PetShop> wrapper2 = new QueryWrapper<PetShop>().from(PetShop.MODEL_MODEL) .eq(nameColumn, "test"); List<PetShop> petShops5 = new PetShop().queryList(wrapper2); System.out.printf(petShops5.size() + ""); } IWrapper转为LambdaQueryWrapper @Function.Advanced(type= FunctionTypeEnum.QUERY) @Function.fun(FunctionConstants.queryPage) @Function(openLevel = {FunctionOpenEnum.API}) public Pagination<PetShopProxy> queryPage(Pagination<PetShopProxy> page, IWrapper<PetShopProxy> queryWrapper) { LambdaQueryWrapper<PetShopProxy> wrapper = ((QueryWrapper<PetShopProxy>) queryWrapper).lambda(); // 非存储字段从QueryData中获取 Map<String, Object> queryData = queryWrapper.getQueryData(); if (null != queryData && !queryData.isEmpty()) { String codes = (String) queryData.get("codes"); if (org.apache.commons.lang3.StringUtils.isNotBlank(codes)) { wrapper.in(PetShopProxy::getCode, codes.split(",")); } } return new PetShopProxy().queryPage(page, wrapper); }

    2024年5月25日
    2.1K00

Leave a Reply

登录后才能评论