4.1.21 框架之分布式消息

消息中间件是在分布式开发中常见的一种技术手段,用于模块间的解耦、异步处理、数据最终一致等场景。

一、介绍

oinone对开源的RocketMQ进行了封装,是平台提供的一种较为简单的使用方式,并非是对RocketMQ进行的功能扩展。同时也伴随着两个非常至关重要的目的:

  1. 适配不同企业对RocketMQ的不同版本选择,不至于改上层业务代码。目前已经适配RocketMQ的开源版本和阿里云版本。 下个版本会对API进行升级支持不同类型MQ,以适配不同企业对MQ的不同要求,应对一些企业客户已经对MQ进行技术选择

  2. 对协议头进行扩展:如多租户的封装,saas模式中为了共用MQ基础资源,需要在消息头中加入必要租户信息。

二、使用准备

demo工程默认已经依赖消息,这里只是做介绍无需大家额外操作,大家可以用maven依赖树命令查看引用关系。

依赖包

增加对pamirs-connectors-event的依赖


<dependency>
    <groupId>pro.shushi.pamirs.framework</groupId>
    <artifactId>pamirs-connectors-event</artifactId>
</dependency>

图4-1-21-1 分布式消息的依赖包

相关功能引入

增加模型、触发器都依赖MQ

<!-- 增强模型 -->
<!-- 增强模型 -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-channel</artifactId>
</dependency>
<!-- 触发器 api -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-api</artifactId>
</dependency>
<!-- 触发器 core -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-core</artifactId>
</dependency>

图4-1-21-2 增加模型、触发器都依赖MQ

yml配置文件参考

详见4.1.1【模块之yml文件结构详解】的“pamirs.event”部分。

三、使用说明

发送消息(NotifyProducer)

概述

NotifyProducer是Pamirs Event中所有生产者的基本API,它仅仅定义了消息发送的基本行为,例如生产者自身的属性,启动和停止,当前状态,以及消息发送方法。它本身并不决定消息如何发送,而是根据具体的实现确定其功能。

目前仅实现了RocketMQProducer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

Notify注解方式

使用示例

@Component
public class DemoProducer {

    @Notify(topic = "test", tags = "model")
    public DemoModel sendModel() {
        return new DemoModel();
    }

    @Notify(topic = "test", tags = "dto")
    public DemoDTO sendDTO() {
        return new DemoDTO();
    }
}

图4-1-21-3 Notify注解方式使用示例

解释说明

  1. 使用Component注解方式注册Spring Bean。

  2. Notify注解指定topic和tags。

  3. topic和tags对应NotifyEvent中的topic和tags。

RocketMQProducer方法调用

使用示例

@Component
public class SendRocketMQMessage {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel()));
        rocketMQProducer.send(new NotifyEvent("test", "dto", new DemoDTO()));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage() {
        DemoModel data = new DemoModel();
        data.setAge(10);
        rocketMQProducer.send(new NotifyEvent("test", "model", data)
                .setQueueSelector((queueSize, event) -> {
                    DemoModel body = (DemoModel) event.getBody();
                    return body.getAge() % queueSize;
                }));
    }

    /**
     * 发送事务消息
     */
    public void sendTransactionMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel())
                .setIsTransaction(true)
                .setGroup("demoTransactionListener"));
    }
}

图4-1-21-4 RocketMQProducer方法调用

解释说明

  1. 使用Component注解方式注册Spring Bean。
  2. 使用Autowired注解方式装配RocketMQProducer实例。
  3. 使用send方法发送指定消息。
  4. 在【发送普通消息】方法中,该实现效果与Notify注解方式所示完全一致。
  5. 在【发送有序消息】方法中,队列选择器是必须配置的,queueSize属性为MQ队列的总数量,在broker中配置。有序消息必须配合有序消费者才能达到有序消费的目的,否则还是无序的常规消息,消费者需要配置@NotifyListener(consumerType=ConsumerType.ORDERLY )注解。
  6. 在【发送事务消息】方法中指定的group为ProducerGroup,事务消息是通过不同的Producer发出的,事务消息监听请参考TransactionListener注解的相关使用方法。(示例中的group与下文介绍中的一致)

使用TransactionListener开启事务消息监听

使用示例

@Component
@TransactionListener
public class DemoTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        return NotifyTransactionState.COMMIT;
    }

}

图4-1-21-5 使用TransactionListener开启事务消息监听

