4.1.11 函数之异步执行

异步任务是非常常见的一种开发模式,它在分布式的开发模式中有很多应用场景如:

  1. 高并发场景中,我们一般采用把长流程切短,用异步方式去掉可以异步的非关键功能,缩小主流程响应时间,提升用户体验

  2. 异构系统的集成调用,通过异步任务完成解耦与自动重试

  3. 分布式系统最终一致性的可选方案

今天我们了解oinone是如何结合Spring+TbSchedule来完成异步任务

一、TbSchedule介绍

它是一个支持分布式的调度框架,让批量任务或者不断变化的任务能够被动态的分配到多个主机的JVM中,在不同的线程组中并行执行,所有的任务能够被不重复,不遗漏的快速处理。基于ZooKeeper的纯Java实现,由Alibaba开源。在互联网和电商领域TBSchedule的使用非常广泛,目前被应用于阿里巴巴、淘宝、支付宝、京东、聚美、汽车之家、国美等很多互联网企业的流程调度系统。也是笔者早期在阿里参与设计的一款产品。

oinone的异步任务执行原理(如下图4-1-11-1所示),先做一个大致了解:

4.1.11 函数之异步执行

图4-1-11-1 Oinone的异步任务执行原理图

基础管理工具

下载tbSchedule的控制台jar包去除文件后缀.txt(详见本书籍【附件一】)pamirs-middleware-schedule-console-3.0.1.jar.txt(31.2 MB)

启动控制台


java -jar pamirs-middleware-schedule-console-3.0.1.jar 

图4-1-11-2 控制台启动方式

访问地址


http://127.0.0.1:10014/schedule/index.jsp?manager=true

图4-1-11-3 访问地址

配置zk连接参数

image.png

图4-1-11-4 配置zk连接参数

oinone默认实现任务类型

image.png

图4-1-11-5 Oinone默认实现任务类型

  • baseScheduleNoTransactionTask
  • baseScheduleTask
  • remoteScheduleTask --- 适用于pamirs-middleware-schedule独立部署场景
  • serialBaseScheduleNoTransactionTask
  • serialBaseScheduleTask
  • serialRemoteScheduleTask --- 适用于pamirs-middleware-schedule独立部署场景
  • cycleScheduleNoTransactionTask
  • delayMsgTransferScheduleTask
  • deleteTransferScheduleTask

注:

a. 默认情况下:所有任务的任务项都只配置了一个任务项0,只有一台机器能分配任务。

1. 如果要修改配置可以在启动项目中放置schedule.json,来修改配置

2. 人工进入控制修改任务对应任务项的配置

b. 如果想为某一个核心任务配置的独立调度器,不受其他任务执行影响。那么见独立调度的异步任务

任务表相关说明

4.1.11 函数之异步执行

图4-1-11-6 任务表相关说明

二、构建第一个异步任务(举例)

Step1 新建PetShopService和PetShopServiceImpl

1 新建PetShopService定义updatePetShops方法

package pro.shushi.pamirs.demo.api.service;

import pro.shushi.pamirs.demo.api.model.PetShop;
import pro.shushi.pamirs.meta.annotation.Fun;
import pro.shushi.pamirs.meta.annotation.Function;

import java.util.List;

@Fun(PetShopService.FUN_NAMESPACE)
public interface PetShopService {
    String FUN_NAMESPACE = "demo.PetShop.PetShopService";
    @Function
    void updatePetShops(List<PetShop> petShops);

}

图4-1-11-7 新建PetShopService定义updatePetShops方法

  1. PetShopServiceImpl实现PetShopService接口并在updatePetShops增加@XAsync注解

    1. displayName = "异步批量更新宠物商店",定义异步任务展示名称

    2. limitRetryNumber = 3,定义任务失败重试次数,,默认:-1不断重试

    3. nextRetryTimeValue = 60,定义任务失败重试的时间数,默认:3

    4. nextRetryTimeUnit,定义任务失败重试的时间单位,默认:TimeUnitEnum.SECOND

    5. delayTime,定义任务延迟执行的时间数,默认:0

    6. delayTimeUnit,定义任务延迟执行的时间单位,默认:TimeUnitEnum.SECOND

