4.1.21 框架之分布式消息

消息中间件是在分布式开发中常见的一种技术手段,用于模块间的解耦、异步处理、数据最终一致等场景。

一、介绍

oinone对开源的RocketMQ进行了封装,是平台提供的一种较为简单的使用方式,并非是对RocketMQ进行的功能扩展。同时也伴随着两个非常至关重要的目的:

  1. 适配不同企业对RocketMQ的不同版本选择,不至于改上层业务代码。目前已经适配RocketMQ的开源版本和阿里云版本。 下个版本会对API进行升级支持不同类型MQ,以适配不同企业对MQ的不同要求,应对一些企业客户已经对MQ进行技术选择

  2. 对协议头进行扩展:如多租户的封装,saas模式中为了共用MQ基础资源,需要在消息头中加入必要租户信息。

二、使用准备

demo工程默认已经依赖消息,这里只是做介绍无需大家额外操作,大家可以用maven依赖树命令查看引用关系。

依赖包

增加对pamirs-connectors-event的依赖


<dependency>
    <groupId>pro.shushi.pamirs.framework</groupId>
    <artifactId>pamirs-connectors-event</artifactId>
</dependency>

图4-1-21-1 分布式消息的依赖包

相关功能引入

增加模型、触发器都依赖MQ

<!-- 增强模型 -->
<!-- 增强模型 -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-channel</artifactId>
</dependency>
<!-- 触发器 api -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-api</artifactId>
</dependency>
<!-- 触发器 core -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-core</artifactId>
</dependency>

图4-1-21-2 增加模型、触发器都依赖MQ

yml配置文件参考

详见4.1.1【模块之yml文件结构详解】的“pamirs.event”部分。

三、使用说明

发送消息(NotifyProducer)

概述

NotifyProducer是Pamirs Event中所有生产者的基本API,它仅仅定义了消息发送的基本行为,例如生产者自身的属性,启动和停止,当前状态,以及消息发送方法。它本身并不决定消息如何发送,而是根据具体的实现确定其功能。

目前仅实现了RocketMQProducer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

Notify注解方式

使用示例

@Component
public class DemoProducer {

    @Notify(topic = "test", tags = "model")
    public DemoModel sendModel() {
        return new DemoModel();
    }

    @Notify(topic = "test", tags = "dto")
    public DemoDTO sendDTO() {
        return new DemoDTO();
    }
}

图4-1-21-3 Notify注解方式使用示例

解释说明

  1. 使用Component注解方式注册Spring Bean。

  2. Notify注解指定topic和tags。

  3. topic和tags对应NotifyEvent中的topic和tags。

RocketMQProducer方法调用

使用示例

@Component
public class SendRocketMQMessage {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel()));
        rocketMQProducer.send(new NotifyEvent("test", "dto", new DemoDTO()));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage() {
        DemoModel data = new DemoModel();
        data.setAge(10);
        rocketMQProducer.send(new NotifyEvent("test", "model", data)
                .setQueueSelector((queueSize, event) -> {
                    DemoModel body = (DemoModel) event.getBody();
                    return body.getAge() % queueSize;
                }));
    }

    /**
     * 发送事务消息
     */
    public void sendTransactionMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel())
                .setIsTransaction(true)
                .setGroup("demoTransactionListener"));
    }
}

图4-1-21-4 RocketMQProducer方法调用

解释说明

  1. 使用Component注解方式注册Spring Bean。
  2. 使用Autowired注解方式装配RocketMQProducer实例。
  3. 使用send方法发送指定消息。
  4. 在【发送普通消息】方法中,该实现效果与Notify注解方式所示完全一致。
  5. 在【发送有序消息】方法中,队列选择器是必须配置的,queueSize属性为MQ队列的总数量,在broker中配置。有序消息必须配合有序消费者才能达到有序消费的目的,否则还是无序的常规消息,消费者需要配置@NotifyListener(consumerType=ConsumerType.ORDERLY )注解。
  6. 在【发送事务消息】方法中指定的group为ProducerGroup,事务消息是通过不同的Producer发出的,事务消息监听请参考TransactionListener注解的相关使用方法。(示例中的group与下文介绍中的一致)

使用TransactionListener开启事务消息监听

使用示例

@Component
@TransactionListener
public class DemoTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        return NotifyTransactionState.COMMIT;
    }

}

图4-1-21-5 使用TransactionListener开启事务消息监听

解释说明

  1. 实现NotifyTransactionListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 添加TransactionListener注解注册事务监听,生成对应的生产者。

  4. 当前ProducerGroup将使用这个类的BeanName,即"demoTransactionListener"。如果你想自定义ProducerGroup,可以使用TransactionListener的value属性进行设置。

