4.1.21 框架之分布式消息

消息中间件是在分布式开发中常见的一种技术手段,用于模块间的解耦、异步处理、数据最终一致等场景。

一、介绍

oinone对开源的RocketMQ进行了封装,是平台提供的一种较为简单的使用方式,并非是对RocketMQ进行的功能扩展。同时也伴随着两个非常至关重要的目的:

  1. 适配不同企业对RocketMQ的不同版本选择,不至于改上层业务代码。目前已经适配RocketMQ的开源版本和阿里云版本。 下个版本会对API进行升级支持不同类型MQ,以适配不同企业对MQ的不同要求,应对一些企业客户已经对MQ进行技术选择

  2. 对协议头进行扩展:如多租户的封装,saas模式中为了共用MQ基础资源,需要在消息头中加入必要租户信息。

二、使用准备

demo工程默认已经依赖消息,这里只是做介绍无需大家额外操作,大家可以用maven依赖树命令查看引用关系。

依赖包

增加对pamirs-connectors-event的依赖


<dependency>
    <groupId>pro.shushi.pamirs.framework</groupId>
    <artifactId>pamirs-connectors-event</artifactId>
</dependency>

图4-1-21-1 分布式消息的依赖包

相关功能引入

增加模型、触发器都依赖MQ

<!-- 增强模型 -->
<!-- 增强模型 -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-channel</artifactId>
</dependency>
<!-- 触发器 api -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-api</artifactId>
</dependency>
<!-- 触发器 core -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-core</artifactId>
</dependency>

图4-1-21-2 增加模型、触发器都依赖MQ

yml配置文件参考

详见4.1.1【模块之yml文件结构详解】的“pamirs.event”部分。

三、使用说明

发送消息(NotifyProducer)

概述

NotifyProducer是Pamirs Event中所有生产者的基本API,它仅仅定义了消息发送的基本行为,例如生产者自身的属性,启动和停止,当前状态,以及消息发送方法。它本身并不决定消息如何发送,而是根据具体的实现确定其功能。

目前仅实现了RocketMQProducer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

Notify注解方式

使用示例

@Component
public class DemoProducer {

    @Notify(topic = "test", tags = "model")
    public DemoModel sendModel() {
        return new DemoModel();
    }

    @Notify(topic = "test", tags = "dto")
    public DemoDTO sendDTO() {
        return new DemoDTO();
    }
}

图4-1-21-3 Notify注解方式使用示例

解释说明

  1. 使用Component注解方式注册Spring Bean。

  2. Notify注解指定topic和tags。

  3. topic和tags对应NotifyEvent中的topic和tags。

RocketMQProducer方法调用

使用示例

@Component
public class SendRocketMQMessage {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel()));
        rocketMQProducer.send(new NotifyEvent("test", "dto", new DemoDTO()));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage() {
        DemoModel data = new DemoModel();
        data.setAge(10);
        rocketMQProducer.send(new NotifyEvent("test", "model", data)
                .setQueueSelector((queueSize, event) -> {
                    DemoModel body = (DemoModel) event.getBody();
                    return body.getAge() % queueSize;
                }));
    }

    /**
     * 发送事务消息
     */
    public void sendTransactionMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel())
                .setIsTransaction(true)
                .setGroup("demoTransactionListener"));
    }
}

图4-1-21-4 RocketMQProducer方法调用

解释说明

  1. 使用Component注解方式注册Spring Bean。
  2. 使用Autowired注解方式装配RocketMQProducer实例。
  3. 使用send方法发送指定消息。
  4. 在【发送普通消息】方法中,该实现效果与Notify注解方式所示完全一致。
  5. 在【发送有序消息】方法中,队列选择器是必须配置的,queueSize属性为MQ队列的总数量,在broker中配置。有序消息必须配合有序消费者才能达到有序消费的目的,否则还是无序的常规消息,消费者需要配置@NotifyListener(consumerType=ConsumerType.ORDERLY )注解。
  6. 在【发送事务消息】方法中指定的group为ProducerGroup,事务消息是通过不同的Producer发出的,事务消息监听请参考TransactionListener注解的相关使用方法。(示例中的group与下文介绍中的一致)

