4.1.21 框架之分布式消息

消息中间件是在分布式开发中常见的一种技术手段,用于模块间的解耦、异步处理、数据最终一致等场景。

一、介绍

oinone对开源的RocketMQ进行了封装,是平台提供的一种较为简单的使用方式,并非是对RocketMQ进行的功能扩展。同时也伴随着两个非常至关重要的目的:

  1. 适配不同企业对RocketMQ的不同版本选择,不至于改上层业务代码。目前已经适配RocketMQ的开源版本和阿里云版本。 下个版本会对API进行升级支持不同类型MQ,以适配不同企业对MQ的不同要求,应对一些企业客户已经对MQ进行技术选择

  2. 对协议头进行扩展:如多租户的封装,saas模式中为了共用MQ基础资源,需要在消息头中加入必要租户信息。

二、使用准备

demo工程默认已经依赖消息,这里只是做介绍无需大家额外操作,大家可以用maven依赖树命令查看引用关系。

依赖包

增加对pamirs-connectors-event的依赖


<dependency>
    <groupId>pro.shushi.pamirs.framework</groupId>
    <artifactId>pamirs-connectors-event</artifactId>
</dependency>

图4-1-21-1 分布式消息的依赖包

相关功能引入

增加模型、触发器都依赖MQ

<!-- 增强模型 -->
<!-- 增强模型 -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-channel</artifactId>
</dependency>
<!-- 触发器 api -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-api</artifactId>
</dependency>
<!-- 触发器 core -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-core</artifactId>
</dependency>

图4-1-21-2 增加模型、触发器都依赖MQ

yml配置文件参考

详见4.1.1【模块之yml文件结构详解】的“pamirs.event”部分。

三、使用说明

发送消息(NotifyProducer)

概述

NotifyProducer是Pamirs Event中所有生产者的基本API,它仅仅定义了消息发送的基本行为,例如生产者自身的属性,启动和停止,当前状态,以及消息发送方法。它本身并不决定消息如何发送,而是根据具体的实现确定其功能。

目前仅实现了RocketMQProducer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

Notify注解方式

使用示例

@Component
public class DemoProducer {

    @Notify(topic = "test", tags = "model")
    public DemoModel sendModel() {
        return new DemoModel();
    }

    @Notify(topic = "test", tags = "dto")
    public DemoDTO sendDTO() {
        return new DemoDTO();
    }
}

图4-1-21-3 Notify注解方式使用示例

解释说明

  1. 使用Component注解方式注册Spring Bean。

  2. Notify注解指定topic和tags。

  3. topic和tags对应NotifyEvent中的topic和tags。

RocketMQProducer方法调用

使用示例

@Component
public class SendRocketMQMessage {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel()));
        rocketMQProducer.send(new NotifyEvent("test", "dto", new DemoDTO()));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage() {
        DemoModel data = new DemoModel();
        data.setAge(10);
        rocketMQProducer.send(new NotifyEvent("test", "model", data)
                .setQueueSelector((queueSize, event) -> {
                    DemoModel body = (DemoModel) event.getBody();
                    return body.getAge() % queueSize;
                }));
    }

    /**
     * 发送事务消息
     */
    public void sendTransactionMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel())
                .setIsTransaction(true)
                .setGroup("demoTransactionListener"));
    }
}

图4-1-21-4 RocketMQProducer方法调用

解释说明

  1. 使用Component注解方式注册Spring Bean。
  2. 使用Autowired注解方式装配RocketMQProducer实例。
  3. 使用send方法发送指定消息。
  4. 在【发送普通消息】方法中,该实现效果与Notify注解方式所示完全一致。
  5. 在【发送有序消息】方法中,队列选择器是必须配置的,queueSize属性为MQ队列的总数量,在broker中配置。有序消息必须配合有序消费者才能达到有序消费的目的,否则还是无序的常规消息,消费者需要配置@NotifyListener(consumerType=ConsumerType.ORDERLY )注解。
  6. 在【发送事务消息】方法中指定的group为ProducerGroup,事务消息是通过不同的Producer发出的,事务消息监听请参考TransactionListener注解的相关使用方法。(示例中的group与下文介绍中的一致)

