函数之异步执行

总体介绍

异步任务是非常常见的一种开发模式,它在分布式的开发模式中有很多应用场景如:

  • 高并发场景中,我们一般采用把长流程切短,用异步方式去掉可以异步的非关键功能,缩小主流程响应时间,提升用户体验
  • 异构系统的集成调用,通过异步任务完成解耦与自动重试
  • 分布式系统最终一致性的可选方案

本文将介绍oinone是如何结合Spring+TbSchedule来完成异步任务

构建第一个异步任务

新建PetShopService和PetShopServiceImpl

1、 新建PetShopService定义updatePetShops方法

package pro.shushi.pamirs.demo.api.service;

import pro.shushi.pamirs.demo.api.model.PetShop;
import pro.shushi.pamirs.meta.annotation.Fun;
import pro.shushi.pamirs.meta.annotation.Function;

import java.util.List;

@Fun(PetShopService.FUN_NAMESPACE)
public interface PetShopService {
    String FUN_NAMESPACE = "demo.PetShop.PetShopService";

    @Function
    void updatePetShops(List<PetShop> petShops);
}

2、PetShopServiceImpl实现PetShopService接口并在updatePetShops增加@XAsync注解

package pro.shushi.pamirs.demo.core.service;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetShop;
import pro.shushi.pamirs.demo.api.service.PetShopService;
import pro.shushi.pamirs.meta.annotation.Fun;
import pro.shushi.pamirs.meta.annotation.Function;
import pro.shushi.pamirs.trigger.annotation.XAsync;
import java.util.List;

@Fun(PetShopService.FUN_NAMESPACE)
@Component
public class PetShopServiceImpl implements PetShopService {
    @Override
    @Function
    @XAsync(displayName = "异步批量更新宠物商店",limitRetryNumber = 3,nextRetryTimeValue = 60)
    public void updatePetShops(List<PetShop> petShops) {
        new PetShop().updateBatch(petShops);
    }
}

a. displayName = "异步批量更新宠物商店",定义异步任务展示名称
b. limitRetryNumber = 3,定义任务失败重试次数,,默认:-1不断重试
c. nextRetryTimeValue = 60,定义任务失败重试的时间数,默认:3
d. nextRetryTimeUnit,定义任务失败重试的时间单位,默认:TimeUnitEnum.SECOND
e. delayTime,定义任务延迟执行的时间数,默认:0
f. delayTimeUnit,定义任务延迟执行的时间单位,默认:TimeUnitEnum.SECOND

修改PetShopBatchUpdateAction调用异步任务

  1. 引入PetShopService
  2. 修改conform方法,调用petShopService.updatePetShops方法

    package pro.shushi.pamirs.demo.core.action;
    @Model.model(PetShopBatchUpdate.MODEL_MODEL)
    @Component
    public class PetShopBatchUpdateAction {
    @Autowired
    private PetShopService petShopService;
    
    @Action(displayName = "确定",bindingType = ViewTypeEnum.FORM,contextType = ActionContextTypeEnum.SINGLE)
    public PetShopBatchUpdate conform(PetShopBatchUpdate data){
            List<PetShop> shops = ArgUtils.convert(PetShopProxy.MODEL_MODEL, PetShop.MODEL_MODEL,proxyList);
           // 调用异步任务
            petShopService.updatePetShops(shops);
        });
        return data;
    }
    }

不同应用如何隔离执行单元

在schedule跟模块部署一起的时候,多模块独立boot的情况下,需要做必要的配置。如果schedule独立部署则没有必要,因为全部走远程,不存在类找不到的问题

  1. 通过配置pamirs.zookeeper.rootPath,确保两组机器都能覆盖所有任务分片,这样不会漏数据
  2. 通过pamirs.event.schedule.ownSign来隔离。确保两组机器只取各自产生的数据,这样不会重复执行数据
    pamirs:
    zookeeper:
    zkConnectString: 127.0.0.1:2181
    zkSessionTimeout: 60000
    rootPath: /demo
    event:
    enabled: true
    schedule:
      enabled: true
      ownSign: demo
    rocket-mq:
      namesrv-addr: 127.0.0.1:9876

Oinone社区 作者:望闲原创文章,如若转载,请注明出处:https://doc.oinone.top/backend/11480.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