消费消息(NotifyConsumer)

概述

NotifyConsumer是Pamirs Evnet中所有消费者的基本API,与NotifyProducer类似,它仅仅定义了消息消费的基本行为,例如消费者自身的属性,启动和停止,当前状态,以及消息的监听和订阅方法。它本身并不决定消息如何消费以及如何被订阅,而是根据具体的实现确定其功能。

目前仅实现了RocketMQEventPushConsumer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

在类上使用NotifyListener注解

使用示例

@Component
@NotifyListener(topic = "test", tags = "model")
public class DemoConsumerClass implements NotifyEventListener {

    @Override
    public void consumer(NotifyEvent event) {
        DemoModel data = (DemoModel) event.getBody();
        // do some things.
    }
}

图4-1-21-6 在类上使用NotifyListener注解

解释说明

  1. 实现NotifyEventListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 当前ConsumerGroup将使用这个类的BeanName,即"demoConsumerClass"。如果你想自定义ConsumerGroup,可以使用Component的value属性进行设置。

  4. 从body中将可以获取生产者发送的数据对象,并且已经做好了类型处理,可以直接使用。

  5. 使用原生的RocketMQ发送的消息,类型可能是无法识别的,你可以使用NotifyListener中提供的bodyClass来指定类型。

  6. topic和tags对应NotifyEvent中的topic和tags。

在方法上使用NotifyListener注解

使用示例

@Component
public class DemoConsumerMethod {

    @Bean
    @NotifyListener(topic = "test", tags = "model")
    public NotifyEventListener modelConsumer() {
        return event -> {
            DemoModel data = (DemoModel) event.getBody();
            // do some things.
        };
    }

    @Bean
    @NotifyListener(topic = "test", tags = "dto")
    public NotifyEventListener dtoConsumer() {
        return event -> {
            DemoDTO data = (DemoDTO) event.getBody();
            // do some things.
        };
    }
}

图4-1-21-7 在方法上使用NotifyListener注解

解释说明

  1. 使用Bean注解方式注册Spring Bean。

  2. 方法返回值为NotifyEventListener类型。

  3. 当前ConsumerGroup将使用对应方法生成的BeanName,即"modelConsumer"和"dtoConsumer"。如果你想自定义ConsumerGroup,可以使用Bean的value属性进行设置。

实战

约定:每个模块下的Topic和Tags必须定义常量池进行统一管理,主要是为了方便维护与管理,技术没有限制

常规消息(举例)

Step1 新建PetNotifyEnum

用PetNotifyEnum来管理模块的所有Topic和Tags的常量定义

package pro.shushi.pamirs.demo.api.mq;

public class PetNotifyEnum {

    public static class PetItemInventroy{

        public interface Topic{
            String PET_ITEM_INVENTROY_CHANGE ="petItemInventroyChange";
        }
        public interface Tag{
            String CREATE ="create";
            String UPDATE ="update";
            String DELETE ="delete";
        }

    }

}

图4-1-21-8 新建PetNotifyEnum

Step2 新建PetItemInventoryMqProducer

新建PetItemInventroy模型对应消息生产者,用于发送PetItemInventroy模型变动的相关消息

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

}

图4-1-21-9 新建PetItemInventoryMqProducer

Step3 新建PetItemInventoryMqConsumer

新建PetItemInventoryMqConsumer,订阅PetItemInventroy模型变动的相关消息,并进行相关处理

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-10 PetItemInventoryMqConsumer

Step4 修改PetItemInventroyAction

在修改PetItemInventroy完成之后,发送topic为【PetNotifyEnum.PetItemInventroyMq.Topic.PET_ITEM_INVENTROY_CHANGE】,tag为【PetNotifyEnum.PetItemInventroyMq.Tag.UPDATE】的消息出去

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-11 修改PetItemInventroyAction

Step5 重新看效果

  1. 编辑商品库存记录

image.png

图4-1-21-12 编辑商品库存记录

  1. 查看后端日志是否打印

image.png

图4-1-21-13 查看后端日志是否打印

顺序消息(举例)

Step1 修改PetItemInventoryMqProducer

增加一个顺序消息发送的方法,发送时根据商品库存的Id就分队列,相同队列顺序消费

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage(PetItemInventroy data,String tag) {

        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
                .setQueueSelector((queueSize, event) -> {
                    PetItemInventroy body = (PetItemInventroy) event.getBody();
                    Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                    return queue.intValue();
                }));

    }
}

图4-1-21-14 修改PetItemInventoryMqProducer

Step2 修改PetItemInventoryMqConsumer

