4.1.21 框架之分布式消息

消息中间件是在分布式开发中常见的一种技术手段,用于模块间的解耦、异步处理、数据最终一致等场景。

一、介绍

oinone对开源的RocketMQ进行了封装,是平台提供的一种较为简单的使用方式,并非是对RocketMQ进行的功能扩展。同时也伴随着两个非常至关重要的目的:

  1. 适配不同企业对RocketMQ的不同版本选择,不至于改上层业务代码。目前已经适配RocketMQ的开源版本和阿里云版本。 下个版本会对API进行升级支持不同类型MQ,以适配不同企业对MQ的不同要求,应对一些企业客户已经对MQ进行技术选择

  2. 对协议头进行扩展:如多租户的封装,saas模式中为了共用MQ基础资源,需要在消息头中加入必要租户信息。

二、使用准备

demo工程默认已经依赖消息,这里只是做介绍无需大家额外操作,大家可以用maven依赖树命令查看引用关系。

依赖包

增加对pamirs-connectors-event的依赖


<dependency>
    <groupId>pro.shushi.pamirs.framework</groupId>
    <artifactId>pamirs-connectors-event</artifactId>
</dependency>

图4-1-21-1 分布式消息的依赖包

相关功能引入

增加模型、触发器都依赖MQ

<!-- 增强模型 -->
<!-- 增强模型 -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-channel</artifactId>
</dependency>
<!-- 触发器 api -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-api</artifactId>
</dependency>
<!-- 触发器 core -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-core</artifactId>
</dependency>

图4-1-21-2 增加模型、触发器都依赖MQ

yml配置文件参考

详见4.1.1【模块之yml文件结构详解】的“pamirs.event”部分。

三、使用说明

发送消息(NotifyProducer)

概述

NotifyProducer是Pamirs Event中所有生产者的基本API,它仅仅定义了消息发送的基本行为,例如生产者自身的属性,启动和停止,当前状态,以及消息发送方法。它本身并不决定消息如何发送,而是根据具体的实现确定其功能。

目前仅实现了RocketMQProducer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

Notify注解方式

使用示例

@Component
public class DemoProducer {

    @Notify(topic = "test", tags = "model")
    public DemoModel sendModel() {
        return new DemoModel();
    }

    @Notify(topic = "test", tags = "dto")
    public DemoDTO sendDTO() {
        return new DemoDTO();
    }
}

图4-1-21-3 Notify注解方式使用示例

解释说明

  1. 使用Component注解方式注册Spring Bean。

  2. Notify注解指定topic和tags。

  3. topic和tags对应NotifyEvent中的topic和tags。

RocketMQProducer方法调用

使用示例

@Component
public class SendRocketMQMessage {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel()));
        rocketMQProducer.send(new NotifyEvent("test", "dto", new DemoDTO()));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage() {
        DemoModel data = new DemoModel();
        data.setAge(10);
        rocketMQProducer.send(new NotifyEvent("test", "model", data)
                .setQueueSelector((queueSize, event) -> {
                    DemoModel body = (DemoModel) event.getBody();
                    return body.getAge() % queueSize;
                }));
    }

    /**
     * 发送事务消息
     */
    public void sendTransactionMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel())
                .setIsTransaction(true)
                .setGroup("demoTransactionListener"));
    }
}

图4-1-21-4 RocketMQProducer方法调用

解释说明

  1. 使用Component注解方式注册Spring Bean。
  2. 使用Autowired注解方式装配RocketMQProducer实例。
  3. 使用send方法发送指定消息。
  4. 在【发送普通消息】方法中,该实现效果与Notify注解方式所示完全一致。
  5. 在【发送有序消息】方法中,队列选择器是必须配置的,queueSize属性为MQ队列的总数量,在broker中配置。有序消息必须配合有序消费者才能达到有序消费的目的,否则还是无序的常规消息,消费者需要配置@NotifyListener(consumerType=ConsumerType.ORDERLY )注解。
  6. 在【发送事务消息】方法中指定的group为ProducerGroup,事务消息是通过不同的Producer发出的,事务消息监听请参考TransactionListener注解的相关使用方法。(示例中的group与下文介绍中的一致)

