4.1.21 框架之分布式消息

消息中间件是在分布式开发中常见的一种技术手段,用于模块间的解耦、异步处理、数据最终一致等场景。

一、介绍

oinone对开源的RocketMQ进行了封装,是平台提供的一种较为简单的使用方式,并非是对RocketMQ进行的功能扩展。同时也伴随着两个非常至关重要的目的:

  1. 适配不同企业对RocketMQ的不同版本选择,不至于改上层业务代码。目前已经适配RocketMQ的开源版本和阿里云版本。 下个版本会对API进行升级支持不同类型MQ,以适配不同企业对MQ的不同要求,应对一些企业客户已经对MQ进行技术选择

  2. 对协议头进行扩展:如多租户的封装,saas模式中为了共用MQ基础资源,需要在消息头中加入必要租户信息。

二、使用准备

demo工程默认已经依赖消息,这里只是做介绍无需大家额外操作,大家可以用maven依赖树命令查看引用关系。

依赖包

增加对pamirs-connectors-event的依赖


<dependency>
    <groupId>pro.shushi.pamirs.framework</groupId>
    <artifactId>pamirs-connectors-event</artifactId>
</dependency>

图4-1-21-1 分布式消息的依赖包

相关功能引入

增加模型、触发器都依赖MQ

<!-- 增强模型 -->
<!-- 增强模型 -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-channel</artifactId>
</dependency>
<!-- 触发器 api -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-api</artifactId>
</dependency>
<!-- 触发器 core -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-core</artifactId>
</dependency>

图4-1-21-2 增加模型、触发器都依赖MQ

yml配置文件参考

详见4.1.1【模块之yml文件结构详解】的“pamirs.event”部分。

三、使用说明

发送消息(NotifyProducer)

概述

NotifyProducer是Pamirs Event中所有生产者的基本API,它仅仅定义了消息发送的基本行为,例如生产者自身的属性,启动和停止,当前状态,以及消息发送方法。它本身并不决定消息如何发送,而是根据具体的实现确定其功能。

目前仅实现了RocketMQProducer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

Notify注解方式

使用示例

@Component
public class DemoProducer {

    @Notify(topic = "test", tags = "model")
    public DemoModel sendModel() {
        return new DemoModel();
    }

    @Notify(topic = "test", tags = "dto")
    public DemoDTO sendDTO() {
        return new DemoDTO();
    }
}

图4-1-21-3 Notify注解方式使用示例

解释说明

  1. 使用Component注解方式注册Spring Bean。

  2. Notify注解指定topic和tags。

  3. topic和tags对应NotifyEvent中的topic和tags。

RocketMQProducer方法调用

使用示例

@Component
public class SendRocketMQMessage {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel()));
        rocketMQProducer.send(new NotifyEvent("test", "dto", new DemoDTO()));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage() {
        DemoModel data = new DemoModel();
        data.setAge(10);
        rocketMQProducer.send(new NotifyEvent("test", "model", data)
                .setQueueSelector((queueSize, event) -> {
                    DemoModel body = (DemoModel) event.getBody();
                    return body.getAge() % queueSize;
                }));
    }

    /**
     * 发送事务消息
     */
    public void sendTransactionMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel())
                .setIsTransaction(true)
                .setGroup("demoTransactionListener"));
    }
}

图4-1-21-4 RocketMQProducer方法调用

解释说明

  1. 使用Component注解方式注册Spring Bean。
  2. 使用Autowired注解方式装配RocketMQProducer实例。
  3. 使用send方法发送指定消息。
  4. 在【发送普通消息】方法中,该实现效果与Notify注解方式所示完全一致。
  5. 在【发送有序消息】方法中,队列选择器是必须配置的,queueSize属性为MQ队列的总数量,在broker中配置。有序消息必须配合有序消费者才能达到有序消费的目的,否则还是无序的常规消息,消费者需要配置@NotifyListener(consumerType=ConsumerType.ORDERLY )注解。
  6. 在【发送事务消息】方法中指定的group为ProducerGroup,事务消息是通过不同的Producer发出的,事务消息监听请参考TransactionListener注解的相关使用方法。(示例中的group与下文介绍中的一致)

使用TransactionListener开启事务消息监听

使用示例

@Component
@TransactionListener
public class DemoTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        return NotifyTransactionState.COMMIT;
    }

}

图4-1-21-5 使用TransactionListener开启事务消息监听