解释说明

  1. 实现NotifyTransactionListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 添加TransactionListener注解注册事务监听,生成对应的生产者。

  4. 当前ProducerGroup将使用这个类的BeanName,即"demoTransactionListener"。如果你想自定义ProducerGroup,可以使用TransactionListener的value属性进行设置。

消费消息(NotifyConsumer)

概述

NotifyConsumer是Pamirs Evnet中所有消费者的基本API,与NotifyProducer类似,它仅仅定义了消息消费的基本行为,例如消费者自身的属性,启动和停止,当前状态,以及消息的监听和订阅方法。它本身并不决定消息如何消费以及如何被订阅,而是根据具体的实现确定其功能。

目前仅实现了RocketMQEventPushConsumer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

在类上使用NotifyListener注解

使用示例

@Component
@NotifyListener(topic = "test", tags = "model")
public class DemoConsumerClass implements NotifyEventListener {

    @Override
    public void consumer(NotifyEvent event) {
        DemoModel data = (DemoModel) event.getBody();
        // do some things.
    }
}

图4-1-21-6 在类上使用NotifyListener注解

解释说明

  1. 实现NotifyEventListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 当前ConsumerGroup将使用这个类的BeanName,即"demoConsumerClass"。如果你想自定义ConsumerGroup,可以使用Component的value属性进行设置。

  4. 从body中将可以获取生产者发送的数据对象,并且已经做好了类型处理,可以直接使用。

  5. 使用原生的RocketMQ发送的消息,类型可能是无法识别的,你可以使用NotifyListener中提供的bodyClass来指定类型。

  6. topic和tags对应NotifyEvent中的topic和tags。

在方法上使用NotifyListener注解

使用示例

@Component
public class DemoConsumerMethod {

    @Bean
    @NotifyListener(topic = "test", tags = "model")
    public NotifyEventListener modelConsumer() {
        return event -> {
            DemoModel data = (DemoModel) event.getBody();
            // do some things.
        };
    }

    @Bean
    @NotifyListener(topic = "test", tags = "dto")
    public NotifyEventListener dtoConsumer() {
        return event -> {
            DemoDTO data = (DemoDTO) event.getBody();
            // do some things.
        };
    }
}

图4-1-21-7 在方法上使用NotifyListener注解

解释说明

  1. 使用Bean注解方式注册Spring Bean。

  2. 方法返回值为NotifyEventListener类型。

  3. 当前ConsumerGroup将使用对应方法生成的BeanName,即"modelConsumer"和"dtoConsumer"。如果你想自定义ConsumerGroup,可以使用Bean的value属性进行设置。

实战

约定:每个模块下的Topic和Tags必须定义常量池进行统一管理,主要是为了方便维护与管理,技术没有限制

常规消息(举例)

Step1 新建PetNotifyEnum

用PetNotifyEnum来管理模块的所有Topic和Tags的常量定义

package pro.shushi.pamirs.demo.api.mq;

public class PetNotifyEnum {

    public static class PetItemInventroy{

        public interface Topic{
            String PET_ITEM_INVENTROY_CHANGE ="petItemInventroyChange";
        }
        public interface Tag{
            String CREATE ="create";
            String UPDATE ="update";
            String DELETE ="delete";
        }

    }

}

图4-1-21-8 新建PetNotifyEnum

Step2 新建PetItemInventoryMqProducer

新建PetItemInventroy模型对应消息生产者,用于发送PetItemInventroy模型变动的相关消息

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

}

图4-1-21-9 新建PetItemInventoryMqProducer

Step3 新建PetItemInventoryMqConsumer

新建PetItemInventoryMqConsumer,订阅PetItemInventroy模型变动的相关消息,并进行相关处理

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-10 PetItemInventoryMqConsumer

Step4 修改PetItemInventroyAction

在修改PetItemInventroy完成之后,发送topic为【PetNotifyEnum.PetItemInventroyMq.Topic.PET_ITEM_INVENTROY_CHANGE】,tag为【PetNotifyEnum.PetItemInventroyMq.Tag.UPDATE】的消息出去

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-11 修改PetItemInventroyAction

Step5 重新看效果

  1. 编辑商品库存记录

image.png

图4-1-21-12 编辑商品库存记录

  1. 查看后端日志是否打印

image.png

图4-1-21-13 查看后端日志是否打印

顺序消息(举例)

Step1 修改PetItemInventoryMqProducer

增加一个顺序消息发送的方法,发送时根据商品库存的Id就分队列,相同队列顺序消费

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage(PetItemInventroy data,String tag) {

        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
                .setQueueSelector((queueSize, event) -> {
                    PetItemInventroy body = (PetItemInventroy) event.getBody();
                    Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                    return queue.intValue();
                }));

    }
}