使用TransactionListener开启事务消息监听

使用示例

@Component
@TransactionListener
public class DemoTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        return NotifyTransactionState.COMMIT;
    }

}

图4-1-21-5 使用TransactionListener开启事务消息监听

解释说明

  1. 实现NotifyTransactionListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 添加TransactionListener注解注册事务监听,生成对应的生产者。

  4. 当前ProducerGroup将使用这个类的BeanName,即"demoTransactionListener"。如果你想自定义ProducerGroup,可以使用TransactionListener的value属性进行设置。

消费消息(NotifyConsumer)

概述

NotifyConsumer是Pamirs Evnet中所有消费者的基本API,与NotifyProducer类似,它仅仅定义了消息消费的基本行为,例如消费者自身的属性,启动和停止,当前状态,以及消息的监听和订阅方法。它本身并不决定消息如何消费以及如何被订阅,而是根据具体的实现确定其功能。

目前仅实现了RocketMQEventPushConsumer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

在类上使用NotifyListener注解

使用示例

@Component
@NotifyListener(topic = "test", tags = "model")
public class DemoConsumerClass implements NotifyEventListener {

    @Override
    public void consumer(NotifyEvent event) {
        DemoModel data = (DemoModel) event.getBody();
        // do some things.
    }
}

图4-1-21-6 在类上使用NotifyListener注解

解释说明

  1. 实现NotifyEventListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 当前ConsumerGroup将使用这个类的BeanName,即"demoConsumerClass"。如果你想自定义ConsumerGroup,可以使用Component的value属性进行设置。

  4. 从body中将可以获取生产者发送的数据对象,并且已经做好了类型处理,可以直接使用。

  5. 使用原生的RocketMQ发送的消息,类型可能是无法识别的,你可以使用NotifyListener中提供的bodyClass来指定类型。

  6. topic和tags对应NotifyEvent中的topic和tags。

在方法上使用NotifyListener注解

使用示例

@Component
public class DemoConsumerMethod {

    @Bean
    @NotifyListener(topic = "test", tags = "model")
    public NotifyEventListener modelConsumer() {
        return event -> {
            DemoModel data = (DemoModel) event.getBody();
            // do some things.
        };
    }

    @Bean
    @NotifyListener(topic = "test", tags = "dto")
    public NotifyEventListener dtoConsumer() {
        return event -> {
            DemoDTO data = (DemoDTO) event.getBody();
            // do some things.
        };
    }
}

图4-1-21-7 在方法上使用NotifyListener注解

解释说明

  1. 使用Bean注解方式注册Spring Bean。

  2. 方法返回值为NotifyEventListener类型。

  3. 当前ConsumerGroup将使用对应方法生成的BeanName,即"modelConsumer"和"dtoConsumer"。如果你想自定义ConsumerGroup,可以使用Bean的value属性进行设置。

实战

约定:每个模块下的Topic和Tags必须定义常量池进行统一管理,主要是为了方便维护与管理,技术没有限制

常规消息(举例)

Step1 新建PetNotifyEnum

用PetNotifyEnum来管理模块的所有Topic和Tags的常量定义

package pro.shushi.pamirs.demo.api.mq;

public class PetNotifyEnum {

    public static class PetItemInventroy{

        public interface Topic{
            String PET_ITEM_INVENTROY_CHANGE ="petItemInventroyChange";
        }
        public interface Tag{
            String CREATE ="create";
            String UPDATE ="update";
            String DELETE ="delete";
        }

    }

}

图4-1-21-8 新建PetNotifyEnum

Step2 新建PetItemInventoryMqProducer