使用TransactionListener开启事务消息监听

使用示例

@Component
@TransactionListener
public class DemoTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        return NotifyTransactionState.COMMIT;
    }

}

图4-1-21-5 使用TransactionListener开启事务消息监听

解释说明

  1. 实现NotifyTransactionListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 添加TransactionListener注解注册事务监听,生成对应的生产者。

  4. 当前ProducerGroup将使用这个类的BeanName,即"demoTransactionListener"。如果你想自定义ProducerGroup,可以使用TransactionListener的value属性进行设置。

消费消息(NotifyConsumer)

概述

NotifyConsumer是Pamirs Evnet中所有消费者的基本API,与NotifyProducer类似,它仅仅定义了消息消费的基本行为,例如消费者自身的属性,启动和停止,当前状态,以及消息的监听和订阅方法。它本身并不决定消息如何消费以及如何被订阅,而是根据具体的实现确定其功能。

目前仅实现了RocketMQEventPushConsumer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

在类上使用NotifyListener注解

使用示例

@Component
@NotifyListener(topic = "test", tags = "model")
public class DemoConsumerClass implements NotifyEventListener {

    @Override
    public void consumer(NotifyEvent event) {
        DemoModel data = (DemoModel) event.getBody();
        // do some things.
    }
}

图4-1-21-6 在类上使用NotifyListener注解

解释说明

  1. 实现NotifyEventListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 当前ConsumerGroup将使用这个类的BeanName,即"demoConsumerClass"。如果你想自定义ConsumerGroup,可以使用Component的value属性进行设置。

  4. 从body中将可以获取生产者发送的数据对象,并且已经做好了类型处理,可以直接使用。

  5. 使用原生的RocketMQ发送的消息,类型可能是无法识别的,你可以使用NotifyListener中提供的bodyClass来指定类型。

  6. topic和tags对应NotifyEvent中的topic和tags。

在方法上使用NotifyListener注解

使用示例

@Component
public class DemoConsumerMethod {

    @Bean
    @NotifyListener(topic = "test", tags = "model")
    public NotifyEventListener modelConsumer() {
        return event -> {
            DemoModel data = (DemoModel) event.getBody();
            // do some things.
        };
    }

    @Bean
    @NotifyListener(topic = "test", tags = "dto")
    public NotifyEventListener dtoConsumer() {
        return event -> {
            DemoDTO data = (DemoDTO) event.getBody();
            // do some things.
        };
    }
}

图4-1-21-7 在方法上使用NotifyListener注解

解释说明

  1. 使用Bean注解方式注册Spring Bean。

  2. 方法返回值为NotifyEventListener类型。

  3. 当前ConsumerGroup将使用对应方法生成的BeanName,即"modelConsumer"和"dtoConsumer"。如果你想自定义ConsumerGroup,可以使用Bean的value属性进行设置。

实战

约定:每个模块下的Topic和Tags必须定义常量池进行统一管理,主要是为了方便维护与管理,技术没有限制

常规消息(举例)

Step1 新建PetNotifyEnum

用PetNotifyEnum来管理模块的所有Topic和Tags的常量定义

package pro.shushi.pamirs.demo.api.mq;

public class PetNotifyEnum {

    public static class PetItemInventroy{

        public interface Topic{
            String PET_ITEM_INVENTROY_CHANGE ="petItemInventroyChange";
        }
        public interface Tag{
            String CREATE ="create";
            String UPDATE ="update";
            String DELETE ="delete";
        }

    }

}

图4-1-21-8 新建PetNotifyEnum

Step2 新建PetItemInventoryMqProducer

新建PetItemInventroy模型对应消息生产者,用于发送PetItemInventroy模型变动的相关消息

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

}

图4-1-21-9 新建PetItemInventoryMqProducer