package pro.shushi.pamirs.demo.core.service;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetShop;
import pro.shushi.pamirs.demo.api.service.PetShopService;
import pro.shushi.pamirs.meta.annotation.Fun;
import pro.shushi.pamirs.meta.annotation.Function;
import pro.shushi.pamirs.trigger.annotation.XAsync;

import java.util.List;

@Fun(PetShopService.FUN_NAMESPACE)
@Component
public class PetShopServiceImpl implements PetShopService {

    @Override
    @Function
    @XAsync(displayName = "异步批量更新宠物商店",limitRetryNumber = 3,nextRetryTimeValue = 60)
    public void updatePetShops(List<PetShop> petShops) {
        new PetShop().updateBatch(petShops);
    }
}

图4-1-11-8 实现PetShopService接口并在updatePetShops增加@XAsync注解

Step2 修改PetShopBatchUpdateAction的conform方法

  1. 引入PetShopService

  2. 修改conform方法

    1. 利用ArgUtils进行参数转化,ArgUtils会经常用到。

    2. 调用petShopService.updatePetShops方法

package pro.shushi.pamirs.demo.core.action;
…… 引依赖类
@Model.model(PetShopBatchUpdate.MODEL_MODEL)
@Component
public class PetShopBatchUpdateAction {

    @Autowired
    private PetShopService petShopService;
    ……其他代码
    @Action(displayName = "确定",bindingType = ViewTypeEnum.FORM,contextType = ActionContextTypeEnum.SINGLE)
    public PetShopBatchUpdate conform(PetShopBatchUpdate data){
        if(data.getPetShopList() == null || data.getPetShopList().size()==0){
            throw  PamirsException.construct(DemoExpEnumerate.PET_SHOP_BATCH_UPDATE_SHOPLIST_IS_NULL).errThrow();
        }
        List<PetShopProxy> proxyList = data.getPetShopList();
        for(PetShopProxy petShopProxy:proxyList){
            petShopProxy.setDataStatus(data.getDataStatus());
        }
        Tx.build(new TxConfig().setPropagation(Propagation.REQUIRED.value())).executeWithoutResult(status -> {
            new PetShopProxy().updateBatch(proxyList);
            //利用ArgUtils进行参数转化
            List<PetShop> shops = ArgUtils.convert(PetShopProxy.MODEL_MODEL, PetShop.MODEL_MODEL,proxyList);
            petShopService.updatePetShops(shops);
//            throw PamirsException.construct(DemoExpEnumerate.SYSTEM_ERROR).errThrow();
        });
        return data;
    }

}

图4-1-11-9 修改PetShopBatchUpdateAction的conform方法

Step3 重启看效果

异步会有一定的延迟,我们按以下步骤测试下异步执行效果

  1. 进入商店管理列表页,选择中一行数据点击批量更新数据状态按钮进入批量修改宠物商店数据状态页面:

image.png

图4-1-11-10 进入批量修改宠物商店数据状态页面

  1. 在批量修改宠物商店数据状态页面,数据状态设置为未启用,点击组合动作按钮回到商店管理列表页:

image.png

图4-1-11-11 点击组合动作按钮回到商店管理列表页

  1. 查看商店管理列表页的数据记录的数据状态字段是否修改成功,此时可能未修改成功,也可能已经修改成功,因为本身就是毫秒级的速度,点击搜索刷新数据,发现数据记录的数据状态字段修改成功:

image.png

图4-1-11-12 发现数据记录的数据状态字段修改成功

  1. 查看任务表,根据任务表与日期的对照关系查询指定表

image.png

图4-1-11-13 根据任务表与日期的对照关系查询指定表

三、异步任务高级玩法

顺序异步任务(举例)

这里的顺序任务是指把任务按一定规则分组以后按时间顺序串行执行,不同分组间的任务不相互影响。有点类似mq的顺序消息

eg:订单的状态变更的异步任务需要根据任务产生时间顺序执行。那么分组规则是按订单id分组,执行顺序按任务产生顺序执行

Step1 PetShopService和PetShopServiceImpl

  1. 修改PetShopService新增定义asyncSerialUpdatePetShops方法
