Oinone远程调用链路源码分析

前提

源码分析版本是 5.1.x版本

概要

在服务启动时,获取注解REMOTE的函数,通过dubbo的泛化调用发布。在调用函数时,通过dubbo泛化调用获取结果。

注册服务者

  1. 在spring 启动方法installOrLoad中初始化
  2. 寻找定义REMOTE的方法
  3. 组装dubbo的服务配置
  4. 组装服务对象实现引用,内容如下,用于注册
    • 调用前置处理
      • 放信息到SessionApi
      • 函数调用链追踪,放到本地TransmittableThreadLocal
      • 从redis中获取到的数据进行反序列化并存在到本地的线程里
      • Trace信息,放一份在sessionApi中 和ThreadLocal
    • 调用函数执行
    • 返回数据转成特定格式
  5. 通过线程组调用dubbo的ServiceConfig.export 服务发布

时序图

Oinone远程调用链路源码分析
注册

源码分析

根据条件判断,确定向dubbo进行服务发布
RemoteServiceLoader

public void publishService(List<FunctionDefinition> functionList,Map<String,Runnable> isPublished) {
        // 因为泛化接口只能控制到namespace,控制粒度不能到fun级别,这里进行去重处理
        Map<String, Function> genericNamespaceMap = new HashMap<>();
        for (FunctionDefinition functionDefinition : functionList) {
            Function function = new Function(functionDefinition)

            try {
               //定义REMOTE, 才给予远程调用
                if (FunctionOpenEnum.REMOTE.in(function.getOpen()) && !ClassUtils.isInterface(function.getClazz())) {
                    genericNamespaceMap.putIfAbsent(RegistryUtils.getRegistryInterface(function), function);
                }
            } catch (PamirsException e) {
            }
        }
        // 发布远程服务
        for (String namespace : genericNamespaceMap.keySet()) {
            Function function = genericNamespaceMap.get(namespace);
            if(isPublished.get(RegistryUtils.getRegistryInterface(function)) == null){
                // 发布,注册远程函数服务,底层使用dubbo的泛化调用
                Runnable registryTask = () -> remoteRegistry.registryService(function);
                isPublished.put(RegistryUtils.getRegistryInterface(function),registryTask);
            }else{

            }
        }
    }

构造ServiceConfig方法,设置成泛化调用,进行发布export()
DefaultRemoteRegistryComponent

     public void registryGenericService(String interfaceName, List<MethodConfig> methods,
                                       String group, String version, Integer timeout, Integer retries) {
        ....
        try {
            ServiceConfig<GenericService> service = new ServiceConfig<>();
            // 服务接口名
            service.setInterface(interfaceName);
            // 服务对象实现引用
            service.setRef(genericService(interfaceName));
            if (null != methods) {
                service.setMethods(methods);
            }
            // 声明为泛化接口
            service.setGeneric(Boolean.TRUE.toString());
            // 基础元数据
            constructService(group, version, timeout, retries, service);
            service.export();
        } catch (Exception e) {
           .....
        }
    }

// 服务对象实现引用
private GenericService genericService(String interfaceName) {
        return (method, parameterTypes, args) -> {
            PamirsSession.clear();
            Function function = Objects.requireNonNull(PamirsSession.getContext()).getFunction(RegistryUtils.getFunctionNamespace(method), RegistryUtils.getFunctionFun(method));
            if (log.isDebugEnabled()) {
                log.debug("interfaceName: " + interfaceName + ", isDataManage: " + function.isDataManager());
            }
            try {
                //前置处理:服务提供者,对请求参数进行对象化拆解,并对请求携带的上下文进行处理
                // 放信息到SessionApi
                // 函数调用链追踪,放到本地TransmittableThreadLocal
                // CommonMetaDataCacheApi.computeMetaData() 从redis中获取到的数据进行反序列化并存在到本地的TL中
                // DataAuditApi.computeDataAuditSession() Trace信息,放一份在sessionApi中 和ThreadLocal中
                Object[] args1 = Spider.getDefaultExtension(RemoteRequestArgApi.class).providerHandle(function.getNamespace(), function.getFun(), args, function.getArguments());

                Object result = FunEngine.get().exclude(ScriptType.REMOTE).run(function, args1);

                //后置处理:服务提供者,对结果进行对象化封装、携带请求上下文进行处理
                return Spider.getDefaultExtension(RemoteResponseApi.class).providerHandle(function, method, result);
            } catch (Throwable e) {
                return Spider.getDefaultExtension(RemoteResponseApi.class).providerExceptionHandle(function, method, e);
            } finally {
                PamirsSession.clear();
            }
        };
    }