新建PetItemInventroy模型对应消息生产者,用于发送PetItemInventroy模型变动的相关消息

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

}

图4-1-21-9 新建PetItemInventoryMqProducer

Step3 新建PetItemInventoryMqConsumer

新建PetItemInventoryMqConsumer,订阅PetItemInventroy模型变动的相关消息,并进行相关处理

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-10 PetItemInventoryMqConsumer

Step4 修改PetItemInventroyAction

在修改PetItemInventroy完成之后,发送topic为【PetNotifyEnum.PetItemInventroyMq.Topic.PET_ITEM_INVENTROY_CHANGE】,tag为【PetNotifyEnum.PetItemInventroyMq.Tag.UPDATE】的消息出去

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-11 修改PetItemInventroyAction

Step5 重新看效果

  1. 编辑商品库存记录

image.png

图4-1-21-12 编辑商品库存记录

  1. 查看后端日志是否打印

image.png

图4-1-21-13 查看后端日志是否打印

顺序消息(举例)

Step1 修改PetItemInventoryMqProducer

增加一个顺序消息发送的方法,发送时根据商品库存的Id就分队列,相同队列顺序消费

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage(PetItemInventroy data,String tag) {

        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
                .setQueueSelector((queueSize, event) -> {
                    PetItemInventroy body = (PetItemInventroy) event.getBody();
                    Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                    return queue.intValue();
                }));

    }
}

图4-1-21-14 修改PetItemInventoryMqProducer

Step2 修改PetItemInventoryMqConsumer

增加@NotifyListener(consumerType= ConsumerType.ORDERLY)的注解

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.ConsumerType;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
//@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*",consumerType= ConsumerType.ORDERLY)
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-15 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的顺序消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
//        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-16 修改PetItemInventroyAction

Step4 重启看效果

同常规消息

事务消息(举例)

注:这种写法默认第一次是UNKNOWN,然后通过MQ回调二次确认,在消息时效性要求高的场景下是不符合要求的。对实效性要求高的,请见下面事务消息-优化(举例)章节。

Step1 新建NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息回滚");
        return NotifyTransactionState.ROLLBACK;
    }
}

图4-1-21-17 新建NotifyTransactionListener

Step2 修改PetItemInventoryMqProducer

增加发送事务性消息的方法,通过.setIsTransaction(true)来显示设置该消息是事务消息,通过setGroup("petItemInventoryMqTransactionListener")来匹配事务监听处理器

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data,String tag) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
    );
}

图4-1-21-18 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的事务性消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-19 修改PetItemInventroyAction

Step4 重启看效果

  1. 编辑商品库存记录

image.png

图4-1-21-20 编辑商品库存记录

  1. 查看后端日志是否打印

消息回滚,没有消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-21 MQ日志打印一

image.png

图4-1-21-22 MQ日志打印二

Step5 修改NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息提交");
        //正常业务情况下,这里要增加自己的逻辑判断,来确定返回状态值
        return NotifyTransactionState.COMMIT;
    }
}

图4-1-21-23 修改NotifyTransactionListener

Step6 重启看效果

消息提交,并看到消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-24 MQ日志打印三

image.png

图4-1-21-25 MQ日志打印三

事务消息-优化(举例)

Step1 修改PetItemInventoryMqProducer

修改PetItemInventoryMqProducer事务性消息发送方法sendTransactionMessage,增加NotifyExecuteLocalTransactionCallback入参。

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data, String tag, NotifyExecuteLocalTransactionCallback callback) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
            .setExecuteLocalTransactionCallback(callback)
    );
}

图4-1-21-26 修改PetItemInventoryMqProducer

Step2 修改PetItemInventroyAction

修改PetItemInventroyAction的update方法

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){
            @Override
            public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

                List<PetItemInventroy> inventroys = new ArrayList<>();
                inventroys.add(data);
                PamirsSession.directive().disableOptimisticLocker();
                try{
                    int i = data.updateBatch(inventroys);
                } finally {
                    PamirsSession.directive().enableOptimisticLocker();
                }
                //业务代码执行完毕,提交消息
                return NotifyTransactionState.COMMIT;
            }
        });
        return data;
    }
}