package pro.shushi.pamirs.demo.api.service;

import pro.shushi.pamirs.demo.api.model.PetShop;
import pro.shushi.pamirs.meta.annotation.Fun;
import pro.shushi.pamirs.meta.annotation.Function;

import java.util.List;

@Fun(PetShopService.FUN_NAMESPACE)
public interface PetShopService {
    String FUN_NAMESPACE = "demo.PetShop.PetShopService";
    @Function
    void updatePetShops(List<PetShop> petShops);
    @Function
    void asyncSerialUpdatePetShops(List<PetShop> petShops);

}

图4-1-11-14 修改PetShopService新增定义asyncSerialUpdatePetShops方法

  1. 修改PetShopServiceImpl实现ScheduleAction接口,并增加asyncSerialUpdatePetShops方法

    1. 引入executeTaskActionService用于提交异步串行任务ExecuteTaskAction

      1.setExecuteNamespace(getInterfaceName()),确保跟getInterfaceName()一致

      2.setExecuteFun("execute");跟执行函数名“execute”一致

      3.setTaskType(TaskType.SERIAL_BASE_SCHEDULE_NO_TRANSACTION_TASK.getValue()),必须用SERIAL_BASE_SCHEDULE_NO_TRANSACTION_TASK,其为顺序执行任务类型

      4.setBizId(petShop.getCreateUid())//根据创建人Id分组,根据实际业务情况决定

    2. getInterfaceName()跟函数的命名空间保持一致

package pro.shushi.pamirs.demo.core.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.core.common.enmu.TimeUnitEnum;
import pro.shushi.pamirs.demo.api.model.PetShop;
import pro.shushi.pamirs.demo.api.service.PetShopService;
import pro.shushi.pamirs.framework.connectors.data.tx.interceptor.PamirsTransactional;
import pro.shushi.pamirs.meta.annotation.Fun;
import pro.shushi.pamirs.meta.annotation.Function;
import pro.shushi.pamirs.meta.util.JsonUtils;
import pro.shushi.pamirs.middleware.schedule.api.ScheduleAction;
import pro.shushi.pamirs.middleware.schedule.common.Result;
import pro.shushi.pamirs.middleware.schedule.domain.ScheduleItem;
import pro.shushi.pamirs.middleware.schedule.eunmeration.TaskType;
import pro.shushi.pamirs.trigger.annotation.XAsync;
import pro.shushi.pamirs.trigger.model.ExecuteTaskAction;
import pro.shushi.pamirs.trigger.service.ExecuteTaskActionService;

import java.util.List;

@Fun(PetShopService.FUN_NAMESPACE)
@Component
public class PetShopServiceImpl implements PetShopService, ScheduleAction {

    @Autowired
    private ExecuteTaskActionService executeTaskActionService;

    @Override
    @Function
    @XAsync(displayName = "异步批量更新宠物商店",limitRetryNumber = 3,nextRetryTimeValue = 60)
    public void updatePetShops(List<PetShop> petShops) {
        new PetShop().updateBatch(petShops);
    }

    @PamirsTransactional
    @Override
    @Function
    public void asyncSerialUpdatePetShops(List<PetShop> petShops){
        for(PetShop petShop:petShops) {
            executeTaskActionService.submit((ExecuteTaskAction) new ExecuteTaskAction()
                    .setBizId(petShop.getCreateUid())//根据创建人Id分组,根据实际业务情况决定
                    .setTaskType(TaskType.SERIAL_BASE_SCHEDULE_NO_TRANSACTION_TASK.getValue())
                    .setNextRetryTimeUnit(TimeUnitEnum.SECOND)//失败重试时间单位
                    .setNextRetryTimeValue(10)//失败重试时间数
                    .setLimitRetryNumber(6)//最多重试次数
                    .setDisplayName("异步顺序任务-更新宠物商店,以createUid分组")
                    .setExecuteNamespace(getInterfaceName())
                    .setExecuteFun("execute")
                    .setContext(JsonUtils.toJSONString(petShop)));
        }
    }

    @Override
    public String getInterfaceName() {
        return PetShopService.FUN_NAMESPACE;
    }

