Oinone远程调用链路源码分析

前提

源码分析版本是 5.1.x版本

概要

在服务启动时,获取注解REMOTE的函数,通过dubbo的泛化调用发布。在调用函数时,通过dubbo泛化调用获取结果。

注册服务者

  1. 在spring 启动方法installOrLoad中初始化
  2. 寻找定义REMOTE的方法
  3. 组装dubbo的服务配置
  4. 组装服务对象实现引用,内容如下,用于注册
    • 调用前置处理
      • 放信息到SessionApi
      • 函数调用链追踪,放到本地TransmittableThreadLocal
      • 从redis中获取到的数据进行反序列化并存在到本地的线程里
      • Trace信息,放一份在sessionApi中 和ThreadLocal
    • 调用函数执行
    • 返回数据转成特定格式
  5. 通过线程组调用dubbo的ServiceConfig.export 服务发布

时序图

Oinone远程调用链路源码分析
注册

源码分析

根据条件判断,确定向dubbo进行服务发布
RemoteServiceLoader

public void publishService(List<FunctionDefinition> functionList,Map<String,Runnable> isPublished) {
        // 因为泛化接口只能控制到namespace,控制粒度不能到fun级别,这里进行去重处理
        Map<String, Function> genericNamespaceMap = new HashMap<>();
        for (FunctionDefinition functionDefinition : functionList) {
            Function function = new Function(functionDefinition)

            try {
               //定义REMOTE, 才给予远程调用
                if (FunctionOpenEnum.REMOTE.in(function.getOpen()) && !ClassUtils.isInterface(function.getClazz())) {
                    genericNamespaceMap.putIfAbsent(RegistryUtils.getRegistryInterface(function), function);
                }
            } catch (PamirsException e) {
            }
        }
        // 发布远程服务
        for (String namespace : genericNamespaceMap.keySet()) {
            Function function = genericNamespaceMap.get(namespace);
            if(isPublished.get(RegistryUtils.getRegistryInterface(function)) == null){
                // 发布,注册远程函数服务,底层使用dubbo的泛化调用
                Runnable registryTask = () -> remoteRegistry.registryService(function);
                isPublished.put(RegistryUtils.getRegistryInterface(function),registryTask);
            }else{

            }
        }
    }

构造ServiceConfig方法,设置成泛化调用,进行发布export()
DefaultRemoteRegistryComponent

     public void registryGenericService(String interfaceName, List<MethodConfig> methods,
                                       String group, String version, Integer timeout, Integer retries) {
        ....
        try {
            ServiceConfig<GenericService> service = new ServiceConfig<>();
            // 服务接口名
            service.setInterface(interfaceName);
            // 服务对象实现引用
            service.setRef(genericService(interfaceName));
            if (null != methods) {
                service.setMethods(methods);
            }
            // 声明为泛化接口
            service.setGeneric(Boolean.TRUE.toString());
            // 基础元数据
            constructService(group, version, timeout, retries, service);
            service.export();
        } catch (Exception e) {
           .....
        }
    }

// 服务对象实现引用
private GenericService genericService(String interfaceName) {
        return (method, parameterTypes, args) -> {
            PamirsSession.clear();
            Function function = Objects.requireNonNull(PamirsSession.getContext()).getFunction(RegistryUtils.getFunctionNamespace(method), RegistryUtils.getFunctionFun(method));
            if (log.isDebugEnabled()) {
                log.debug("interfaceName: " + interfaceName + ", isDataManage: " + function.isDataManager());
            }
            try {
                //前置处理:服务提供者,对请求参数进行对象化拆解,并对请求携带的上下文进行处理
                // 放信息到SessionApi
                // 函数调用链追踪,放到本地TransmittableThreadLocal
                // CommonMetaDataCacheApi.computeMetaData() 从redis中获取到的数据进行反序列化并存在到本地的TL中
                // DataAuditApi.computeDataAuditSession() Trace信息,放一份在sessionApi中 和ThreadLocal中
                Object[] args1 = Spider.getDefaultExtension(RemoteRequestArgApi.class).providerHandle(function.getNamespace(), function.getFun(), args, function.getArguments());

                Object result = FunEngine.get().exclude(ScriptType.REMOTE).run(function, args1);

                //后置处理:服务提供者,对结果进行对象化封装、携带请求上下文进行处理
                return Spider.getDefaultExtension(RemoteResponseApi.class).providerHandle(function, method, result);
            } catch (Throwable e) {
                return Spider.getDefaultExtension(RemoteResponseApi.class).providerExceptionHandle(function, method, e);
            } finally {
                PamirsSession.clear();
            }
        };
    }