使用TransactionListener开启事务消息监听

使用示例

@Component
@TransactionListener
public class DemoTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        return NotifyTransactionState.COMMIT;
    }

}

图4-1-21-5 使用TransactionListener开启事务消息监听

解释说明

  1. 实现NotifyTransactionListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 添加TransactionListener注解注册事务监听,生成对应的生产者。

  4. 当前ProducerGroup将使用这个类的BeanName,即"demoTransactionListener"。如果你想自定义ProducerGroup,可以使用TransactionListener的value属性进行设置。

消费消息(NotifyConsumer)

概述

NotifyConsumer是Pamirs Evnet中所有消费者的基本API,与NotifyProducer类似,它仅仅定义了消息消费的基本行为,例如消费者自身的属性,启动和停止,当前状态,以及消息的监听和订阅方法。它本身并不决定消息如何消费以及如何被订阅,而是根据具体的实现确定其功能。

目前仅实现了RocketMQEventPushConsumer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

在类上使用NotifyListener注解

使用示例

@Component
@NotifyListener(topic = "test", tags = "model")
public class DemoConsumerClass implements NotifyEventListener {

    @Override
    public void consumer(NotifyEvent event) {
        DemoModel data = (DemoModel) event.getBody();
        // do some things.
    }
}

图4-1-21-6 在类上使用NotifyListener注解

解释说明

  1. 实现NotifyEventListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 当前ConsumerGroup将使用这个类的BeanName,即"demoConsumerClass"。如果你想自定义ConsumerGroup,可以使用Component的value属性进行设置。

  4. 从body中将可以获取生产者发送的数据对象,并且已经做好了类型处理,可以直接使用。

  5. 使用原生的RocketMQ发送的消息,类型可能是无法识别的,你可以使用NotifyListener中提供的bodyClass来指定类型。

  6. topic和tags对应NotifyEvent中的topic和tags。

在方法上使用NotifyListener注解

使用示例

@Component
public class DemoConsumerMethod {

    @Bean
    @NotifyListener(topic = "test", tags = "model")
    public NotifyEventListener modelConsumer() {
        return event -> {
            DemoModel data = (DemoModel) event.getBody();
            // do some things.
        };
    }

    @Bean
    @NotifyListener(topic = "test", tags = "dto")
    public NotifyEventListener dtoConsumer() {
        return event -> {
            DemoDTO data = (DemoDTO) event.getBody();
            // do some things.
        };
    }
}

图4-1-21-7 在方法上使用NotifyListener注解

解释说明

  1. 使用Bean注解方式注册Spring Bean。

  2. 方法返回值为NotifyEventListener类型。

  3. 当前ConsumerGroup将使用对应方法生成的BeanName,即"modelConsumer"和"dtoConsumer"。如果你想自定义ConsumerGroup,可以使用Bean的value属性进行设置。

实战

约定:每个模块下的Topic和Tags必须定义常量池进行统一管理,主要是为了方便维护与管理,技术没有限制

常规消息(举例)

Step1 新建PetNotifyEnum

用PetNotifyEnum来管理模块的所有Topic和Tags的常量定义

package pro.shushi.pamirs.demo.api.mq;

public class PetNotifyEnum {

    public static class PetItemInventroy{

        public interface Topic{
            String PET_ITEM_INVENTROY_CHANGE ="petItemInventroyChange";
        }
        public interface Tag{
            String CREATE ="create";
            String UPDATE ="update";
            String DELETE ="delete";
        }

    }

}

图4-1-21-8 新建PetNotifyEnum

Step2 新建PetItemInventoryMqProducer

新建PetItemInventroy模型对应消息生产者,用于发送PetItemInventroy模型变动的相关消息

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

}

图4-1-21-9 新建PetItemInventoryMqProducer

Step3 新建PetItemInventoryMqConsumer

新建PetItemInventoryMqConsumer,订阅PetItemInventroy模型变动的相关消息,并进行相关处理

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-10 PetItemInventoryMqConsumer