    @Override
    @Function
    public Result<Void> execute(ScheduleItem scheduleItem) {
        Result<Void> result = new Result<>();
        PetShop petShop = JsonUtils.parseObject(scheduleItem.getContext(),PetShop.class);
        petShop.updateById();
        return result;
    }
}

图4-1-11-15 代码示例

Step2 修改PetShopBatchUpdateAction的conform方法

改调用异步顺序方法,petShopService.asyncSerialUpdatePetShops(shops)

package pro.shushi.pamirs.demo.core.action;
…… 引依赖类
@Model.model(PetShopBatchUpdate.MODEL_MODEL)
@Component
public class PetShopBatchUpdateAction {

    @Autowired
    private PetShopService petShopService;
    ……其他代码
    @Action(displayName = "确定",bindingType = ViewTypeEnum.FORM,contextType = ActionContextTypeEnum.SINGLE)
    public PetShopBatchUpdate conform(PetShopBatchUpdate data){
        if(data.getDataStatus() == null){
            throw  PamirsException.construct(DemoExpEnumerate.PET_SHOP_BATCH_UPDATE_DATASTATUS_IS_NULL).errThrow();
        }
        List<PetShopProxy> proxyList = data.getPetShopList();
        for(PetShopProxy petShopProxy:proxyList){
            petShopProxy.setDataStatus(data.getDataStatus());
        }
        Tx.build(new TxConfig().setPropagation(Propagation.REQUIRED.value())).executeWithoutResult(status -> {
            //利用ArgUtils进行参数转化
            List<PetShop> shops = ArgUtils.convert(PetShopProxy.MODEL_MODEL, PetShop.MODEL_MODEL,proxyList);
//            petShopService.updatePetShops(shops);
            petShopService.asyncSerialUpdatePetShops(shops);
//            throw PamirsException.construct(DemoExpEnumerate.SYSTEM_ERROR).errThrow();
        });
        return data;
    }

}

图4-1-11-16 修改PetShopBatchUpdateAction的conform方法

Step3 重启看效果

页面效果跟构建第一个异步任务一样,但任务生产和执行逻辑不一样。会根据biz_id分配任务项与分组确保执行顺序

  1. 分配任务项,相同任务项一定会分配给同一个schedule的执行者

  2. 分组,任务在同一个schedule的执行者,相同分组Id一定会分配给同一个线程执行

页面操作完以后查看数据任务表

image.png

图4-1-11-17 根据biz_id分配任务项与分组确保执行顺序

独立调度的异步任务(举例)

如果把所有任务都放在同一个任务类型下,复用同一套任务策略、任务配置、任务执行器。那么当某些不重要的异步任务大量失败会影响其他任务的执行,所以我们在一些高并发大任务量的场景下会独立给一些核心异步任务配置独立调度策略。

Step1 修改pamirs-demo-core的pom

增加对pamirs-middleware-schedule-core依赖,为了复用oinone默认实现任务类型的基础逻辑,在例子中我们自定义的异步任务继承SerialBaseScheduleNoTransactionTask的基础逻辑

        <dependency>
            <groupId>pro.shushi.pamirs.middleware.schedule</groupId>
            <artifactId>pamirs-middleware-schedule-core</artifactId>
        </dependency>

图4-1-11-18 增加pamirs-middleware-schedule-core依赖

Step2 新建PetShopUpdateCustomAsyncTask

package pro.shushi.pamirs.demo.core.task;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.middleware.schedule.core.tasks.SerialBaseScheduleNoTransactionTask;

@Component
public class PetShopUpdateCustomAsyncTask extends SerialBaseScheduleNoTransactionTask {

    public static final String TASK_TYPE = PetShopUpdateCustomAsyncTask.class.getSimpleName();

    @Override
    public String getTaskType() {
        return TASK_TYPE;
    }

}

图4-1-11-19 新建PetShopUpdateCustomAsyncTask

Step3 修改PetShopServiceImpl的asyncSerialUpdatePetShops方法