Like (0)
望闲's avatar望闲数式管理员
Previous 2024年5月25日 pm8:10
Next 2024年5月27日 pm8:19

相关推荐

  • 如何通过自定义支持excel导出的动态表头

    介绍 本文需要阅读过前置文档如何自定义Excel导出功能,动态表头的功能在前置文档的基础上做的进一步扩展,本文未提到的部分都参考这个前置文档。 在日常的业务开发中,我们在导出的场景会遇到需要设置动态表头的场景,比如统计商品在最近1个月的销量,固定表头列为商品的名称等基础信息,动态表头列为最近一个月的日期,在导出的时候设置每个日期的销量,本文将通过此业务场景提供示例代码。 1.自定义导出任务模型 package pro.shushi.pamirs.demo.api.model; import pro.shushi.pamirs.file.api.model.ExcelExportTask; import pro.shushi.pamirs.meta.annotation.Model; @Model.model(DemoItemDynamicExcelExportTask.MODEL_MODEL) @Model(displayName = "商品-Excel动态表头导出任务") public class DemoItemDynamicExcelExportTask extends ExcelExportTask { public static final String MODEL_MODEL = "demo.DemoItemDynamicExcelExportTask"; } 2.自定义导出任务处理数据的扩展点 package pro.shushi.pamirs.demo.core.excel.exportdemo.extPoint; import org.springframework.stereotype.Component; import pro.shushi.pamirs.core.common.FetchUtil; import pro.shushi.pamirs.core.common.cache.MemoryIterableSearchCache; import pro.shushi.pamirs.demo.api.model.DemoItem; import pro.shushi.pamirs.file.api.config.FileConstant; import pro.shushi.pamirs.file.api.context.ExcelDefinitionContext; import pro.shushi.pamirs.file.api.enmu.ExcelTemplateTypeEnum; import pro.shushi.pamirs.file.api.entity.EasyExcelCellDefinition; import pro.shushi.pamirs.file.api.extpoint.impl.ExcelExportSameQueryPageTemplate; import pro.shushi.pamirs.file.api.model.ExcelExportTask; import pro.shushi.pamirs.file.api.model.ExcelWorkbookDefinition; import pro.shushi.pamirs.file.api.util.ExcelFixedHeadHelper; import pro.shushi.pamirs.file.api.util.ExcelHelper; import pro.shushi.pamirs.file.api.util.ExcelTemplateInit; import pro.shushi.pamirs.framework.common.entry.TreeNode; import pro.shushi.pamirs.meta.annotation.ExtPoint; import pro.shushi.pamirs.meta.api.CommonApiFactory; import pro.shushi.pamirs.meta.api.core.orm.ReadApi; import pro.shushi.pamirs.meta.api.core.orm.systems.relation.RelationReadApi; import pro.shushi.pamirs.meta.api.dto.config.ModelConfig; import pro.shushi.pamirs.meta.api.dto.config.ModelFieldConfig; import pro.shushi.pamirs.meta.api.session.PamirsSession; import pro.shushi.pamirs.meta.enmu.TtypeEnum; import pro.shushi.pamirs.meta.util.FieldUtils; import java.util.*; @Component public class DemoItemDynamicExportExtPoint extends ExcelExportSameQueryPageTemplate<DemoItem> implements ExcelTemplateInit { public static final String TEMPLATE_NAME ="商品动态导出"; @Override public List<ExcelWorkbookDefinition> generator() { ExcelFixedHeadHelper excelFixedHeadHelper = ExcelHelper.fixedHeader(DemoItem.MODEL_MODEL,TEMPLATE_NAME) .createBlock(TEMPLATE_NAME, DemoItem.MODEL_MODEL) .setType(ExcelTemplateTypeEnum.EXPORT); return Collections.singletonList(excelFixedHeadHelper.build()); } public static void buildHeader(ExcelFixedHeadHelper excelFixedHeadHelper) { excelFixedHeadHelper.addColumn("name","名称") .addColumn("cateName","类目") .addColumn("searchFrom","搜索来源") .addColumn("description","描述") .addColumn("itemPrice","单价") .addColumn("inventoryQuantity","库存"); } @Override @ExtPoint.Implement(expression = "context.model == \"" + DemoItem.MODEL_MODEL+"\" && context.name == \"" +TEMPLATE_NAME+"\"" ) public List<Object> fetchExportData(ExcelExportTask exportTask, ExcelDefinitionContext context) { List<Object> result = super.fetchExportData(exportTask,context); Object block = result.get(0); if (block instanceof ArrayList) { ((List<Object>) block).forEach(o -> { if (o instanceof DemoItem) { DemoItem item = (DemoItem) o; // TODO 设置动态表头部分字段的值 item.get_d().put("2024-09-10", "1111"); item.get_d().put("2024-09-11", "2222"); }…

    2024年9月11日
    4.0K00
  • Oinone请求路由源码分析

    通过源码分析,从页面发起请求,如果通过graphQL传输到具体action的链路,并且在这之间做了哪些隐式处理分析源码版本5.1.x 请求流程大致如下: 拦截所有指定的请求 组装成graphQL请求信息 调用graphQL执行 通过hook拦截先执行 RsqlDecodeHook:rsql解密 UserHook: 获取用户信息, 通过cookies获取用户ID,再查表获取用户信息,放到本地Local线程里 RoleHook: 角色Hook FunctionPermissionHook: 函数权限Hook ,跳过权限拦截的实现放在这一层,对应的配置 pamirs: auth: fun-filter: – namespace: user.PamirsUserTransient fun: login #登录 – namespace: top.PetShop fun: action DataPermissionHook: 数据权限hook PlaceHolderHook:占位符转化替换hook RsqlParseHook: 解释Rsql hook SingletonModelUpdateHookBefore 执行post具体内容 通过hook拦截后执行 QueryPageHook4TreeAfter: 树形Parent查询优化 FieldPermissionHook: 字段权限Hook UserQueryPageHookAfter UserQueryOneHookAfter 封装执行结果信息返回 时序图 核心源码解析 拦截所有指定的请求 /pamirs/模块名RequestController @RequestMapping( value = "/pamirs/{moduleName:^[a-zA-Z][a-zA-Z0-9_]+[a-zA-Z0-9]$}", method = RequestMethod.POST ) public String pamirsPost(@PathVariable("moduleName") String moduleName, @RequestBody PamirsClientRequestParam gql, HttpServletRequest request, HttpServletResponse response) { } DefaultRequestExecutor 构建graph请求信息,并调用graph请求 () -> execute(GraphQL::execute, param), param private <T> T execute(BiFunction<GraphQL, ExecutionInput, T> executor, PamirsRequestParam param) { // 获取GraphQL请求信息,包含grapsh schema GraphQL graphQL = buildGraphQL(param); … ExecutionInput executionInput = ExecutionInput.newExecutionInput() .query(param.getQuery()) .variables(param.getVariables().getVariables()) .dataLoaderRegistry(Spider.getDefaultExtension(DataLoaderRegistryApi.class).dataLoader()) .build(); … // 调用 GraphQL的方法execute 执行 T result = executor.apply(graphQL, executionInput); … return result; } QueryAndMutationBinder 绑定graphQL读取写入操作 public static DataFetcher<?> dataFetcher(Function function, ModelConfig modelConfig) { if (isAsync()) { if (FunctionTypeEnum.QUERY.in(function.getType())) { return AsyncDataFetcher.async(dataFetchingEnvironment -> dataFetcherAction(function, modelConfig, dataFetchingEnvironment), ExecutorServiceApi.getExecutorService()); } else { return dataFetchingEnvironment -> dataFetcherAction(function, modelConfig, dataFetchingEnvironment); } } else { return dataFetchingEnvironment -> dataFetcherAction(function, modelConfig, dataFetchingEnvironment); } } private static Object dataFetcherAction(Function function, ModelConfig modelConfig, DataFetchingEnvironment environment) { try { SessionExtendUtils.tagMainRequest(); // 使用共享的请求和响应对象 return Spider.getDefaultExtension(ActionBinderApi.class) .action(modelConfig,…

    2024年8月21日
    6.0K02
  • 分库分表与自定义分表规则

    总体介绍 Oinone的分库分表方案是基于Sharding-JDBC的整合方案,要先具备一些Sharding-JDBC的知识。[Sharding-JDBC]https://shardingsphere.apache.org/document/current/cn/overview/ 做分库分表前,大家要有一个明确注意的点就是分表字段(也叫均衡字段)的选择,它是非常重要的,与业务场景非常相关。在明确了分库分表字段以后,甚至在功能上都要做一些妥协。比如分库分表字段在查询管理中做为查询条件是必须带上的,不然效率只会更低。 分表字段不允许更新,所以代码里更新策略设置类永不更新,并在设置了在页面修改的时候为readonly 配置分表策略 配置ShardingModel模型走分库分表的数据源pamirsSharding 为pamirsSharding配置数据源以及sharding规则 a. pamirs.sharding.define用于oinone的数据库表创建用 b. pamirs.sharding.rule用于分表规则配置 为pamirsSharding配置数据源以及sharding规则 1)指定模型对应数据源 pamirs: framework: system: system-ds-key: base system-models: – base.WorkerNode data: default-ds-key: pamirs ds-map: base: base modelDsMap: "[demo.ShardingModel]": pamirsSharding #配置模型对应的库 2)分库分表规则配置 pamirs: sharding: define: data-sources: ds: pamirs pamirsSharding: pamirs #申明pamirsSharding库对应的pamirs数据源 models: "[trigger.PamirsSchedule]": tables: 0..13 "[demo.ShardingModel]": tables: 0..7 table-separator: _ rule: pamirsSharding: #配置pamirsSharding库的分库分表规则 actual-ds: – pamirs #申明pamirsSharding库对应的pamirs数据源 sharding-rules: # Configure sharding rule ,以下配置跟sharding-jdbc配置一致 – tables: demo_core_sharding_model: #demo_core_sharding_model表规则配置 actualDataNodes: pamirs.demo_core_sharding_model_${0..7} tableStrategy: standard: shardingColumn: user_id shardingAlgorithmName: table_inline shardingAlgorithms: table_inline: type: INLINE props: algorithm-expression: demo_core_sharding_model_${(Long.valueOf(user_id) % 8)} props: sql.show: true 自定义规则 默认规则即通用的分库分表策略,如按照数据量、哈希等方式进行分库分表;通常默认规则是可以的。 但在一些复杂的业务场景下,使用默认规则可能无法满足需求,需要根据实际情况进行自定义。例如,某些业务可能有特定的数据分布模式或者查询特点,需要定制化的分库分表规则来优化数据访问性能或者满足业务需求。在这种情况下,使用自定义规则可以更好地适应业务的需求。 自定义分表规则示例 示例1:按月份分表(DATE_MONTH ) package pro.shushi.pamirs.demo.core.sharding; import cn.hutool.core.date.DateUtil; import com.google.common.collect.Range; import org.apache.shardingsphere.sharding.api.sharding.standard.PreciseShardingValue; import org.apache.shardingsphere.sharding.api.sharding.standard.RangeShardingValue; import org.apache.shardingsphere.sharding.api.sharding.standard.StandardShardingAlgorithm; import org.springframework.stereotype.Component; import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j; import java.util.*; /** * @author wangxian * @version 1.0 * @description */ @Component @Slf4j public class DateMonthShardingAlgorithm implements StandardShardingAlgorithm<Date> { private Properties props; @Override public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Date> preciseShardingValue) { Date date = preciseShardingValue.getValue(); String suffix = "_" + (DateUtil.month(date) + 1); for (String tableName : availableTargetNames) { if (tableName.endsWith(suffix)) { return tableName; } } throw new IllegalArgumentException("未找到匹配的数据表"); } @Override public Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<Date> rangeShardingValue) { List<String> list =…

    2024年5月11日
    1.7K00
  • 自定义RSQL占位符(placeholder)及在权限中使用

    1 自定义RSQL占位符常用场景 统一的数据权限配置 查询表达式的上下文变量扩展 2 自定义RSQL的模板 /** * 演示Placeholder占位符基本定义 * * @author Adamancy Zhang at 13:53 on 2024-03-24 */ @Component public class DemoPlaceHolder extends AbstractPlaceHolderParser { private static final String PLACEHOLDER_KEY = "${thisPlaceholder}"; /** * 占位符 * * @return placeholder */ @Override public String namespace() { return PLACEHOLDER_KEY; } /** * 占位符替换值 * * @return the placeholder replace to the value */ @Override protected String value() { return PamirsSession.getUserId().toString(); } /** * 优先级 * * @return execution order of placeholders, ascending order. */ @Override public Integer priority() { return 0; } /** * 是否激活 * * @return the placeholder is activated */ @Override public Boolean active() { return true; } } 注意事项 在一些旧版本中,priority和active可能不起作用,为保证升级时不受影响,请保证该属性配置正确。 PLACEHOLDER_KEY变量表示自定义占位符使用的关键字,需按照所需业务场景的具体功能并根据上下文语义正确定义。 为保证占位符可以被正确替换并执行,所有占位符都不应该出现重复,尤其是不能与系统内置的重复。 3 占位符使用时的优先级问题 多个占位符在进行替换时,会根据优先级按升序顺序执行,如需要指定替换顺序,可使用Spring的Order注解对其进行排序。 import org.springframework.core.annotation.Order; @Order(0) 4 Oinone平台内置的占位符 占位符 数据类型 含义 备注 ${currentUser} String 当前用户ID 未登录时无法使用 ${currentRoles} Set<String> 当前用户的角色ID集合 未登录时无法使用 5 如何覆盖平台内置的占位符? 通过指定占位符的优先级,并定义相同的namespace可优先替换。 6 如何定义会话级别的上下文变量? 在上述模板中,我们使用的是Oinone平台内置的上下文变量进行演示,通常情况下,我们需要根据实际业务场景增加上下文变量,以此来实现所需功能。 下面,我们将根据当前用户获取当前员工ID定义该上下文变量进行演示。 /** * 员工Session * * @author Adamancy Zhang at 14:33 on 2024-03-24 */ @Component public class EmployeeSession implements HookBefore { private static final String SESSION_KEY = "CUSTOM_EMPLOYEE_ID"; @Autowired private DemoEmployeeService demoEmployeeService; public static String getEmployeeId() { return PamirsSession.getTransmittableExtend().get(SESSION_KEY);…

    2024年3月24日
    1.9K00
  • DsHint(指定数据源)和BatchSizeHint(指定批次数量)

    概述和使用场景 DsHintApi ,强制指定数据源, BatchSizeHintApi ,强制指定查询批量数量 API定义 DsHintApi public static DsHintApi model(String model/**模型编码*/) { // 具体实现 } public DsHintApi(Object dsKey/***数据源名称*/) { // 具体实现 } BatchSizeHintApi public static BatchSizeHintApi use(Integer batchSize) { // 具体实现 } 使用示例 1、【注意】代码中使用 try-with-resources语法; 否则可能会出现数据源错乱 2、DsHintApi使用示例包裹在try里面的所有查询都会强制使用指定的数据源 // 使用方式1: try (DsHintApi dsHintApi = DsHintApi.model(PetItem.MODEL_MODEL)) { List<PetItem> items = demoItemDAO.customSqlDemoItem(); PetShopProxy data2 = data.queryById(); data2.fieldQuery(PetShopProxy::getPetTalents); } // 使用方式2: try (DsHintApi dsHintApi = DsHintApi.use("数据源名称")) { List<PetItem> items = demoItemDAO.customSqlDemoItem(); PetShopProxy data2 = data.queryById(); data2.fieldQuery(PetShopProxy::getPetTalents); } 3、BatchSizeHintApi使用示例包裹在try里面的所有查询都会按照指定的batchSize进行查询 // 查询指定每次查询500跳 try (BatchSizeHintApi batchSizeHintApi = BatchSizeHintApi.use(500)) { PetShopProxy data2 = data.queryById(); data2.fieldQuery(PetShopProxy::getPetTalents); } // 查询指定不分页(batchSize=-1)查询。 请注意,你必须在明确不需要分页查询的情况下使用;如果数据量超大不分页可能会卡死。默认不指定分页数的情况下下平台会进行分页查询 try (BatchSizeHintApi batchSizeHintApi = BatchSizeHintApi.use(-1)) { PetShopProxy data2 = data.queryById(); data2.fieldQuery(PetShopProxy::getPetTalents); }

    2024年5月18日
    1.9K00

Leave a Reply

Please Login to Comment