图4-1-21-14 修改PetItemInventoryMqProducer

Step2 修改PetItemInventoryMqConsumer

增加@NotifyListener(consumerType= ConsumerType.ORDERLY)的注解

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.ConsumerType;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
//@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*",consumerType= ConsumerType.ORDERLY)
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-15 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的顺序消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
//        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-16 修改PetItemInventroyAction

Step4 重启看效果

同常规消息

事务消息(举例)

注:这种写法默认第一次是UNKNOWN,然后通过MQ回调二次确认,在消息时效性要求高的场景下是不符合要求的。对实效性要求高的,请见下面事务消息-优化(举例)章节。

Step1 新建NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息回滚");
        return NotifyTransactionState.ROLLBACK;
    }
}

图4-1-21-17 新建NotifyTransactionListener

Step2 修改PetItemInventoryMqProducer

增加发送事务性消息的方法,通过.setIsTransaction(true)来显示设置该消息是事务消息,通过setGroup("petItemInventoryMqTransactionListener")来匹配事务监听处理器

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data,String tag) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
    );
}

图4-1-21-18 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的事务性消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-19 修改PetItemInventroyAction

Step4 重启看效果

  1. 编辑商品库存记录

image.png

图4-1-21-20 编辑商品库存记录

  1. 查看后端日志是否打印

消息回滚,没有消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-21 MQ日志打印一

image.png

图4-1-21-22 MQ日志打印二

Step5 修改NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息提交");
        //正常业务情况下,这里要增加自己的逻辑判断,来确定返回状态值
        return NotifyTransactionState.COMMIT;
    }
}

图4-1-21-23 修改NotifyTransactionListener

Step6 重启看效果

消息提交,并看到消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-24 MQ日志打印三

image.png

图4-1-21-25 MQ日志打印三

事务消息-优化(举例)

Step1 修改PetItemInventoryMqProducer

修改PetItemInventoryMqProducer事务性消息发送方法sendTransactionMessage,增加NotifyExecuteLocalTransactionCallback入参。

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data, String tag, NotifyExecuteLocalTransactionCallback callback) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
            .setExecuteLocalTransactionCallback(callback)
    );
}

图4-1-21-26 修改PetItemInventoryMqProducer

Step2 修改PetItemInventroyAction

修改PetItemInventroyAction的update方法

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){
            @Override
            public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

                List<PetItemInventroy> inventroys = new ArrayList<>();
                inventroys.add(data);
                PamirsSession.directive().disableOptimisticLocker();
                try{
                    int i = data.updateBatch(inventroys);
                } finally {
                    PamirsSession.directive().enableOptimisticLocker();
                }
                //业务代码执行完毕,提交消息
                return NotifyTransactionState.COMMIT;
            }
        });
        return data;
    }
}

图4-1-21-27 修改PetItemInventroyAction

Step3 重启看效果

编辑商品库存记录,发现消息的处理几乎没有延迟

image.png

图4-1-21-28 示例效果几乎没有延迟

Step4 模拟业务成功,消息发送失败

修改PetItemInventroyAction的update方法

@Function.Advanced(type= FunctionTypeEnum.UPDATE)
@Function.fun(FunctionConstants.update)
@Function(openLevel = {FunctionOpenEnum.API})
public PetItemInventroy update(PetItemInventroy data){
    //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){

        @Override
        public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

            List<PetItemInventroy> inventroys = new ArrayList<>();
            inventroys.add(data);
            PamirsSession.directive().disableOptimisticLocker();
            try{
                int i = data.updateBatch(inventroys);
            } finally {
                PamirsSession.directive().enableOptimisticLocker();
            }
            //模拟业务成功,消息发送失败
            throw new RuntimeException();
            //业务代码执行完毕,提交消息
            //return NotifyTransactionState.COMMIT;
        }
    });
    return data;
}

图4-1-21-29 修改PetItemInventroyAction的update方法

Step5 重启看效果

这个效果跟第一种事务消息一样,消息提交延迟严重。

image.png

图4-1-21-30 MQ日志打印

image.png

图4-1-21-31 MQ日志打印

Step6 总结

优化后的事务消息,在正常情况下时效性非常高,异常情况下也能通过NotifyTransactionListener做保障

Oinone社区 作者:史, 昂原创文章,如若转载,请注明出处:https://doc.oinone.top/oio4/9296.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