修改TaskType为PetShopUpdateCustomAsyncTask.TASK_TYPE

    @PamirsTransactional
    @Override
    @Function
    public void asyncSerialUpdatePetShops(List<PetShop> petShops){
        for(PetShop petShop:petShops) {
            executeTaskActionService.submit((ExecuteTaskAction) new ExecuteTaskAction()
                    .setBizId(petShop.getCreateUid())//根据创建人Id分组,根据实际业务情况决定
//                    .setTaskType(TaskType.SERIAL_BASE_SCHEDULE_NO_TRANSACTION_TASK.getValue())
                    .setTaskType(PetShopUpdateCustomAsyncTask.TASK_TYPE)
                    .setNextRetryTimeUnit(TimeUnitEnum.SECOND)//失败重试时间单位
                    .setNextRetryTimeValue(10)//失败重试时间数
                    .setLimitRetryNumber(6)//最多重试次数
                    .setDisplayName("异步顺序任务-更新宠物商店,以createUid分组")
                    .setExecuteNamespace(getInterfaceName())
                    .setExecuteFun("execute")
                    .setContext(JsonUtils.toJSONString(petShop)));
        }
    }

图4-1-11-20 修改TaskType为PetShopUpdateCustomAsyncTask.TASK_TYPE

Step4 初始化数据

下载以下文件放在pamirs-demo-boot的src/main/resources/init目录下

schedule.json.txt(8 KB)

我们在系统原有提供的schedule.json,中增入任务类型为petShopUpdateCustomAsyncTask的配置,配置项"taskType": "CUSTOM",标志为客户自定义。实际注册到TbSchedule会按beanNames转化为taskType,其他参数含义见TbSchedule的管理控制台有对应中文说明


  {
    "taskType": "CUSTOM",
    "beanNames": "petShopUpdateCustomAsyncTask",
    "values": {
      "heartBeatRate": 1000, 
      "judgeDeadInterval": 10000,
      "sleepTimeNoData": 500,
      "sleepTimeInterval": 500,
      "fetchDataNumber": 500,
      "executeNumber": 1,
      "threadNumber": 8,
      "processorType": "SLEEP",
      "expireOwnSignInterval": 1.0,
      "taskParameter": "",
      "taskKind": "static",
      "taskItems": [
        0,1,2,3,4,5,6,7
      ],
      "maxTaskItemsOfOneThreadGroup": 0,
      "version": 0,
      "sts": "resume",
      "fetchDataCountEachSchedule": -1
    },
    "strategy": {
      "IPList": [
        "127.0.0.1"
      ],
      "numOfSingleServer": 1,
      "assignNum": 4,
      "kind": "Schedule",
      "taskParameter": "",
      "sts": "resume"
    }
  }

图4-1-11-21 增加任务类型为petShopUpdateCustomAsyncTask的配置

Step5 重启看效果

页面效果跟构建第一个异步任务一样,但任务生产和执行逻辑不一样。会根据biz_id分配任务项与分组确保执行顺序,同时会有独立的调度器以及规则配置

  1. 在tbSchedule的管理控制台,可以看见多了一个“petShopUpdateCustomAsyncTask”的任务类型,点编辑就可以看到我们配置任务类型对应的参数

image.png

图4-1-11-22 tbSchedule管理控制台

  1. 页面操作完以后查看对应数据任务表

image.png

图4-1-11-23 查看对应数据任务表

四、不同应用如何隔离执行单元

在schedule跟模块部署一起的时候,多模块独立boot的情况下,需要做必要的配置。如果schedule独立部署则没有必要,因为全部走远程,不存在类找不到的问题

  1. 通过配置pamirs.zookeeper.rootPath,确保两组机器都能覆盖所有任务分片,这样不会漏数据

  2. 通过pamirs.event.schedule.ownSign来隔离。确保两组机器只取各自产生的数据,这样不会重复执行数据

pamirs:
    zookeeper:
    zkConnectString: 127.0.0.1:2181
    zkSessionTimeout: 60000
    rootPath: /demo
    event:
    enabled: true
    schedule:
      enabled: true
      ownSign: demo
    rocket-mq:
      namesrv-addr: 127.0.0.1:9876

图4-1-11-24 配置pamirs.zookeeper.rootPath

Oinone社区 作者:史, 昂原创文章,如若转载,请注明出处:https://doc.oinone.top/oio4/9286.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

