Oinone远程调用链路源码分析

前提

源码分析版本是 5.1.x版本

概要

在服务启动时,获取注解REMOTE的函数,通过dubbo的泛化调用发布。在调用函数时,通过dubbo泛化调用获取结果。

注册服务者

  1. 在spring 启动方法installOrLoad中初始化
  2. 寻找定义REMOTE的方法
  3. 组装dubbo的服务配置
  4. 组装服务对象实现引用,内容如下,用于注册
    • 调用前置处理
      • 放信息到SessionApi
      • 函数调用链追踪,放到本地TransmittableThreadLocal
      • 从redis中获取到的数据进行反序列化并存在到本地的线程里
      • Trace信息,放一份在sessionApi中 和ThreadLocal
    • 调用函数执行
    • 返回数据转成特定格式
  5. 通过线程组调用dubbo的ServiceConfig.export 服务发布

时序图

Oinone远程调用链路源码分析
注册

源码分析

根据条件判断,确定向dubbo进行服务发布
RemoteServiceLoader

public void publishService(List<FunctionDefinition> functionList,Map<String,Runnable> isPublished) {
        // 因为泛化接口只能控制到namespace,控制粒度不能到fun级别,这里进行去重处理
        Map<String, Function> genericNamespaceMap = new HashMap<>();
        for (FunctionDefinition functionDefinition : functionList) {
            Function function = new Function(functionDefinition)

            try {
               //定义REMOTE, 才给予远程调用
                if (FunctionOpenEnum.REMOTE.in(function.getOpen()) && !ClassUtils.isInterface(function.getClazz())) {
                    genericNamespaceMap.putIfAbsent(RegistryUtils.getRegistryInterface(function), function);
                }
            } catch (PamirsException e) {
            }
        }
        // 发布远程服务
        for (String namespace : genericNamespaceMap.keySet()) {
            Function function = genericNamespaceMap.get(namespace);
            if(isPublished.get(RegistryUtils.getRegistryInterface(function)) == null){
                // 发布,注册远程函数服务,底层使用dubbo的泛化调用
                Runnable registryTask = () -> remoteRegistry.registryService(function);
                isPublished.put(RegistryUtils.getRegistryInterface(function),registryTask);
            }else{

            }
        }
    }

构造ServiceConfig方法,设置成泛化调用,进行发布export()
DefaultRemoteRegistryComponent

     public void registryGenericService(String interfaceName, List<MethodConfig> methods,
                                       String group, String version, Integer timeout, Integer retries) {
        ....
        try {
            ServiceConfig<GenericService> service = new ServiceConfig<>();
            // 服务接口名
            service.setInterface(interfaceName);
            // 服务对象实现引用
            service.setRef(genericService(interfaceName));
            if (null != methods) {
                service.setMethods(methods);
            }
            // 声明为泛化接口
            service.setGeneric(Boolean.TRUE.toString());
            // 基础元数据
            constructService(group, version, timeout, retries, service);
            service.export();
        } catch (Exception e) {
           .....
        }
    }

// 服务对象实现引用
private GenericService genericService(String interfaceName) {
        return (method, parameterTypes, args) -> {
            PamirsSession.clear();
            Function function = Objects.requireNonNull(PamirsSession.getContext()).getFunction(RegistryUtils.getFunctionNamespace(method), RegistryUtils.getFunctionFun(method));
            if (log.isDebugEnabled()) {
                log.debug("interfaceName: " + interfaceName + ", isDataManage: " + function.isDataManager());
            }
            try {
                //前置处理:服务提供者,对请求参数进行对象化拆解,并对请求携带的上下文进行处理
                // 放信息到SessionApi
                // 函数调用链追踪,放到本地TransmittableThreadLocal
                // CommonMetaDataCacheApi.computeMetaData() 从redis中获取到的数据进行反序列化并存在到本地的TL中
                // DataAuditApi.computeDataAuditSession() Trace信息,放一份在sessionApi中 和ThreadLocal中
                Object[] args1 = Spider.getDefaultExtension(RemoteRequestArgApi.class).providerHandle(function.getNamespace(), function.getFun(), args, function.getArguments());

                Object result = FunEngine.get().exclude(ScriptType.REMOTE).run(function, args1);

                //后置处理:服务提供者,对结果进行对象化封装、携带请求上下文进行处理
                return Spider.getDefaultExtension(RemoteResponseApi.class).providerHandle(function, method, result);
            } catch (Throwable e) {
                return Spider.getDefaultExtension(RemoteResponseApi.class).providerExceptionHandle(function, method, e);
            } finally {
                PamirsSession.clear();
            }
        };
    }