图4-1-21-27 修改PetItemInventroyAction

Step3 重启看效果

编辑商品库存记录,发现消息的处理几乎没有延迟

image.png

图4-1-21-28 示例效果几乎没有延迟

Step4 模拟业务成功,消息发送失败

修改PetItemInventroyAction的update方法

@Function.Advanced(type= FunctionTypeEnum.UPDATE)
@Function.fun(FunctionConstants.update)
@Function(openLevel = {FunctionOpenEnum.API})
public PetItemInventroy update(PetItemInventroy data){
    //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){

        @Override
        public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

            List<PetItemInventroy> inventroys = new ArrayList<>();
            inventroys.add(data);
            PamirsSession.directive().disableOptimisticLocker();
            try{
                int i = data.updateBatch(inventroys);
            } finally {
                PamirsSession.directive().enableOptimisticLocker();
            }
            //模拟业务成功,消息发送失败
            throw new RuntimeException();
            //业务代码执行完毕,提交消息
            //return NotifyTransactionState.COMMIT;
        }
    });
    return data;
}

图4-1-21-29 修改PetItemInventroyAction的update方法

Step5 重启看效果

这个效果跟第一种事务消息一样,消息提交延迟严重。

image.png

图4-1-21-30 MQ日志打印

image.png

图4-1-21-31 MQ日志打印

Step6 总结

优化后的事务消息,在正常情况下时效性非常高,异常情况下也能通过NotifyTransactionListener做保障

Oinone社区 作者:史, 昂原创文章,如若转载,请注明出处:https://doc.oinone.top/oio4/9296.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

Like (0)
史, 昂's avatar史, 昂数式管理员
Previous 2024年5月23日
Next 2024年5月23日