(0)
史, 昂的头像史, 昂数式管理员
上一篇 2024年5月23日
下一篇 2024年5月23日

相关推荐

  • 3.2 Oinone以模块为组织

    模块(module):是按业务领域划分和管理的最小单元,是一组功能、界面的集合。 带大家快速认识下如何构建一个oinone的模块并启动它。我会从以下几个维度去介绍模块的构建与启动方式、模块详解。让大家直观且全方位地了解oinone的模块所包含的内容 构建第一个Module 启动前端工程 应用中心

    Oinone 7天入门到精通 2024年5月23日
    1.4K00
  • 7.3.2 原业务加审批流程

    场景说明 场景描述:全员营销标准产品的功能并未有任务发放的审批流,在实际执行中,当营销专员配置好任务后,需部门领导对整个活动如该任务内容、形式、参与人员进行审批。 业务需求:在发布任务这个流程中增加审批节点。 实战训练 Step1 原业务分析 点击菜单【任务中心】通过URL上的model参数找到对应模型编码为【gemini.biz.GeminiTaskProxy】 进入模型设计器主页面,应用选择【全员营销】、选择【系统模型】、通过搜索关键字【任务】选择【Gemini任务代理】,展示方式从图模式切换到表单模式,对比【模型编码】 但目前模型为代理模型,代理模型是用于代理存储模型的数据管理器能力,同时又可以扩展出非存储数据信息的交互功能的模型。因为在代理模型中新增的字段都是非存储字段,所以如果要增加【审核状态】的字段一定以要在存储模型增加。其父模型的查看有两种方式 表单模式下可以直接看父模型 在图模式和表单模式下点击继承关系 点击【Gemini任务】,进入【Gemini任务】的模型设计界面,可以看出该模型所在模块为【全员营销核心业务】,从【系统字段】中找到【任务状态】字段,点击查看字段详情,我们可以看到【业务类型】为数据字典,字典类型为【任务状态】。 在模型设计器的管理页面上方点击【数字字典】选项卡,模块选择为【全员营销核心业务】,选择【系统字典】就可以查看到【任务状态】数字字典 总结如下: 给【Gemini任务】模型增加一个【任务审批状态】,记录审批状态 在任务创建的时候,修改【任务状态】为【关闭】确保任务未审批通过的时,用户无法操作该任务。 审批通过后,恢复【任务状态】为【初始化】 我们先来整理下核心流程即:任务审批流程。 Step2 利用模型设计器设计模型 在模型设计器的管理页面上方点击【数字字典】选项卡,模块选择为【全员营销核心业务】,点击添加【数据字典】按钮,设置对应数据项 设置【字典名称】为【审批状态】 设置【字典项类型】为【文本】 通过【添加数据字典项】按钮增加对应数据字典项,如审核中、审核失败、审核成功 在模型设计器的管理页面上方点击【模型】选项卡,模块选择为【全员营销核心业务】,选择【系统模型】、搜索任务选择【Gemini任务】,点击添加字段 为模型【Gemini任务】添加字段 设置【字段名称】为【任务审批状态】 设置【字段业务类型】为数据字典,并选择关联数据字典为【任务审批状态】 最后点击【创建】按钮完成操作 回到【Gemini任务】设计区,我们可以看到在模型的【自定义字段】选项卡下方多了一个【任务审批状态】字段 Step3 利用界面设计器,设计出必要的审核页面 进入界面设计器,应用选择全员营销,模型选择【Gemini任务】,点击添加页面下的直接创建 设置页面标题、模型(自动带上可切换)、业务类型(运营管理后续会扩展其他类型)、视图类型(表单)后点击确认按钮进入【Gemini任务】表单设计页面 进入页面设计器,对【Gemini任务】表单页面进行设计(更多细节介绍,请参考界面设计产品使用手册) 左侧为物料区:分为组件、模型。 【组件】选项卡下为通用物料区,我们可以为页面增加对应布局、字段(如同在模型设计器增加字段)、动作、数据、多媒体等等 【模型】选项卡下为页面对应模型的自定义字段、系统字段、以及模型已有动作 中间是设计区域 右侧为属性面板,在设计区域选择中组件会显示对应组件的可配置参数 在左侧【组件】选项卡下,拖入布局组件【分组】,并设置组件【标题属性】为基础信息 在左侧【模型】选项卡下,分别系统字段中的【任务标题】、【任务开始时间】、【任务结束时间】、【视频标题】、【视频风格】、【任务描述】拖入【基础信息】分组,并点击【任务描述】,在右侧属性面板的【交互】分组中设置宽度为1。最后别忘了点击【发布】按钮完成页面的发布 Step4 通过流程设计器,设计对应业务流程 进入流程设计器,点击【创建】按钮 注意:流程中需要获取【关系字段】的除关联字段(一般为ID)以外的字段需要通过【数据获取】节点单独获取【关系字段】的对象数据。所以在流程设计中经常会用到【数据获取】节点 左上角编辑流程名称为【任务审批流程】,点击第一个【触发】节点,触发方式选择模型触发,模型选择【Gemini任务】,触发场景选择【新增或更新数据时】,【筛选条件】设置为【任务审批状态】为空或【任务审批状态】等于【审核中】,点击该节点的【保存】按钮 点击流程图节点间的【+】图标选择增加【获取数据】节点,或者拖动左侧物料区【获取数据】到特定的【+】图标 点击【获取数据】,在右侧属性面板中设置【获取数据条数】为多条,选择模型为【Gemini用户任务实例】,点击【筛选条件】的【{X}】图标,进行数据获取的条件设置 选择条件字段为【任务ID】条件操作符为【等于】,条件为变量的导购字段的ID。当上下文只有一个变量时默认不需要选择,这里默认的是【模型触发:[Gemini任务]】,设置好以后点击确认,回到属性面板设置【未获取到数据时执行方式】为【终止流程】,并点击节点【保持】按钮 增加【更新数据】节点,在右侧属性面板中 【更新模型】选择【模型触发:[Gemini任务]】 【字段列表】点击【创建】按钮 字段选择 更新【任务状态】字段 表达式设置为:【已关闭】。 【字段列表】点击【创建】按钮 字段选择 更新【任务审核状态】字段 表达式设置为:【审核中】。 最终完成的【模型触发:[Gemini任务]】更新设置 a. 【模型触发:[Gemini任务]】的【任务状态】字段等于数字字典的【已关闭】,任务审核状态为【审核中】 b. 最后点击节点【保持】按钮。 再增加【更新数据】节点,在右侧属性面板中 【更新模型】选择【获取数据[Gemini用户任务实例]】 【字段列表】点击【创建】按钮 字段选择 更新【任务状态】字段 表达式设置为:【已关闭】。 最终完成的【获取数据[Gemini用户任务实例]】更新设置 a. 【获取数据[Gemini用户任务实例]】的【任务状态】字段等于数字字典的【已关闭】 b. 最后点击节点【保持】按钮。 增加【审批】节点,在右侧属性面板中 【审批模型】选择模型为【模型触发:[Gemini任务]】 【选择视图】选择前面新建的页面【流程中的任务编辑页】 【审批人】选择角色为【超级管理员】 【数据】权限全部设置为【查看】 其他配置项默认,需要了解更多请查看产品使用手册 最后点击节点【保持】按钮。 新增【审核分支】,在【通过】分支中增加两个数据更新节点,跟审核前的两个数据更新节点对应 【模型触发:[Gemini任务]】的【任务状态】字段等于数字字典的【初始化】,任务审核状态为【审核通过】 【获取数据[Gemini用户任务实例]】的【任务状态】字段等于数字字典的【初始化】 流程确保保持并发布过,点击右上角【发布流程】完成流程的保存与发布 Step5 检验效果 创建任务后,任务状态为【关闭】状态,任务列表中的任务状态为多个状态的计算值 审核通过后,任务状态为【进行中】状态,任务列表中的任务状态为多个状态的计算值

    2024年5月23日
    89800
  • 2.4.2 Oinone独特性之每一个需求都可以是一个模块

    我们的Oinone平台采用模型驱动的方式,并符合面向对象设计原则,每个需求都可以是一个独立模块,可以独立安装、升级和卸载。这让系统真正像乐高积木一样搭建,具有高度的灵活性和可维护性。 与大部分低代码或无代码平台不同的是,它们的应用市场上的应用往往是模板式的,也就是说,这是一个拷贝,个性化只能在应用上直接修改,而且一旦修改就不能升级。这对于软件公司和客户来说都非常痛苦。客户无法享受到软件公司产品的升级功能,而软件公司在服务大量客户时,也会面临不同版本的维护问题,成本也非常高。而我们的Oinone平台完全避免了这些问题,让客户和软件公司都可以从中受益(如下图2-9、2-10所示)。 图2-9软件公司与客户项目的关系-让标准与个性化共存 图2-10 软件公司与客户项目的关系-让升级无忧 实现原理 在满足客户个性化定制需求时,传统的方法通常是直接修改标准产品源码,但这样做会带来一个问题:标准产品无法持续升级。相反,无论是在OP模式还是SaaS模式下,Oinone都采用全新的模块为客户进行个性化开发,保持标准产品和个性化模块的独立维护和升级。这是因为在元数据设计时,Oinone采用了面向对象的设计原则,实现了元数据设计与面向对象设计思想的完美融合。 面向对象设计的核心特征包括封装、继承、多态,而Oinone的元数据设计完全融入了这些思想。下面是几个例子,说明Oinone的元数据设计如何体现面向对象设计的核心特征,并带来了什么好处: 继承:在继承原有模型的字段、逻辑、展示的情况下,增加一段代码来扩展模型的字段、逻辑、展示。 多态:在继承原有模型的字段、逻辑、展示的情况下,增加一段代码来覆盖模型的原有字段、逻辑、展示。 封装:外部无需关心模型内部如何实现,只需按照不同场景调用模型对应开放级别的字段、逻辑、展示。 这些特征和优势使得Oinone在满足客户个性化需求时更加灵活和可持续,同时使得标准产品的维护和升级变得更加容易和高效。 在Java语言设计中,万物皆对象,一切都以对象为基础。而Oinone的元数据设计则是以模型为出发点,作为数据和行为的承载体。如下图2-11清晰地描述了Java面向对象编程中封装、继承、多态在Oinone元数据中的对应关系。Oinone元数据描述了B对象继承A对象并拥有其所有属性和方法,并覆盖了A对象的属性1和方法1,同时新增了属性3和方法3。 此外,Oinone的面向对象特性是用元数据来描述的。一方面,我们基于Java编码规范收集相关元数据,以保持不改变Java编程习惯。另一方面,方法和对象的挂载是松耦合的,只要按照元数据规范进行挂载,就能轻松地将其附加到模型上。在不改变原有A对象的情况下,我们可以直接增加方法和属性(如下图2-12所示)。 图2-11 java面向对象在Oinone元数据中对应 图2-12 java对象的修改 VS Oinone元数据模型的修改 Oinone函数不仅支持面向对象的继承和多态特性,还提供了面向切面的拦截器和SPI机制的扩展点,以应对方法逻辑的覆盖和扩展,以及系统层面的逻辑扩展(如下图2-13所示)。这些扩展功能可以独立地在模块中维护。 其中,拦截器可以在不侵入函数逻辑的情况下,根据优先级为满足条件的函数添加执行前和执行后的逻辑。 扩展点是一种类似于SPI机制的逻辑扩展机制,用于扩展函数的逻辑。通过这一机制,可以对函数逻辑进行灵活的扩展,以满足不同的业务需求。 图2-13 Oinone函数拦截与扩展机制 不管是对象、属性还是方法,都可以以独立的模块方式来扩展,这就使得每一个需求都可以成为一个独立的模块,方便我们在研发标准产品时进行模块化的划分,同时也让我们在以低代码模式为客户进行二次开发时,能够更好地支持“标准产品迭代与个性化保持独立”的需求。在2.4.3【oinone独特性之低无一体】一文中,我们也提到了这个特性,但那是在低无一体的情况下,通过元数据融合来实现的。让我们看看基于低代码开发模式下,典型的Oinone二次开发工程结构(如下图2-14所示),就可以更好地理解这个特性啦! 图2-14 Oinone典型的二开工程结构

    2024年5月23日
    1.1K00
  • 5.3 基础支撑之用户与客户域

    一、三户概念 三户由来 介绍下经典的三户模型,它是电信运营支持系统的基础。三户模型即客户、用户和帐户,来源于etom的模型。这三者之间的关系应该是一个相互关联但又是独立的三个实体,这种关联只是一个归属和映射的关系,而三个实体本身是相互独立的,分别是体现完全不同的几个域的信息,客户是体现了社会域的信息,用户体现了业务域的信息,帐户体现的是资金域的信息。 客户:它是个社会化的概念,一个自然人或一个法人 用户:它是客户使用运营商开发的一个产品以及基于该产品之上的增值业务时,产生的一个实体。如果说一个客户使用了多个产品,那么一个客户就会对应好几个用户(即产品) 账户:它的概念起源于金融业,只是一个客户在运营商存放资金的实体,目的是为选择的产品付费 Oinone的三户 在原三户模型中【用户】是购买关系产生的产品与客户关系的服务实例,在互联网发展中用户的概念发生了非常大的变化,【用户】概念变成了:使用者,是指使用电脑或网络服务的人,通常拥有一个用户账号,并以用户名识别。而且新概念在互联网强调用户数的大背景下已经被普遍介绍,再去强调电信行业的用户概念就会吃力不讨好。而且不管是企业应用领域和互联网领域,原用户概念都显得过于复杂和没有必要。也就有了特色的oinone的三户模型: 客户:它是个社会化的概念,一个自然人或一个法人 用户:使用者,是指使用电脑或网络服务的人,通常拥有一个用户账号,并以用户名识别 账户:它的概念起源于金融业,只是一个客户在运营商存放资金的实体,目的是为选择的产品付费 二、Oinone的客户与用户 三户模型是构建上层应用的基础支撑能力,任何业务行为都跟这里两个实体脱不了干系。以客户为中心建立商业关系与商业行为主体,以用户为中心构建一致体验与操作行为主体。在底层设计上二者相互独立并无关联,由上层应用自行作关联绑定,往往在登陆时在Session的处理逻辑中会根据【用户】去找到对应一个或多个【商业(主体)客户】,Session的实现可以参考4.1.20【框架之Session】一文。 图5-3-1 Oinone的客户与用户 客户设计说明 PamirsPartner作为商业关系与商业行为的主体,派生了两个子类PamirsCompany与PamirsPerson分别对应:公司(法人)客户、自然人客户 公司(法人)客户PamirsCompany对应多个组织部门PamirsDepartment,公司(法人)客户PamirsCompany对应多个员工PamirsEmployee 部门PamirsDepartment对应一个公司(法人)客户PamirsCompany,对应多个员工PamirsEmployee 员工PamirsEmployee对应多个部门PamirsDepartment,对应一个或多个公司(法人)客户PamirsCompany,其中有一个主的 用户设计说明 PamirsUser作为一致体验与操作行为主体,本身绑定登陆账号,并且可以关联多个三方登陆账户PamirsUserThirdParty 客户与用户如何关联(举例) 例子设计: 新建demo系统的PetComany和PetEmployee,用PetEmployee去关联用户。 当用户登陆时,根据用户Id找到PetEmployee,在根据PetEmployee找到PetComany,把PetComany放到Session中去 修改PetShop模型关联一个PamirsPartner,PamirsPartner的信息从Session取。 Step1 pamirs-demo-api工程增加依赖,并且DemoModule增加对BusinessModule的依赖 <dependency> <groupId>pro.shushi.pamirs.core</groupId> <artifactId>pamirs-business-api</artifactId> </dependency> 图5-3-2 pamirs-demo-api工程增加依赖 在DemoModule类中通过@Module.dependencies中增加BusinessModule.MODULE_MODULE @Module( dependencies = { BusinessModule.MODULE_MODULE} ) 图5-3-3 声明对BusinessModule的依赖 Step2 新建PetComany和PetEmployee,以及对应的服务 package pro.shushi.pamirs.demo.api.model; import pro.shushi.pamirs.business.api.model.PamirsEmployee; import pro.shushi.pamirs.meta.annotation.Field; import pro.shushi.pamirs.meta.annotation.Model; import pro.shushi.pamirs.user.api.model.PamirsUser; @Model.model(PetEmployee.MODEL_MODEL) @Model(displayName = "宠物公司员工",labelFields = "name") public class PetEmployee extends PamirsEmployee { public static final String MODEL_MODEL="demo.PetEmployee"; @Field(displayName = "用户") private PamirsUser user; } 图5-3-4 新建PetEmployee package pro.shushi.pamirs.demo.api.model; import pro.shushi.pamirs.business.api.entity.PamirsCompany; import pro.shushi.pamirs.meta.annotation.Field; import pro.shushi.pamirs.meta.annotation.Model; @Model.model(PetCompany.MODEL_MODEL) @Model(displayName = "宠物公司",labelFields = "name") public class PetCompany extends PamirsCompany { public static final String MODEL_MODEL="demo.PetCompany"; @Field.Text @Field(displayName = "简介") private String introductoin; } 图5-3-5 新建PetComany package pro.shushi.pamirs.demo.api.service; import pro.shushi.pamirs.demo.api.model.PetEmployee; import pro.shushi.pamirs.meta.annotation.Fun; import pro.shushi.pamirs.meta.annotation.Function; @Fun(PetEmployeeQueryService.FUN_NAMESPACE) public interface PetEmployeeQueryService { String FUN_NAMESPACE ="demo.PetEmployeeQueryService"; @Function PetEmployee queryByUserId(Long userId); } 图5-3-6 新建PetEmployee对应服务 package pro.shushi.pamirs.demo.core.service; import org.springframework.stereotype.Component; import pro.shushi.pamirs.demo.api.model.PetEmployee; import pro.shushi.pamirs.demo.api.service.PetEmployeeQueryService; import pro.shushi.pamirs.framework.connectors.data.sql.query.QueryWrapper; import pro.shushi.pamirs.meta.annotation.Fun; import pro.shushi.pamirs.meta.annotation.Function; @Fun(PetEmployeeQueryService.FUN_NAMESPACE) @Component public class PetEmployeeQueryServiceImpl implements PetEmployeeQueryService { @Override @Function public PetEmployee queryByUserId(Long userId) { if(userId==null){ return null; } QueryWrapper<PetEmployee> queryWrapper = new QueryWrapper<PetEmployee>().from(PetEmployee.MODEL_MODEL).eq("user_id", userId); return…

    2024年5月23日
    70300
  • 3.5.6.3 布局的配置

    布局是将页面拆分成一个一个的小单元,按照上下中左右进行排列。 前沿 在前端领域中,布局可以分为三大块「Float、Flex、Grid 」,Float可以说的上是上古时期的布局了,如今市面还是很少见的,除了一些古老的网站。 目前,平台主要支持通过配置XML上面的cols和span来进行布局。平台也同样支持自由布局,合理的使用row、col、containers和container四个布局容器相关组件,将可以实现各种类型的布局样式,换句话说,平台实现的自由布局功能是Flex和Grid的结合体。 这里主要是讲解Flex和Grid布局,以及目前新的模板布局实现的思路。 Flex布局 Flex布局采用的是一维布局,那么什么是一维布局呢,所谓的一维布局就是只有一个方向、没有体积、面积,比如一条直线。它适合做局部布局,就像我们原来的顶部菜单、面包屑导航,以及现在的主视图字段配置。 图3-5-6-19 Flex布局示意 图3-5-6-20 Flex布局示意 图3-5-6-21 Flex布局示意 从上图可以看看出,Flex布局只能在X、Y轴进行转换,它无法对上下左右四个方向同时处理,因为它没“面积”的概念。所以它最适合做局部布局。 优点 图3-5-6-22 Flex兼容性 Flex的兼容性,可以看得出来,目前主流的浏览器都支持该属性。所以Flex兼容性强,如果你想对局部做布局处理,Flex是最好选择。 缺陷 刚刚也提到了,用户想要的布局是千奇百怪的,如果他想要的布局在现有的功能中无法实现怎么办?让用户放弃?还是说服他使用现在的布局。 Grid布局 Grid布局系统采用的是二维布局,二维布局有四个方向:上、下、左、右,它只有面积没有体积,比如一张纸、网格。 Grid布局 <div id="grid-container-one"> <div class="one-1">Grid Item 1</div> <div>Grid Item 2</div> <div>Grid Item 3</div> <div>Grid Item 4</div> <div>Grid Item 5</div> <div class="one-6">Grid Item 6</div> </div> <div id="grid-container-two"> <div class="tow-1">Grid Item 1</div> <div class="tow-2">Grid Item 2</div> <div>Grid Item 3</div> <div>Grid Item 4</div> <div>Grid Item 5</div> <div>Grid Item 6</div> </div> <div id="grid-container-three"> <div>Grid Item 1</div> <div>Grid Item 2</div> <div class="grid">Grid Item 3</div> <div class="grid-column">Grid Item 4</div> <div>Grid Item 5</div> <div>Grid Item 6</div> <div>Grid Item 7</div> <div class="grid-column">Grid Item 8</div> </div> HTML CSSResult Skip Results Iframe EDIT ON * { box-sizing: border-box; padding: 0; margin: 0; line-height: 1.5; font-weight: bold; text-align: center; } #grid-container-one{ background-color: black; display: grid; grid-template-columns: repeat(3, 1fr); grid-template-rows: repeat(2, 50px); gap: 10px; border: solid black 2px; margin-bottom: 20px; color: salmon; } #grid-container-one div { border: solid white 2px; padding: 10px; } #grid-container-one .one-1 { grid-area: span 1/span 3; text-aligin: center } #grid-container-one .one-6 { grid-column: 3 /4; } #grid-container-two{ background-color: CADETBLUE; display: grid; grid-template-columns: 15% repeat(2, 1fr);…

    2024年5月23日
    1.0K00

Leave a Reply

登录后才能评论