注册消费者

  1. 函数处理调用
  2. 注册服务消费者
    • 从ReferenceConfigCache获取泛化
  3. 调用dubbo泛化调用接口
  4. 获取返回信息
    • 获取用户id,放入PamirsSession
    • 如果开启debug模式
      • 存入DEBUG_THREAD_LOCAL本地线程
    • 返回格式
      • IWrapper
      • Pagination
      • Result

时序图

Oinone远程调用链路源码分析

源码分析

泛化调用dobbo接口,并解析返回对象
RemoteComputer

public Object compute(Function function, Object... args) {

        .....
        List<Arg> functionArguments = function.getArguments();
        String methodName = RegistryUtils.getGenericServiceMethodName(function);
        String[] argTypes = FunctionUtils.fetchArgTypes(functionArguments);
        //前置处理:服务消费者,对请求参数进行对象化封装、携带请求上下文进行处理
        Object[] arguments = getRemoteRequestApi().consumerHandle(function.getNamespace(),function.getFun(), args, functionArguments);
        // 泛化调用
        Object result = invoke(function, methodName, argTypes, arguments);
        .....
        //后置处理:服务消费者,对返回结果进行对象化拆解,并对结果携带的上下文进行处理
        // 数据转成IWrapper/Pagination/Result
        return getRemoteResponseApi().consumerHandle(function, result);
}

// 配置请求信息,通过$invoke 实际调用
private Object invoke(Function function, String methodName, String[] argTypes, Object[] arguments) {
        Object result;
        String configCdOwnSign = getSessionFillOwnSignApi().getConfigCdOwnSign();
        if (StringUtils.isBlank(configCdOwnSign)) {
.....
        } else {
            try {
            // 获取服务,由于是泛化调用,所以获取的一定是GenericService类型
                GenericService remoteClient = CommonApiFactory.getApi(RemoteRegistry.class).registryOriginConsumer(function);
         // 第一个参数是需要调用的方法名
         // 第二个参数是需要调用的方法的参数类型数组,为String数组,里面存入参数的全类名。
         // 第三个参数是需要调用的方法的参数数组,为Object数组,里面存入需要的参数
                result = remoteClient.$invoke(methodName, argTypes, arguments);
            } catch (RpcException e) {
                ....            }
        }
        return result;
    }

从缓存中获取泛化
DefaultRemoteRegistryComponent

    public GenericService registryGenericConsumer(String interfaceName, List<MethodConfig> methods,
                                                  String group, String version, Integer timeout, Integer retries) {
        ....
        // 创建服务引用配置
        ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
        reference.setInterface(interfaceName);
        // 设置为泛化调用
        reference.setGeneric(Boolean.TRUE.toString());
        if (null != methods) {
            reference.setMethods(methods);
        }
        constructReference(group, version, timeout, retries, reference);
        return ReferenceConfigCache.getCache().get(reference);
    }

名词解释

泛化调用是指在调用方没有服务方提供的API(SDK)的情况下,对服务方进行调用,并且可以正常拿到调用结果
泛化调用(客户端泛化)
实现泛化实现(服务端泛化)

Oinone社区 作者:oinone原创文章,如若转载,请注明出处:https://doc.oinone.top/backend/17027.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

(0)
oinone的头像oinone
上一篇 2024年9月3日 pm12:53
下一篇 2024年9月5日 pm8:13