增加@NotifyListener(consumerType= ConsumerType.ORDERLY)的注解

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.ConsumerType;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
//@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*",consumerType= ConsumerType.ORDERLY)
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-15 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的顺序消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
//        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-16 修改PetItemInventroyAction

Step4 重启看效果

同常规消息

事务消息(举例)

注:这种写法默认第一次是UNKNOWN,然后通过MQ回调二次确认,在消息时效性要求高的场景下是不符合要求的。对实效性要求高的,请见下面事务消息-优化(举例)章节。

Step1 新建NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息回滚");
        return NotifyTransactionState.ROLLBACK;
    }
}

图4-1-21-17 新建NotifyTransactionListener

Step2 修改PetItemInventoryMqProducer

增加发送事务性消息的方法,通过.setIsTransaction(true)来显示设置该消息是事务消息,通过setGroup("petItemInventoryMqTransactionListener")来匹配事务监听处理器

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data,String tag) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
    );
}

图4-1-21-18 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的事务性消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-19 修改PetItemInventroyAction

Step4 重启看效果

  1. 编辑商品库存记录

image.png

图4-1-21-20 编辑商品库存记录

  1. 查看后端日志是否打印

消息回滚,没有消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-21 MQ日志打印一

image.png

图4-1-21-22 MQ日志打印二

Step5 修改NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息提交");
        //正常业务情况下,这里要增加自己的逻辑判断,来确定返回状态值
        return NotifyTransactionState.COMMIT;
    }
}

图4-1-21-23 修改NotifyTransactionListener

Step6 重启看效果

消息提交,并看到消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-24 MQ日志打印三

image.png

图4-1-21-25 MQ日志打印三

事务消息-优化(举例)

Step1 修改PetItemInventoryMqProducer

修改PetItemInventoryMqProducer事务性消息发送方法sendTransactionMessage,增加NotifyExecuteLocalTransactionCallback入参。

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data, String tag, NotifyExecuteLocalTransactionCallback callback) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
            .setExecuteLocalTransactionCallback(callback)
    );
}

图4-1-21-26 修改PetItemInventoryMqProducer

Step2 修改PetItemInventroyAction

修改PetItemInventroyAction的update方法

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){
            @Override
            public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

                List<PetItemInventroy> inventroys = new ArrayList<>();
                inventroys.add(data);
                PamirsSession.directive().disableOptimisticLocker();
                try{
                    int i = data.updateBatch(inventroys);
                } finally {
                    PamirsSession.directive().enableOptimisticLocker();
                }
                //业务代码执行完毕,提交消息
                return NotifyTransactionState.COMMIT;
            }
        });
        return data;
    }
}

图4-1-21-27 修改PetItemInventroyAction

Step3 重启看效果

编辑商品库存记录,发现消息的处理几乎没有延迟

image.png

图4-1-21-28 示例效果几乎没有延迟

Step4 模拟业务成功,消息发送失败

修改PetItemInventroyAction的update方法

@Function.Advanced(type= FunctionTypeEnum.UPDATE)
@Function.fun(FunctionConstants.update)
@Function(openLevel = {FunctionOpenEnum.API})
public PetItemInventroy update(PetItemInventroy data){
    //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){

        @Override
        public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

            List<PetItemInventroy> inventroys = new ArrayList<>();
            inventroys.add(data);
            PamirsSession.directive().disableOptimisticLocker();
            try{
                int i = data.updateBatch(inventroys);
            } finally {
                PamirsSession.directive().enableOptimisticLocker();
            }
            //模拟业务成功,消息发送失败
            throw new RuntimeException();
            //业务代码执行完毕,提交消息
            //return NotifyTransactionState.COMMIT;
        }
    });
    return data;
}

图4-1-21-29 修改PetItemInventroyAction的update方法

Step5 重启看效果

这个效果跟第一种事务消息一样,消息提交延迟严重。

image.png

图4-1-21-30 MQ日志打印

image.png

图4-1-21-31 MQ日志打印

Step6 总结

优化后的事务消息,在正常情况下时效性非常高,异常情况下也能通过NotifyTransactionListener做保障

Oinone社区 作者:史, 昂原创文章,如若转载,请注明出处:https://doc.oinone.top/oio4/9296.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

(0)
史, 昂的头像史, 昂数式管理员
上一篇 2024年5月23日
下一篇 2024年5月23日