注册消费者

  1. 函数处理调用
  2. 注册服务消费者
    • 从ReferenceConfigCache获取泛化
  3. 调用dubbo泛化调用接口
  4. 获取返回信息
    • 获取用户id,放入PamirsSession
    • 如果开启debug模式
      • 存入DEBUG_THREAD_LOCAL本地线程
    • 返回格式
      • IWrapper
      • Pagination
      • Result

时序图

Oinone远程调用链路源码分析

源码分析

泛化调用dobbo接口,并解析返回对象
RemoteComputer

public Object compute(Function function, Object... args) {

        .....
        List<Arg> functionArguments = function.getArguments();
        String methodName = RegistryUtils.getGenericServiceMethodName(function);
        String[] argTypes = FunctionUtils.fetchArgTypes(functionArguments);
        //前置处理:服务消费者,对请求参数进行对象化封装、携带请求上下文进行处理
        Object[] arguments = getRemoteRequestApi().consumerHandle(function.getNamespace(),function.getFun(), args, functionArguments);
        // 泛化调用
        Object result = invoke(function, methodName, argTypes, arguments);
        .....
        //后置处理:服务消费者,对返回结果进行对象化拆解,并对结果携带的上下文进行处理
        // 数据转成IWrapper/Pagination/Result
        return getRemoteResponseApi().consumerHandle(function, result);
}

// 配置请求信息,通过$invoke 实际调用
private Object invoke(Function function, String methodName, String[] argTypes, Object[] arguments) {
        Object result;
        String configCdOwnSign = getSessionFillOwnSignApi().getConfigCdOwnSign();
        if (StringUtils.isBlank(configCdOwnSign)) {
.....
        } else {
            try {
            // 获取服务,由于是泛化调用,所以获取的一定是GenericService类型
                GenericService remoteClient = CommonApiFactory.getApi(RemoteRegistry.class).registryOriginConsumer(function);
         // 第一个参数是需要调用的方法名
         // 第二个参数是需要调用的方法的参数类型数组,为String数组,里面存入参数的全类名。
         // 第三个参数是需要调用的方法的参数数组,为Object数组,里面存入需要的参数
                result = remoteClient.$invoke(methodName, argTypes, arguments);
            } catch (RpcException e) {
                ....            }
        }
        return result;
    }

从缓存中获取泛化
DefaultRemoteRegistryComponent

    public GenericService registryGenericConsumer(String interfaceName, List<MethodConfig> methods,
                                                  String group, String version, Integer timeout, Integer retries) {
        ....
        // 创建服务引用配置
        ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
        reference.setInterface(interfaceName);
        // 设置为泛化调用
        reference.setGeneric(Boolean.TRUE.toString());
        if (null != methods) {
            reference.setMethods(methods);
        }
        constructReference(group, version, timeout, retries, reference);
        return ReferenceConfigCache.getCache().get(reference);
    }

名词解释

泛化调用是指在调用方没有服务方提供的API(SDK)的情况下,对服务方进行调用,并且可以正常拿到调用结果
泛化调用(客户端泛化)
实现泛化实现(服务端泛化)

Oinone社区 作者:oinone原创文章,如若转载,请注明出处:https://doc.oinone.top/backend/17027.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

(0)
oinone的头像oinone
上一篇 2024年9月3日 pm12:53
下一篇 2024年9月5日 pm8:13