解释说明

  1. 实现NotifyTransactionListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 添加TransactionListener注解注册事务监听,生成对应的生产者。

  4. 当前ProducerGroup将使用这个类的BeanName,即"demoTransactionListener"。如果你想自定义ProducerGroup,可以使用TransactionListener的value属性进行设置。

消费消息(NotifyConsumer)

概述

NotifyConsumer是Pamirs Evnet中所有消费者的基本API,与NotifyProducer类似,它仅仅定义了消息消费的基本行为,例如消费者自身的属性,启动和停止,当前状态,以及消息的监听和订阅方法。它本身并不决定消息如何消费以及如何被订阅,而是根据具体的实现确定其功能。

目前仅实现了RocketMQEventPushConsumer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

在类上使用NotifyListener注解

使用示例

@Component
@NotifyListener(topic = "test", tags = "model")
public class DemoConsumerClass implements NotifyEventListener {

    @Override
    public void consumer(NotifyEvent event) {
        DemoModel data = (DemoModel) event.getBody();
        // do some things.
    }
}

图4-1-21-6 在类上使用NotifyListener注解

解释说明

  1. 实现NotifyEventListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 当前ConsumerGroup将使用这个类的BeanName,即"demoConsumerClass"。如果你想自定义ConsumerGroup,可以使用Component的value属性进行设置。

  4. 从body中将可以获取生产者发送的数据对象,并且已经做好了类型处理,可以直接使用。

  5. 使用原生的RocketMQ发送的消息,类型可能是无法识别的,你可以使用NotifyListener中提供的bodyClass来指定类型。

  6. topic和tags对应NotifyEvent中的topic和tags。

在方法上使用NotifyListener注解

使用示例

@Component
public class DemoConsumerMethod {

    @Bean
    @NotifyListener(topic = "test", tags = "model")
    public NotifyEventListener modelConsumer() {
        return event -> {
            DemoModel data = (DemoModel) event.getBody();
            // do some things.
        };
    }

    @Bean
    @NotifyListener(topic = "test", tags = "dto")
    public NotifyEventListener dtoConsumer() {
        return event -> {
            DemoDTO data = (DemoDTO) event.getBody();
            // do some things.
        };
    }
}

图4-1-21-7 在方法上使用NotifyListener注解

解释说明

  1. 使用Bean注解方式注册Spring Bean。

  2. 方法返回值为NotifyEventListener类型。

  3. 当前ConsumerGroup将使用对应方法生成的BeanName,即"modelConsumer"和"dtoConsumer"。如果你想自定义ConsumerGroup,可以使用Bean的value属性进行设置。

实战

约定:每个模块下的Topic和Tags必须定义常量池进行统一管理,主要是为了方便维护与管理,技术没有限制

常规消息(举例)

Step1 新建PetNotifyEnum

用PetNotifyEnum来管理模块的所有Topic和Tags的常量定义

package pro.shushi.pamirs.demo.api.mq;

public class PetNotifyEnum {

    public static class PetItemInventroy{

        public interface Topic{
            String PET_ITEM_INVENTROY_CHANGE ="petItemInventroyChange";
        }
        public interface Tag{
            String CREATE ="create";
            String UPDATE ="update";
            String DELETE ="delete";
        }

    }

}

图4-1-21-8 新建PetNotifyEnum

Step2 新建PetItemInventoryMqProducer

新建PetItemInventroy模型对应消息生产者,用于发送PetItemInventroy模型变动的相关消息

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

}

图4-1-21-9 新建PetItemInventoryMqProducer

Step3 新建PetItemInventoryMqConsumer

新建PetItemInventoryMqConsumer,订阅PetItemInventroy模型变动的相关消息,并进行相关处理

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-10 PetItemInventoryMqConsumer

Step4 修改PetItemInventroyAction

在修改PetItemInventroy完成之后,发送topic为【PetNotifyEnum.PetItemInventroyMq.Topic.PET_ITEM_INVENTROY_CHANGE】,tag为【PetNotifyEnum.PetItemInventroyMq.Tag.UPDATE】的消息出去

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-11 修改PetItemInventroyAction

Step5 重新看效果

  1. 编辑商品库存记录

image.png

图4-1-21-12 编辑商品库存记录

  1. 查看后端日志是否打印

image.png

图4-1-21-13 查看后端日志是否打印

顺序消息(举例)

Step1 修改PetItemInventoryMqProducer

