函数之异步执行

总体介绍

异步任务是非常常见的一种开发模式,它在分布式的开发模式中有很多应用场景如:

  • 高并发场景中,我们一般采用把长流程切短,用异步方式去掉可以异步的非关键功能,缩小主流程响应时间,提升用户体验
  • 异构系统的集成调用,通过异步任务完成解耦与自动重试
  • 分布式系统最终一致性的可选方案

本文将介绍oinone是如何结合Spring+TbSchedule来完成异步任务

构建第一个异步任务

新建PetShopService和PetShopServiceImpl

1、 新建PetShopService定义updatePetShops方法

package pro.shushi.pamirs.demo.api.service;

import pro.shushi.pamirs.demo.api.model.PetShop;
import pro.shushi.pamirs.meta.annotation.Fun;
import pro.shushi.pamirs.meta.annotation.Function;

import java.util.List;

@Fun(PetShopService.FUN_NAMESPACE)
public interface PetShopService {
    String FUN_NAMESPACE = "demo.PetShop.PetShopService";

    @Function
    void updatePetShops(List<PetShop> petShops);
}

2、PetShopServiceImpl实现PetShopService接口并在updatePetShops增加@XAsync注解

package pro.shushi.pamirs.demo.core.service;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetShop;
import pro.shushi.pamirs.demo.api.service.PetShopService;
import pro.shushi.pamirs.meta.annotation.Fun;
import pro.shushi.pamirs.meta.annotation.Function;
import pro.shushi.pamirs.trigger.annotation.XAsync;
import java.util.List;

@Fun(PetShopService.FUN_NAMESPACE)
@Component
public class PetShopServiceImpl implements PetShopService {
    @Override
    @Function
    @XAsync(displayName = "异步批量更新宠物商店",limitRetryNumber = 3,nextRetryTimeValue = 60)
    public void updatePetShops(List<PetShop> petShops) {
        new PetShop().updateBatch(petShops);
    }
}

a. displayName = "异步批量更新宠物商店",定义异步任务展示名称
b. limitRetryNumber = 3,定义任务失败重试次数,,默认:-1不断重试
c. nextRetryTimeValue = 60,定义任务失败重试的时间数,默认:3
d. nextRetryTimeUnit,定义任务失败重试的时间单位,默认:TimeUnitEnum.SECOND
e. delayTime,定义任务延迟执行的时间数,默认:0
f. delayTimeUnit,定义任务延迟执行的时间单位,默认:TimeUnitEnum.SECOND

修改PetShopBatchUpdateAction调用异步任务

  1. 引入PetShopService
  2. 修改conform方法,调用petShopService.updatePetShops方法

    package pro.shushi.pamirs.demo.core.action;
    @Model.model(PetShopBatchUpdate.MODEL_MODEL)
    @Component
    public class PetShopBatchUpdateAction {
    @Autowired
    private PetShopService petShopService;
    
    @Action(displayName = "确定",bindingType = ViewTypeEnum.FORM,contextType = ActionContextTypeEnum.SINGLE)
    public PetShopBatchUpdate conform(PetShopBatchUpdate data){
            List<PetShop> shops = ArgUtils.convert(PetShopProxy.MODEL_MODEL, PetShop.MODEL_MODEL,proxyList);
           // 调用异步任务
            petShopService.updatePetShops(shops);
        });
        return data;
    }
    }

不同应用如何隔离执行单元

在schedule跟模块部署一起的时候,多模块独立boot的情况下,需要做必要的配置。如果schedule独立部署则没有必要,因为全部走远程,不存在类找不到的问题

  1. 通过配置pamirs.zookeeper.rootPath,确保两组机器都能覆盖所有任务分片,这样不会漏数据
  2. 通过pamirs.event.schedule.ownSign来隔离。确保两组机器只取各自产生的数据,这样不会重复执行数据
    pamirs:
    zookeeper:
    zkConnectString: 127.0.0.1:2181
    zkSessionTimeout: 60000
    rootPath: /demo
    event:
    enabled: true
    schedule:
      enabled: true
      ownSign: demo
    rocket-mq:
      namesrv-addr: 127.0.0.1:9876

Oinone社区 作者:望闲原创文章,如若转载,请注明出处:https://doc.oinone.top/backend/11480.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

