函数之异步执行

总体介绍

异步任务是非常常见的一种开发模式,它在分布式的开发模式中有很多应用场景如:

  • 高并发场景中,我们一般采用把长流程切短,用异步方式去掉可以异步的非关键功能,缩小主流程响应时间,提升用户体验
  • 异构系统的集成调用,通过异步任务完成解耦与自动重试
  • 分布式系统最终一致性的可选方案

本文将介绍oinone是如何结合Spring+TbSchedule来完成异步任务

构建第一个异步任务

新建PetShopService和PetShopServiceImpl

1、 新建PetShopService定义updatePetShops方法

package pro.shushi.pamirs.demo.api.service;

import pro.shushi.pamirs.demo.api.model.PetShop;
import pro.shushi.pamirs.meta.annotation.Fun;
import pro.shushi.pamirs.meta.annotation.Function;

import java.util.List;

@Fun(PetShopService.FUN_NAMESPACE)
public interface PetShopService {
    String FUN_NAMESPACE = "demo.PetShop.PetShopService";

    @Function
    void updatePetShops(List<PetShop> petShops);
}

2、PetShopServiceImpl实现PetShopService接口并在updatePetShops增加@XAsync注解

package pro.shushi.pamirs.demo.core.service;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetShop;
import pro.shushi.pamirs.demo.api.service.PetShopService;
import pro.shushi.pamirs.meta.annotation.Fun;
import pro.shushi.pamirs.meta.annotation.Function;
import pro.shushi.pamirs.trigger.annotation.XAsync;
import java.util.List;

@Fun(PetShopService.FUN_NAMESPACE)
@Component
public class PetShopServiceImpl implements PetShopService {
    @Override
    @Function
    @XAsync(displayName = "异步批量更新宠物商店",limitRetryNumber = 3,nextRetryTimeValue = 60)
    public void updatePetShops(List<PetShop> petShops) {
        new PetShop().updateBatch(petShops);
    }
}

a. displayName = "异步批量更新宠物商店",定义异步任务展示名称
b. limitRetryNumber = 3,定义任务失败重试次数,,默认:-1不断重试
c. nextRetryTimeValue = 60,定义任务失败重试的时间数,默认:3
d. nextRetryTimeUnit,定义任务失败重试的时间单位,默认:TimeUnitEnum.SECOND
e. delayTime,定义任务延迟执行的时间数,默认:0
f. delayTimeUnit,定义任务延迟执行的时间单位,默认:TimeUnitEnum.SECOND

修改PetShopBatchUpdateAction调用异步任务

  1. 引入PetShopService
  2. 修改conform方法,调用petShopService.updatePetShops方法

    package pro.shushi.pamirs.demo.core.action;
    @Model.model(PetShopBatchUpdate.MODEL_MODEL)
    @Component
    public class PetShopBatchUpdateAction {
    @Autowired
    private PetShopService petShopService;
    
    @Action(displayName = "确定",bindingType = ViewTypeEnum.FORM,contextType = ActionContextTypeEnum.SINGLE)
    public PetShopBatchUpdate conform(PetShopBatchUpdate data){
            List<PetShop> shops = ArgUtils.convert(PetShopProxy.MODEL_MODEL, PetShop.MODEL_MODEL,proxyList);
           // 调用异步任务
            petShopService.updatePetShops(shops);
        });
        return data;
    }
    }

不同应用如何隔离执行单元

在schedule跟模块部署一起的时候,多模块独立boot的情况下,需要做必要的配置。如果schedule独立部署则没有必要,因为全部走远程,不存在类找不到的问题

  1. 通过配置pamirs.zookeeper.rootPath,确保两组机器都能覆盖所有任务分片,这样不会漏数据
  2. 通过pamirs.event.schedule.ownSign来隔离。确保两组机器只取各自产生的数据,这样不会重复执行数据
    pamirs:
    zookeeper:
    zkConnectString: 127.0.0.1:2181
    zkSessionTimeout: 60000
    rootPath: /demo
    event:
    enabled: true
    schedule:
      enabled: true
      ownSign: demo
    rocket-mq:
      namesrv-addr: 127.0.0.1:9876