增加一个顺序消息发送的方法,发送时根据商品库存的Id就分队列,相同队列顺序消费

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage(PetItemInventroy data,String tag) {

        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
                .setQueueSelector((queueSize, event) -> {
                    PetItemInventroy body = (PetItemInventroy) event.getBody();
                    Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                    return queue.intValue();
                }));

    }
}

图4-1-21-14 修改PetItemInventoryMqProducer

Step2 修改PetItemInventoryMqConsumer

增加@NotifyListener(consumerType= ConsumerType.ORDERLY)的注解

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.ConsumerType;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
//@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*",consumerType= ConsumerType.ORDERLY)
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-15 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的顺序消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
//        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-16 修改PetItemInventroyAction

Step4 重启看效果

同常规消息

事务消息(举例)

注:这种写法默认第一次是UNKNOWN,然后通过MQ回调二次确认,在消息时效性要求高的场景下是不符合要求的。对实效性要求高的,请见下面事务消息-优化(举例)章节。

Step1 新建NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息回滚");
        return NotifyTransactionState.ROLLBACK;
    }
}

图4-1-21-17 新建NotifyTransactionListener

Step2 修改PetItemInventoryMqProducer

增加发送事务性消息的方法,通过.setIsTransaction(true)来显示设置该消息是事务消息,通过setGroup("petItemInventoryMqTransactionListener")来匹配事务监听处理器

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data,String tag) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
    );
}

图4-1-21-18 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的事务性消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-19 修改PetItemInventroyAction

Step4 重启看效果

  1. 编辑商品库存记录

image.png

图4-1-21-20 编辑商品库存记录

  1. 查看后端日志是否打印

消息回滚,没有消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-21 MQ日志打印一

image.png

图4-1-21-22 MQ日志打印二

Step5 修改NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息提交");
        //正常业务情况下,这里要增加自己的逻辑判断,来确定返回状态值
        return NotifyTransactionState.COMMIT;
    }
}

图4-1-21-23 修改NotifyTransactionListener

Step6 重启看效果

消息提交,并看到消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-24 MQ日志打印三

image.png

图4-1-21-25 MQ日志打印三

事务消息-优化(举例)

Step1 修改PetItemInventoryMqProducer

修改PetItemInventoryMqProducer事务性消息发送方法sendTransactionMessage,增加NotifyExecuteLocalTransactionCallback入参。

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data, String tag, NotifyExecuteLocalTransactionCallback callback) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
            .setExecuteLocalTransactionCallback(callback)
    );
}

图4-1-21-26 修改PetItemInventoryMqProducer

Step2 修改PetItemInventroyAction

修改PetItemInventroyAction的update方法

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){
            @Override
            public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

                List<PetItemInventroy> inventroys = new ArrayList<>();
                inventroys.add(data);
                PamirsSession.directive().disableOptimisticLocker();
                try{
                    int i = data.updateBatch(inventroys);
                } finally {
                    PamirsSession.directive().enableOptimisticLocker();
                }
                //业务代码执行完毕,提交消息
                return NotifyTransactionState.COMMIT;
            }
        });
        return data;
    }
}

图4-1-21-27 修改PetItemInventroyAction

Step3 重启看效果

编辑商品库存记录,发现消息的处理几乎没有延迟

image.png

图4-1-21-28 示例效果几乎没有延迟

Step4 模拟业务成功,消息发送失败

修改PetItemInventroyAction的update方法

@Function.Advanced(type= FunctionTypeEnum.UPDATE)
@Function.fun(FunctionConstants.update)
@Function(openLevel = {FunctionOpenEnum.API})
public PetItemInventroy update(PetItemInventroy data){
    //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){

        @Override
        public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

            List<PetItemInventroy> inventroys = new ArrayList<>();
            inventroys.add(data);
            PamirsSession.directive().disableOptimisticLocker();
            try{
                int i = data.updateBatch(inventroys);
            } finally {
                PamirsSession.directive().enableOptimisticLocker();
            }
            //模拟业务成功,消息发送失败
            throw new RuntimeException();
            //业务代码执行完毕,提交消息
            //return NotifyTransactionState.COMMIT;
        }
    });
    return data;
}

图4-1-21-29 修改PetItemInventroyAction的update方法

Step5 重启看效果

这个效果跟第一种事务消息一样,消息提交延迟严重。

image.png

图4-1-21-30 MQ日志打印

image.png

图4-1-21-31 MQ日志打印

Step6 总结

优化后的事务消息,在正常情况下时效性非常高,异常情况下也能通过NotifyTransactionListener做保障