Step4 修改PetItemInventroyAction

在修改PetItemInventroy完成之后,发送topic为【PetNotifyEnum.PetItemInventroyMq.Topic.PET_ITEM_INVENTROY_CHANGE】,tag为【PetNotifyEnum.PetItemInventroyMq.Tag.UPDATE】的消息出去

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-11 修改PetItemInventroyAction

Step5 重新看效果

  1. 编辑商品库存记录

image.png

图4-1-21-12 编辑商品库存记录

  1. 查看后端日志是否打印

image.png

图4-1-21-13 查看后端日志是否打印

顺序消息(举例)

Step1 修改PetItemInventoryMqProducer

增加一个顺序消息发送的方法,发送时根据商品库存的Id就分队列,相同队列顺序消费

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage(PetItemInventroy data,String tag) {

        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
                .setQueueSelector((queueSize, event) -> {
                    PetItemInventroy body = (PetItemInventroy) event.getBody();
                    Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                    return queue.intValue();
                }));

    }
}

图4-1-21-14 修改PetItemInventoryMqProducer

Step2 修改PetItemInventoryMqConsumer

增加@NotifyListener(consumerType= ConsumerType.ORDERLY)的注解

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.ConsumerType;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
//@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*",consumerType= ConsumerType.ORDERLY)
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-15 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的顺序消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
//        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-16 修改PetItemInventroyAction

Step4 重启看效果

同常规消息

事务消息(举例)

注:这种写法默认第一次是UNKNOWN,然后通过MQ回调二次确认,在消息时效性要求高的场景下是不符合要求的。对实效性要求高的,请见下面事务消息-优化(举例)章节。

Step1 新建NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息回滚");
        return NotifyTransactionState.ROLLBACK;
    }
}

图4-1-21-17 新建NotifyTransactionListener

Step2 修改PetItemInventoryMqProducer

增加发送事务性消息的方法,通过.setIsTransaction(true)来显示设置该消息是事务消息,通过setGroup("petItemInventoryMqTransactionListener")来匹配事务监听处理器

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data,String tag) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
    );
}

图4-1-21-18 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的事务性消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-19 修改PetItemInventroyAction

Step4 重启看效果

  1. 编辑商品库存记录

image.png

图4-1-21-20 编辑商品库存记录

  1. 查看后端日志是否打印

消息回滚,没有消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-21 MQ日志打印一

image.png

图4-1-21-22 MQ日志打印二

Step5 修改NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息提交");
        //正常业务情况下,这里要增加自己的逻辑判断,来确定返回状态值
        return NotifyTransactionState.COMMIT;
    }
}

图4-1-21-23 修改NotifyTransactionListener

Step6 重启看效果

消息提交,并看到消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-24 MQ日志打印三

image.png

图4-1-21-25 MQ日志打印三

事务消息-优化(举例)

Step1 修改PetItemInventoryMqProducer

修改PetItemInventoryMqProducer事务性消息发送方法sendTransactionMessage,增加NotifyExecuteLocalTransactionCallback入参。

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data, String tag, NotifyExecuteLocalTransactionCallback callback) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
            .setExecuteLocalTransactionCallback(callback)
    );
}

图4-1-21-26 修改PetItemInventoryMqProducer

Step2 修改PetItemInventroyAction

修改PetItemInventroyAction的update方法

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){
            @Override
            public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

                List<PetItemInventroy> inventroys = new ArrayList<>();
                inventroys.add(data);
                PamirsSession.directive().disableOptimisticLocker();
                try{
                    int i = data.updateBatch(inventroys);
                } finally {
                    PamirsSession.directive().enableOptimisticLocker();
                }
                //业务代码执行完毕,提交消息
                return NotifyTransactionState.COMMIT;
            }
        });
        return data;
    }
}

图4-1-21-27 修改PetItemInventroyAction

Step3 重启看效果

编辑商品库存记录,发现消息的处理几乎没有延迟

image.png

图4-1-21-28 示例效果几乎没有延迟

Step4 模拟业务成功,消息发送失败

修改PetItemInventroyAction的update方法