相关推荐

  • 如何在代码中使用自增ID和获取序列

    在使用继承IDModel或CodeModel时,id和code是系统默认自动生成, 默认值规则:ID–>分布式ID; CODE–>根据定义的SequenceConfig规则自动生成。 在特定情况下需要落库前先生成ID或者Code,这些场景下可参照如下代码示例 一、使用自增ID 单个字段设置方式 // 主键字段,可以使用mysql的自增能力 @Field.Integer @Field.PrimaryKey(keyGenerator = KeyGeneratorEnum.AUTO_INCREMENT) @Field.Advanced(batchStrategy = FieldStrategyEnum.NEVER) @Field(displayName = "id", summary = "Id字段,⾃增") private Long id; @Field.Integer @Field(displayName = "自增版本") @Field.Sequence(sequence = "SEQ", initial = 1) private Long version; 全局设置方式 该方式会作用到每一个存储模型的id字段,在application.yml配置文件中修改id的生成规则,查找配置项关键字key-generator,默认为DISTRIBUTION(分布式id),可修改为 AUTO_INCREMENT(自增id) 二、手动方式获取序列 获取方式示例1 /** * 在特定场景下需要手动生成Id或者code时,可参照这个示例 */ public void manualSetIdCode(){ DemoItem demoItem = new DemoItem(); //手动生成ID和code Object idObj = Spider.getDefaultExtension(IdGenerator.class).generate(PamirsTableInfo.fetchKeyGenerator(DemoItem.MODEL_MODEL)); demoItem.setId(TypeUtils.createLong(idObj)); Object codeObj = CommonApiFactory.getSequenceGenerator().generate("SEQ",DemoItem.MODEL_MODEL); String code = TypeUtils.stringValueOf(codeObj); demoItem.setCode(code); //…… } 获取方式示例2 1、在系统启动的时候初始化SequenceConfig package pro.shushi.pamirs.demo.core.init; import org.springframework.stereotype.Component; import pro.shushi.pamirs.boot.common.api.command.AppLifecycleCommand; import pro.shushi.pamirs.boot.common.extend.MetaDataEditor; import pro.shushi.pamirs.core.common.InitializationUtil; import pro.shushi.pamirs.demo.api.DemoModule; import pro.shushi.pamirs.demo.core.constant.SeqConstants; import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j; import pro.shushi.pamirs.meta.api.dto.meta.Meta; import pro.shushi.pamirs.meta.enmu.SequenceEnum; import java.util.Map; /** * DemoMetadataEditor */ @Slf4j @Component public class DemoMetadataEditor implements MetaDataEditor { @Override public void edit(AppLifecycleCommand command, Map<String, Meta> metaMap) { InitializationUtil util = InitializationUtil.get(metaMap, DemoModule.MODULE_MODULE, DemoModule.MODULE_NAME); if (util == null) { log.error("获取初始化序列失败"); return; } bizSequence(util); } private void bizSequence(InitializationUtil util) { util.createSequenceConfig("申请单编码生成", SeqConstants.NABEL_SAMPLE_APPLY_SEQ, SequenceEnum.ORDERLY_SEQ, 8) .setStep(1) .setInitial(80000000L) .setIsRandomStep(false); util.createSequenceConfig("订单编码生成", SeqConstants.NABEL_SAMPLE_ORDER_SEQ_YP, SequenceEnum.ORDERLY_SEQ, 8) .setPrefix("YP") .setStep(1) .setInitial(80000000L) .setIsRandomStep(false); } } 2、在代码中使用序列 public static String getSaleOrderCode() { Object sequence = CommonApiFactory.getSequenceGenerator().generate(SequenceEnum.ORDERLY_SEQ.value(), SeqConstants.NABEL_SAMPLE_STRUCTURE_SEQ); return TypeUtils.stringValueOf(sequence); } public static String getApplyOrderCode(String prefix) { Object sequence = CommonApiFactory.getSequenceGenerator().generate(SequenceEnum.ORDERLY_SEQ.value(), SeqConstants.NABEL_SAMPLE_APPLY_SEQ); return…

    2024年5月25日
    1.7K00
  • 【后端】项目开发后端知识要点地图

    目录 工程结构篇 协议篇 GraphQL请求:后端接口实现逻辑解析 基本功能及配置篇 Dubbo Dubbo配置详解 Nacos Oinone项目引入Nacos作为注册中心 Oinone项目引入Nacos作为配置中心 Nacos做为注册中心调用其他系统的SpringCloud服务 OSS OSS(CDN)配置和文件系统的一些操作 MINIO无公网访问地址下OSS的配置 Trigger/Async/Schedule 函数之触发与定时配置和示例 函数之异步执行 Excel导入/导出(file) Excel批量导入 【Excel导入/导出】多Sheet导入导出示例 如何自定义Excel导入功能 如何自定义Excel导出功能 Excel导入导出模板翻译 Expression(表达式) 扩展内置函数表达式 ShardingJDBC(分库分表) 分库分表与自定义分表规则 Elasticsearch(ES) Oinone引入搜索引擎(增强模型) 引入搜索(增强模型Channel)常见问题解决办法 数据库方言配置(Dialect) 【DM】后端部署使用Dameng数据库(达梦) 【PostgreSQL】后端部署使用PostgreSQL数据库(PGSQL) 【OpenGauss】后端部署使用OpenGauss数据库(高斯) 【MSSQL】后端部署使用MSSQL数据库(SQLServer) 【KDB】后端部署使用Kingbase数据库(人大金仓/电科金仓) 【Oracle】后端部署使用Oracle数据库 【OceanBase】后端部署使用 OceanBase 数据库(海扬/OB) 其他功能使用文档 框架之MessageHub(信息提示) DsHint(指定数据源)和BatchSizeHint(指定批次数量) IWrapper、QueryWrapper和LambdaQueryWrapper使用 查询时自定义排序字段和排序规则 如何在代码中使用自增ID和获取序列 非存储字段搜索,适应灵活的搜索场景 如何使用位运算的数据字典 全局首页及应用首页配置方法(homepage) 如何增加用户中心的菜单 自定义RSQL占位符(placeholder)及在权限中使用 Function、Action函数使用规范 特定场景解决方案 Oinone连接外部数据源方案 如何自定义SQL(Mapper)语句 工程部署 后端部署 Oinone平台部署及依赖说明 v4.7 v5.0 v5.1 v5.3 v6.2 Oinone License 许可证使用常见问题 后端无代码设计器Jar包启动方法 Oinone环境保护(v5.2.3以上) 设计器部署 Oinone设计器部署参数说明 Oinone离线部署设计器镜像 Oinone离线部署设计器JAR包 Docker部署常见问题 其他环境部署 东方通Web和Tomcat部署Oinone项目 可视化调试工具 Oinone平台可视化调试工具 协同开发 Oinone协同开发使用手册 工作流 项目中工作流引入和流程触发 【工作流】流程扩展自定义函数示例代码汇总 工作流-流程代办等页面自定义 工作流审核撤回/回退/拒绝钩子使用 如何添加工作流运行时依赖(前后端) 数据可视化运行时 如何添加数据可视化运行时依赖 界面设计器 如何实现页面间的跳转 界面设计器的导入导出 流程设计器 流程设计器的导入导出 数据可视化 数据可视化-项目中如何引用图表、报表、大屏 数据可视化中图表的低无一体 数据可视化-如何自定义查询数据方法 数据可视化的导入导出 其他 EIP开放接口使用MD5验签发起请求(v5.x) 缓存连接由Jedis切换为Lettuce Oinone登录扩展:对接SSO QA 导入设计数据时dubbo超时导入失败

    2024年10月23日
    3.0K00
  • Oinone环境保护(v5.2.3以上)

    概述 Oinone平台为合作伙伴提供了环境保护功能,以确保在一套环境可以在较为安全前提下修改配置文件,启动多个JVM等部署操作。 本章内容主要介绍与环境保护功能相关的启动参数。 名词解释 本地开发环境:开发人员在本地启动业务工程的环境 公共环境:包含设计器镜像和业务工程的环境 环境保护参数介绍 【注意】参数是加在程序实参 (Program arguments)上,通常可能错误的加在Active Profiles上了 -PenvProtected=${value} 是否启用环境保护,默认为true。 环境保护是通过与最近一次保存在数据库的base_platform_environment表中数据进行比对,并根据每个参数的配置特性进行判断,在启动时将有错误的内容打印在启动日志中,以便于开发者进行问题排查。 除此之外,环境保护功能还提供了一些生产配置的优化建议,开发者可以在启动时关注这些日志,从而对生产环境的配置进行调优。 -PsaveEnvironments=${value} 是否将此次启动的环境参数保存到数据库,默认为true。 在某些特殊情况下,为了避免公共环境中的保护参数发生不必要的变化,我们可以选择不保存此次启动时的配置参数到数据库中,这样就不会影响其他JVM启动时发生校验失败而无法启动的问题。 -PstrictProtected=${value} 是否使用严格校验模式,默认为false 通常我们建议在公共环境启用严格校验模式,这样可以最大程度的保护公共环境的元数据不受其他环境干扰。 PS:在启用严格校验模式时,需避免内外网使用不同连接地址的场景。如无法避免,则无法启用严格校验模式。 常见问题 需要迁移数据库,并更换了数据库连接地址该如何操作? 将原有数据库迁移到新数据库。 修改配置文件中数据库的连接地址。 在启动脚本中增加-PenvProtected=false关闭环境保护。 启动JVM服务可以看到有错误的日志提示,但不会中断本次启动。 移除启动脚本中的-PenvProtected=false或将值改为true,下次启动时将继续进行环境保护检查。 可查看数据库中base_platform_environment表中对应数据库连接配置已发生修改,此时若其他JVM在启动前未正确修改,则无法启动。 本地开发时需要修改Redis连接地址到本地,但希望不影响公共环境的使用该如何操作? PS:由于Redis中的元数据缓存是根据数据库差量进行同步的,此操作会导致公共环境在启动时无法正确刷新Redis中的元数据缓存,需要配合pamirs.distribution.session.allMetaRefresh参数进行操作。如无特殊必要,我们不建议使用该形式进行协同开发,多次修改配置会导致出错的概率增加。 本地环境首次启动时,除了修改Redis相关配置外,还需要配置pamirs.distribution.session.allMetaRefresh=true,将本地新连接的Redis进行初始化。 在本地启动时,增加-PenvProtected=false -PsaveEnvironments=false启动参数,以确保本地启动不会修改公共环境的配置,并且可以正常通过环境保护检测。 本地环境成功启动并正常开发功能后,需要发布到公共环境进行测试时,需要先修改公共环境中业务工程配置pamirs.distribution.session.allMetaRefresh=true后,再启动业务工程。 启动一次业务工程后,将配置还原为pamirs.distribution.session.allMetaRefresh=false。

    2024年10月21日
    1.1K00
  • 如何扩展自有的文件存储系统

    介绍 数式Oinone默认提供了阿里云、腾讯云、华为云、又拍云、Minio和本地文件存储这几种文件存储系统,如果我们有其他的文件存储系统需要对接,或者是扩展现有的文件系统,可以通过SPI继承AbstractFileClient注册新的文件存储系统。 代码示例 这里以扩展自有的本地文件系统为例 继承了内置的本地文件存储LocalFileClient,将其中上传文件的方法重写 package pro.shushi.pamirs.demo.core.file; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.support.StandardMultipartHttpServletRequest; import pro.shushi.pamirs.framework.connectors.cdn.client.LocalFileClient; import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j; import pro.shushi.pamirs.meta.common.spi.SPI; import javax.servlet.http.HttpServletRequest; @Slf4j @Component // 注册新的文件存储系统类型 @SPI.Service(DemoLocalFileClient.TYPE) @RestController @RequestMapping("/demo_file") public class DemoLocalFileClient extends LocalFileClient { public static final String TYPE = "DEMO_LOCAL"; @Override public CdnFileForm getFormData(String fileName) { CdnConfig cdnConfig = getCdnConfig(); CdnFileForm fileForm = new CdnFileForm(); String uniqueFileName = Spider.getDefaultExtension(CdnFileNameApi.class).getNewFilename(fileName); String fileKey = getFileKey(cdnConfig.getMainDir(), uniqueFileName); //前端获取uploadUrl,上传文件到该地址 fileForm.setUploadUrl(cdnConfig.getUploadUrl() + "/demo_file/upload"); //上传后,前端将downloadUrl返回给后端 fileForm.setDownloadUrl(getDownloadUrl(fileKey)); fileForm.setFileName(uniqueFileName); Map<String, Object> formDataJson = new HashMap<>(); formDataJson.put("uniqueFileName", uniqueFileName); formDataJson.put("key", fileKey); fileForm.setFormDataJson(JSON.toJSONString(formDataJson)); return fileForm; } @ResponseBody @RequestMapping(value = "/upload", produces = "multipart/form-data;charset=UTF-8",method = RequestMethod.POST) public String uploadFileToLocal(HttpServletRequest request) { MultipartFile file = ((StandardMultipartHttpServletRequest) request).getFile("file"); // 例如可以根据file文件类型判断哪些文件是否可以上传 return super.uploadFileToLocal(request); } } 在application.yml内配置 cdn: oss: name: 本地文件系统 # 这里的type与代码中定义的文件存储系统类型对应 type: DEMO_LOCAL bucket: pamirs uploadUrl: http://127.0.0.1:8190 downloadUrl: http://127.0.0.1:6800 validTime: 3600000 timeout: 600000 active: true referer: localFolderUrl: /Users/demo/workspace/static

    2024年10月24日
    70100

Leave a Reply

登录后才能评论