Step3 新建PetItemInventoryMqConsumer

新建PetItemInventoryMqConsumer,订阅PetItemInventroy模型变动的相关消息,并进行相关处理

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-10 PetItemInventoryMqConsumer

Step4 修改PetItemInventroyAction

在修改PetItemInventroy完成之后,发送topic为【PetNotifyEnum.PetItemInventroyMq.Topic.PET_ITEM_INVENTROY_CHANGE】,tag为【PetNotifyEnum.PetItemInventroyMq.Tag.UPDATE】的消息出去

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-11 修改PetItemInventroyAction

Step5 重新看效果

  1. 编辑商品库存记录

image.png

图4-1-21-12 编辑商品库存记录

  1. 查看后端日志是否打印

image.png

图4-1-21-13 查看后端日志是否打印

顺序消息(举例)

Step1 修改PetItemInventoryMqProducer

增加一个顺序消息发送的方法,发送时根据商品库存的Id就分队列,相同队列顺序消费

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage(PetItemInventroy data,String tag) {

        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
                .setQueueSelector((queueSize, event) -> {
                    PetItemInventroy body = (PetItemInventroy) event.getBody();
                    Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                    return queue.intValue();
                }));

    }
}

图4-1-21-14 修改PetItemInventoryMqProducer

Step2 修改PetItemInventoryMqConsumer

增加@NotifyListener(consumerType= ConsumerType.ORDERLY)的注解

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.ConsumerType;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
//@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*",consumerType= ConsumerType.ORDERLY)
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-15 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的顺序消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
//        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-16 修改PetItemInventroyAction

Step4 重启看效果

同常规消息

事务消息(举例)

注:这种写法默认第一次是UNKNOWN,然后通过MQ回调二次确认,在消息时效性要求高的场景下是不符合要求的。对实效性要求高的,请见下面事务消息-优化(举例)章节。

Step1 新建NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息回滚");
        return NotifyTransactionState.ROLLBACK;
    }
}

图4-1-21-17 新建NotifyTransactionListener

Step2 修改PetItemInventoryMqProducer

增加发送事务性消息的方法,通过.setIsTransaction(true)来显示设置该消息是事务消息,通过setGroup("petItemInventoryMqTransactionListener")来匹配事务监听处理器

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data,String tag) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
    );
}

图4-1-21-18 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的事务性消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-19 修改PetItemInventroyAction

Step4 重启看效果

  1. 编辑商品库存记录

image.png

图4-1-21-20 编辑商品库存记录

  1. 查看后端日志是否打印

消息回滚,没有消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-21 MQ日志打印一

image.png

图4-1-21-22 MQ日志打印二

Step5 修改NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息提交");
        //正常业务情况下,这里要增加自己的逻辑判断,来确定返回状态值
        return NotifyTransactionState.COMMIT;
    }
}

图4-1-21-23 修改NotifyTransactionListener

Step6 重启看效果

消息提交,并看到消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-24 MQ日志打印三

image.png

图4-1-21-25 MQ日志打印三

事务消息-优化(举例)

Step1 修改PetItemInventoryMqProducer

修改PetItemInventoryMqProducer事务性消息发送方法sendTransactionMessage,增加NotifyExecuteLocalTransactionCallback入参。

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data, String tag, NotifyExecuteLocalTransactionCallback callback) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
            .setExecuteLocalTransactionCallback(callback)
    );
}

图4-1-21-26 修改PetItemInventoryMqProducer

Step2 修改PetItemInventroyAction

修改PetItemInventroyAction的update方法

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){
            @Override
            public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

                List<PetItemInventroy> inventroys = new ArrayList<>();
                inventroys.add(data);
                PamirsSession.directive().disableOptimisticLocker();
                try{
                    int i = data.updateBatch(inventroys);
                } finally {
                    PamirsSession.directive().enableOptimisticLocker();
                }
                //业务代码执行完毕,提交消息
                return NotifyTransactionState.COMMIT;
            }
        });
        return data;
    }
}