相关推荐

  • Excel导入导出模板翻译

    导出翻译项 与翻译的导出全部翻译项类似,只是该操作目前没有加入到页面交互中,需要通过工具发起后端服务请求,拿到导入导出翻译Excel模版,添加模版翻译项。(查看路径:文件–导出任务) mutation { excelExportTaskMutation { createExportTask( data: { workbookDefinition: { model: "file.ExcelWorkbookDefinition" name: "excelLocationTemplate" } } ) { name } } } variables: { "path": "/file", "lang": "en-US" } 参数说明:(不在以下说明范围内的参数无需修改) variables.lang参数:用于指定翻译项的目标语言编码,与【资源】-【语言】中的编码一致。 导入翻译项 mutation { excelImportTaskMutation { createImportTask( data: { workbookDefinition: { model: "file.ExcelWorkbookDefinition" name: "excelLocationTemplate" } file: { url: "翻译项URL链接" } } ) { name } } } variables: { "path": "/file" } 参数说明: 将翻译项URL链接改为实际可访问的文件链接即可,可通过页面中任意文件上传的组件获取。

    2024年12月5日
    1.2K00
  • 后端:如何自定义表达式实现特殊需求?扩展内置函数表达式

    平台提供了很多的表达式,如果这些表达式不满足场景?那我们应该如何新增表达式去满足项目的需求?目前平台支持的表达式内置函数,参考 1. 扩展表达式的场景 注解@Validation的rule字段支持配置表达式校验如果需要判断入参List类型字段中的某一个参数进行NULL校验,发现平台的内置函数不支持该场景的配置,这里就可以通过平台的机制,对内置函数进行扩展。 常见的一些代码场景,如下: package pro.shushi.pamirs.demo.core.action; ……引用类 @Model.model(PetShopProxy.MODEL_MODEL) @Component public class PetShopProxyAction extends DataStatusBehavior<PetShopProxy> { @Override protected PetShopProxy fetchData(PetShopProxy data) { return data.queryById(); } @Validation(ruleWithTips = { @Validation.Rule(value = "!IS_BLANK(data.code)", error = "编码为必填项"), @Validation.Rule(value = "LEN(data.name) < 128", error = "名称过长,不能超过128位"), }) @Action(displayName = "启用") @Action.Advanced(invisible="!(activeRecord.code !== undefined && !IS_BLANK(activeRecord.code))") public PetShopProxy dataStatusEnable(PetShopProxy data){ data = super.dataStatusEnable(data); data.updateById(); return data; } ……其他代码 } 2. 新建一个自定义表达式的函数 校验入参如果是个集合对象的情况下,单个对象的某个字段如果为空,返回false的函数。 例子:新建一个CustomCollectionFunctions类 package xxx.xxx.xxx; import org.apache.commons.collections4.CollectionUtils; import org.springframework.stereotype.Component; import pro.shushi.pamirs.meta.annotation.Fun; import pro.shushi.pamirs.meta.annotation.Function; import pro.shushi.pamirs.meta.common.constants.NamespaceConstants; import pro.shushi.pamirs.meta.util.FieldUtils; import java.util.List; import static pro.shushi.pamirs.meta.enmu.FunctionCategoryEnum.COLLECTION; import static pro.shushi.pamirs.meta.enmu.FunctionLanguageEnum.JAVA; import static pro.shushi.pamirs.meta.enmu.FunctionOpenEnum.LOCAL; import static pro.shushi.pamirs.meta.enmu.FunctionSceneEnum.EXPRESSION; /** * 自定义内置函数 */ @Fun(NamespaceConstants.expression) @Component public class CustomCollectionFunctions { /** * LIST_FIELD_NULL 就是我们自定义的表达式,不能与已经存在的表达式重复!!! * * @param list * @param field * @return */ @Function.Advanced( displayName = "校验集成的参数是否为null", language = JAVA, builtin = true, category = COLLECTION ) @Function.fun("LIST_FIELD_NULL") @Function(name = "LIST_FIELD_NULL", scene = {EXPRESSION}, openLevel = LOCAL, summary = "函数示例: LIST_FIELD_NULL(list,field),函数说明: 传入一个对象集合,校验集合的字段是否为空" ) public Boolean listFieldNull(List list, String field) { if (null == list) { return false; } if (CollectionUtils.isEmpty(list)) { return false; } for (Object data : list) { Object value =…

    2024年5月30日
    2.5K00
  • 项目中工作流引入和流程触发

    目录 1. 使用工作流需要依赖的包和设置2. 触发方式2.1 自动触发方式2.2 触发方式 1.使用工作流需要依赖的包和设置 1.1 工作流需要依赖的模块 需在pom.xml中增加workflow、sql-record和trigger相关模块的依赖 workflow:工作流运行核心模块 sql-record:监听流程发布以后对应模型的增删改监听 trigger:异步任务调度模块 <dependency> <groupId>pro.shushi.pamirs.workflow</groupId> <artifactId>pamirs-workflow-api</artifactId> </dependency> <dependency> <groupId>pro.shushi.pamirs.workflow</groupId> <artifactId>pamirs-workflow-core</artifactId> </dependency> <dependency> <groupId>pro.shushi.pamirs.core</groupId> <artifactId>pamirs-sql-record-core</artifactId> </dependency> <dependency> <groupId>pro.shushi.pamirs.core</groupId> <artifactId>pamirs-trigger-core</artifactId> </dependency> <dependency> <groupId>pro.shushi.pamirs.core</groupId> <artifactId>pamirs-trigger-bridge-tbschedule</artifactId> </dependency> 在application.yml中增加对应模块的依赖以及sql-record路径以及其他相关设置 pamirs: … record: sql: #改成自己路径 store: /opt/pamirs/logs … boot: init: true sync: true modules: … – sql_record – trigger – workflow … sharding: define: data-sources: ds: pamirs models: "[trigger.PamirsSchedule]": tables: 0..13 event: enabled: true schedule: enabled: true # ownSign区分不同应用 ownSign: demo rocket-mq: # enabled 为 false情况不用配置 namesrv-addr: 192.168.6.2:19876 trigger: auto-trigger: true 2.触发方式 2.1自动触发方式 在流程设计器中设置触发方式,如果设置了代码触发方式则不会自动触发 2.2代码调用方式触发 2.2.1.再流程设计器中触发设置中,设置为是否人工触发设置为是 2.2.2.查询数据库获取该流程的编码 2.2.3.在代码中调用 /** * 触发⼯作流实例 */ private Boolean startWorkflow(WorkflowD workflowD, IdModel modelData) { WorkflowDefinition workflowDefinition = new WorkflowDefinition().queryOneByWrapper( Pops.<WorkflowDefinition>lambdaQuery() .from(WorkflowDefinition.MODEL_MODEL) .eq(WorkflowDefinition::getWorkflowCode, workflowD.getCode()) .eq(WorkflowDefinition::getActive, 1) ); if (null == workflowDefinition) { // 流程没有运⾏实例 return Boolean.FALSE; } String model = Models.api().getModel(modelData); //⼯作流上下⽂ WorkflowDataContext wdc = new WorkflowDataContext(); wdc.setDataType(WorkflowVariationTypeEnum.ADD); wdc.setModel(model); wdc.setWorkflowDefinitionDefinition(workflowDefinition.parseContent()); wdc.setWorkflowDefinition(workflowDefinition); wdc.setWorkflowDefinitionId(workflowDefinition.getId()); IdModel copyData = KryoUtils.get().copy(modelData); // ⼿动触发创建的动作流,将操作⼈设置为当前⽤户,作为流程的发起⼈ copyData.setCreateUid(PamirsSession.getUserId()); copyData.setWriteUid(PamirsSession.getUserId()); String jsonData = JsonUtils.toJSONString(copyData.get_d()); //触发⼯作流 新增时触发-onCreateManual 更新时触发-onUpdateManual Fun.run(WorkflowModelTriggerFunction.FUN_NAMESPACE, "onCreateManual", wdc, msgId, jsonData); return Boolean.TRUE; }

    2023年11月7日
    1.6K00
  • 国际化-语言和时区设置

    国际化-翻译 1、引入翻译的包 <dependency> <groupId>pro.shushi.pamirs.core</groupId> <artifactId>pamirs-translate</artifactId> </dependency> 2、默认逻辑:在系统的右上角,切换【系统语言后】,用户所选择的语言会保存到对应的用户信息中,后续所有的请求都会拿这个「语言」的值,并将其放入到PamirsSession#Lang中。3、实际项目可以通过自定义Session逻辑,根据实际业务覆盖掉默认方式,并将其设置在PamirsSession中: PamirsSession.setLang(langCode); 构建自定义Session参考:https://shushi.yuque.com/yoxz76/oio3/kg2sgr 构建自定义Session的逻辑中,根据业务逻辑把获取到的langCode设置都PamirsSession 4、目标语言编码说明语言编码必须符合ISO标准,即语言ISO代码。国际化-语言代码表-Language Codes参考下面的链接:https://blog.csdn.net/qq827245563/article/details/131552695 国际化-时区 1、引入时区的包 <dependency> <groupId>pro.shushi.pamirs.core</groupId> <artifactId>pamirs-timezone</artifactId> </dependency> 2、时区设置类似语言(langCode)3、在自定义Session(与设置语言共同的Session自定义)中,根据实际业务覆盖掉默认方式,并将其设置在TimezoneSession中: pro.shushi.pamirs.timezone.session.TimezoneSession#setTimezone(TimeZone timezone) 其他说明 PamirsSession 和 TimezoneSession 都是请求级别的;即每次请求都会自动销毁; 因此在自定义Session中覆盖这两个属性的默认值的时候特别注意一下性能。

    2023年12月4日
    1.2K00
  • 【PostgreSQL】后端部署使用PostgreSQL数据库

    PostgreSQL数据库配置 驱动配置 Maven配置(14.3版本可用) <postgresql.version>42.6.0</postgresql.version> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>${postgresql.version}</version> </dependency> 离线驱动下载 postgresql-42.2.18.jarpostgresql-42.6.0.jarpostgresql-42.7.3.jar JDBC连接配置 pamirs: datasource: base: type: com.alibaba.druid.pool.DruidDataSource driverClassName: org.postgresql.Driver url: jdbc:postgresql://127.0.0.1:5432/pamirs?currentSchema=base username: xxxxxx password: xxxxxx 连接url配置 暂无官方资料 url格式 jdbc:postgresql://${host}:${port}/${database}?currentSchema=${schema} 在jdbc连接配置时,${database}和${schema}必须完整配置,不可缺省。 其他连接参数如需配置,可自行查阅相关资料进行调优。 方言配置 pamirs方言配置 pamirs: dialect: ds: base: type: PostgreSQL version: 14 major-version: 14.3 pamirs: type: PostgreSQL version: 14 major-version: 14.3 数据库版本 type version majorVersion 14.x PostgreSQL 14 14.3 PS:由于方言开发环境为14.3版本,其他类似版本(14.x)原则上不会出现太大差异,如出现其他版本无法正常支持的,可在文档下方留言。 schedule方言配置 pamirs: event: enabled: true schedule: enabled: true dialect: type: PostgreSQL version: 14 major-version: 14.3 type version majorVersion PostgreSQL 14 14.3 PS:由于schedule的方言在多个版本中并无明显差异,目前仅提供一种方言配置。 其他配置 逻辑删除的值配置 pamirs: mapper: global: table-info: logic-delete-value: (EXTRACT(epoch FROM CURRENT_TIMESTAMP) * 1000000 + EXTRACT(MICROSECONDS FROM CURRENT_TIMESTAMP))::bigint PostgreSQL数据库用户初始化及授权 — init root user (user name can be modified by oneself) CREATE USER root WITH PASSWORD 'password'; — if using automatic database and schema creation, this is very important. ALTER USER root CREATEDB; SELECT * FROM pg_roles; — if using postgres database, this authorization is required. GRANT CREATE ON DATABASE postgres TO root;

    2023年11月1日
    1.3K00

Leave a Reply

登录后才能评论