@Function.Advanced(type= FunctionTypeEnum.UPDATE)
@Function.fun(FunctionConstants.update)
@Function(openLevel = {FunctionOpenEnum.API})
public PetItemInventroy update(PetItemInventroy data){
    //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){

        @Override
        public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

            List<PetItemInventroy> inventroys = new ArrayList<>();
            inventroys.add(data);
            PamirsSession.directive().disableOptimisticLocker();
            try{
                int i = data.updateBatch(inventroys);
            } finally {
                PamirsSession.directive().enableOptimisticLocker();
            }
            //模拟业务成功,消息发送失败
            throw new RuntimeException();
            //业务代码执行完毕,提交消息
            //return NotifyTransactionState.COMMIT;
        }
    });
    return data;
}

图4-1-21-29 修改PetItemInventroyAction的update方法

Step5 重启看效果

这个效果跟第一种事务消息一样,消息提交延迟严重。

image.png

图4-1-21-30 MQ日志打印

image.png

图4-1-21-31 MQ日志打印

Step6 总结

优化后的事务消息,在正常情况下时效性非常高,异常情况下也能通过NotifyTransactionListener做保障

Oinone社区 作者:史, 昂原创文章,如若转载,请注明出处:https://doc.oinone.top/oio4/9296.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

(0)
史, 昂的头像史, 昂数式管理员
上一篇 2024年5月23日
下一篇 2024年5月23日