图4-1-21-27 修改PetItemInventroyAction

Step3 重启看效果

编辑商品库存记录,发现消息的处理几乎没有延迟

image.png

图4-1-21-28 示例效果几乎没有延迟

Step4 模拟业务成功,消息发送失败

修改PetItemInventroyAction的update方法

@Function.Advanced(type= FunctionTypeEnum.UPDATE)
@Function.fun(FunctionConstants.update)
@Function(openLevel = {FunctionOpenEnum.API})
public PetItemInventroy update(PetItemInventroy data){
    //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){

        @Override
        public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

            List<PetItemInventroy> inventroys = new ArrayList<>();
            inventroys.add(data);
            PamirsSession.directive().disableOptimisticLocker();
            try{
                int i = data.updateBatch(inventroys);
            } finally {
                PamirsSession.directive().enableOptimisticLocker();
            }
            //模拟业务成功,消息发送失败
            throw new RuntimeException();
            //业务代码执行完毕,提交消息
            //return NotifyTransactionState.COMMIT;
        }
    });
    return data;
}

图4-1-21-29 修改PetItemInventroyAction的update方法

Step5 重启看效果

这个效果跟第一种事务消息一样,消息提交延迟严重。

image.png

图4-1-21-30 MQ日志打印

image.png

图4-1-21-31 MQ日志打印

Step6 总结

优化后的事务消息,在正常情况下时效性非常高,异常情况下也能通过NotifyTransactionListener做保障

Oinone社区 作者:史, 昂原创文章,如若转载,请注明出处:https://doc.oinone.top/oio4/9296.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

(0)
史, 昂的头像史, 昂数式管理员
上一篇 2024年5月23日
下一篇 2024年5月23日