(0)
史, 昂的头像史, 昂数式管理员
上一篇 2024年5月23日
下一篇 2024年5月23日

相关推荐

  • 节点动作

    节点动作为触发和结束结点中间的动作,点击节点名称可以进行修改。

  • 企业个性化配置

    平台除了帮助企业快速解决业务需求外,还提供了企业文化、风格个性化能力,本章节详细介绍个性化配置。包括:登陆页配置、企业形象配置、系统风格配置。 系统配置 配置入口 配置入口:右上角登陆账号——系统配置——全局配置——登陆页配置、企业形象配置、系统风格配置。 登陆页配置 用户自定义登陆页的主题,一共有六个主题可选择,分别为: 图一:左侧是图片,右侧是登录输入框,Logo在左上角; 图二:右左侧是图片,右侧是登录输入框,Logo在左上角; 图三:底图是图片,登录输入框浮在页面左侧,Logo在左上角; 图四:底图是图片,登录输入框浮在页面中间,Logo在左上角; 图五:底图是图片,登录输入框浮在页面右侧,Logo在左上角; 图六:底图是图片,登录输入框浮在页面中间,Logo在登录框上方。 页面背景 上传登录页背景,支持上传jpg、png格式的图片或mp4、mov、avi格式视频。尺寸建议为1920*1080px,文件大小不超过50MB。 登录页logo:指配置登录页左上角的logo图标。 预览:配置后可在右侧电脑中点击全屏预览效果。 配置示例 企业形象配置 企业形象配置:可设置企业信息、业务应用导航栏logo、浏览器logo,设置完成之后,点击发布之后设置生效。 系统风格配置 系统风格配置,可设置主题(浅色、深色)、尺寸(大、中、小),配置后可全屏预览,发布后即可更换成对应的风格。 下载代码可进行自定义风格。

    2024年6月20日
    1.1K00
  • 4.1.19 框架之网关协议-后端占位符

    在我们日常开发中会有碰到一些特殊场景,需要由前端来传一些如“当前用户Id”、“当前用户code”诸如此类只有后端才知道值的参数,那么后端占位符就是来解决类似问题的。如前端传${currentUserId},后端会自动替换为当前用户Id。 Step1 后端定义占位符 我们新建一个UserPlaceHolder继承AbstractPlaceHolderParser,用namespace来定义一个“currentUserId”的占位符,其对应值由value()决定为“PamirsSession.getUserId().toString()”,active要为真才有效,priority为优先级 package pro.shushi.pamirs.demo.core.placeholder; import org.springframework.stereotype.Component; import pro.shushi.pamirs.meta.api.session.PamirsSession; import pro.shushi.pamirs.user.api.AbstractPlaceHolderParser; @Component public class UserPlaceHolder extends AbstractPlaceHolderParser { @Override protected String value() { return PamirsSession.getUserId().toString(); } @Override public Integer priority() { return 10; } @Override public Boolean active() { return Boolean.TRUE; } @Override public String namespace() { return "currentUserId"; } } 图4-1-19-1 后端定义占位符 Step2 前端使用后端占位符 我们经常在o2m和m2m中会设置domain来过滤数据,这里案例就是在field中设置来过滤条件,domain="createUid == $#{currentUserId}",注意这里用的是$#{currentUserId} 而不是${currentUserId},这是前端为了区分真正变量和后端占位符,提交的时候会把#过滤掉提交。修改宠物达人表格视图的Template中search部分 <template slot="search" cols="4"> <field data="name" label="达人"/> <field data="petTalentSex" multi="true" label="达人性别"/> <field data="creater" /> <!– <field data="petShops" label="宠物商店" domain="createUid == ${activeRecord.creater.id}"/>–> <field data="petShops" label="宠物商店" domain="createUid == $#{currentUserId}"/> <field data="dataStatus" label="数据状态" multi="true"> <options> <option name="DRAFT" displayName="草稿" value="DRAFT" state="ACTIVE"/> <option name="NOT_ENABLED" displayName="未启用" value="NOT_ENABLED" state="ACTIVE"/> <option name="ENABLED" displayName="已启用" value="ENABLED" state="ACTIVE"/> <option name="DISABLED" displayName="已禁用" value="DISABLED" state="ACTIVE"/> </options> </field> <field data="createDate" label="创建时间"/> <field data="unStore" /> </template> 图4-1-19-2 前端使用后端占位符 Step3 重启看效果 请求上都带上了createUid==${currentUserId} 图4-1-19-3 示例效果

    2024年5月23日
    1.2K00
  • 4.1.10 函数之触发与定时(改)

    函数的触发和定时在很多场景中会用到,也是一个oinone的基础能力。比如我们的流程产品中在定义流程触发时就会让用户选择模型触发还是时间触发,就是用到了函数的触发与定时能力。 整体链路示意图(如下图4-1-10-1 所示),本文只讲trigger里的两类任务,一个是触发任务,一个是定时任务,异步任务放在4.1.11【函数之异步执行】一文中单独去介绍。 图4-1-10-1 整体链路示意图 一、触发任务TriggerTaskAction(举例) 触发任务的创建,使用pamirs-middleware-canal监听mysql的binlog事件,通过rocketmq发送变更数据消息,收到MQ消息后,创建TriggerAutoTask。 触发任务的执行,使用TBSchedule拉取触发任务后,执行相应函数。 注意:pamirs-middleware-canal监听的数据库表必须包含触发模型的数据库表。 Step1 下载canal中间件 下载pamirs-middleware-canal-deployer-3.0.1.zip,去.txt后缀为pamirs-middleware-canal-deployer-3.0.1.zip,解压文件如下: 图4-1-10-2 下载canal中间件 Step2 引入依赖pamirs-core-trigger模块 pamirs-demo-api增加pamirs-trigger-api <dependency> <groupId>pro.shushi.pamirs.core</groupId> <artifactId>pamirs-trigger-api</artifactId> </dependency> 图4-1-10-3 pamirs-trigger-api依赖包 DemoModule在模块依赖定义中增加@Module(dependencies={TriggerModule.MODULE_MODULE}) @Component @Module( name = DemoModule.MODULE_NAME, displayName = "oinoneDemo工程", version = "1.0.0", dependencies = {ModuleConstants.MODULE_BASE, CommonModule.MODULE_MODULE, UserModule.MODULE_MODULE, TriggerModule.MODULE_MODULE} ) @Module.module(DemoModule.MODULE_MODULE) @Module.Advanced(selfBuilt = true, application = true) @UxHomepage(PetShopProxy.MODEL_MODEL) public class DemoModule implements PamirsModule { ……其他代码 } 图4-1-10-4 模块依赖中增加Trigger模块 pamirs-demo-boot 增加pamirs-trigger-core和pamirs-trigger-bridge-tbschedule的依赖 <dependency> <groupId>pro.shushi.pamirs.core</groupId> <artifactId>pamirs-trigger-core</artifactId> </dependency> <dependency> <groupId>pro.shushi.pamirs.core</groupId> <artifactId>pamirs-trigger-bridge-tbschedule</artifactId> </dependency> 图4-1-10-5 增加pamirs-trigger-core和pamirs-trigger-bridge-tbschedule的依赖 修改pamirs-demo-boot的applcation-dev.yml 修改pamris.event.enabled和pamris.event.schedule.enabled为true pamirs_boot_modules增加启动模块:trigger pamirs: event: enabled: true schedule: enabled: true rocket-mq: namesrv-addr: 127.0.0.1:9876 boot: init: true sync: true modules: – base – common – sequence – resource – user – auth – message – international – business – trigger – demo_core 图4-1-10-6 启动模块中增加trigger模块 Step3 启动canal中间件 canal的库表需要手工建 create schema canal_tsdb collate utf8mb4_bin 图4-1-10-7 canal的建库语句 CREATE TABLE IF NOT EXISTS `meta_snapshot` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `gmt_create` datetime NOT NULL COMMENT '创建时间', `gmt_modified` datetime NOT NULL COMMENT '修改时间', `destination` varchar(128) DEFAULT NULL COMMENT '通道名称', `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名', `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量', `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id', `binlog_timestamp` bigint(20) DEFAULT NULL…

    2024年5月23日
    1.2K00

Leave a Reply

登录后才能评论