注册消费者

  1. 函数处理调用
  2. 注册服务消费者
    • 从ReferenceConfigCache获取泛化
  3. 调用dubbo泛化调用接口
  4. 获取返回信息
    • 获取用户id,放入PamirsSession
    • 如果开启debug模式
      • 存入DEBUG_THREAD_LOCAL本地线程
    • 返回格式
      • IWrapper
      • Pagination
      • Result

时序图

Oinone远程调用链路源码分析

源码分析

泛化调用dobbo接口,并解析返回对象
RemoteComputer

public Object compute(Function function, Object... args) {

        .....
        List<Arg> functionArguments = function.getArguments();
        String methodName = RegistryUtils.getGenericServiceMethodName(function);
        String[] argTypes = FunctionUtils.fetchArgTypes(functionArguments);
        //前置处理:服务消费者,对请求参数进行对象化封装、携带请求上下文进行处理
        Object[] arguments = getRemoteRequestApi().consumerHandle(function.getNamespace(),function.getFun(), args, functionArguments);
        // 泛化调用
        Object result = invoke(function, methodName, argTypes, arguments);
        .....
        //后置处理:服务消费者,对返回结果进行对象化拆解,并对结果携带的上下文进行处理
        // 数据转成IWrapper/Pagination/Result
        return getRemoteResponseApi().consumerHandle(function, result);
}

// 配置请求信息,通过$invoke 实际调用
private Object invoke(Function function, String methodName, String[] argTypes, Object[] arguments) {
        Object result;
        String configCdOwnSign = getSessionFillOwnSignApi().getConfigCdOwnSign();
        if (StringUtils.isBlank(configCdOwnSign)) {
.....
        } else {
            try {
            // 获取服务,由于是泛化调用,所以获取的一定是GenericService类型
                GenericService remoteClient = CommonApiFactory.getApi(RemoteRegistry.class).registryOriginConsumer(function);
         // 第一个参数是需要调用的方法名
         // 第二个参数是需要调用的方法的参数类型数组,为String数组,里面存入参数的全类名。
         // 第三个参数是需要调用的方法的参数数组,为Object数组,里面存入需要的参数
                result = remoteClient.$invoke(methodName, argTypes, arguments);
            } catch (RpcException e) {
                ....            }
        }
        return result;
    }

从缓存中获取泛化
DefaultRemoteRegistryComponent

    public GenericService registryGenericConsumer(String interfaceName, List<MethodConfig> methods,
                                                  String group, String version, Integer timeout, Integer retries) {
        ....
        // 创建服务引用配置
        ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
        reference.setInterface(interfaceName);
        // 设置为泛化调用
        reference.setGeneric(Boolean.TRUE.toString());
        if (null != methods) {
            reference.setMethods(methods);
        }
        constructReference(group, version, timeout, retries, reference);
        return ReferenceConfigCache.getCache().get(reference);
    }

名词解释

泛化调用是指在调用方没有服务方提供的API(SDK)的情况下,对服务方进行调用,并且可以正常拿到调用结果
泛化调用(客户端泛化)
实现泛化实现(服务端泛化)

Oinone社区 作者:oinone原创文章,如若转载,请注明出处:https://doc.oinone.top/backend/17027.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

(0)
oinone的头像oinone
上一篇 2024年9月3日 pm12:53
下一篇 2024年9月5日 pm8:13