相关推荐

  • 翻译

    翻译应用是管理翻译规则的应用,以模型为基础、维护字段的翻译值,支持导入、导出 1. 操作步骤 Step1:导出所有翻译项; Step2:线下翻译; Step3:导入翻译项; Step4:刷新远程资源; Step5:页面右上角可切换语言,查看翻译效果。 2. 新增翻译 翻译是具体到模型字段,其中需要区分出是否字典; 源语言、目标语言,是在资源中维护的语言,可在资源中维护需要翻译的语言; 翻译项则是模型字段,默认翻译项为激活状态,关闭后维护的翻译项无效。 3. 导出、导入 不勾选导出:导出所有需要翻译的翻译项,包括模块、字段,源术语、翻译值等,其中如果已经翻译过的内容,会体现在翻译值中; 勾选导出:导出勾选模型的翻译项。 导入:导入翻译项,平台会根据模型拆分为多条数据。 4. 刷新远程资源 导入翻译项后,点击“刷新远程资源”按钮。 5. 查看翻译内容 页面右上角切换语言,查看翻译效果。

    2024年6月20日
    90700
  • 流程类

    1.流程类 1.1 审批 审批节点配置步骤: 添加审批节点 选择审批的模型和视图 设置审批人和通过方式 设置审批人在审批时的操作权限和数据权限 1.1.1 审批节点 审批节点只能放置在有数据可审批的流程链路上,审批分支只能放置在审批节点后。 1.1.2 审批模型和视图 可选的审批模型包含添加的审批节点之前的所有能获取到数据的模型。可选视图为该选择的数据模型关联的界面设计器中视图类型为表单的页面。 1.1.3 审批人和通过方式 审批人可在个人、部门、角色和模型中的字段里复选。当某人在不同类型人员选择中被重复选中,只会收到一次审批的代办。若为多人审批,审批是同步进行的。 单人审批: 通过方式:唯一通过方式,同意通过,拒绝否决 多人审批: 通过方式:或签/会签(默认或签) a. 或签(一名审批人同意或拒绝即可) 任意一位审批人操作通过或否决后流程就结束,其他审批人无法进入审批操作,但是会弹出消息提示审批结果。 场景:紧急且影响不大的审批可以由任意一位领导层或签。 b. 会签(需所有审批人同意才为同意,一名审批人拒绝即为拒绝) 场景:影响比较重大的审批,一票否决的形式决定是否通过。 c. 会签(一名审批人同意即为同意,需所有审批人拒绝才为拒绝) 场景:需要评估项目可操作性时,若有领导觉得有意义就通过,进入下一步评估,全员否决就否决项目。 1.1.4 操作&数据权限 操作权限 可设置是否必填拒绝原因、是否允许转交、是否允许加签、是否允许退回。 选择允许转交或允许加签之后,可选择添加人员的候选名单,不填默认所有人都可选。 选择允许退回后,可以选择退回到该审批节点之前的任意审批节点。ps:需所有审批人拒绝才为拒绝的会签不允许退回。 数据权限 选择视图后自动显示该视图下的数据字段,可选择的权限为查看、编辑、隐藏数据字段,默认可查看全部字段。 1.1.5 参与人重复 勾选参与人重复的场景时,满足场景的审批流程会由系统自动审批通过。 1.2 填写 当流程需要某些人提交数据才能继续时,可以使用填写这个动作。区别于数据类中的操作,填写这个动作只能修改当前触发模型中关联的视图表单,而数据类中的更新数据可以修改其他模型中的数据。 和审批动作相似,填写动作需要选择填写的模型和视图表单,需要选择填写人,可以选择添加转交权限。另外,填写动作必须包含一个及以上的可编辑的数据权限供操作人填写。

    2024年5月23日
    1.2K00
  • 7.3.2 原业务加审批流程

    场景说明 场景描述:全员营销标准产品的功能并未有任务发放的审批流,在实际执行中,当营销专员配置好任务后,需部门领导对整个活动如该任务内容、形式、参与人员进行审批。 业务需求:在发布任务这个流程中增加审批节点。 实战训练 Step1 原业务分析 点击菜单【任务中心】通过URL上的model参数找到对应模型编码为【gemini.biz.GeminiTaskProxy】 进入模型设计器主页面,应用选择【全员营销】、选择【系统模型】、通过搜索关键字【任务】选择【Gemini任务代理】,展示方式从图模式切换到表单模式,对比【模型编码】 但目前模型为代理模型,代理模型是用于代理存储模型的数据管理器能力,同时又可以扩展出非存储数据信息的交互功能的模型。因为在代理模型中新增的字段都是非存储字段,所以如果要增加【审核状态】的字段一定以要在存储模型增加。其父模型的查看有两种方式 表单模式下可以直接看父模型 在图模式和表单模式下点击继承关系 点击【Gemini任务】,进入【Gemini任务】的模型设计界面,可以看出该模型所在模块为【全员营销核心业务】,从【系统字段】中找到【任务状态】字段,点击查看字段详情,我们可以看到【业务类型】为数据字典,字典类型为【任务状态】。 在模型设计器的管理页面上方点击【数字字典】选项卡,模块选择为【全员营销核心业务】,选择【系统字典】就可以查看到【任务状态】数字字典 总结如下: 给【Gemini任务】模型增加一个【任务审批状态】,记录审批状态 在任务创建的时候,修改【任务状态】为【关闭】确保任务未审批通过的时,用户无法操作该任务。 审批通过后,恢复【任务状态】为【初始化】 我们先来整理下核心流程即:任务审批流程。 Step2 利用模型设计器设计模型 在模型设计器的管理页面上方点击【数字字典】选项卡,模块选择为【全员营销核心业务】,点击添加【数据字典】按钮,设置对应数据项 设置【字典名称】为【审批状态】 设置【字典项类型】为【文本】 通过【添加数据字典项】按钮增加对应数据字典项,如审核中、审核失败、审核成功 在模型设计器的管理页面上方点击【模型】选项卡,模块选择为【全员营销核心业务】,选择【系统模型】、搜索任务选择【Gemini任务】,点击添加字段 为模型【Gemini任务】添加字段 设置【字段名称】为【任务审批状态】 设置【字段业务类型】为数据字典,并选择关联数据字典为【任务审批状态】 最后点击【创建】按钮完成操作 回到【Gemini任务】设计区,我们可以看到在模型的【自定义字段】选项卡下方多了一个【任务审批状态】字段 Step3 利用界面设计器,设计出必要的审核页面 进入界面设计器,应用选择全员营销,模型选择【Gemini任务】,点击添加页面下的直接创建 设置页面标题、模型(自动带上可切换)、业务类型(运营管理后续会扩展其他类型)、视图类型(表单)后点击确认按钮进入【Gemini任务】表单设计页面 进入页面设计器,对【Gemini任务】表单页面进行设计(更多细节介绍,请参考界面设计产品使用手册) 左侧为物料区:分为组件、模型。 【组件】选项卡下为通用物料区,我们可以为页面增加对应布局、字段(如同在模型设计器增加字段)、动作、数据、多媒体等等 【模型】选项卡下为页面对应模型的自定义字段、系统字段、以及模型已有动作 中间是设计区域 右侧为属性面板,在设计区域选择中组件会显示对应组件的可配置参数 在左侧【组件】选项卡下,拖入布局组件【分组】,并设置组件【标题属性】为基础信息 在左侧【模型】选项卡下,分别系统字段中的【任务标题】、【任务开始时间】、【任务结束时间】、【视频标题】、【视频风格】、【任务描述】拖入【基础信息】分组,并点击【任务描述】,在右侧属性面板的【交互】分组中设置宽度为1。最后别忘了点击【发布】按钮完成页面的发布 Step4 通过流程设计器,设计对应业务流程 进入流程设计器,点击【创建】按钮 注意:流程中需要获取【关系字段】的除关联字段(一般为ID)以外的字段需要通过【数据获取】节点单独获取【关系字段】的对象数据。所以在流程设计中经常会用到【数据获取】节点 左上角编辑流程名称为【任务审批流程】,点击第一个【触发】节点,触发方式选择模型触发,模型选择【Gemini任务】,触发场景选择【新增或更新数据时】,【筛选条件】设置为【任务审批状态】为空或【任务审批状态】等于【审核中】,点击该节点的【保存】按钮 点击流程图节点间的【+】图标选择增加【获取数据】节点,或者拖动左侧物料区【获取数据】到特定的【+】图标 点击【获取数据】,在右侧属性面板中设置【获取数据条数】为多条,选择模型为【Gemini用户任务实例】,点击【筛选条件】的【{X}】图标,进行数据获取的条件设置 选择条件字段为【任务ID】条件操作符为【等于】,条件为变量的导购字段的ID。当上下文只有一个变量时默认不需要选择,这里默认的是【模型触发:[Gemini任务]】,设置好以后点击确认,回到属性面板设置【未获取到数据时执行方式】为【终止流程】,并点击节点【保持】按钮 增加【更新数据】节点,在右侧属性面板中 【更新模型】选择【模型触发:[Gemini任务]】 【字段列表】点击【创建】按钮 字段选择 更新【任务状态】字段 表达式设置为:【已关闭】。 【字段列表】点击【创建】按钮 字段选择 更新【任务审核状态】字段 表达式设置为:【审核中】。 最终完成的【模型触发:[Gemini任务]】更新设置 a. 【模型触发:[Gemini任务]】的【任务状态】字段等于数字字典的【已关闭】,任务审核状态为【审核中】 b. 最后点击节点【保持】按钮。 再增加【更新数据】节点,在右侧属性面板中 【更新模型】选择【获取数据[Gemini用户任务实例]】 【字段列表】点击【创建】按钮 字段选择 更新【任务状态】字段 表达式设置为:【已关闭】。 最终完成的【获取数据[Gemini用户任务实例]】更新设置 a. 【获取数据[Gemini用户任务实例]】的【任务状态】字段等于数字字典的【已关闭】 b. 最后点击节点【保持】按钮。 增加【审批】节点,在右侧属性面板中 【审批模型】选择模型为【模型触发:[Gemini任务]】 【选择视图】选择前面新建的页面【流程中的任务编辑页】 【审批人】选择角色为【超级管理员】 【数据】权限全部设置为【查看】 其他配置项默认,需要了解更多请查看产品使用手册 最后点击节点【保持】按钮。 新增【审核分支】,在【通过】分支中增加两个数据更新节点,跟审核前的两个数据更新节点对应 【模型触发:[Gemini任务]】的【任务状态】字段等于数字字典的【初始化】,任务审核状态为【审核通过】 【获取数据[Gemini用户任务实例]】的【任务状态】字段等于数字字典的【初始化】 流程确保保持并发布过,点击右上角【发布流程】完成流程的保存与发布 Step5 检验效果 创建任务后,任务状态为【关闭】状态,任务列表中的任务状态为多个状态的计算值 审核通过后,任务状态为【进行中】状态,任务列表中的任务状态为多个状态的计算值

    2024年5月23日
    99900
  • 梅丛银

    认识陈鹏程及数式核心团队同学已经有一段时间了,在我们多次的交流讨论中时常会谈及:未来中国哪家软件企业能在互联网云原生时代走出来超越传统软件企业?史昂说这是他的梦想,也是他们团队这么多年坚持技术和产品研发与应用优先思考之路。史昂及数式核心团队面向企业应用市场历经三年的潜心研发和实战交付,推出Oinone产品及配套的低代码平台工具:对比国内外应用软件平台在开放生态和云原生均有它的继承性和独特性,特别是将技术平台赋予企业各种业务领域属性,便于企业客户和开发伙伴的二次开发并能快速搭建各类企业核心应用场景是Oinone的最大亮点。Oinone的内在特点之一是参考了全球最大开源ERP Odoo的元数据模型设计,同时基于业务中台架构和云原生技术,形成了自己一套国际化的快速开发平台、建模规范和应用产品,通过自己进场落地很多品牌企业的应用中台化不断迭代升级,走出了一条具有显著特色的新应用软件之路。史昂及团队特点谦卑、善于思考,善于吸收他山之精华,这是创业团队难能可贵之点,由此能善于与生态伙伴合作也是能够走的更远更长的基础基因。最后希望和祝愿Oinone能为中国企业在云时代数字化实践做出更多的贡献,为软件产业构建强大的应用生态和开发社区,真正树立起Oinone自己的软件品牌形象。 资深IT咨询专家&浩鲸云智能专家学院院长:梅丛银

    Oinone 7天入门到精通 2024年5月23日
    1.1K00
  • 4.5.1 研发辅助之插件-结构性代码

    研发辅助意在 消灭研发过程中的重复性工作提升研发效率,如结构性代码 提供生产示例性代码,如果根据模型生成导入导出、view自定义配置等经常性开发 一、插件安装 根据自身Idea版本下载插件并安装: 版本 插件 2023.1 pamirs-source-maker-1.0.0-2023.1.zip(2.4 MB) 2021.1 pamirs-source-maker-1.0.0-2021.1.zip(2.4 MB) 2021.2 pamirs-source-maker-1.0.0-2021.2.zip(2.4 MB) 2021.3 pamirs-source-maker-1.0.0-2021.3.zip(2.4 MB) 2022.1 pamirs-source-maker-1.0.0-2022.1.zip(2.4 MB) pamirs-source-maker-1.0.0-223-EAP-SNAPSHOT(2.4 MB) 表4-5-1-1 插件列表 二、研发辅助之配置式结构性代码生成器 我们在开发过程中为了日后代码易于维护和修改,往往会做工程性的职责划分。 除去模型外会有 代理模型和代理模型Action来负责前端交互 以面向接口的形式来定义函数,就会有api和实现类之分 如果项目有多端,那么如代理模型和代理模型Action又要为每一个端构建一份 在大型项目的初始阶段,我们需要手工重复做很多事情,特别麻烦。现在用oinone的研发辅助插件的结构性代码生成器,就可以避免前面的重复工作 插件执行的配置文件 <?xml version="1.0" encoding="utf-8" ?> <oinone> <makers> <!– 根据模型生成代理类、代理类的Action、Service、ServiceImpl –> <maker> <!– 选择模型所在位置 –> <modelPath>/Users/oinone/Documents/oinone/demo/pamirs-second/pamirs-second-api/src/main/java/pro/shushi/pamirs/second/api/model</modelPath> <!– 代理模型、代理模型Action生成相关配置信息 –> <proxyModules> <module> <!– 代理模型和代理模型Action的生成位置信息 –> <generatePath>/Users/oinone/Documents/oinone/demo/pamirs-second/pamirs-second-api/src/main/java/pro/shushi/pamirs/second/api</generatePath> <!– 代理模型和代理模型Action的模块前缀 –> <modulePrefix>second</modulePrefix> <!– 代理模型和代理模型Action的模块名,代理模型和代理模型Action类名为moduleName+模型名+"Proxy"+"Action" –> <moduleName>second</moduleName> <!– 代理模型和代理模型Action的包名,实际包名为 packageName+".proxy"或packageName+".action"–> <packageName>pro.shushi.pamirs.second.api</packageName> </module> </proxyModules> <!– 根据模型生成api,包括service(写方法)和queryService(读方法) –> <apiModule> <!– service和queryService的生成位置信息 –> <generatePath>/Users/oinone/Documents/oinone/demo/pamirs-second/pamirs-second-api/src/main/java/pro/shushi/pamirs/second/api</generatePath> <!– service和queryService的模块前缀 –> <modulePrefix>second</modulePrefix> <!– service和queryService的模块名 –> <moduleName>second</moduleName> <!– service和queryService的包名,实际包名为 packageName+".service" –> <packageName>pro.shushi.pamirs.second.api</packageName> </apiModule> <!– 根据模型生成api实现类,包括serviceImpl(写方法)和queryServiceImpl(读方法) –> <coreModule> <!– serviceImpl和queryServiceImpl的生成位置信息 –> <generatePath>/Users/oinone/Documents/oinone/demo/pamirs-second/pamirs-second-core/src/main/java/pro/shushi/pamirs/second/core</generatePath> <!– serviceImpl和queryServiceImpl的模块前缀 –> <modulePrefix>second</modulePrefix> <!– serviceImpl和queryServiceImpl的模块名 –> <moduleName>second</moduleName> <!– serviceImpl和queryServiceImpl的包名,实际包名为 packageName+".service" –> <packageName>pro.shushi.pamirs.second.core</packageName> </coreModule> </maker> </makers> </oinone> 图4-5-1-1 插件执行的配置文件 三、研发辅助之多模型结构性代码生成器 是配置式结构性代码生成器的补充,应对开发后期维护中新增模型的场景。它的不同点在于只要选择模型文件就可以,不需要专门编写xml文件。生成的文件默认就在模型所在路径下 Step1 菜单栏上找到oinone,并点击子菜单【多模型结构性代码生成器】 图4-5-1-2 多模型结构性代码生成操作步骤一 Step2 设置必要的信息 模型前缀 模型的所属模块 代理模型的模块 这三个信息分别用于构建 代理模型的MODEL_MODEL = 模型前缀.代理模型的模块.代理模型类名 服务的FUN_NAMESPACE = 模型前缀.代理模型的模块.服务类名 图4-5-1-3 多模型结构性代码生成操作步骤二 Step3 选择为哪些模型生成对应的结构性代码 图4-5-1-4 多模型结构性代码生成操作步骤三 Step4 代码在模型所在目录 生成的文件默认就在模型所在路径下,您可以手动拖动到对应的包路径当中去 图4-5-1-5 多模型结构性代码生成操作步骤四

    2024年5月23日
    97200

Leave a Reply

登录后才能评论