相关推荐

  • 3.3.8 字段类型之基础与复合

    模型字段是描述实体的特征属性,本文介绍重点介绍字段的基础类型与复合类型 使用@Field注解来描述模型的字段。如果未配置字段类型,系统会根据Java代码的字段声明类型来自动获取业务类型。建议配置displayName属性来描述字段在前端的显示名称。可以使用defaultValue配置字段的默认值。 一、安装与更新 使用@Field.field来配置字段的不可变更编码。字段一旦安装,无法在对该字段编码值进行修改,之后的字段配置更新会依据该编码进行查找并更新;如果仍然修改该注解的配置值,则系统会将该字段识别为新字段,存储模型会创建新的数据库表字段,而原字段将会rename为废弃字段。 二、字段类型 类型系统由基本类型、复合(组件)类型、引用类型和关系类型四种类型系统构成。通过类型系统描述应用程序、数据库和前端视觉视图如何进行交互,数据及数据间关系如何处理的协议。其中引用类型和关系类型介绍详见3.3.9【字段类型之关系与引用】一文,字段命名规范参见3.3.1【构建第一个Model】一文,这里不再赘述。 基本类型 业务类型 Java类型 数据库类型 规则说明 BINARY ByteByte[] TINYINTBLOB 二进制类型,不推荐使用 INTEGER ShortIntegerLongBigInteger smallintintbigintdecimal(size,0) 整数, 包括整数(10-11位有效数字)、长整数(19-20位有效数字)和大整数(超过19位)。【数据库规则】:默认使用int;如果size小于6则使用smallint;如果size超过6则使用int;如果size超过10位数字,即大于11(包含符号位),则使用长整数bigint;如果size超过19位数字,即大于20(包含符号位),则使用大数decimal。若未配置size,则按Java类型推测。【前端交互规则】:整数使用Number类型,长整数和大整数前后端协议使用字符串类型。 FLOAT FloatDoubleBigDecimal float(M,D)double(M,D)decimal(M,D) 浮点数,?包括单精度浮点数(7-8位有效数字)、双精度浮点数(15-16位有效数字)和大数(超过15位)。【数据库规则】:默认使用单精度浮点数float;如果size超过7位数字,即大于等于8,则使用双精度浮点数double;如果size超过15位数字,即大于等于16,则使用大数decimal。若未配置size,则按Java类型推测。【前端交互规则】:单精度浮点数float和双精度浮点数double使用Number类型(因为都使用IEEE754协议64位进行存储),大数前后端协议使用字符串类型。 BOOLEAN Boolean tinyint(1) 布尔类型,值为1,true(真)或0,false(假) ENUM Enum 与数据字典指定基本类型一致 【前端交互规则】:可选项从ModelField的options字段获取,options字段值为字段指定数据字典子集的JSON序列化字符串。前后端传递的是可选项的name,数据库存储使用可选项的value。multi属性为true,则使用多选控件;multi属性为false,则使用单元控件 STRING String varchar(size) 字符串,size为长度限制默认值参考,前端可以view中覆盖该配置 TEXT String text 多行文本,编辑态组件为多行文本框,长度限制为配置项size值 HTML String text 富文本编辑器 DATETIME java.util.Datejava.sql.Timestamp datetime(fraction)timestamp(fraction) 日期时间类型【数据库规则】:日期和时间的组合,时间格式为?YYYY-MM-DD HH:MM:SS[.fraction],默认精确到秒,在默认的秒精确度上,可以带小数,最多带6位小数,即可以精确到?microseconds (6 digits) precision。可以通过设置fraction来设置精确小数位数,最终存储在字段的decimal属性上。【前端交互规则】:前端默认使用日期时间控件,根据日期时间类型格式化格式format格式化日期时间 YEAR java.util.Date year 年份类型日期类型【数据库规则】:默认“YYYY”格式表示的日期值【前端交互规则】:前端默认使用年份控件,根据日期类型格式化格式format格式化日期 DATE java.util.Datejava.sql.Date datedate 日期类型【数据库规则】:默认“YYYY-MM-DD”格式表示的日期值【前端交互规则】:前端默认使用日期控件,根据日期类型格式化格式format格式化日期 TIME java.util.Datejava.sql.Time time(fraction)time(fraction) 时间类型【数据库规则】:默认“HH:MM:SS”格式表示的时间值【前端交互规则】:前端默认使用时间控件,根据日期类型格式化格式format格式化日期 表3-3-8-1 字段基本类型 复合类型 业务类型 Java类型 数据库类型 规则说明 MONEY BigDecimal decimal(M,D) 金额,前端使用金额控件,可以使用currency设置币种字段 表3-3-8-2 字段复合类型 不可变更字段 使用immutable属性来描述该字段前后端都无法进行更新操作,系统会忽略不可变更字段的更新操作。 自动生成编码的字段 详见3.3.5【模型编码生成器】一文。 字段的序列化与反序列化 使用@Field注解的serialize属性来配置非字符串类型属性的序列化与反序列化方式,最终会以序列化后的字符串持久化到存储中。 详见3.3.7【字段之序列化方式】一文 前端默认配置 可以使用@Field注解中的以下属性来配置前端的默认视觉与交互规则,也可以在前端设置覆盖以下配置。 @Field(required),是否必填 @Field(invisible),是否不可见 @Field(priority),字段优先级,列表的列使用该属性进行排序 更多前端默认视图配置详见:3.5.4【Ux注解详解】一文,如:readonly是否只读等。 举例 回顾我们前面学习例子 现有PetShop代码如下 package pro.shushi.pamirs.demo.api.model; import pro.shushi.pamirs.core.common.enmu.DataStatusEnum; import pro.shushi.pamirs.demo.api.enumeration.PetShopOptionEnum; import pro.shushi.pamirs.meta.annotation.Field; import pro.shushi.pamirs.meta.annotation.Model; import java.sql.Time; import java.util.List; @Model.model(PetShop.MODEL_MODEL) @Model(displayName = 宠物店铺,summary=宠物店铺,labelFields ={shopName} ) @Model.Code(sequence = DATE_ORDERLY_SEQ,prefix = P,size=6,step=1,initial = 10000,format = yyyyMMdd) public class PetShop extends AbstractDemoIdModel { public static final String MODEL_MODEL=demo.PetShop; @Field(displayName = 店铺编码) private String code; @Field(displayName = 店铺编码2) @Field.Sequence(sequence = DATE_ORDERLY_SEQ,prefix = C,size=6,step=1,initial = 10000,format = yyyyMMdd) private String codeTwo; @Field(displayName = 店铺名称,required = true) private String shopName; @Field(displayName = 开店时间,required = true) private Time openTime; @Field(displayName = 闭店时间,required = true) private Time closeTime; @Field(displayName =…

    2024年5月23日
    1.6K00
  • 3.5.5.1 设计器数据导出

    简介 通过调用导出接口,将设计器的设计数据与运动数据打包导出到文件中。 提供了download/export两类接口。 export 导出到OSS。导出的文件会上传到文件服务,通过返回的url下载导出文件。 请求示例: mutation { uiDesignerExportReqMutation { export( data: { module: "gemini_core", fileName: "meta", moduleBasics: true } ) { jsonUrl } } } 响应示例: { "data": { "uiDesignerExportReqMutation": { "export": { "jsonUrl": "https://xxx/meta.json" } } }, "errors": [], "extensions": {} } download 直接返回导出数据。适用于通过浏览器直接下载文件。 请求示例: mutation { uiDesignerExportReqMutation { download( data: { module: "gemini_core", fileName: "meta", moduleBasics: true } ) { jsonUrl } } } 如何构造url protocol :// hostname[:port] / path ? query=URLEncode(GraphQL) 例: http://127.0.0.1:8080/pamirs/base?query=mutation%20%7B%0A%09uiDesignerExportReqMutation%20%7B%0A%09%09download(%0A%09%09%09data%3A%20%7B%20module%3A%20%22gemini_core%22%2C%20fileName%3A%20%22meta%22%2C%20moduleBasics%3A%20true%20%7D%0A%09%09)%20%7B%0A%09%09%09jsonUrl%0A%09%09%7D%0A%09%7D%0A%7D 在浏览器中访问构造后的url,可直接下载文件 接口列表 模型设计器 指定模块导出 query { modelMetaDataExporterQuery { export/download(query: { module: "模块编码" }) { module url } } } module参数:指定导出的模块编码 url返回结果:export方式导出的文件url 页面设计器 导出页面 指定模块导出 mmutation { uiDesignerExportReqMutation { download/export( data: { module: "gemini_core", fileName: "meta", moduleBasics: false } ) { jsonUrl } } } module参数:模块编码 fileName参数:指定生成的json文件名称 moduleBasics参数:指定是否只导出模块基础数据,如果为true,只导出内置布局、模块菜单、菜单关联的动作。 如果为false,还会导出模块内的所有页面,以及页面关联的动作元数据、页面设计数据 等等。 默认值为false。 指定菜单导出 mutation { uiDesignerExportReqMutation { download/export( data: { menu: { name: "uiMenu0000000000048001" } fileName: "meta" relationViews: true } ) { jsonUrl } } } menu参数:菜单对象,指定菜单的name。只会导出该菜单及其绑定页面,不会递归查询子菜单 fileName参数:指定生成的json文件名称 relationViews参数:指定是否导出关联页面,默认为false,只导出菜单关联的页面。如果为true,还会导出该页面通过跳转动作关联的自定义页面。 指定页面导出 mutation { uiDesignerExportReqMutation { download/export( data: { view: { name: "xx_TABLE_0000000000119001" model: "ui.designer.TestUiDesigner" } fileName: "meta" relationViews: true } ) { jsonUrl } } }…

    Oinone 7天入门到精通 2024年5月23日
    1.9K00
  • 3.5.6.1 字段的配置

    字段组件类型 ttype可以配置哪些widget?本文这里把oinone平台默认支持的所有widget都进行了罗列,方便大家查阅。 字段组件匹配规则 字段组件没有严格的按组件名(widget)、字段类型(ttype)、视图组件类型(viewType)限定,而是一个匹配规则 按widget 最优先匹配 按最大匹配原则 ttype、viewType。每个属性权重一分 按后注册优先原则 通用属性 属性 属性描述 属性name 默认值 类型 标题 字段的标题名称 label – string 占位提示 一个字段的描述信息,常用于说明当前字典的范围、注意事项等 placeholder – string 描述说明 组件描述信息 hint – string 数据校验 与表单其他数据联动校验 validator – 表达式 数据校验不通过提示 数据校验不通过提示 validatorMessage 校验失败 string 只读 字段的状态,可见,不可编辑 readonly false boolean或者表达式 隐藏 字段的状态,不,不可编辑 invisible false boolean或者表达式 必填 字段是否必填 required false boolean或者表达式 禁用 字段是否禁用 disabled false boolean或者表达式 宽度 属性在页面中的宽度 colSpan 01/02 标题排列方式 标题排列方式: 水平 , 横向 layout vertical vertical | horizontal 默认值 默认值,多值逗号分割 defaultValue – 根据不同ttype有不同的默认值类型 表3-5-6-1 字段通用属性 字段组件大全 widget为"-"表明不需要指定,是该ttype默认widget 组件名称 widget 对应ttype 属性 属性描述 属性name 默认值 类型 单行文本 – STRING 通用属性 文本类型 密码:password; 文本: text type text string 最小长度 输入框填写数据时最少输入的长度值 minLength – number 最大长度 输入框填写数据时最多输入的长度值 maxLength – number 输入格式 单行文本组件特有的属性,通过规则校验内容,提供一些常用的,也支持自定义校验正则 pattern – 正则表达式 输入格式不通过 输入格式不通过提示语 tips 校验失败 string 显示计数器 设置输入框是否显示字数计数器 showCount FALSE boolean 显示清除按钮 设置输入框是否有一键清除的按钮及功能 allowClear TRUE TRUE 支持前缀 开启前缀 showPrefix FALSE boolean 支持后缀 开启后缀 showSuffix FALSE boolean 前缀类型 ICON: 图标; TEXT:文本 prefixType – string 后缀类型 ICON: 图标; TEXT:文本 suffixType – string 前缀内容 文本内容或者图标引用名 prefix – string 后缀内容 文本内容或者图标引用名 suffix – string 前缀存储 前缀存储, 仅前缀类型为文本可用 prefixStore – boolean 后缀存储 后缀存储, 仅后缀类型为文本可用 suffixStore – boolean 多行文本 -…

    2024年5月23日
    1.5K00
  • 致Oinone读者

    欢迎来到Oinone生态,我们为您提供了一站式低代码商业支撑平台,数式科技已经用其服务了如中烟、得力、上海电气、中航金网、雾芯科技等多个知名企业,我们的技术实力在商业场景得到了很好的验证。我们希望通过开源Oinone项目,为中国软件行业带来变革,提升整体工程化水平,与广大软件工程师一起为客户创造价值!《Oinone 7天从入门到精通》一书是Oinone开源项目的配套书籍,系统化地介绍了如何基于Oinone开源项目,快速开发高质量的软件系统。此外,如果您开发的项目是用于商业化而非企业自用,我们还为您提供了一种可选的商业化变现途径:申请成为Oinone的合作伙伴,并提交相关产品,详情请访问 www.oinone.top 网站。 书籍纲领 本书的章节安排如下: 第一章至第二章:【揭开面纱,理解Oinone】和【Oinone的技术独特性】。这两个章节可以帮助您更好地理解我们设计Oinone的初衷以及特性的由来。 第三章:面向研发人员的【Oinone的基础入门】。如果您是专业的研发人员,本章可以帮助您快速上手并做出业务系统。只要按着里面的case一步步操作下来就可以。 第四章至第六章:面向研发人员的【Oinone的高级特性】、【Oinone的CDM】、【Oinone的通用能力】。这三篇章节重点介绍了Oinone的高级技术特性、提供的通用数据模型和通用基础能力。它们能够帮助我们更快地进行业务开发,从容应对业务的特殊场景要求,比较适合进阶的研发人员。 第七章:面向非研发人员的【Oinone的设计器们】和【Oinone的低无一体】。如果您并不是专业的研发人员,本章可以帮助您通过使用Oinone的无代码可视化设计器轻松自主解决业务需求,并且当可视化设计器满足不了的时候,您还可以在【Oinone的低无一体】中找到方式,并寻求研发帮助

    Oinone 7天入门到精通 2024年5月23日
    1.9K00
  • 4.2.5 框架之网络请求-Request

    在中后台业务场景中,大部分的请求时候是可以被枚举的,比如创建、删除、更新、查询。在上文中,我们讲了httpClient如何自定义请求,来实现自己的业务诉求。本文中讲到的Request是离业务更近一步的封装,他提供了开箱即用的API,比如insertOne、updateOne,它是基于HttpClient做的二次封装,当你熟悉Request时,在中后台的业务场景中,所有的业务接口自定义将事半功倍。 一、Request详细介绍 元数据-model 获取模型实例 import { getModel } from '@kunlun/dependencies' getModel('modelName'); 图4-2-5-1 获取模型实例 清除所有缓存的模型 import { cleanModelCache } from '@kunlun/dependencies' cleanModelCache(); 图4-2-5-2 清除所有缓存的模型 元数据-module 获取应用实例,包含应用入口和菜单 import { queryModuleByName } from '@kunlun/dependencies' queryModuleByName('moduleName') 图4-2-5-3 获取应用实例 查询当前用户所有的应用 import { loadModules } from '@kunlun/dependencies' loadModules() 图4-2-5-4 查询当前用户所有的应用 query 分页查询 import { queryPage } from '@kunlun/dependencies' queryPage(modelName, { pageSize: 15, // 一次查询几条 currentPage, 1, // 当前页码 condition?: '' // 查询条件 maxDepth?: 1, // 查几层模型出来,如果有2,会把所有查询字段的关系字段都查出来 sort?: []; // 排序规则 }, fields, variables, context) 图4-2-5-5 分页查询 自定义分页查询-可自定义后端接口查询数据 import { customQueryPage } from '@kunlun/dependencies' customQueryPage(modelName, methodName, { pageSize: 15, // 一次查询几条 currentPage, 1, // 当前页码 condition?: '' // 查询条件 maxDepth?: 1, // 查几层模型出来,如果有2,会把所有查询字段的关系字段都查出来 sort?: []; // 排序规则 }, fields, variables, context) 图4-2-5-6 自定义分页查询 查询一条-根据params匹配出一条数据 import { queryOne } from '@kunlun/dependencies' customQueryPage(modelName, params, fields, variables, context) 图4-2-5-7 根据params匹配出一条数据 自定义查询 import { customQuery } from '@kunlun/dependencies' customQuery(methodName, modelName, record, fields, variables, context) 图4-2-5-8 自定义查询 update import { updateOne } from '@kunlun/dependencies' updateOne(modelName, record, fields, variables, context) 图4-2-5-9 update insert import { insertOne } from '@kunlun/dependencies' insertOne(modelName, record, fields, variables, context) 图4-2-5-10 insert delete import { deleteOne } from '@kunlun/dependencies'…

    2024年5月23日
    1.6K00

Leave a Reply

Please Login to Comment