Oinone社区 作者:望闲原创文章,如若转载,请注明出处:https://doc.oinone.top/backend/11480.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

(0)
望闲的头像望闲数式管理员
上一篇 2024年5月25日 pm8:10
下一篇 2024年5月27日 pm8:19

相关推荐

  • 工作流审核撤回/回退/拒绝/同意/反悔钩子使用

    目录 1. 流程撤回、拒绝和回退调用自定义函数1.1 工作流【撤销】回调钩子1.2 撤销【回退】回调钩子1.3 工作流【拒绝】回调钩子1.4 工作流【同意】回调钩子1.4 工作流【反悔】回调钩子1.4 回调钩子在业务系统中的调用示例2. 自定义审批方式、自定义审批节点名称 1.流程撤回、拒绝和回退调用自定义函数 1.1工作流【撤销】回调钩子 使用方式:把该方法放置到XXX模型的Action下面,或@Fun(XXX.MODEL_MODEL)触发方式:当流程实例被撤销时调用入口:pro.shushi.pamirs.workflow.app.core.service.impl.WorkflowInstanceServiceImpl#undoInstance /** * XXX为当前流程触发方式为模型触发时对应的触发模型、 * 对应返回不影响流程上下文 * @param data 入参为触发时的业务数据,数据的JsonString * @return */ @Function public XXX recall(String data) { // TODO: 根据实际的业务逻辑把data转换为对象 WorkRecord workRecord = JsonUtils.parseObject(data, new TypeReference<WorkRecord>(){}); // TODO: 增加自定义业务逻辑 return new XXX(); } 1.2撤销【回退】回调钩子 使用方式:把该方法放置到XXX模型的Action下面,或@Fun(XXX.MODEL_MODEL)触发方式:流程待办进行回退操作时调用入口:pro.shushi.pamirs.workflow.app.core.service.operator.ApprovalFallbackOperatorService /** * XXX为当前流程触发方式为模型触发时对应的触发模型 * 对应返回不影响流程上下文 * @param data 入参为触发时的业务数据,数据的JsonString * @return */ @Function public XXX fallBack(String data) { // TODO: 根据实际的业务逻辑把data转换为对象 WorkRecord workRecord = JsonUtils.parseObject(data, new TypeReference<WorkRecord>(){}); // TODO: 增加自定义业务逻辑 return new XXX(); } 1.3工作流【拒绝】回调钩子 使用方式:把该方法放置到XXX模型的Action下面,或@Fun(XXX.MODEL_MODEL)触发方式:流程待办进行拒绝操作时调用入口:pro.shushi.pamirs.workflow.app.core.service.operator.ApprovalFallbackOperatorService /** * XXX为当前流程触发方式为模型触发时对应的触发模型 * 对应返回不影响流程上下文 * @param data 入参为触发时的业务数据,数据的JsonString * @return */ @Function public XXX reject(String data) { // TODO: 根据实际的业务逻辑把data转换为对象 WorkRecord workRecord = JsonUtils.parseObject(data, new TypeReference<WorkRecord>(){}); // TODO: 增加自定义业务逻辑 return new XXX(); } 1.4 工作流【同意】回调钩子 使用方式:把该方法放置到XXX模型的Action下面,或@Fun(XXX.MODEL_MODEL)触发方式:流程待办进行同意操作时调用入口:pro.shushi.pamirs.workflow.app.core.util.ArtificialTaskUtils @Function(summary = "发起的审批同意时会自动调用此方法") @Function.Advanced(displayName = "审批同意") public Teacher agree(String data) { // TODO: 根据实际的业务逻辑把data转换为对象 // WorkRecord workRecord = JsonUtils.parseObject(data, new TypeReference<WorkRecord>(){}); // TODO: 增加自定义业务逻辑 return new Teacher(); } 1.4 工作流【反悔】回调钩子 使用方式:把该方法放置到XXX模型的Action下面,或@Fun(XXX.MODEL_MODEL)触发方式:流程待办进行反悔操作时使用场景:流程待办进行反悔操作时,需要额外更改其他的业务数据逻辑时可用该回调钩子。 注意:该函数的namespace需要设置为流程触发模型。 调用入口:pro.shushi.pamirs.workflow.app.core.service.operator.ArtificialRetractOperatorService @Function @Function.fun(WorkflowBizCallConstants.retract) public void retract(WorkflowUserTask workflowUserTask) { // 获取流程实例 workflowUserTask.fieldQuery(WorkflowUserTask::getInstance); WorkflowInstance instance = workflowUserTask.getInstance(); // 获取用户任务实例 WorkflowUserInstance userInstance = new WorkflowUserInstance() .setId(workflowUserTask.getWorkflowUserInstanceId()) .queryById(); // 反悔的用户id…

    2023年11月15日
    1.2K00
  • Excel导入扩展点-整体导入(批量导入)

    1、【导入】在有些场景,需要获取Excel导入的整体数据,进行批量的操作或者校验 可以通过实现导入扩展点的方式实现,入参data是导入Excel的数据列表;业务可以根据实际情况进行数据校验 1)Excel模板定义,需要设置setEachImport(false) 2)导入扩展点API定义 pro.shushi.pamirs.file.api.extpoint.ExcelImportDataExtPoint#importData 3)示例代码参考: pro.shushi.pamirs.translate.extpoint.ResourceTranslationImportExtPoint#importData @Slf4j @Component @Ext(ExcelImportTask.class) public class ResourceTranslationImportExtPoint extends AbstractExcelImportDataExtPointImpl<List<ResourceTranslationItem>> { @Override //TODO 表达式,可以自定义,比如可以支持1个模型的多个【导入名称】的不同模板 @ExtPoint.Implement(expression = "importContext.definitionContext.model==\"" + ResourceTranslation.MODEL_MODEL + "\"") public Boolean importData(ExcelImportContext importContext, List<ResourceTranslationItem> dataList) { //TODO dataList就是excel导入那个sheet的所有内容 return true; } } 2、【导入】逐行导入的时候做事务控制 在模板中定义中增加事务的定义,并设置异常后回滚。参加示例代码: excel模板定义 @Component public class DemoItemImportTemplate implements ExcelTemplateInit { public static final String TEMPLATE_NAME = "商品导入模板"; @Override public List<ExcelWorkbookDefinition> generator() { //定义事务(导入处理中,只操作单个表的不需要事务定义。) //是否定义事务根据实际业务逻辑确定。比如:有些场景在导入前需要删除数据后在进行导入就需要定义事务 InitializationUtil.addTxConfig(DemoItem.MODEL_MODEL, ExcelDefinitionContext.EXCEL_TX_CONFIG_PREFIX + TEMPLATE_NAME); return Collections.singletonList( ExcelHelper.fixedHeader(DemoItem.MODEL_MODEL, TEMPLATE_NAME) .setType(ExcelTemplateTypeEnum.IMPORT) .createSheet("商品导入-sheet1") .createBlock(DemoItem.MODEL_MODEL) .addUnique(DemoItem.MODEL_MODEL,"name") .addColumn("name","名称") .addColumn("description","描述") .addColumn("itemPrice","单价") .addColumn("inventoryQuantity","库存") .build().setEachImport(true) //TODO 设置异常后回滚的标识,这个地方会回滚事务 .setHasErrorRollback(true) .setExcelImportMode(ExcelImportModeEnum.SINGLE_MODEL) ); } } 导入逻辑处理 @Slf4j @Component @Ext(ExcelImportTask.class) public class DemoItemImportExtPoint extends AbstractExcelImportDataExtPointImpl<DemoItem> implements ExcelImportDataExtPoint<DemoItem> { @Autowired private DemoItemService demoItemService; @Override @ExtPoint.Implement(expression = "importContext.definitionContext.model == \"" + DemoItem.MODEL_MODEL + "\"") public Boolean importData(ExcelImportContext importContext, DemoItem data) { ExcelImportTask importTask = importContext.getImportTask(); try { DemoItemImportTask hrExcelImportTask = new DemoItemImportTask().queryById(importTask.getId()); String publishUserName = Optional.ofNullable(hrExcelImportTask).map(DemoItemImportTask::getPublishUserName).orElse(null); data.setPublishUserName(publishUserName); demoItemService.create(data); } catch(PamirsException e) { log.error("导入异常", e); } catch (Exception e) { log.error("导入异常", e); } return Boolean.TRUE; } }

    2023年12月7日
    1.4K00
  • 如何使用GQL工具正确发起请求

    简介 本文将讲解一下如何正确发起GQL请求和GQL工具使用过程中的常见问题。 参数介绍 请求url和请求方法在浏览器的请求标头里面可以查到,保持一致就可以向服务正常发送请求,但还缺少请求体确定请求哪个接口。 每个请求都包括两部分内容:1. Query 2. Variables右键Query和Variables复制值直接拷贝到工具中就可以正常请求了。 注意:如果使用admin账号登录,请求的时候可以不携带Variables参数,因为admin没有权限控制,如果使用其他用户,就必须携带Variables参数,否则会被权限拦截。

    2024年10月25日
    1.1K00
  • 低无一体使用 (后端)

    低无一体使用 (后端) 低无一体应用 打开低无一体应用。 选择应用模块 在选择模块选择框中,下拉选择需要使用低无一体的应用模块。 生成SDK 点击生成SDK, 生成当前选择应用模块的低无一体SDK。 点击之后的系统消息 提示"生成SDK成功",表示操作完成。 生成扩展工程 点击下载扩展工程模板, 生成当前选择应用模块的低无一体SDK。 点击之后的系统消息 提示"下载扩展工程模板成功",表示操作完成。 之后刷新页面 下载扩展工程 使用系统消息中的链接或者详情页中的下载地址下载扩展工程 扩展工程结构概览 自定义Action示例 import org.springframework.stereotype.Component; import pro.shushi.oinone.stand.testExt.model.Model0000000001; import pro.shushi.pamirs.meta.annotation.Action; import pro.shushi.pamirs.meta.annotation.Function; import pro.shushi.pamirs.meta.annotation.Model; import pro.shushi.pamirs.meta.api.dto.condition.Pagination; import pro.shushi.pamirs.meta.api.dto.wrapper.IWrapper; import pro.shushi.pamirs.meta.constant.FunctionConstants; import pro.shushi.pamirs.meta.enmu.FunctionOpenEnum; import pro.shushi.pamirs.meta.enmu.FunctionTypeEnum; /** * Model0000000001Action * * @author yakir on 2025/01/20 14:59. */ @Component @Model.model(Model0000000001.MODEL_MODEL) public class Model0000000001Action { @Function.Advanced(type = FunctionTypeEnum.QUERY) @Function.fun(FunctionConstants.queryPage) @Function(openLevel = {FunctionOpenEnum.API}) public Pagination<Model0000000001> queryPage(Pagination<Model0000000001> page, IWrapper<Model0000000001> queryWrapper) { return new Model0000000001().queryPage(page, queryWrapper); } @Action(displayName = "sayHello") @Action.Advanced(type = FunctionTypeEnum.QUERY) public Model0000000001 sayHello(Model0000000001 query) { query.setName(query.getName() + System.currentTimeMillis()); return query; } } 注意事项 ⚠️⚠️⚠️ Oinone底层依赖版本与设计器和业务应用一致 (参考 版本更新日志 ) 扩展工程如需独立启动, 手动修改application.yml中安装模块和pom.xml中模块jar的依赖配置

    2025年2月17日
    87700
  • Oinone协同开发源码分析

    前提 源码分析版本是 5.1.x版本 什么是协同开发模式 协同开发模式解决的是不同开发,在开发同一个模型时,不会相互影响,也不会影响到测试环境详见:Oinone协同开发使用手册 协同开发原理 在协同模式下,本地开发的元数据,配置pamirs.data.distribution.session.ownSign参数后,元数据前缀加ownSign值,然后只存在redis缓存,不落库。其它环境无法直接访问到该数据。测试环境,或其它环境访问,需要在url上加ownSign等于设置的,则读redis数据时,除了加载通用数据,也会合并ownSign前缀的redis数据,显示出来 注意事项 协同开发仅支持界面设计器,其他设计器均不支持 不支持权限配置 不支持工作流触发 版本支持 完整支持5.1.0及以上 功能详解 启动时操作 做元数据保护检查 配置ownSign,则key拼接为 ownSign + ‘:’ + key 清除掉ownSign的redis缓存数据;非ownSign不用清理 计算差量数据 有差量数据,放入ownSign标识数据,并清理本地标识 dubbo注册服务,group拼接group + ownSign 后进行注册 读取时操作 读本地 组装key: ownSign + ‘:’ + key 本地缓存有数据,更新缓存本地数据,返回 本地没有数据,读redis,并插入本地缓存 读远程 dubbo注册消费者,group拼接group + ownSign 后进行泛化调用 元数据保护检查 开启数据保护模式,在启动参数里加-PmetaProtected=pamirs 会在启动时,往redis里写入数据 private static final String META_PROTECTED_KEY = “pamirs:check:meta-protected”; private void writeMetaProtected(String metaProtected) { stringRedisTemplate.opsForValue().set(META_PROTECTED_KEY, metaProtected); } 如果同时又设置 pamirs.data.distribution.session.ownSign则会报错 在使用元数据保护模式下,不允许设置 [pamirs.distribution.session.ownSign] 处理逻辑如下 看redis是否启用保护标识的值 获取pamirs.distribution.session.ownSign配置 没有启动参数 且redis没有值,则retrun 如果有启动参数且配置了ownSign,报错 在使用元数据保护模式下,不允许设置 [pamirs.distribution.session.ownSign] 如果有启动参数且 redis没有值或启动参数设置 -P metaForceProtected,则写入redis 如果有启动参数, 且启动参数跟redis值不同,则报错[公共环境开启了元数据保护模式,本地开发环境需配置[pamirs.distribution.session.ownSign]] 如果没有启动参数且redis有值,但没有配置ownSign 报错[公共环境开启了元数据保护模式,本地开发环境需配置[pamirs.distribution.session.ownSign]] 核心代码如下MetadataProtectedChecker public void process(AppLifecycleCommand command, Set<String> runModules, List<ModuleDefinition> installModules, List<ModuleDefinition> upgradeModules, List<ModuleDefinition> reloadModules) { String currentMetaProtected = stringRedisTemplate.opsForValue().get(META_PROTECTED_KEY); String metaProtected = getMetaProtected(); boolean hasCurrentMetaProtected = StringUtils.isNotBlank(currentMetaProtected); boolean hasMetaProtected = StringUtils.isNotBlank(metaProtected); if (!hasCurrentMetaProtected && !hasMetaProtected) { return; } if (hasMetaProtected) { if (Spider.getDefaultExtension(SessionFillOwnSignApi.class).handleOwnSign()) { // 如果有启动参数且配置了ownSign throw new UnsupportedOperationException(“在使用元数据保护模式下,不允许设置 [pamirs.distribution.session.ownSign]”); } if (!hasCurrentMetaProtected || isForceProtected()) { writeMetaProtected(metaProtected); } else if (!metaProtected.equals(currentMetaProtected)) { // 如果有启动参数, 且启动参数跟redis值不同 throw unsupportedLocalOperation(); } } else { if (Spider.getDefaultExtension(SessionFillOwnSignApi.class).handleOwnSign()) { return; } // 没有启动参数且redis有值,但没有配置ownSign 报错 throw unsupportedLocalOperation(); } } 取ownSign方式 看header是否有ownSign这个标识 header没有,则从配置里取,并放到header里 ownSign的获取核心代码 CdDistributionSessionFillOwnSignApi @Override public String getCdOwnSign() { String cdOwnSign = null; // 看header是否有ownSign这个标识…

    2024年9月12日
    1.5K00

Leave a Reply

登录后才能评论