Oinone社区 作者:史, 昂原创文章,如若转载,请注明出处:https://doc.oinone.top/oio4/9296.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

(0)
史, 昂的头像史, 昂数式管理员
上一篇 2024年5月23日
下一篇 2024年5月23日

相关推荐

  • 4.2.3 框架之SPI机制

    SPI(Service Provider Interface)服务提供接口,是一套用来被第三方实现或者扩展的API,它可以用来启用框架扩展和替换组件,简单来说就是用来解耦,实现组件的自由插拔,这样我们就能在平台提供的基础组件外扩展新组件或覆盖平台组件。 目前定义SPI组件 ViewWidget 视图组件 FieldWidget 字段组件 ActionWidget 动作组件 表4-2-3-1 目前定义SPI组件 前提知识点 使用 TypeScript 装饰器(注解)装饰你的代码 1. 通过注解定义一种SPI接口(Interface) @SPI.Base<IViewFilterOptions, IView>('View', ['id', 'name', 'type', 'model', 'widget']) export abstract class ViewWidget<ViewData = any> extends DslNodeWidget { } 图4-2-3-1 代码示意 2. 通过注解注册提供View类型接口的一个或多个实现 @SPI.Base<IViewFilterOptions, IView>('View', ['id', 'name', 'type', 'model', 'widget']) export abstract class ViewWidget<ViewData = any> extends DslNodeWidget { } 图4-2-3-2 代码示意 3. 视图的xml内通过配置来调用已定义的一种SPI组件 <view widget="form" model="demo.shop"> <field name="id" /> </view> 图4-2-3-3 代码示意 图4-2-3-4 组件继承示意图 当有多个服务提供方时,按以下规则匹配出最符合条件的服务提供方 SPI匹配规则 SPI组件没有严格的按匹配选项属性限定,而是一个匹配规则 按widget最优先匹配,配置了widget等于是指定了需要调用哪个widget,此时其他属性直接忽略 按最大匹配原则(匹配到的属性越多优先级越高) 按后注册优先原则

    2024年5月23日
    1.2K00
  • 4.1.13 Action之校验

    在3.5.3【Action的类型】一文中有涉及到“ServerAction之校验”部分,本文介绍一个特殊的写法,当内置函数和表达式不够用的时候,怎么扩展。还是拿PetShopProxyAction举例,修改如下: package pro.shushi.pamirs.demo.core.action; ……引依赖类 @Model.model(PetShopProxy.MODEL_MODEL) @Component public class PetShopProxyAction extends DataStatusBehavior<PetShopProxy> { ……其他代码 // @Validation(ruleWithTips = { // @Validation.Rule(value = "!IS_BLANK(data.code)", error = "编码为必填项"), // @Validation.Rule(value = "LEN(data.shopName) < 128", error = "名称过长,不能超过128位"), // }) @Validation(check = "checkName") @Action(displayName = "启用") @Action.Advanced(rule="activeRecord.code !== undefined && !IS_BLANK(activeRecord.code)") public PetShopProxy dataStatusEnable(PetShopProxy data){ data = super.dataStatusEnable(data); data.updateById(); return data; } @Function public Boolean checkName(PetShopProxy data) { String field = "name"; String name = data.getShopName(); boolean success = true; if (StringUtils.isBlank(name)) { PamirsSession.getMessageHub() .msg(Message.init() .setLevel(InformationLevelEnum.ERROR) .setField(field) .setMessage("名称为必填项")); success = false; } if (name.length() > 128) { PamirsSession.getMessageHub() .msg(Message.init() .setLevel(InformationLevelEnum.ERROR) .setField(field) .setMessage("名称过长,不能超过128位")); success = false; } return success; } ……其他代码 } 图4-1-13-1 PetShopProxyAction扩展配置 注: check属性指定了校验函数名称,命名空间必须与服务器动作一致。 校验函数的入参必须与服务器动作一致 使用PamirsSession#getMessageHub方法可通知前端错误的属性及需要展示的提示信息,允许多个。

    Oinone 7天入门到精通 2024年5月23日
    1.1K00
  • 7.4 Oinone的低无一体

    基础介绍 前面我们学习了基于低代码开发平台进行快速开发,以及通过oinone的设计器进行零代码开发两种模式。当然低无一体不是简单地说两种模式还指:低无两种模式可以融合。 在做核心产品的时候以低代码开发为主,以无代码为辅助。见低代码开发的基础入门篇中设计器的结合一文 在做实施或临时性需求则是以无代码为主,以低代码为辅助 本文主要介绍第二种模式,它是www.oinone.top官网在SaaS模式下的专有特性。满足客户安装标品后通过设计器进行适应性修改后,但对于一些特殊场景还是需要通过代码进行完善或开发 在该模式下,我们提供了jar模式和代码托管两种模式,客户只要选择需要进行代码开发的模块,点击生产SDK,下载扩展工程模版,按Oinone低代码开发平台规范进行研发,后上传扩展工程即可。 操作手册 低无一体这个模块是连接无代码设计器的桥梁,可以为一个模块或应用设计低代码的逻辑,可以在界面设计器或流程设计器中使用低代码的逻辑。 1.选择模块 首先需要在下拉单选中选择需要低代码的模块或应用。 下拉选中只展示在「应用中心」中已安装的模块或应用,可前往「应用中心」安装后继续低代码操作。 选择模块中不展示系统的基础模块或应用,因为这些模块或应用无法自定义模型。 2.模块信息 模块信息展示的是选择模块的基础信息:模块名称、模块编码、模块作者、模块版本、包的前缀、工程模板下载地址,下载地址仅在上传jar包模式时候用到。 3.低无一体操作 低无一体支持了两种使用模式:上传jar包模式、源码托管模式。 上传控制工程或创建研发分支动作完成会生成一条数据,可以对单条数据进行部署、卸载、修改、删除。 3.1 上传jar包模式 在这个模式下,需要做四步动作。 生成SDK,点击按钮之后,会把模块的当前模型状态打成一个SDK包,SDK最新生成时间更新。当模型变更但未生成SDK时,使用低无一体就会出错,请重新生成SDK并修改扩展工程。生成SDK通常需要1分钟左右,若第一次使用低无一体模块,可能需要更长时间,请耐心等待。 下载扩展工程模板,点击按钮之后,会将SDK包和工程模板生成一个下载链接,复制模块信息中的卸载地址打开即可下载。 技术人员在工程模板的基础上写低代码逻辑。 上传扩展工程,点击按钮展开弹窗,在弹窗中设置标签、备注,并将最终的jar包上传,完成上传之后表格中就会新增一条数据。 上传jar包模式下,模板工程中代码需要注意的点参考下图: 3.2 源码托管模式 在这个模式下,需要做三步动作。 生成SDK,点击按钮之后,会把模块的当前模型状态打成一个SDK包,SDK最新生成时间更新。当模型变更但未生成SDK时,使用低无一体就会出错,请重新生成SDK并修改扩展工程。生成SDK通常需要1分钟左右,若第一次使用低无一体模块,可能需要更长时间,请耐心等待。 创建研发分支,点击按钮展开弹窗。首次创建时需要设置git账号名称、git账号邮箱来创建一个账号,另外在弹窗中设置分支名称、标签、备注,完成创建后表格中就会新增一条数据。 通过表格中的Gitlab地址,技术人员写低代码逻辑。 3.3 行内操作 部署:工程状态为未部署、部署失败、已卸载时展示行内的部署按钮,点击之后进行部署,工程状态变为部署中。部署过程大致需要5-10分钟,请耐心等待。部署完成之后,会生成一个新的模块:“原模块名称”扩展工程。 卸载:工程状态为已部署时展示行内的卸载按钮,点击之后会卸载这个已部署的工程,工程状态变为已卸载。同一模块只能有一个已部署的工程(与选择的模式无关),若需要使用新的工程请先卸载已部署的工程。 修改:行内操作修改按钮始终展示,只允许修改标签、备注。 删除:工程状态为未部署、部署失败、已卸载时展示行内的删除按钮,点击之后删除这一条工程记录。 3.4 部署效果 低无一体部署成功之后,可以进入对应模块的模型页面中使用提交动作来使用低代码逻辑,也可以在流程设计器中的引用逻辑节点中使用低代码逻辑。

    2024年5月23日
    2.1K00
  • 4.1.2 模块之启动指令

    针对不同启动指令的组合可以满足不同场景需求,下面列举了几个常规组合方式,小伙伴们务必把这几种模式都尝试一遍,会更有体感 本节为小伙伴讲解oinone模块的几种启动方式,它是为能灵活地应对企业市场的不同场景需求,为op(本地化部署)、saas和研发提供个性化支撑。也为oinone独特性之单体与分布式的灵活切换提供基础支撑 一、部署参数 参数 名称 默认值 说明 -Plifecycle 生命周期部署指令 RELOAD 可选项:无/INSTALL/PACKAGE/RELOAD/DDL安装-install为AUTO;upgrade为FORCE打包-install为AUTO;upgrade为FORCE;profile为PACKAGE重启-install、upgrade、profile为READONLY打印变更DDL-install为AUTO;upgrade为FORCE;profile为DDL 表4-1-2-1 部署参数 如果在启动命令中配置了部署参数,可不再设置服务参数和可选项参数。下图为在启动命令中添加部署参数的示例。 图4-1-2-1 在启动命令中添加部署参数的示例 二、使用场景 针对不同启动指令的组合可以满足不同场景需求,下面列举了几个常规组合方式,小伙伴们务必把这几种模式都尝试一遍,会更有体感。 场景一:DDL(1)+RELOAD(N)应对专有DBA 因为很多公司数据库是由专门的DBA来管理的,不允许应用直接变更数据库相关配置、表结构、初始化数据。而oinone是基于元数据驱动的,任何模型、行为的变化都会自动转化成对物理存储的改变与元数据变化。 oinone为了适用企业op场景,特别增加了DDL模式。把发布上线分为两个步骤。 一:用DDL模式把涉及到数据库的变更与元数据初始化的脚本进行输出,交由客户公司DBA审批,并执行 二:用RELOAD模式,进行正常的应用重启工作,不进行安装、升级、以及数据库物理变革等操作。 #应用启动关闭自动DDL配置 pamirs.boot.profile: CUSTOMIZE pamirs.boot.options.rebuildTable: false pamirs.persistence.global.auto-create-database: false pamirs.persistence.global.auto-create-table: false 图4-1-2-2 应用启动关闭自动DDL模式 场景二:PACKAGE(1)+RELOAD(N)应对提升多机器实例效率 在机器规模相对大的场景中我们会碰到以下问题: 元数据差量计算、数据库变更、元数据变化保存都非常费时,如果每台机器都来一遍是非常费时费力的 分布式下多机器如果并发进行INSTALL,会导致数据库修改表结构、元数据变化保存锁死 所以我们可以选择一台机器用PACKAGE,其他机器采用RELOAD模式,做到合理规避问题,提升应用发布效率 场景三:INSTALL应对开发模式 研发在本地开发模式下INSTALL是最有效率的,把所需依赖模块一把启动和调试。 上线如果要用INSTALL需要注意,要逐台进行。当然也可以改进成INSTALL(1)+RELOAD(N)模式 三、启动命令解读 查看启动命令 可以在启动日志中查看当前所用启动命令。 图4-1-2-3 在启动日志中查看当前所用启动命令 生命周期管理-Plifecycle 除了通过启动YAML中pamirs.boot属性来设置启动参数,你还可以在应用启动命令中使用-Plifecycle参数来快捷控制模块生命周期的管理方式。该参数的可选项为RELOAD、INSTALL、CUSTOM_INSTALL、PACKAGE、DDL。 java -jar <your jar name>.jar -Plifecycle=RELOAD 启动命令优先级高于YAML中pamirs.boot属性中的install、upgrade和profile属性。如果不使用-Plifecycle参数,则使用YAML中pamirs.boot属性中的install、upgrade和profile属性配置。若YAML中未配置,则采用默认值。 启动配置项 默认值 RELOAD INSTALL CUSTOM_INSTALL PACKAGE DDL install AUTO READONLY AUTO AUTO AUTO AUTO upgrade AUTO READONLY FORCE FORCE FORCE FORCE profile CUSTOMIZE READONLY AUTO CUSTOMIZE PACKAGE DDL 表4-1-2-2 Plifecycle可选项与启动项对应表 profile属性请参考4.1.1【服务启动可选项】一文。只有pamirs.boot.profile=CUSTOMIZE时,在pamirs.boot.options中自定义的可选项才生效。 自动建表-PbuildTable java -jar <your jar name>.jar -PbuildTable=NEVER PbuildTable参数用于设置自动构建表结构的方式。如果不使用该参数,则options属性的默认值请参考4.1.1【服务启动可选项】一文。-PbuildTable参数可选项为: NEVER – 不自动构建表结构,会将pamirs.boot.options中的diffTable和rebuildTable属性设置为false EXTEND – 增量构建表结构,会将pamirs.boot.options中的diffTable属性设置为false,rebuildTable属性设置为true DIFF – 差量构建表结构,会将pamirs.boot.options中的diffTable和rebuildTable属性设置为true 模块在线 -PmoduleOnline java -jar <your jar name>.jar -PmoduleOnline=CHECK PmoduleOnline参数用于设置模块在线的方式。如果不使用该参数,则profile属性的默认值请参考4.1.1【服务启动可选项】一文。-PmoduleOnline参数可选项为: NEVER – 不读取存储在数据库中的模块信息,会将pamirs.boot.options中的reloadModule和checkModule属性设置为false READ – 读取存储在数据库中的模块信息,会将pamirs.boot.options中的checkModule属性设置为false,reloadModule属性设置为true CHECK – 读取存储在数据库中的模块信息并校验依赖模块是否已安装,会将pamirs.boot.options中的reloadModule和checkModule属性设置为true 元数据在线-PmetaOnline java -jar <your jar name>.jar -PmetaOnline=MODULE PmetaOnline参数用于设置元数据在线的方式,如果不使用该参数,则profile属性的默认值请参考4.1.1【服务启动可选项】一文。-PmetaOnline参数可选项为: NEVER – 不持久化元数据,会将pamirs.boot.options中的updateModule、reloadMeta和updateMeta属性设置为false MODULE – 只注册模块信息,会将pamirs.boot.options中的updateModule属性设置为true,reloadMeta和updateMeta属性设置为false ALL – 注册持久化所有元数据,会将pamirs.boot.options中的updateModule、reloadMeta和updateMeta属性设置为true 开放远程服务-PenableRpc PenableRpc参数用于设置是否开启远程服务。如果不使用该参数,则profile属性的默认值请参考4.1.1【服务启动可选项】一文。-PenableRpc参数可选项为true和false。该参数会将参数值设置到pamirs.boot.options中的publishService属性。 开启API服务-PopenApi PopenApi参数用于设置是否开启HTTP API服务。如果不使用该参数,则profile属性的默认值请参考4.1.1【服务启动可选项】一文。-PopenApi参数可选项为true和false。该参数会将参数值设置到pamirs.boot.options中的rebuildHttpApi属性。 开启字段校验-PcheckField PcheckField参数用于设置是否开启字段校验。-PcheckField参数可选项为true和false。由于通常应用的字段数量非常多,会延长系统启动时长,所以默认不会开启字段校验。 启用数据初始化服务-PinitData PinitData参数用于设置是否开启数据初始化服务。如果不使用该参数,则profile属性的默认值请参考4.1.1【服务启动可选项】一文。-PinitData参数可选项为true和false。该参数会将参数值设置到pamirs.boot.options中的updateData属性。 四、不使用自动构建数据库表功能 Oinone LCDP默认提供框架的所有服务,所以会自动构建数据库表。如果不需要使用Oinone的存储构建服务,可以设置YAML文件中关于自动建表的配置。这样就不会动态构建数据库表,你可以手动搭建数据库表。 通过配置启动YAML中pamirs.boot.options.rebuildTable为false彻底关闭自动建表功能。 pamirs: boot: options: rebuildTable: false 图4-1-2-4 不使用自动构建数据库表功能 也可以按需配置启动YAML中pamirs.persistence配置来关闭部分数据源的自动建表功能。persistence配置既可以针对全局也可以分数据源进行配置。 pamirs: persistence: global: # 是否自动创建数据库的全局配置,默认为true autoCreateDatabase: true # 是否自动创建数据表的全局配置,默认为true autoCreateTable: true <your ds key>: # 是否自动创建数据库的数据源配置,默认为true autoCreateDatabase: true # 是否自动创建数据表的数据源配置,默认为true…

    2024年5月23日
    92900
  • 4.1.3 模块之生命周期

    了解oinone的启动生命周期过程,对于理解oinone或者开发高级功能都有非常大的帮助 一、生命周期大图 图4-1-3-1 生命周期大图 二、平台扩展说明 平台节点通过SPI机制进行扩展,本书籍暂不展开,更多详情请见可关注数式Oinone公众号中的Oinone内核揭秘系列文章。 三、业务扩展说明 接口 说明 使用场景 LifecycleBeginAllInit 系统进入生命周期前置逻辑注:不能有任何数据库操作 系统级别的信息收集上报 LifecycleCompletedAllInit 系统生命周期完结后置逻辑 系统级别的信息收集上报、生命周期过程中的数据或上下文清理 LifecycleBeginInit 模块进入生命周期前置逻辑注:不能有任何数据库操作 预留,能做的事情比较少 LifecycleCompletedInit 模块生命周期完结后置逻辑 本模块需等待其他模块初始化完毕以后进行初始化的逻辑。比如:1.集成模块的初始化2.权限缓存的初始化…… MetaDataEditor 元数据编辑注:不能有任何数据库操作 这个在第3章Oinone的基础入门中已经多次提及,核心场景是向系统主动注册如Action、Menu、View等元数据 ExtendBuildInit 系统构建前置处理逻辑 预留,能做的事情比较少,做一些跟模块无关的事情 ExtendAfterBuilderInit 系统构建后置处理逻辑 预留,能做的事情比较少,做一些跟模块无关的事情 InstallDataInit 模块在初次安装时的初始化逻辑 根据模块启动指令来进行选择执行逻辑,一般用于初始化业务数据。应用启动参数与指令转化逻辑详见4.1.2【模块之启动指令】一文 UpgradeDataInit 模块在升级时的初始化逻辑注:根据启动指令来执行,是否执行一次业务自己控制 ReloadDataInit 模块在重启时的初始化逻辑注:根据启动指令来执行,是否执行一次业务自己控制 表4-1-3-1 业务拓展说明 四、常用生命周期举例 Install\Upgrade\Reload的业务初始化(举例) Step1 新建DemoModuleBizInit DemoModuleBizInit实现InstallDataInit, UpgradeDataInit, ReloadDataInit a. InstallDataInit 对应 init b. UpgradeDataInit 对应 upgrade c. ReloadDataInit 对应 reload modules方法代表改初始化类与哪些模块匹配,以模块编码为准 priority 执行优先级 package pro.shushi.pamirs.demo.core.init; import org.springframework.stereotype.Component; import pro.shushi.pamirs.boot.common.api.command.AppLifecycleCommand; import pro.shushi.pamirs.boot.common.api.init.InstallDataInit; import pro.shushi.pamirs.boot.common.api.init.ReloadDataInit; import pro.shushi.pamirs.boot.common.api.init.UpgradeDataInit; import pro.shushi.pamirs.demo.api.DemoModule; import pro.shushi.pamirs.demo.api.enumeration.DemoExpEnumerate; import pro.shushi.pamirs.meta.common.exception.PamirsException; import java.util.Collections; import java.util.List; @Component public class DemoModuleBizInit implements InstallDataInit, UpgradeDataInit, ReloadDataInit { @Override public boolean init(AppLifecycleCommand command, String version) { throw PamirsException.construct(DemoExpEnumerate.SYSTEM_ERROR).appendMsg("DemoModuleBizInit: install").errThrow(); //安装指令执行逻辑 // return Boolean.TRUE; } @Override public boolean reload(AppLifecycleCommand command, String version) { throw PamirsException.construct(DemoExpEnumerate.SYSTEM_ERROR).appendMsg("DemoModuleBizInit: reload").errThrow(); //重启指令执行逻辑 // return Boolean.TRUE; } @Override public boolean upgrade(AppLifecycleCommand command, String version, String existVersion) { throw PamirsException.construct(DemoExpEnumerate.SYSTEM_ERROR).appendMsg("DemoModuleBizInit: upgrade").errThrow(); //升级指令执行逻辑 // return Boolean.TRUE; } @Override public List<String> modules() { return Collections.singletonList(DemoModule.MODULE_MODULE); } @Override public int priority() { return 0; } } 图4-1-3-2 新建DemoModuleBizInit Step2 重启看效果 启动指令为-Plifecycle=INSTALL,转化指令为 install为AUTO;upgrade为FORCE 因为DemoModule我们已经执行过好多次了,所以会进入upgrade逻辑。系统重启的效果跟我们预期的结果一致,确实执行了DemoModuleBizInit的upgrade方法 图4-1-3-3 系统重启执行DemoModuleBizInit的upgrade方法 MetaDataEditor 回顾使用情况 最早在3.3.2【模型的类型】一文中介绍“传输模型”时,初始化ViewAction窗口动作时使用到,这里不过多介绍。下面主要介绍下InitializationUtil的工具类包含方法。 注:模块上报元数据只能通过注解或者实现MetaDataEditor接口并使用InitializationUtil工具来进行,更建议用注解方式

    2024年5月23日
    1.2K00

Leave a Reply

登录后才能评论