相关推荐

  • 平台配置日志输出和推送到APM与LogStash

    场景描述 目前设计器镜像启动后日志文件为out.log,是启动脚本中定向输出了(>>)out.log文件。实际项目可能: 日志输出到特定目录的特定文件名中 指定以日志保留策略(单个文件大小和文件保留个数) 日志输出到APM工具中(如skywalking) 日志推送到LogStash 日志自定义输出 不定向输出,采用自己配置的方式,与标准的SpringBoot工程配置日志一样。两种方式(都是Spring提供的方式): 方式一 bootstrap.yml 里面可以按profiles指定logback的配置文件,具体文件名和文件输入在logback里面进行配置,跟通用的logback配置一致. 例如: logging: config: classpath:logback-pre.xml 方式二 resources的根目录,直接配置 logback-spring.xml, 启动会自动加载。 日志自定义场景 配置日志推送到LogStash <!–配置日志推送到LogStash–> <contextListener class="pro.shushi.pamirs.demo.core.config.DemoLogbackFiledConfig"/> <appender name="LogStash" class="net.logstash.logback.appender.LogstashTcpSocketAppender"> <destination>127.0.0.1:4560</destination> <!– encoder必须配置,有多种可选 –> <encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"> <!– SkyWalking插件, log加tid–> <provider class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.logstash.TraceIdJsonProvider" /> <!–在生成的json中会加这些字段–> <customFields> {"app.name":"pamirs-demo", "app.type":"Microservice", "platform":"pamirs", "env":"dev"} </customFields> <timeZone>Asia/Shanghai</timeZone> <writeVersionAsInteger>true</writeVersionAsInteger> <providers> <pattern> <pattern> <!–动态的变量–> { "ip": "%{ip}", "server.name": "%{server.name}", "logger_name": "%logger" } </pattern> </pattern> </providers> </encoder> </appender> skywalking的日志rpc上传 <!– skywalking的日志rpc上传 –> <appender name="SkyWalkingLogs" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender"> <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder"> <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.mdc.TraceIdMDCPatternLogbackLayout"> <Pattern>${CONSOLE_LOG_PATTERN}</Pattern> </layout> </encoder> </appender> 完整的代码示例 Logback自定义字段 package pro.shushi.pamirs.demo.core.config; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.spi.LoggerContextListener; import ch.qos.logback.core.Context; import ch.qos.logback.core.spi.ContextAwareBase; import ch.qos.logback.core.spi.LifeCycle; import java.net.InetAddress; import java.net.UnknownHostException; /** * Logback自定义字段 * * @author wx@shushi.pro * @date 2024/4/17 */ public class DemoLogbackFiledConfig extends ContextAwareBase implements LoggerContextListener, LifeCycle { private boolean started = false; @Override public boolean isResetResistant() { return false; } @Override public void onStart(LoggerContext loggerContext) { } @Override public void onReset(LoggerContext loggerContext) { } @Override public void onStop(LoggerContext loggerContext) { } @Override public void onLevelChange(Logger logger, Level level) { } @Override public void start() { if (started) { return; } Context context = getContext();…

    2024年5月18日
    1.5K00
  • 工作流审核撤回/回退/拒绝/同意/反悔钩子使用

    目录 1. 流程撤回、拒绝和回退调用自定义函数1.1 工作流【撤销】回调钩子1.2 撤销【回退】回调钩子1.3 工作流【拒绝】回调钩子1.4 工作流【同意】回调钩子1.4 工作流【反悔】回调钩子1.4 回调钩子在业务系统中的调用示例2. 自定义审批方式、自定义审批节点名称 1.流程撤回、拒绝和回退调用自定义函数 1.1工作流【撤销】回调钩子 使用方式:把该方法放置到XXX模型的Action下面,或@Fun(XXX.MODEL_MODEL)触发方式:当流程实例被撤销时调用入口:pro.shushi.pamirs.workflow.app.core.service.impl.WorkflowInstanceServiceImpl#undoInstance /** * XXX为当前流程触发方式为模型触发时对应的触发模型、 * 对应返回不影响流程上下文 * @param data 入参为触发时的业务数据,数据的JsonString * @return */ @Function public XXX recall(String data) { // TODO: 根据实际的业务逻辑把data转换为对象 WorkRecord workRecord = JsonUtils.parseObject(data, new TypeReference<WorkRecord>(){}); // TODO: 增加自定义业务逻辑 return new XXX(); } 1.2撤销【回退】回调钩子 使用方式:把该方法放置到XXX模型的Action下面,或@Fun(XXX.MODEL_MODEL)触发方式:流程待办进行回退操作时调用入口:pro.shushi.pamirs.workflow.app.core.service.operator.ApprovalFallbackOperatorService /** * XXX为当前流程触发方式为模型触发时对应的触发模型 * 对应返回不影响流程上下文 * @param data 入参为触发时的业务数据,数据的JsonString * @return */ @Function public XXX fallBack(String data) { // TODO: 根据实际的业务逻辑把data转换为对象 WorkRecord workRecord = JsonUtils.parseObject(data, new TypeReference<WorkRecord>(){}); // TODO: 增加自定义业务逻辑 return new XXX(); } 1.3工作流【拒绝】回调钩子 使用方式:把该方法放置到XXX模型的Action下面,或@Fun(XXX.MODEL_MODEL)触发方式:流程待办进行拒绝操作时调用入口:pro.shushi.pamirs.workflow.app.core.service.operator.ApprovalFallbackOperatorService /** * XXX为当前流程触发方式为模型触发时对应的触发模型 * 对应返回不影响流程上下文 * @param data 入参为触发时的业务数据,数据的JsonString * @return */ @Function public XXX reject(String data) { // TODO: 根据实际的业务逻辑把data转换为对象 WorkRecord workRecord = JsonUtils.parseObject(data, new TypeReference<WorkRecord>(){}); // TODO: 增加自定义业务逻辑 return new XXX(); } 1.4 工作流【同意】回调钩子 使用方式:把该方法放置到XXX模型的Action下面,或@Fun(XXX.MODEL_MODEL)触发方式:流程待办进行同意操作时调用入口:pro.shushi.pamirs.workflow.app.core.util.ArtificialTaskUtils @Function(summary = "发起的审批同意时会自动调用此方法") @Function.Advanced(displayName = "审批同意") public Teacher agree(String data) { // TODO: 根据实际的业务逻辑把data转换为对象 // WorkRecord workRecord = JsonUtils.parseObject(data, new TypeReference<WorkRecord>(){}); // TODO: 增加自定义业务逻辑 return new Teacher(); } 1.4 工作流【反悔】回调钩子 使用方式:把该方法放置到XXX模型的Action下面,或@Fun(XXX.MODEL_MODEL)触发方式:流程待办进行反悔操作时使用场景:流程待办进行反悔操作时,需要额外更改其他的业务数据逻辑时可用该回调钩子。 注意:该函数的namespace需要设置为流程触发模型。 调用入口:pro.shushi.pamirs.workflow.app.core.service.operator.ArtificialRetractOperatorService @Function @Function.fun(WorkflowBizCallConstants.retract) public void retract(WorkflowUserTask workflowUserTask) { // 获取流程实例 workflowUserTask.fieldQuery(WorkflowUserTask::getInstance); WorkflowInstance instance = workflowUserTask.getInstance(); // 获取用户任务实例 WorkflowUserInstance userInstance = new WorkflowUserInstance() .setId(workflowUserTask.getWorkflowUserInstanceId()) .queryById(); // 反悔的用户id…

    2023年11月15日
    1.3K00
  • 自定义RSQL占位符(placeholder)及在权限中使用

    1 自定义RSQL占位符常用场景 统一的数据权限配置 查询表达式的上下文变量扩展 2 自定义RSQL的模板 /** * 演示Placeholder占位符基本定义 * * @author Adamancy Zhang at 13:53 on 2024-03-24 */ @Component public class DemoPlaceHolder extends AbstractPlaceHolderParser { private static final String PLACEHOLDER_KEY = "${thisPlaceholder}"; /** * 占位符 * * @return placeholder */ @Override public String namespace() { return PLACEHOLDER_KEY; } /** * 占位符替换值 * * @return the placeholder replace to the value */ @Override protected String value() { return PamirsSession.getUserId().toString(); } /** * 优先级 * * @return execution order of placeholders, ascending order. */ @Override public Integer priority() { return 0; } /** * 是否激活 * * @return the placeholder is activated */ @Override public Boolean active() { return true; } } 注意事项 在一些旧版本中,priority和active可能不起作用,为保证升级时不受影响,请保证该属性配置正确。 PLACEHOLDER_KEY变量表示自定义占位符使用的关键字,需按照所需业务场景的具体功能并根据上下文语义正确定义。 为保证占位符可以被正确替换并执行,所有占位符都不应该出现重复,尤其是不能与系统内置的重复。 3 占位符使用时的优先级问题 多个占位符在进行替换时,会根据优先级按升序顺序执行,如需要指定替换顺序,可使用Spring的Order注解对其进行排序。 import org.springframework.core.annotation.Order; @Order(0) 4 Oinone平台内置的占位符 占位符 数据类型 含义 备注 ${currentUser} String 当前用户ID 未登录时无法使用 ${currentRoles} Set<String> 当前用户的角色ID集合 未登录时无法使用 5 如何覆盖平台内置的占位符? 通过指定占位符的优先级,并定义相同的namespace可优先替换。 6 如何定义会话级别的上下文变量? 在上述模板中,我们使用的是Oinone平台内置的上下文变量进行演示,通常情况下,我们需要根据实际业务场景增加上下文变量,以此来实现所需功能。 下面,我们将根据当前用户获取当前员工ID定义该上下文变量进行演示。 /** * 员工Session * * @author Adamancy Zhang at 14:33 on 2024-03-24 */ @Component public class EmployeeSession implements HookBefore { private static final String SESSION_KEY = "CUSTOM_EMPLOYEE_ID"; @Autowired private DemoEmployeeService demoEmployeeService; public static String getEmployeeId() { return PamirsSession.getTransmittableExtend().get(SESSION_KEY);…

    2024年3月24日
    1.5K00
  • 扩展操作日志字段,实现操作日志界面显示自定义字段

    注:该功能在pamirs-core 4.3.27 / 4.7.8.12以上版本可用 在模块依赖里新增DataAuditModule.MODULE_MODULE模块依赖。 @Module( name = DemoModule.MODULE_NAME, dependencies = { CommonModule.MODULE_MODULE, DataAuditModule.MODULE_MODULE }, displayName = “****”, version = “1.0.0” ) 继承OperationBody模型,设置需要在操作日志中显示的字段,并重写clone方法,设置自定义字段值。用于在计入日志处传递参数。 public class MyOperationBody extends OperationBody { public MyOperationBody(String operationModel, String operationName) { super(operationModel, operationName); } private String itemNames; public String getItemNames() { return itemNames; } public void setItemNames(String itemNames) { this.itemNames = itemNames; } @Override public OperationBody clone() { //设置自定义字段值 MyOperationBody body = OperationBody.transfer(this, new MyOperationBody(this.getOperationModel(), this.getOperationName())); body.setItemNames(this.getItemNames()); return body; } } 继承OperationLog模型,新增需要在操作日志中显示的字段。用于界面展示该自定义字段。 @Model.model(MyOperationLog.MODEL_MODEL) @Model(displayName = “自定义操作日志”, labelFields = {“itemNames”}) public class MyOperationLog extends OperationLog { public static final String MODEL_MODEL = “operation.MyOperationLog”; @Field(displayName = “新增日志字段”) @Field.String private String itemNames; } 定义一个常量 public interface OperationLogConstants { String MY_SCOPE = “MY_SCOPE”; } 在计入日志处,构造出MyOperationBody对象,向该对象中设置自定义日志字段。构造OperationLogBuilder对象并设置scope的值,用于跳转自定义服务实现。 MyOperationBody body = new MyOperationBody(CustomerCompanyUserProxy.MODEL_MODEL, CustomerCompanyUserProxyDataAudit.UPDATE); body.setItemNames(“新增日志字段”); OperationLogBuilder builder = OperationLogBuilder.newInstance(body); //设置一个scope,用于跳转自定义服务实现.OperationLogConstants.MY_SCOPE是常量,请自行定义 builder.setScope(OperationLogConstants.MY_SCOPE); //记录日志 builder.record(data.queryByPk(), data); 实现OperationLogService接口,加上@SPI.Service()注解,并设置常量,一般为类名。定义scope(注意:保持和计入日志处传入的scope值一致),用于计入日志处找到该自定义服务实现。根据逻辑重写父类中方法,便可以扩展操作日志,实现自定义记录了。 @Slf4j @Service @SPI.Service(“myOperationLogServiceImpl”) public class MyOperationLogServiceImpl< T extends D > extends OperationLogServiceImpl< T > implements OperationLogService< T >{ //定义scope,用于计入日志处找到该自定义服务实现 private static final String[] MY_SCOPE = new String[]{OperationLogConstants.MY_SCOPE}; @Override public String[] scopes() { return MY_SCOPE; } //此方法用于创建操作日志 @Override protected OperationLog createOperationLog(OperationBody body, OperationLogConfig config) { MyOperationBody body1 = (MyOperationBody)…

    2024年6月27日 后端
    1.2K00
  • Oinone请求路由源码分析

    通过源码分析,从页面发起请求,如果通过graphQL传输到具体action的链路,并且在这之间做了哪些隐式处理分析源码版本5.1.x 请求流程大致如下: 拦截所有指定的请求 组装成graphQL请求信息 调用graphQL执行 通过hook拦截先执行 RsqlDecodeHook:rsql解密 UserHook: 获取用户信息, 通过cookies获取用户ID,再查表获取用户信息,放到本地Local线程里 RoleHook: 角色Hook FunctionPermissionHook: 函数权限Hook ,跳过权限拦截的实现放在这一层,对应的配置 pamirs: auth: fun-filter: – namespace: user.PamirsUserTransient fun: login #登录 – namespace: top.PetShop fun: action DataPermissionHook: 数据权限hook PlaceHolderHook:占位符转化替换hook RsqlParseHook: 解释Rsql hook SingletonModelUpdateHookBefore 执行post具体内容 通过hook拦截后执行 QueryPageHook4TreeAfter: 树形Parent查询优化 FieldPermissionHook: 字段权限Hook UserQueryPageHookAfter UserQueryOneHookAfter 封装执行结果信息返回 时序图 核心源码解析 拦截所有指定的请求 /pamirs/模块名RequestController @RequestMapping( value = "/pamirs/{moduleName:^[a-zA-Z][a-zA-Z0-9_]+[a-zA-Z0-9]$}", method = RequestMethod.POST ) public String pamirsPost(@PathVariable("moduleName") String moduleName, @RequestBody PamirsClientRequestParam gql, HttpServletRequest request, HttpServletResponse response) { } DefaultRequestExecutor 构建graph请求信息,并调用graph请求 () -> execute(GraphQL::execute, param), param private <T> T execute(BiFunction<GraphQL, ExecutionInput, T> executor, PamirsRequestParam param) { // 获取GraphQL请求信息,包含grapsh schema GraphQL graphQL = buildGraphQL(param); … ExecutionInput executionInput = ExecutionInput.newExecutionInput() .query(param.getQuery()) .variables(param.getVariables().getVariables()) .dataLoaderRegistry(Spider.getDefaultExtension(DataLoaderRegistryApi.class).dataLoader()) .build(); … // 调用 GraphQL的方法execute 执行 T result = executor.apply(graphQL, executionInput); … return result; } QueryAndMutationBinder 绑定graphQL读取写入操作 public static DataFetcher<?> dataFetcher(Function function, ModelConfig modelConfig) { if (isAsync()) { if (FunctionTypeEnum.QUERY.in(function.getType())) { return AsyncDataFetcher.async(dataFetchingEnvironment -> dataFetcherAction(function, modelConfig, dataFetchingEnvironment), ExecutorServiceApi.getExecutorService()); } else { return dataFetchingEnvironment -> dataFetcherAction(function, modelConfig, dataFetchingEnvironment); } } else { return dataFetchingEnvironment -> dataFetcherAction(function, modelConfig, dataFetchingEnvironment); } } private static Object dataFetcherAction(Function function, ModelConfig modelConfig, DataFetchingEnvironment environment) { try { SessionExtendUtils.tagMainRequest(); // 使用共享的请求和响应对象 return Spider.getDefaultExtension(ActionBinderApi.class) .action(modelConfig,…

    2024年8月21日
    5.6K02

Leave a Reply

登录后才能评论