相关推荐

  • 5.4 基础支撑之商业关系域

    PamirsPartner作为商业关系与商业行为的主体,那么PamirsPartner间的关系如何描述,本文将介绍两种常见的设计思路,从思维和实现两方面进行对比,给出oinone为啥选择关系设计模式的原因。 一、两种设计模式对比 设计模式思路介绍 角色设计模式思路介绍 从产品角度枚举所有商业角色,每个商业角色对应一个派生的商业主体,并把主体间的关系类型进行整理。 图5-4-1 角色设计模式 关系设计模式思路介绍 从产品角度枚举所有商业角色,每个商业角色对应一个派生的主体间商业关系 图5-4-2 关系设计模式 设计模式对应实现介绍 角色设计模式实现介绍 不单商业主体需要扩展,关系也要额外维护,可以是字段或是关系表。一般M2O和O2M字段维护,M2M关系表维护。 创建合同场景中甲方选择【商业主体A】,乙方必须是【商业主体A】有关联的经销商、分销商、零售商、供应商等,则在角色设计模式下就非常麻烦,因为关系都是独立维护的 图5-4-3 角色设计模式实现介绍 关系设计模式实现介绍 只需维护商业关系扩展 同时在设计上收敛了商业关系,统一管理应对不同场景都比较从容 图5-4-4 关系设计模式实现介绍 二、oinone商业关系的默认实现 首先oinone的商业关系选择关系设计模式 其次模型上采用多表继承模式,父模型上维护核心字段,子模型维护个性化字段。

    2024年5月23日
    83000
  • 3.2.3 应用中心

    在App Finder 中点击应用中心可以进入oinone的应用中心,可以看到oinone平台所有应用列表、应用大屏、以及技术可视化。 一、应用列表 标准版本不支持在线安装,只能通过boot工程的yml文件来配置安装模块。在 www.oinone.top 官方SaaS平台客户可以在线管理应用生命周期如:安装、升级、卸载。同时针对已安装应用可以进行无代码设计(前提安装了设计器),针对应用类的模块则可进行收藏,收藏后会在App Finder中的我收藏的应用中出现。在应用列表中可以看到我们已经安装的应用以及模块,我们oinoneDemo工程也在其中。 图3-2-35 Oinone的应用列表 图3-2-36 应用收藏后会在App Finder的【我收藏的应用】中出现 二、应用大屏 但我们的测试应用没有设置应用类目,则无法在应用大屏中呈现。 图3-2-37 未设置应用类目则无法在应用大屏中呈现 三、技术可视化 在技术可视化页面,出展示已经安装模块的元数据,并进行分类呈现 图3-2-38 云数据分类呈现

    2024年5月23日
    1.3K00
  • 企业个性化配置

    平台除了帮助企业快速解决业务需求外,还提供了企业文化、风格个性化能力,本章节详细介绍个性化配置。包括:登陆页配置、企业形象配置、系统风格配置。 系统配置 配置入口 配置入口:右上角登陆账号——系统配置——全局配置——登陆页配置、企业形象配置、系统风格配置。 登陆页配置 用户自定义登陆页的主题,一共有六个主题可选择,分别为: 图一:左侧是图片,右侧是登录输入框,Logo在左上角; 图二:右左侧是图片,右侧是登录输入框,Logo在左上角; 图三:底图是图片,登录输入框浮在页面左侧,Logo在左上角; 图四:底图是图片,登录输入框浮在页面中间,Logo在左上角; 图五:底图是图片,登录输入框浮在页面右侧,Logo在左上角; 图六:底图是图片,登录输入框浮在页面中间,Logo在登录框上方。 页面背景 上传登录页背景,支持上传jpg、png格式的图片或mp4、mov、avi格式视频。尺寸建议为1920*1080px,文件大小不超过50MB。 登录页logo:指配置登录页左上角的logo图标。 预览:配置后可在右侧电脑中点击全屏预览效果。 配置示例 企业形象配置 企业形象配置:可设置企业信息、业务应用导航栏logo、浏览器logo,设置完成之后,点击发布之后设置生效。 系统风格配置 系统风格配置,可设置主题(浅色、深色)、尺寸(大、中、小),配置后可全屏预览,发布后即可更换成对应的风格。 下载代码可进行自定义风格。

    2024年6月20日
    89800
  • 报表发布主流程

    1. 主业务流程图 2. 主业务流程操作图解 创建图表—创建报表/数据大屏—发布图表/报表—前端业务系统可引用 2.1 创建图表分组 1)操作流程:创建图表分组 2)操作路径:数据可视化-图表-创建图表分组 3)点击搜索框后的「+」创建一级分组,输入一级分组名称后,点击一级分组后的「+」创建二级分组,输入二级分组名称后,此时分组创建完成,可以在二级分组下创建图表 2.2 创建图表 1)操作流程:选择图表二级分组-创建图表 2)操作路径:数据可视化-图表-二级分组-创建图表 3)鼠标移动至需要创建图表的二级分组上,出现「+」,点击图标后弹出“创建图表”弹窗,需要填写图表标题、模型、方法; a. 图表标题:最大支持20个字,支持汉字、数字、大小写字母、-;同个一级分组下不允许重复; b. 模型:需要选择来源数据对应的模型; c. 方法:选择模型后需要选择方法,方法是用来提取模型数据的逻辑; 4)选择成功后进图表设计页面(详见图表-图表设计页面),设计完成后点击保存,图表创建成功 2.3 创建报表分组 1)操作流程:创建报表分组 2)操作路径:数据可视化-报表-创建报表分组 3)点击搜索框后的「+」创建一级分组,输入一级分组名称后,点击一级分组后的「+」创建二级分组,输入二级分组名称后,此时分组创建完成,可以在二级分组下创建报表 2.4 创建数据大屏分组 1)操作流程:创建报表分组 2)操作路径:数据可视化-报表-创建报表分组 3)点击搜索框后的「+」创建一级分组,输入一级分组名称后,点击一级分组后的「+」创建二级分组,输入二级分组名称后,此时分组创建完成,可以在二级分组下创建报表 2.5 创建报表 1)操作流程:选择报表二级分组-创建报表 2)操作路径:数据可视化-报表-二级分组-创建报表 3)鼠标移动至需要创建报表的二级分组上,出现「+」,点击图标后=需要填写报表标题; a报表标题:最大支持20个字,支持汉字、数字、大小写字母、-;同个一级分组下不允许重复; 4)创建后可以选择报表需要展示的图表 2.6 为报表选择图表 1)操作流程:选择报表-为报表选择图表 2)操作路径:数据可视化-图表-二级分组-报表-选择图表 3)选择单个未发布或者已发布但没有被隐藏的报表,点击【选择图表】,弹出“选择图表”弹窗,对该报表需要展示的图表进行选择 a需要选择图表的一级分组后才能选择图表; b可以多选图表,选择的图表只能是已选一级分组下的未隐藏的未被选择的图表;选择一个二级分组时,默认该二级分组下的图表会全部被选中,图表会按照选中的顺序展示在报表列表; 4)选择图表后,报表信息保持展示图表的最新效果;如果图表更新了,但是报表没有发布最新,则报表在前端展示的仍为最近发布的版本; 5)如果图表中存在超过一行的图内筛选项,则在报表处原始的图表尺寸只能查看一行图内筛选项,需要根据图表在报表处的等比拖动效果展示更多的图内筛选项 2.7 创建数据大屏 1)操作流程:选择数据大屏二级分组-创建数据大屏 2)操作路径:数据可视化-数据大屏-二级分组-创建 3)鼠标移动至需要创建数据大屏的二级分组上,出现「+」,点击图标后进入数据大屏设计页面; a. 选择图表组件组合成数据大屏,还有其他诸如时间、图片等组件可供选择; b. 数据大屏标题:最大支持20个字,支持汉字、数字、大小写字母、-;同个一级分组下不允许重复; 4)选择完成后可以保存,则创建数据大屏成功 2.8 发布图表/报表/数据大屏 1)操作流程:选择图表/报表-发布图表/报表 2)操作路径:数据可视化-图表/报表/数据大屏-二级分组-图表名称/报表名称/数据大屏名称-发布 3)选择单个未发布且没有被隐藏的图表/报表/数据大屏,点击【发布】按钮,图表/报表发布后可以被前端引用,数据大屏可被屏幕演示,图表/报表/数据大屏状态变为已发布,展示最近发布时间; a. 如果图表发布后有更新内容,会展示的更新类型:更新图表信息/更新图表内容; b. 如果报表发布后有更新内容,会展示的更新类型:更新报表信息/更新图表内容/选择图表/移除图表; c. 如果数据大屏发布后有更新内容,会展示的更新类型:更新数据大屏信息/更新数据大屏内容; 4)发布后可以修改 2.9更新发布图表/报表/数据大屏 1)操作流程:选择图表/报表-更新发布图表/报表 2)操作路径:数据可视化-图表/报表/数据大屏-二级分组-图表名称/报表名称/数据大屏名称-更新发布图表/报表/数据大屏 3)选择单个已发布且没有被隐藏的图表/报表/数据大屏,并且该图表/报表/数据大屏在上次发布后有所更新,可以点击【更新发布】按钮,将最新的图表/报表内容发布至业务系统,业务系统引用的图表/报表为最新内容,屏幕展示的数据大屏是最新的大屏内容; 4)如果更新了内容,但未点击更新发布,则前端业务系统查看的图表/报表仍为最近发布的内容,屏幕展示的数据大屏仍是最近发布的内容

    2024年6月20日
    94100
  • 4.5.1 研发辅助之插件-结构性代码

    研发辅助意在 消灭研发过程中的重复性工作提升研发效率,如结构性代码 提供生产示例性代码,如果根据模型生成导入导出、view自定义配置等经常性开发 一、插件安装 根据自身Idea版本下载插件并安装: 版本 插件 2023.1 pamirs-source-maker-1.0.0-2023.1.zip(2.4 MB) 2021.1 pamirs-source-maker-1.0.0-2021.1.zip(2.4 MB) 2021.2 pamirs-source-maker-1.0.0-2021.2.zip(2.4 MB) 2021.3 pamirs-source-maker-1.0.0-2021.3.zip(2.4 MB) 2022.1 pamirs-source-maker-1.0.0-2022.1.zip(2.4 MB) pamirs-source-maker-1.0.0-223-EAP-SNAPSHOT(2.4 MB) 表4-5-1-1 插件列表 二、研发辅助之配置式结构性代码生成器 我们在开发过程中为了日后代码易于维护和修改,往往会做工程性的职责划分。 除去模型外会有 代理模型和代理模型Action来负责前端交互 以面向接口的形式来定义函数,就会有api和实现类之分 如果项目有多端,那么如代理模型和代理模型Action又要为每一个端构建一份 在大型项目的初始阶段,我们需要手工重复做很多事情,特别麻烦。现在用oinone的研发辅助插件的结构性代码生成器,就可以避免前面的重复工作 插件执行的配置文件 <?xml version="1.0" encoding="utf-8" ?> <oinone> <makers> <!– 根据模型生成代理类、代理类的Action、Service、ServiceImpl –> <maker> <!– 选择模型所在位置 –> <modelPath>/Users/oinone/Documents/oinone/demo/pamirs-second/pamirs-second-api/src/main/java/pro/shushi/pamirs/second/api/model</modelPath> <!– 代理模型、代理模型Action生成相关配置信息 –> <proxyModules> <module> <!– 代理模型和代理模型Action的生成位置信息 –> <generatePath>/Users/oinone/Documents/oinone/demo/pamirs-second/pamirs-second-api/src/main/java/pro/shushi/pamirs/second/api</generatePath> <!– 代理模型和代理模型Action的模块前缀 –> <modulePrefix>second</modulePrefix> <!– 代理模型和代理模型Action的模块名,代理模型和代理模型Action类名为moduleName+模型名+"Proxy"+"Action" –> <moduleName>second</moduleName> <!– 代理模型和代理模型Action的包名,实际包名为 packageName+".proxy"或packageName+".action"–> <packageName>pro.shushi.pamirs.second.api</packageName> </module> </proxyModules> <!– 根据模型生成api,包括service(写方法)和queryService(读方法) –> <apiModule> <!– service和queryService的生成位置信息 –> <generatePath>/Users/oinone/Documents/oinone/demo/pamirs-second/pamirs-second-api/src/main/java/pro/shushi/pamirs/second/api</generatePath> <!– service和queryService的模块前缀 –> <modulePrefix>second</modulePrefix> <!– service和queryService的模块名 –> <moduleName>second</moduleName> <!– service和queryService的包名,实际包名为 packageName+".service" –> <packageName>pro.shushi.pamirs.second.api</packageName> </apiModule> <!– 根据模型生成api实现类,包括serviceImpl(写方法)和queryServiceImpl(读方法) –> <coreModule> <!– serviceImpl和queryServiceImpl的生成位置信息 –> <generatePath>/Users/oinone/Documents/oinone/demo/pamirs-second/pamirs-second-core/src/main/java/pro/shushi/pamirs/second/core</generatePath> <!– serviceImpl和queryServiceImpl的模块前缀 –> <modulePrefix>second</modulePrefix> <!– serviceImpl和queryServiceImpl的模块名 –> <moduleName>second</moduleName> <!– serviceImpl和queryServiceImpl的包名,实际包名为 packageName+".service" –> <packageName>pro.shushi.pamirs.second.core</packageName> </coreModule> </maker> </makers> </oinone> 图4-5-1-1 插件执行的配置文件 三、研发辅助之多模型结构性代码生成器 是配置式结构性代码生成器的补充,应对开发后期维护中新增模型的场景。它的不同点在于只要选择模型文件就可以,不需要专门编写xml文件。生成的文件默认就在模型所在路径下 Step1 菜单栏上找到oinone,并点击子菜单【多模型结构性代码生成器】 图4-5-1-2 多模型结构性代码生成操作步骤一 Step2 设置必要的信息 模型前缀 模型的所属模块 代理模型的模块 这三个信息分别用于构建 代理模型的MODEL_MODEL = 模型前缀.代理模型的模块.代理模型类名 服务的FUN_NAMESPACE = 模型前缀.代理模型的模块.服务类名 图4-5-1-3 多模型结构性代码生成操作步骤二 Step3 选择为哪些模型生成对应的结构性代码 图4-5-1-4 多模型结构性代码生成操作步骤三 Step4 代码在模型所在目录 生成的文件默认就在模型所在路径下,您可以手动拖动到对应的包路径当中去 图4-5-1-5 多模型结构性代码生成操作步骤四

    2024年5月23日
    87000

Leave a Reply

登录后才能评论