(0)
望闲的头像望闲数式管理员
上一篇 2024年5月25日 pm8:10
下一篇 2024年5月27日 pm8:19

相关推荐

  • 引入搜索(增强模型Channel)常见问题解决办法

    总体描述 引入Oinone的搜索(即Channel模块)后,因错误的配置、缺少配置或者少引入一些Jar包,会出现一些报错。 问题1:启动报类JCTree找不到 具体现象 启动过程可能会出现报错:java.lang.NoClassDefFoundError: com/sun/tools/javac/tree/JCTree$JCExpression 产生原因 引入Channel模块后,启动过程中会扫描Class包找寻Enhance的注解,Pamirs底层有使用到jdk的tools中的类, com/sun/tools/javac/tree/JCTree$JCExpression 特定版本的jdk可能会缺少tools.jar导致启动失败 具体报错 at org.springframework.boot.loader.Launcher.launch(Launcher.java:107) [pamirs-venus-boot.jar:na] at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) [pamirs-venus-boot.jar:na] at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88) [pamirs-venus-boot.jar:na] Caused by: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: com/sun/tools/javac/tree/JCTree$JCExpression at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[na:1.8.0_381] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[na:1.8.0_381] at pro.shushi.pamirs.boot.common.initial.PamirsBootMainInitial.init(PamirsBootMainInitial.java:66) ~[pamirs-boot-api-4.6.10.jar!/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_381] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_381] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_381] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_381] at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:305) ~[spring-context-5.2.12.RELEASE.jar!/:5.2.12.RELEASE] … 20 common frames omitted Caused by: java.lang.NoClassDefFoundError: com/sun/tools/javac/tree/JCTree$JCExpression at java.lang.Class.forName0(Native Method) ~[na:1.8.0_381] at java.lang.Class.forName(Class.java:264) ~[na:1.8.0_381] at pro.shushi.pamirs.meta.util.ClassUtils.getClasses(ClassUtils.java:157) ~[pamirs-meta-model-4.6.8.jar!/:na] at pro.shushi.pamirs.meta.util.ClassUtils.getClassesByPacks(ClassUtils.java:73) ~[pamirs-meta-model-4.6.8.jar!/:na] at pro.shushi.pamirs.channel.core.manager.EnhanceModelScanner.enhanceModel(EnhanceModelScanner.java:51) ~[pamirs-channel-core-4.6.15.jar!/:na] at pro.shushi.pamirs.channel.core.init.ChannelSystemBootAfterInit.init(ChannelSystemBootAfterInit.java:31) 解决办法 方式一【推荐】、配置channel的扫描路径 pamirs: channel: packages: – com.pamirs.ic 方式二、使用Oracle版本的jdk,确保jdk的lib目录,tools.jar有com/sun/tools/javac/tree/JCTree对应的类 问题2:启动报类JsonProvider找不到 具体报错 如果启动报错信息如下: Caused by: java.lang.NoClassDefFoundError: jakarta/json/spi/JsonProvider at java.lang.ClassLoader.defineClass1(Native Method) ~[na:1.8.0_181] at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[na:1.8.0_181] at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[na:1.8.0_181] at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ~[na:1.8.0_181] 产生原因 项目中只引入了pamirs-channel-core,但未引入elasticsearch相关的包 解决办法 <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>8.4.1</version> </dependency> <dependency> <groupId>jakarta.json</groupId> <artifactId>jakarta.json-api</artifactId> <version>2.1.1</version> </dependency> 其他参考: Oinone引入搜索引擎步骤:https://doc.oinone.top/backend/7235.html

    2024年5月17日
    1.5K00
  • 模型字段之序列化方式

    本文核心是带大家全面了解oinone的序列方式,包括支持的序列化类型、注意点、如果新增客户化序列化方式以及字段默认值的反序列化。 字段序列化方式说明 序列化方式 说明 备注 JSON JSON序列化 主要用于模型相关类型字段的序列化,是@Field.serialize默认选项 DOT 点拼接集合元素 COMMA 逗号拼接集合元素 BIT 按位与,2次幂数求和 非@Field.serialize可选项列表,用于二进制枚举序列化不需要配置,由oinone自动推断 字段序列化方式举例 1、给模型PetItemDetail 增加两个字段:petItemDetails类型为List 和 tags类型为List,并设置为不同的序列化方式,petItemDetails为JSON(缺省就是JSON,可不配),tags为COMMA。2、同时设置 @Field.Advanced(columnDefinition = "varchar(1024)"),防止序列化后存储过长。 @Model.model(PetItem.MODEL_MODEL) @Model(displayName = "宠物商品",summary="宠物商品",labelFields = {"itemName"}) public class PetItem extends AbstractDemoCodeModel{ public static final String MODEL_MODEL="demo.PetItem"; @Field(displayName = "品种") @Field.many2one @Field.Relation(relationFields = {"typeId"},referenceFields = {"id"}) private PetType type; @Field(displayName = "品种类型",invisible = true) private Long typeId; @Field(displayName = "详情", serialize = Field.serialize.JSON, store = NullableBoolEnum.TRUE) @Field.Advanced(columnDefinition = "varchar(1024)") private List<PetItemDetail> petItemDetails; @Field(displayName = "商品标签",serialize = Field.serialize.COMMA,store = NullableBoolEnum.TRUE,multi = true) @Field.Advanced(columnDefinition = "varchar(1024)") private List<String> tags; } 字段序列化注意点 必须使用Field#store属性将字段存储设置为NullableBoolEnum.TRUE。 使用Field#serialize属性指定序列化方式,默认为JSON。 如把PetItemDetail设置为存储模型,须在PetItem的petItemDetails字段上使用Field.Relation#store属性将关联关系存储设置为false。不然会同时存储petItemDetails字段和对应的PetItemDetail表记录 注册自己的序列化器 注册自己的序列化器(实现pro.shushi.pamirs.meta.api.core.orm.serialize.Serializer接口), 如oinone的DOT的序列化方式,用type()方法返回值做匹配,serialize和deserialize分别对应序列化和反序列化方法。 package pro.shushi.pamirs.framework.compute.serialize; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j; import pro.shushi.pamirs.meta.api.core.orm.serialize.Serializer; import pro.shushi.pamirs.meta.common.constants.CharacterConstants; import pro.shushi.pamirs.meta.enmu.SerializeEnum; import pro.shushi.pamirs.meta.util.TypeUtils; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * 点表达式序列生成处理器实现 * @author shushi@shushi.pro * @version 1.0.0 */ @SuppressWarnings("rawtypes") @Slf4j @Component public class DotSerializeProcessor implements Serializer<Object, String> { @Override public String serialize(String ltype, Object value) { if (null == value) { return null; } if (List.class.isAssignableFrom(value.getClass())) { return StringUtils.join((List) value, CharacterConstants.SEPARATOR_DOT); } else { return StringUtils.join(Collections.singletonList(value), CharacterConstants.SEPARATOR_DOT); } } @SuppressWarnings("unchecked") @Override public Object deserialize(String ltype, String ltypeT, String value,…

    2024年5月24日
    1.8K00
  • IWrapper、QueryWrapper和LambdaQueryWrapper使用

    条件更新updateByWrapper 通常我们在更新的时候new一个对象出来在去更新,减少更新的字段 Integer update = new DemoUser().updateByWrapper(new DemoUser().setFirstLogin(Boolean.FALSE), Pops.<DemoUser>lambdaUpdate().from(DemoUser.MODEL_MODEL).eq(IdModel::getId, userId) 使用基础模型的updateById方法更新指定字段的方法: new 一下update对象出来,更新这个对象。 WorkflowUserTask userTaskUp = new WorkflowUserTask(); userTaskUp.setId(userTask.getId()); userTaskUp.setNodeContext(json); userTaskUp.updateById(); 条件删除updateByWrapper public List<T> delete(List<T> data) { List<Long> petTypeIdList = new ArrayList(); for(T item:data){ petTypeIdList.add(item.getId()); } Models.data().deleteByWrapper(Pops.<PetType>lambdaQuery().from(PetType.MODEL_MODEL).in(PetType::getId,petTypeIdList)); return data; } 构造条件查询数据 示例1: LambdaQueryWrapper拼接查询条件 private void queryPetShops() { LambdaQueryWrapper<PetShop> query = Pops.<PetShop>lambdaQuery(); query.from(PetShop.MODEL_MODEL); query.setSortable(Boolean.FALSE); query.orderBy(true, true, PetShop::getId); List<PetShop> petShops2 = new PetShop().queryList(query); System.out.printf(petShops2.size() + ""); } 示例2: IWrapper拼接查询条件 private void queryPetShops() { IWrapper<PetShop> wrapper = Pops.<PetShop>lambdaQuery() .from(PetShop.MODEL_MODEL).eq(PetShop::getId,1L); List<PetShop> petShops4 = new PetShop().queryList(wrapper); System.out.printf(petShops4.size() + ""); } 示例3: QueryWrapper拼接查询条件 private void queryPetShops() { //使用Lambda获取字段名,防止后面改字段名漏改 String nameField = LambdaUtil.fetchFieldName(PetTalent::getName); //使用Lambda获取Clumon名,防止后面改字段名漏改 String nameColumn = PStringUtils.fieldName2Column(nameField); QueryWrapper<PetShop> wrapper2 = new QueryWrapper<PetShop>().from(PetShop.MODEL_MODEL) .eq(nameColumn, "test"); List<PetShop> petShops5 = new PetShop().queryList(wrapper2); System.out.printf(petShops5.size() + ""); } IWrapper转为LambdaQueryWrapper @Function.Advanced(type= FunctionTypeEnum.QUERY) @Function.fun(FunctionConstants.queryPage) @Function(openLevel = {FunctionOpenEnum.API}) public Pagination<PetShopProxy> queryPage(Pagination<PetShopProxy> page, IWrapper<PetShopProxy> queryWrapper) { LambdaQueryWrapper<PetShopProxy> wrapper = ((QueryWrapper<PetShopProxy>) queryWrapper).lambda(); // 非存储字段从QueryData中获取 Map<String, Object> queryData = queryWrapper.getQueryData(); if (null != queryData && !queryData.isEmpty()) { String codes = (String) queryData.get("codes"); if (org.apache.commons.lang3.StringUtils.isNotBlank(codes)) { wrapper.in(PetShopProxy::getCode, codes.split(",")); } } return new PetShopProxy().queryPage(page, wrapper); }

    2024年5月25日
    2.1K00
  • Excel导入扩展点-整体导入(批量导入)

    1、【导入】在有些场景,需要获取Excel导入的整体数据,进行批量的操作或者校验 可以通过实现导入扩展点的方式实现,入参data是导入Excel的数据列表;业务可以根据实际情况进行数据校验 1)Excel模板定义,需要设置setEachImport(false) 2)导入扩展点API定义 pro.shushi.pamirs.file.api.extpoint.ExcelImportDataExtPoint#importData 3)示例代码参考: pro.shushi.pamirs.translate.extpoint.ResourceTranslationImportExtPoint#importData @Slf4j @Component @Ext(ExcelImportTask.class) public class ResourceTranslationImportExtPoint extends AbstractExcelImportDataExtPointImpl<List<ResourceTranslationItem>> { @Override //TODO 表达式,可以自定义,比如可以支持1个模型的多个【导入名称】的不同模板 @ExtPoint.Implement(expression = "importContext.definitionContext.model==\"" + ResourceTranslation.MODEL_MODEL + "\"") public Boolean importData(ExcelImportContext importContext, List<ResourceTranslationItem> dataList) { //TODO dataList就是excel导入那个sheet的所有内容 return true; } } 2、【导入】逐行导入的时候做事务控制 在模板中定义中增加事务的定义,并设置异常后回滚。参加示例代码: excel模板定义 @Component public class DemoItemImportTemplate implements ExcelTemplateInit { public static final String TEMPLATE_NAME = "商品导入模板"; @Override public List<ExcelWorkbookDefinition> generator() { //定义事务(导入处理中,只操作单个表的不需要事务定义。) //是否定义事务根据实际业务逻辑确定。比如:有些场景在导入前需要删除数据后在进行导入就需要定义事务 InitializationUtil.addTxConfig(DemoItem.MODEL_MODEL, ExcelDefinitionContext.EXCEL_TX_CONFIG_PREFIX + TEMPLATE_NAME); return Collections.singletonList( ExcelHelper.fixedHeader(DemoItem.MODEL_MODEL, TEMPLATE_NAME) .setType(ExcelTemplateTypeEnum.IMPORT) .createSheet("商品导入-sheet1") .createBlock(DemoItem.MODEL_MODEL) .addUnique(DemoItem.MODEL_MODEL,"name") .addColumn("name","名称") .addColumn("description","描述") .addColumn("itemPrice","单价") .addColumn("inventoryQuantity","库存") .build().setEachImport(true) //TODO 设置异常后回滚的标识,这个地方会回滚事务 .setHasErrorRollback(true) .setExcelImportMode(ExcelImportModeEnum.SINGLE_MODEL) ); } } 导入逻辑处理 @Slf4j @Component @Ext(ExcelImportTask.class) public class DemoItemImportExtPoint extends AbstractExcelImportDataExtPointImpl<DemoItem> implements ExcelImportDataExtPoint<DemoItem> { @Autowired private DemoItemService demoItemService; @Override @ExtPoint.Implement(expression = "importContext.definitionContext.model == \"" + DemoItem.MODEL_MODEL + "\"") public Boolean importData(ExcelImportContext importContext, DemoItem data) { ExcelImportTask importTask = importContext.getImportTask(); try { DemoItemImportTask hrExcelImportTask = new DemoItemImportTask().queryById(importTask.getId()); String publishUserName = Optional.ofNullable(hrExcelImportTask).map(DemoItemImportTask::getPublishUserName).orElse(null); data.setPublishUserName(publishUserName); demoItemService.create(data); } catch(PamirsException e) { log.error("导入异常", e); } catch (Exception e) { log.error("导入异常", e); } return Boolean.TRUE; } }

    2023年12月7日
    1.6K00
  • Oinone构建分布式项目一些注意点

    1. Oinone如何支持构建分布式项目 参考文档:https://doc.oinone.top/kai-fa-shi-jian/5572.html 2. Oinone远程服务发布范围 泛化服务范围,可选值:module、namespacemodule:按模块维度发布远程服务namespace:按Fun的namespace维度发布远程服务默认按module维度发布服务 pamirs: distribution: service: #serviceScope: 可选值namespace、module serviceScope: module 3.关闭Dubbo服务注册元数据上报日志 logging: level: root: info pro.shushi.pamirs.framework.connectors.data.mapper.PamirsMapper: error pro.shushi.pamirs.framework.connectors.data.mapper.GenericMapper: error # mybatis sql日志 RocketmqClient: error org.apache.dubbo.registry.zookeeper.ZookeeperRegistry: error org.apache.dubbo.registry.integration.RegistryDirectory: error org.apache.dubbo.config.ServiceConfig: error com.alibaba.nacos.client.naming: error org.apache.dubbo.registry.nacos.NacosRegistry: error org.apache.dubbo.registry.support.AbstractRegistryFactory: error org.apache.dubbo.registry.integration.RegistryProtocol: error org.apache.dubbo.registry.client.metadata.store.RemoteMetadataServiceImpl: off org.apache.dubbo.metadata.store.zookeeper.ZookeeperMetadataReport: off org.apache.dubbo.metadata.store.nacos.NacosMetadataReport: off 4.Naocs配置列表出现多余配置 dubbo 集成 nacos注册中心,会出现多余的配置,详细参考:配置列表会自动创建很多无关的配置: https://github.com/apache/dubbo/issues/6645配置列表出现多余的配置:https://github.com/alibaba/nacos/issues/8843 按照下面的配置可以将其关闭(📢主要是这三项配置use-as-config-center, use-as-metadata-center,metadata-report.failfast),已生成的配置需要手动删除掉。 dubbo: application: name: pamirs-demo version: 1.0.0 metadata-type: local registry: id: pamirs-demo-registry address: nacos://192.168.0.129:8848 username: nacos password: nacos # dubbo使用nacos的注册中心往配置中心写入配置关闭配置 use-as-metadata-center: false use-as-config-center: false config-center: address: nacos://192.168.0.129:8848 username: nacos password: nacos metadata-report: failfast: false # 关闭错误上报的功能 address: nacos://192.168.0.129:8848 username: nacos password: nacos protocol: name: dubbo port: -1 serialization: pamirs scan: base-packages: pro.shushi cloud: subscribed-services:

    2024年2月1日
    1.5K10

Leave a Reply

登录后才能评论