相关推荐

  • 7.2 实战训练(积分发放)

    前言 当我们碰到一个全新的场景,除了写代码以外也可以通过设计器来完成,大致步骤如下: 分析业务场景,规划对应的模型并通过模型设计器进行配置 通过界面设计器,设计出必要管理页面 通过流程设计器,设计对应业务流程 通过数据可视化,设计相应的数据看板 场景说明 本节通过例子完成一个积分成本分摊的业务场景 积分支出:谁受益谁支出原则+导购手动确认原则,通过门店应用的积分规则,来实现自动化+手动积分形式实现 案例背景 某家具企业经营多种家具类型,不同系列不同品类分事业部经营。 注: 独立门店:只能售卖本事业部下的商品; 融合门店:可以售卖多事业部下的商品; 需求:需要建立一套积分规则,遵循谁受益谁支出原则+导购手动确认原则;通过门店应用的积分规则,来实现自动化+手动积分。 场景一 导购a邀请老客户c通过裂变分享新客户x在独立门店1下单。系统根据独立门店1的积分规则,自动发放积分给老客户c和新客户x,积分由国一事业部承担。 场景二 融合门店1导购b邀请老客户d裂变分享新客户y在融合门店下单。该订单可能涉及多个事业部,由导购b手动选择最大量值的积分规则进行发放 场景三 独立门店1的导购a邀请老客户c裂变分享新客户z在独立门店2下单。系统根据独立门店2的积分规则,来计算积分值,自动发放老客户c和新客户z的积分,积分由国二事业部 实战训练 Step1 分析业务场景规划对应模型 本场景下涉及基础对象模型包括:事业部、门店、导购、会员等 事业部:它是积分成本的载体 门店:类型分为独立门店和融合门店,独立门店必须隶属于一个事业部,同时配置默认积分发放规则,融合门店可能属于多个事业部当发生积分发放时,需要店员手工选择成本事业部和积分发放规则 导购员:导购员必须隶属于一个门店等 会员:消费者需要记录它隶属导购员,以及是由哪个会员推荐过来的等 涉及业务对象模型包括:积分发放规则,积分发放记录,邀请下单记录 积分发放规则:会员积分发放规则,对应邀请老客的积分发放规则等 积分发放记录:本次发放积分、本次发放积分规则、发放对象(会员)、成本事业部、关联门店、关联导购、关联老客(可空)、关联下单记录编码等 邀请下单记录:导购、下单会员、下单门店、商品信息、下单金额等 对应模型如下: 模型 设计器呈现 自定义字段列表 关系字段说明 说明 事业部 事业部负责人(文本) 门店列表(o2m) 与门店建立o2m绑定关系,绑定时选择双向绑定,双向绑定意思是在事业部这边建立o2m到门店的关系字段,在门店那边建立m2o的关系字段 关系字段需要在有对方模型的情况下再建,比如事业部中的门店列表则是再后面追加新增的 门店 导购列表(o2m) 事业部(m2o) 默认积分发规则(m2o) 门店类型(枚举) 分别与事业部、导购、积分发放规则建立m2o、o2m、m2o关系 门店类型:需要先建对应的数据字典 导购 绑定用户(m2o) 门店(m2o) 是否离职(布尔型) 与门店建立m2o关系 1. 绑定用户,用于后续业务流程设计中的填写规则 2. 打马赛克的忽略,其他场景测试用 会员 会员累计积分(浮点数) 推荐客户(m2o) 所属于导购员(m2o) 是否为新客(布尔型) 与导购、会员建立m2o关系 1. 会员m2o的字段是自关联用于存储推荐会员 2. 打马赛克的忽略,其他场景测试用 积分发放规则 推荐客户发放比例(浮点数) 发放倍数(整数) 积分发放记录 最终发放积分(浮点数) 关联积分规则(m2o) 事件编码(文本) 推荐导购员(m2o) 推荐会员(m2o) 关系门店(m2o) 成本事业部(m2o) 会员(m2o) 成本事业部名称(文本) 会员名称(文本) 与积分发放规则、导购员、会员、门店、事业部建立m2o关系 1. 会员有两个m2o,分别用户记录发放会员和发放会员的推荐会员也就是老客 2. 事件编码用户维护触发本次积分发放记录产生的源头单据编码如:邀请下单记录的编码 邀请下单记录 成本事业部(m2o) 选择积分发放规则(m2o) 下单门店(m2o) 购买商品(文本) 下单金额(整数) 会员(m2o) 导购(m2o) 与成本事业部、积分发放规则、下单门店、会员、导购等建立m2o的关系 1. 会员、下单门店、导购属于必要信息 2. 成本事业部、积分发放规则是业务流程中自动计算回填的数据 Step3 利用界面设计器,设计出必要的管理页面 以事业部为例,构建管理页面。其他模型依次按例子建立管理页面 进入界面设计器,应用选择全员营销,模型选择事业部,点击添加页面下的直接创建 设置页面标题、模型(自动带上可切换)、业务类型(运营管理后续会扩展其他类型)、视图类型(表单)后点击确认按钮进入事业部表单设计页面 进入页面设计器,对事业部表单页面进行设计(更多细节介绍,请参考界面设计产品使用手册) a. 左侧为物料区:分为组件、模型。 ⅰ. 【组件】选项卡下为通用物料区,我们可以为页面增加对应布局、字段(如同在模型设计器增加字段)、动作、数据、多媒体等等 ⅱ. 【模型】选项卡下为页面对应模型的自定义字段、系统字段、以及模型已有动作 b. 中间是设计区域 c. 右侧为属性面板,在设计区域选择中组件会显示对应组件的可配置参数 在左侧【组件】选项卡下,拖入布局组件【分组】,并设置组件【标题属性】为基础信息 在左侧【组件】选项卡下,再次拖入布局组件【分组】,并设置组件【标题属性】为门店列表 在左侧【模型】选项卡下,分别把自定义字段中的【事业部负责人】和系统字段中的【名称】拖入【基础信息】分组,把自定义字段中的【门店列表】字段拖入门店列表分组 在设计区域切换【门店列表】展示组件为【表格】 此时【门店列表】展示形式变成了表格形式,选中【门店列表】组件,会发现左侧【模型】选项卡下的当前模型切换成了【门店】,同时我们在右属性面板区置空其【标题属性】 设计区选中【门店列表】的表格组件,分别把自定义字段中的【默认积分发规则】、【门店类型】、【导购列表】和系统字段中的【名称】拖入【门店列表】表格组件的表格字段设计区 设计区选中【门店列表】的表格组件的【创建】按钮,点击【打开弹窗】设计关系字段【门店】的新增页面 11.分别把自定义字段中的【默认积分发规则】、【门店类型】和系统字段中的【名称】拖入门店的新增页面设计区 选中弹出框中【取消】取消按钮,设置其【按钮样式】属性为【次要按钮】 关闭弹出框,回到主设计区 设计区选中【门店列表】的表格组件的【删除】按钮,设置其【按钮样式】属性为【次要按钮】,【二次确认】属性打开 设计区选中【门店列表】的表格组件中操作列的【编辑】按钮,点击【打开弹窗】设计关系字段【门店】的编辑页面, a. 分别把自定义字段中的【默认积分发规则】、【门店类型】和系统字段中的【名称】拖入门店的新增页面设计区。 b. 选中弹出框中【取消】取消按钮,设置其【按钮样式】属性为【次要按钮】 c. 把门店类型展示组件切换为【单选框】 d. 关闭弹出框 设计区选择非【门店列表】组件如基础信息,模型切换为主模型【事业部】,在左侧【模型】选项卡下,把动作分类下的提交类型【创建】动作拖入中间设计区的动作区 选择中展示名为【确定】的创建动作按钮,在右侧属性面板中设置:是否隐藏属性为【条件隐藏】,隐藏条件为【id非空】 在左侧【模型】选项卡下,把动作分类下的提交类型【更新】动作拖入中间设计区的动作区 选择中展示名为【确定】的更新动作按钮,在右侧属性面板中设置:是否隐藏属性为【条件隐藏】,隐藏条件为【id为空】。之所以同时拖入【创建】和【更新】动作并都命名为确认,是期望这个页面同时可以做为新增页面也可以做为编辑页面,只不过要通过条件隐藏来设置按钮的出现规则 在左侧【组件】选项卡下,把动作分类下的【客户端动作】拖入中间设计区的动作区 选择设计区【客户端动作】,在右侧属性面板中设置:动作名称为【返回】,客户端行为为【返回上一个页面】并点击保存 选择设计区【返回】动作,在右侧属性面板中设置:按钮样式为【返回】,【二次确认】属性打开并设置提示文字为【返回页面操作将不被保存】,可以点击预览二次确认看效果 点击【发布】按钮,页面成功发布,每发布一次会有一个例子版本,可以通过历史版本进行恢复 点击右上角历史版本图标,进入历史版本查看页面 在历史版本页面可以选择对应历史版本记录,并通过【恢复此版本】来完成页面的历史版本切换 接下来我们为事业部模型创建表格管理页面,入口同编辑页面。设置页面标题、模型(自动带上可切换)、业务类型(运营管理后续会扩展其他类型)、视图类型(表格)后点击确认按钮进入事业部表格设计页面 进入页面设计器,对事业部表格页面进行设计(更多细节介绍,请参考界面设计产品使用手册) 在左侧【模型】选项卡下,分别把自定义字段中的【事业部负责人】和系统字段中的【名称】拖入表格组件的表格字段设计区 在左侧【组件】选项卡下,把动作分类下的【跳转动作】拖入中间设计区的动作区,并在右侧属性面板中设置动作名称为【新增】,数据控制类型为【不进行数据处理】,打开方式为【当前窗口打开】,动作跳转页面为【事业部编辑】页面,并点击保存 在左侧【组件】选项卡下,把动作分类下的【跳转动作】拖入中间设计区的行内动作区,并在右侧属性面板中设置动作名称为【编辑】,数据控制类型为【处理单条数据】,打开方式为【当前窗口打开】,动作跳转页面为【事业部编辑】页面,并点击保存 在左侧【模型】选项卡下,把动作分类下的【删除】拖入中间设计区的动作区,并在右侧属性面板中设置动作名称为【删除】,按钮样式为【次要按钮】,【二次确认】属性打开 点击右上角【显示母版】进入页面最终展示形式,点击添加菜单项,并在输入框中输入【事业部管理】 点击菜单右侧设置图标,选择【绑定已有页面】,进行菜单与页面的绑定操作 在绑定页面中,模型选择【事业部】,视图选择【事业部管理】,点击确认按钮提交 拖动【事业部管理】菜单到【积分管理】父菜单下 最后别忘了点击右上角【发布】按钮对【事业部管理】表格页面进行发布,回到界面设计器首页查看刚刚建好的两个页面 以事业部为例分别对门店、导购、会员、积分发放规则、积分发放记录、邀请下单记录等模型进行页面设计,这里不再累赘,请按照自身学习需要,尝试进行界面设计 Step4 通过流程设计器,设计对应业务流程 我们先来整理下核心流程即:邀请下单流程 Step4.1 创建导购邀请下单记录触发流程 进入流程设计器,点击【创建】按钮 注意:流程中需要获取关系类型的字段(如:O2O、O2M、M2O或M2M),这种类型都是复杂的对象字段,除关联字段(一般为ID)以外的字段,需要通过【数据获取】节点单独获取【关系字段】的对象数据。所以在流程设计中经常会用到【数据获取】节点 左上角编辑流程名称为【导购邀请下单触发流程】,点击第一个【触发】节点,触发方式选择模型触发,模型选择【导购邀请下单】,触发场景选择【新增数据时】,点击该节点的【保存】按钮 点击流程图节点间的【+】图标选择增加【获取数据】节点,或者拖动左侧物料区【获取数据】到特定的【+】图标 点击【获取数据】,在右侧属性面板中设置【获取数据条数】为单条,选择模型为【导购】,点击【筛选条件】的【{X}】图标,进行数据获取的条件设置 选择条件字段为【ID】条件操作符为【等于】,条件为变量的导购字段的ID。当上下文只有一个变量时默认不需要选择,这里默认的是【触发[导购邀请下单记录]】,设置好以后点击确认,回到属性面板设置【未获取到数据时执行方式】为【终止流程】,并点击节点【保持】按钮 再增加一个【获取数据】节点,在右侧属性面板中设置 a. 【获取数据条数】为单条,选择模型为【会员】 b. 点击【筛选条件】的【{X}】图标,进行数据获取的条件设置,选择条件字段为【ID】条件操作符为【等于】,条件为变量【导购邀请下单记录】的会员关系字段的ID字段。因为上下文中存在多个变量时需要选择对应变量,跟获取导购数据有点区别,在获取导购数据时,上下文中只有此次【导购邀请下单记录】所以不需要选择对应变量,设置好以后点击【确认】按钮 c. 回到属性面板设置【未获取到数据时执行方式】为【终止流程】 d. 最后点击节点【保持】按钮。…

    2024年5月23日
    1.7K00
  • 2.4.2 Oinone独特性之每一个需求都可以是一个模块

    我们的Oinone平台采用模型驱动的方式,并符合面向对象设计原则,每个需求都可以是一个独立模块,可以独立安装、升级和卸载。这让系统真正像乐高积木一样搭建,具有高度的灵活性和可维护性。 与大部分低代码或无代码平台不同的是,它们的应用市场上的应用往往是模板式的,也就是说,这是一个拷贝,个性化只能在应用上直接修改,而且一旦修改就不能升级。这对于软件公司和客户来说都非常痛苦。客户无法享受到软件公司产品的升级功能,而软件公司在服务大量客户时,也会面临不同版本的维护问题,成本也非常高。而我们的Oinone平台完全避免了这些问题,让客户和软件公司都可以从中受益(如下图2-9、2-10所示)。 图2-9软件公司与客户项目的关系-让标准与个性化共存 图2-10 软件公司与客户项目的关系-让升级无忧 实现原理 在满足客户个性化定制需求时,传统的方法通常是直接修改标准产品源码,但这样做会带来一个问题:标准产品无法持续升级。相反,无论是在OP模式还是SaaS模式下,Oinone都采用全新的模块为客户进行个性化开发,保持标准产品和个性化模块的独立维护和升级。这是因为在元数据设计时,Oinone采用了面向对象的设计原则,实现了元数据设计与面向对象设计思想的完美融合。 面向对象设计的核心特征包括封装、继承、多态,而Oinone的元数据设计完全融入了这些思想。下面是几个例子,说明Oinone的元数据设计如何体现面向对象设计的核心特征,并带来了什么好处: 继承:在继承原有模型的字段、逻辑、展示的情况下,增加一段代码来扩展模型的字段、逻辑、展示。 多态:在继承原有模型的字段、逻辑、展示的情况下,增加一段代码来覆盖模型的原有字段、逻辑、展示。 封装:外部无需关心模型内部如何实现,只需按照不同场景调用模型对应开放级别的字段、逻辑、展示。 这些特征和优势使得Oinone在满足客户个性化需求时更加灵活和可持续,同时使得标准产品的维护和升级变得更加容易和高效。 在Java语言设计中,万物皆对象,一切都以对象为基础。而Oinone的元数据设计则是以模型为出发点,作为数据和行为的承载体。如下图2-11清晰地描述了Java面向对象编程中封装、继承、多态在Oinone元数据中的对应关系。Oinone元数据描述了B对象继承A对象并拥有其所有属性和方法,并覆盖了A对象的属性1和方法1,同时新增了属性3和方法3。 此外,Oinone的面向对象特性是用元数据来描述的。一方面,我们基于Java编码规范收集相关元数据,以保持不改变Java编程习惯。另一方面,方法和对象的挂载是松耦合的,只要按照元数据规范进行挂载,就能轻松地将其附加到模型上。在不改变原有A对象的情况下,我们可以直接增加方法和属性(如下图2-12所示)。 图2-11 java面向对象在Oinone元数据中对应 图2-12 java对象的修改 VS Oinone元数据模型的修改 Oinone函数不仅支持面向对象的继承和多态特性,还提供了面向切面的拦截器和SPI机制的扩展点,以应对方法逻辑的覆盖和扩展,以及系统层面的逻辑扩展(如下图2-13所示)。这些扩展功能可以独立地在模块中维护。 其中,拦截器可以在不侵入函数逻辑的情况下,根据优先级为满足条件的函数添加执行前和执行后的逻辑。 扩展点是一种类似于SPI机制的逻辑扩展机制,用于扩展函数的逻辑。通过这一机制,可以对函数逻辑进行灵活的扩展,以满足不同的业务需求。 图2-13 Oinone函数拦截与扩展机制 不管是对象、属性还是方法,都可以以独立的模块方式来扩展,这就使得每一个需求都可以成为一个独立的模块,方便我们在研发标准产品时进行模块化的划分,同时也让我们在以低代码模式为客户进行二次开发时,能够更好地支持“标准产品迭代与个性化保持独立”的需求。在2.4.3【oinone独特性之低无一体】一文中,我们也提到了这个特性,但那是在低无一体的情况下,通过元数据融合来实现的。让我们看看基于低代码开发模式下,典型的Oinone二次开发工程结构(如下图2-14所示),就可以更好地理解这个特性啦! 图2-14 Oinone典型的二开工程结构

    2024年5月23日
    1.3K00
  • 4.2.4 框架之网络请求-HttpClient

    oinone提供统一的网络请求底座,基于graphql二次封装 一、初始化 import { HttpClient } from '@kunlun/dependencies'; const http = HttpClient.getInstance(); http.setMiddleware() // 必须设置,请求回调。具体查看文章https://shushi.yuque.com/yqitvf/oinone/vwo80g http.setBaseURL() // 必须设置,后端请求的路径 图4-2-4-1 初始化代码示例 二、HttpClient详细介绍 获取实例 import { HttpClient } from '@kunlun/dependencies'; const http = HttpClient.getInstance(); 图4-2-4-2 获取实例 接口地址 import { HttpClient } from '@kunlun/dependencies'; const http = HttpClient.getInstance(); http.setBaseURL('接口地址'); http.getBaseURL(); // 获取接口地址 图4-2-4-3 接口地址 请求头 import { HttpClient } from '@kunlun/dependencies'; const http = HttpClient.getInstance(); http.setHeader({key: value}); 图4-2-4-4 请求头 variables import { HttpClient } from '@kunlun/dependencies'; const http = HttpClient.getInstance(); http.setExtendVariables((moduleName: string) => { return customFuntion(); }); 图4-2-4-5 variables 回调 import { HttpClient } from '@kunlun/dependencies'; const http = HttpClient.getInstance(); http.setMiddleware([middleware]); 图4-2-4-6 回调 业务使用-query private http = HttpClient.getInstance(); private getTestQuery = async () => { const query = `gql str`; const result = await this.http.query('module name', query); console.log(result) return result.data[`xx`]['xx']; // 返回的接口,打印出result对象层次返回 }; 图4-2-4-7 业务使用-query 业务使用-mutate private http = HttpClient.getInstance(); private getTestMutate = async () => { const mutation = `gql str`; const result = await this.http.mutate('module name', mutation); console.log(result) return result.data[`xx`]['xx']; // 返回的接口,打印出result对象层次返回 }; 图4-2-4-8 业务使用-mutate 三、如何使用HttpClient 初始化 在项目目录src/main.ts下初始化httpClient 初始化必须要做的事: 设置服务接口链接 设置接口请求回调 业务实战 前文说到自定义新增宠物表单,让我们在这个基础上加入我们的httpClient; 第一步新增service.ts 图4-2-4-8 新增service.ts service.ts import { HttpClient }…

    2024年5月23日
    1.4K00
  • 3.5.7.8 自定义菜单栏

    在业务中,可能会遇到需要对菜单栏的交互或UI做全新设计的需求,这个时候可以自定义菜单栏组件支持。 首先继承平台的CustomMenuWidget 组件,将自己vue文件设置到component处 import { NavMenu, SPI, ViewWidget } from '@kunlun/dependencies'; import Component from './CustomMenu.vue'; @SPI.ClassFactory( ViewWidget.Token({ // 这里的widget跟平台的组件一样,这样才能覆盖平台的组件 widget: 'nav-menu' }) ) export class CustomMenuWidget extends NavMenu { public initialize(props) { super.initialize(props); this.setComponent(Component); return this; } } vue文件中继承平台的props,编写自定义页面代码 export const NavMenuProps = { /** * 当前模块 */ module: { type: Object as PropType<IModule | null> }, /** * 树结构的菜单 */ menus: { type: Array as PropType<IResolvedMenu[]>, default: () => [] }, /** * 菜单类型,现在支持垂直、水平、和内嵌模式三种 */ mode: { type: String as PropType<'vertical' | 'horizontal' | 'inline'>, default: 'inline' }, /** * 菜单栏是否折叠收起 */ collapsed: { type: Boolean, default: false }, /** * 当前展开的 SubMenu 菜单项 key 数组 */ openKeys: { type: Array as PropType<string[]>, default: () => [] }, /** * 当前选中的菜单项 key 数组 */ selectKeys: { type: Array as PropType<string[]>, default: () => [] }, /** * 菜单搜索下拉选中菜单项 */ selectMenuBySearch: { type: Function as PropType<(menuName: String) => void> }, /** * 选中菜单项 */ selectMenu: { type: Function as PropType<(menuName: String) => Promise<void>> }, /** * SubMenu 展开/关闭的回调 */ openChange: { type: Function as PropType<(openKeys: string[]) => void> }, /**…

    2024年5月23日
    1.2K00
  • 梅丛银

    认识陈鹏程及数式核心团队同学已经有一段时间了,在我们多次的交流讨论中时常会谈及:未来中国哪家软件企业能在互联网云原生时代走出来超越传统软件企业?史昂说这是他的梦想,也是他们团队这么多年坚持技术和产品研发与应用优先思考之路。史昂及数式核心团队面向企业应用市场历经三年的潜心研发和实战交付,推出Oinone产品及配套的低代码平台工具:对比国内外应用软件平台在开放生态和云原生均有它的继承性和独特性,特别是将技术平台赋予企业各种业务领域属性,便于企业客户和开发伙伴的二次开发并能快速搭建各类企业核心应用场景是Oinone的最大亮点。Oinone的内在特点之一是参考了全球最大开源ERP Odoo的元数据模型设计,同时基于业务中台架构和云原生技术,形成了自己一套国际化的快速开发平台、建模规范和应用产品,通过自己进场落地很多品牌企业的应用中台化不断迭代升级,走出了一条具有显著特色的新应用软件之路。史昂及团队特点谦卑、善于思考,善于吸收他山之精华,这是创业团队难能可贵之点,由此能善于与生态伙伴合作也是能够走的更远更长的基础基因。最后希望和祝愿Oinone能为中国企业在云时代数字化实践做出更多的贡献,为软件产业构建强大的应用生态和开发社区,真正树立起Oinone自己的软件品牌形象。 资深IT咨询专家&浩鲸云智能专家学院院长:梅丛银

    Oinone 7天入门到精通 2024年5月23日
    1.3K00

Leave a Reply

登录后才能评论