4.1.21 框架之分布式消息

消息中间件是在分布式开发中常见的一种技术手段,用于模块间的解耦、异步处理、数据最终一致等场景。

一、介绍

oinone对开源的RocketMQ进行了封装,是平台提供的一种较为简单的使用方式,并非是对RocketMQ进行的功能扩展。同时也伴随着两个非常至关重要的目的:

  1. 适配不同企业对RocketMQ的不同版本选择,不至于改上层业务代码。目前已经适配RocketMQ的开源版本和阿里云版本。 下个版本会对API进行升级支持不同类型MQ,以适配不同企业对MQ的不同要求,应对一些企业客户已经对MQ进行技术选择

  2. 对协议头进行扩展:如多租户的封装,saas模式中为了共用MQ基础资源,需要在消息头中加入必要租户信息。

二、使用准备

demo工程默认已经依赖消息,这里只是做介绍无需大家额外操作,大家可以用maven依赖树命令查看引用关系。

依赖包

增加对pamirs-connectors-event的依赖


<dependency>
    <groupId>pro.shushi.pamirs.framework</groupId>
    <artifactId>pamirs-connectors-event</artifactId>
</dependency>

图4-1-21-1 分布式消息的依赖包

相关功能引入

增加模型、触发器都依赖MQ

<!-- 增强模型 -->
<!-- 增强模型 -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-channel</artifactId>
</dependency>
<!-- 触发器 api -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-api</artifactId>
</dependency>
<!-- 触发器 core -->
<dependency>
    <groupId>pro.shushi.pamirs.core</groupId>
    <artifactId>pamirs-trigger-core</artifactId>
</dependency>

图4-1-21-2 增加模型、触发器都依赖MQ

yml配置文件参考

详见4.1.1【模块之yml文件结构详解】的“pamirs.event”部分。

三、使用说明

发送消息(NotifyProducer)

概述

NotifyProducer是Pamirs Event中所有生产者的基本API,它仅仅定义了消息发送的基本行为,例如生产者自身的属性,启动和停止,当前状态,以及消息发送方法。它本身并不决定消息如何发送,而是根据具体的实现确定其功能。

目前仅实现了RocketMQProducer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

Notify注解方式

使用示例

@Component
public class DemoProducer {

    @Notify(topic = "test", tags = "model")
    public DemoModel sendModel() {
        return new DemoModel();
    }

    @Notify(topic = "test", tags = "dto")
    public DemoDTO sendDTO() {
        return new DemoDTO();
    }
}

图4-1-21-3 Notify注解方式使用示例

解释说明

  1. 使用Component注解方式注册Spring Bean。

  2. Notify注解指定topic和tags。

  3. topic和tags对应NotifyEvent中的topic和tags。

RocketMQProducer方法调用

使用示例

@Component
public class SendRocketMQMessage {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel()));
        rocketMQProducer.send(new NotifyEvent("test", "dto", new DemoDTO()));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage() {
        DemoModel data = new DemoModel();
        data.setAge(10);
        rocketMQProducer.send(new NotifyEvent("test", "model", data)
                .setQueueSelector((queueSize, event) -> {
                    DemoModel body = (DemoModel) event.getBody();
                    return body.getAge() % queueSize;
                }));
    }

    /**
     * 发送事务消息
     */
    public void sendTransactionMessage() {
        rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel())
                .setIsTransaction(true)
                .setGroup("demoTransactionListener"));
    }
}

图4-1-21-4 RocketMQProducer方法调用

解释说明

  1. 使用Component注解方式注册Spring Bean。
  2. 使用Autowired注解方式装配RocketMQProducer实例。
  3. 使用send方法发送指定消息。
  4. 在【发送普通消息】方法中,该实现效果与Notify注解方式所示完全一致。
  5. 在【发送有序消息】方法中,队列选择器是必须配置的,queueSize属性为MQ队列的总数量,在broker中配置。有序消息必须配合有序消费者才能达到有序消费的目的,否则还是无序的常规消息,消费者需要配置@NotifyListener(consumerType=ConsumerType.ORDERLY )注解。
  6. 在【发送事务消息】方法中指定的group为ProducerGroup,事务消息是通过不同的Producer发出的,事务消息监听请参考TransactionListener注解的相关使用方法。(示例中的group与下文介绍中的一致)

使用TransactionListener开启事务消息监听

使用示例

@Component
@TransactionListener
public class DemoTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        return NotifyTransactionState.COMMIT;
    }

}

图4-1-21-5 使用TransactionListener开启事务消息监听

解释说明

  1. 实现NotifyTransactionListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 添加TransactionListener注解注册事务监听,生成对应的生产者。

  4. 当前ProducerGroup将使用这个类的BeanName,即"demoTransactionListener"。如果你想自定义ProducerGroup,可以使用TransactionListener的value属性进行设置。

消费消息(NotifyConsumer)

概述

NotifyConsumer是Pamirs Evnet中所有消费者的基本API,与NotifyProducer类似,它仅仅定义了消息消费的基本行为,例如消费者自身的属性,启动和停止,当前状态,以及消息的监听和订阅方法。它本身并不决定消息如何消费以及如何被订阅,而是根据具体的实现确定其功能。

目前仅实现了RocketMQEventPushConsumer,你可以使用下面介绍的方法轻松使用这些功能。

使用方法

在类上使用NotifyListener注解

使用示例

@Component
@NotifyListener(topic = "test", tags = "model")
public class DemoConsumerClass implements NotifyEventListener {

    @Override
    public void consumer(NotifyEvent event) {
        DemoModel data = (DemoModel) event.getBody();
        // do some things.
    }
}

图4-1-21-6 在类上使用NotifyListener注解

解释说明

  1. 实现NotifyEventListener接口。

  2. 使用Component注解方式注册Spring Bean。

  3. 当前ConsumerGroup将使用这个类的BeanName,即"demoConsumerClass"。如果你想自定义ConsumerGroup,可以使用Component的value属性进行设置。

  4. 从body中将可以获取生产者发送的数据对象,并且已经做好了类型处理,可以直接使用。

  5. 使用原生的RocketMQ发送的消息,类型可能是无法识别的,你可以使用NotifyListener中提供的bodyClass来指定类型。

  6. topic和tags对应NotifyEvent中的topic和tags。

在方法上使用NotifyListener注解

使用示例

@Component
public class DemoConsumerMethod {

    @Bean
    @NotifyListener(topic = "test", tags = "model")
    public NotifyEventListener modelConsumer() {
        return event -> {
            DemoModel data = (DemoModel) event.getBody();
            // do some things.
        };
    }

    @Bean
    @NotifyListener(topic = "test", tags = "dto")
    public NotifyEventListener dtoConsumer() {
        return event -> {
            DemoDTO data = (DemoDTO) event.getBody();
            // do some things.
        };
    }
}

图4-1-21-7 在方法上使用NotifyListener注解

解释说明

  1. 使用Bean注解方式注册Spring Bean。

  2. 方法返回值为NotifyEventListener类型。

  3. 当前ConsumerGroup将使用对应方法生成的BeanName,即"modelConsumer"和"dtoConsumer"。如果你想自定义ConsumerGroup,可以使用Bean的value属性进行设置。

实战

约定:每个模块下的Topic和Tags必须定义常量池进行统一管理,主要是为了方便维护与管理,技术没有限制

常规消息(举例)

Step1 新建PetNotifyEnum

用PetNotifyEnum来管理模块的所有Topic和Tags的常量定义

package pro.shushi.pamirs.demo.api.mq;

public class PetNotifyEnum {

    public static class PetItemInventroy{

        public interface Topic{
            String PET_ITEM_INVENTROY_CHANGE ="petItemInventroyChange";
        }
        public interface Tag{
            String CREATE ="create";
            String UPDATE ="update";
            String DELETE ="delete";
        }

    }

}

图4-1-21-8 新建PetNotifyEnum

Step2 新建PetItemInventoryMqProducer

新建PetItemInventroy模型对应消息生产者,用于发送PetItemInventroy模型变动的相关消息

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

}

图4-1-21-9 新建PetItemInventoryMqProducer

Step3 新建PetItemInventoryMqConsumer

新建PetItemInventoryMqConsumer,订阅PetItemInventroy模型变动的相关消息,并进行相关处理

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-10 PetItemInventoryMqConsumer

Step4 修改PetItemInventroyAction

在修改PetItemInventroy完成之后,发送topic为【PetNotifyEnum.PetItemInventroyMq.Topic.PET_ITEM_INVENTROY_CHANGE】,tag为【PetNotifyEnum.PetItemInventroyMq.Tag.UPDATE】的消息出去

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-11 修改PetItemInventroyAction

Step5 重新看效果

  1. 编辑商品库存记录

image.png

图4-1-21-12 编辑商品库存记录

  1. 查看后端日志是否打印

image.png

图4-1-21-13 查看后端日志是否打印

顺序消息(举例)

Step1 修改PetItemInventoryMqProducer

增加一个顺序消息发送的方法,发送时根据商品库存的Id就分队列,相同队列顺序消费

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;

@Component
public class PetItemInventoryMqProducer {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    /**
     * 发送普通消息
     */
    public void sendNormalMessage(PetItemInventroy data,String tag) {
        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
    }

    /**
     * 发送有序消息
     */
    public void sendOrderlyMessage(PetItemInventroy data,String tag) {

        rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
                .setQueueSelector((queueSize, event) -> {
                    PetItemInventroy body = (PetItemInventroy) event.getBody();
                    Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                    return queue.intValue();
                }));

    }
}

图4-1-21-14 修改PetItemInventoryMqProducer

Step2 修改PetItemInventoryMqConsumer

增加@NotifyListener(consumerType= ConsumerType.ORDERLY)的注解

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.ConsumerType;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
//@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*",consumerType= ConsumerType.ORDERLY)
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
    @Override
    public void consumer(NotifyEvent event) {
        PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
        log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
    }
}

图4-1-21-15 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的顺序消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
//        petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-16 修改PetItemInventroyAction

Step4 重启看效果

同常规消息

事务消息(举例)

注:这种写法默认第一次是UNKNOWN,然后通过MQ回调二次确认,在消息时效性要求高的场景下是不符合要求的。对实效性要求高的,请见下面事务消息-优化(举例)章节。

Step1 新建NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息回滚");
        return NotifyTransactionState.ROLLBACK;
    }
}

图4-1-21-17 新建NotifyTransactionListener

Step2 修改PetItemInventoryMqProducer

增加发送事务性消息的方法,通过.setIsTransaction(true)来显示设置该消息是事务消息,通过setGroup("petItemInventoryMqTransactionListener")来匹配事务监听处理器

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data,String tag) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
    );
}

图4-1-21-18 修改PetItemInventoryMqProducer

Step3 修改PetItemInventroyAction

调用petItemInventoryMqProducer的事务性消息发送接口

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        ……其他代码
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        return data;
    }
}

图4-1-21-19 修改PetItemInventroyAction

Step4 重启看效果

  1. 编辑商品库存记录

image.png

图4-1-21-20 编辑商品库存记录

  1. 查看后端日志是否打印

消息回滚,没有消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-21 MQ日志打印一

image.png

图4-1-21-22 MQ日志打印二

Step5 修改NotifyTransactionListener

package pro.shushi.pamirs.demo.core.mq;

import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;

@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {

    @Override
    public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
        log.info("消息提交");
        //正常业务情况下,这里要增加自己的逻辑判断,来确定返回状态值
        return NotifyTransactionState.COMMIT;
    }
}

图4-1-21-23 修改NotifyTransactionListener

Step6 重启看效果

消息提交,并看到消息消费的日志,从日志打印时间上看MQ回调会有延迟

image.png

图4-1-21-24 MQ日志打印三

image.png

图4-1-21-25 MQ日志打印三

事务消息-优化(举例)

Step1 修改PetItemInventoryMqProducer

修改PetItemInventoryMqProducer事务性消息发送方法sendTransactionMessage,增加NotifyExecuteLocalTransactionCallback入参。

/**
 * 发送事务消息
 */
public void sendTransactionMessage(PetItemInventroy data, String tag, NotifyExecuteLocalTransactionCallback callback) {

    rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
            .setQueueSelector((queueSize, event) -> {
                PetItemInventroy body = (PetItemInventroy) event.getBody();
                Long queue = body.getId().longValue() % Long.valueOf(queueSize);
                return queue.intValue();
            })
            .setIsTransaction(true)
            .setGroup("petItemInventoryMqTransactionListener")
            .setExecuteLocalTransactionCallback(callback)
    );
}

图4-1-21-26 修改PetItemInventoryMqProducer

Step2 修改PetItemInventroyAction

修改PetItemInventroyAction的update方法

package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {

    @Autowired
    private PetItemInventoryMqProducer petItemInventoryMqProducer;

    @Function.Advanced(type= FunctionTypeEnum.UPDATE)
    @Function.fun(FunctionConstants.update)
    @Function(openLevel = {FunctionOpenEnum.API})
    public PetItemInventroy update(PetItemInventroy data){
        //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){
            @Override
            public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

                List<PetItemInventroy> inventroys = new ArrayList<>();
                inventroys.add(data);
                PamirsSession.directive().disableOptimisticLocker();
                try{
                    int i = data.updateBatch(inventroys);
                } finally {
                    PamirsSession.directive().enableOptimisticLocker();
                }
                //业务代码执行完毕,提交消息
                return NotifyTransactionState.COMMIT;
            }
        });
        return data;
    }
}

图4-1-21-27 修改PetItemInventroyAction

Step3 重启看效果

编辑商品库存记录,发现消息的处理几乎没有延迟

image.png

图4-1-21-28 示例效果几乎没有延迟

Step4 模拟业务成功,消息发送失败

修改PetItemInventroyAction的update方法

@Function.Advanced(type= FunctionTypeEnum.UPDATE)
@Function.fun(FunctionConstants.update)
@Function(openLevel = {FunctionOpenEnum.API})
public PetItemInventroy update(PetItemInventroy data){
    //petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    //petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//        petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
    petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){

        @Override
        public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {

            List<PetItemInventroy> inventroys = new ArrayList<>();
            inventroys.add(data);
            PamirsSession.directive().disableOptimisticLocker();
            try{
                int i = data.updateBatch(inventroys);
            } finally {
                PamirsSession.directive().enableOptimisticLocker();
            }
            //模拟业务成功,消息发送失败
            throw new RuntimeException();
            //业务代码执行完毕,提交消息
            //return NotifyTransactionState.COMMIT;
        }
    });
    return data;
}

图4-1-21-29 修改PetItemInventroyAction的update方法

Step5 重启看效果

这个效果跟第一种事务消息一样,消息提交延迟严重。

image.png

图4-1-21-30 MQ日志打印

image.png

图4-1-21-31 MQ日志打印

Step6 总结

优化后的事务消息,在正常情况下时效性非常高,异常情况下也能通过NotifyTransactionListener做保障

Oinone社区 作者:史, 昂原创文章,如若转载,请注明出处:https://doc.oinone.top/oio4/9296.html

访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验

(0)
史, 昂的头像史, 昂数式管理员
上一篇 2024年5月23日
下一篇 2024年5月23日

相关推荐

  • 4.1.5 模型之持久层配置

    一、批量操作 批量操作包括批量创建与批量更新。批量操作的提交类型系统默认值为batchCommit。 批量提交类型: useAffectRows,循环单次单条脚本提交,返回实际影响行数 useAndJudgeAffectRows,循环单次单条脚本提交,返回实际影响行数,若实际影响行数与输入不一致,抛出异常 collectionCommit,将多个单条更新脚本拼接成一个脚本提交,不能返回实际影响行数 batchCommit,使用单条更新脚本批量提交,不能返回实际影响行数。 全局配置 pamirs: mapper: batch: batchCommit 图4-1-5-1 全局配置 运行时配置 非乐观锁模型系统默认采用batchCommit提交更新操作;乐观锁模型默认采用useAndJudgeAffectRows提交更新操作。也可以使用以下方式在运行时改变批量提交方式。 Spider.getDefaultExtension(BatchApi.class).run(() -> { 更新逻辑 }, 批量提交类型枚举); 图4-1-5-2 运行时配置 运行时校正 如果模型配置了数据库自增主键,而批量新增的批量提交类型为batchCommit,则系统将批量提交类型变更为collectionCommit(如果使用batchCommit,则需要单条提交以获得正确的主键返回值,性能有所损失)。 如果模型配置了乐观锁,而批量更新的批量提交类型为collectionCommit或者batchCommit,则系统将批量提交类型变更为useAndJudgeAffectRows。也可以失效乐观锁,让系统不做批量提交类型变更处理。 二、乐观锁(举例) 在一些会碰到并发修改的数据,往往需要进行并发控制,一般数据库层面有两种一种是悲观锁、一种是乐观锁。oinone对乐观锁进行了良好支持 定义方式 乐观锁的两种定义方式: 通过快捷继承VersionModel,构建带有乐观锁,唯一编码code且主键为id的模型。 可以在字段上使用@Field.Version注解来标识该模型更新数据时使用乐观锁 如果更新的实际影响行数与入参数量不一致,则会抛出异常,错误码为10150024。如果是批量更新数据,为了返回准确的实际影响行数,批量更新由批量提交改为循环单条数据提交更新,性能有所损失。 失效乐观锁 一个模型在某些场景下需要使用乐观锁来更新数据,而另一些场景不需要使用乐观锁来更新数据,则可以使用以下方式在一些场景下失效乐观锁。更多元位指令用法详见4.1.9【函数之元位指令】一文。 PamirsSession.directive().disableOptimisticLocker(); try{ 更新逻辑 } finally { PamirsSession.directive().enableOptimisticLocker(); } 图4-1-5-3 失效乐观锁 不抛乐观锁异常 将批量提交类型设置为useAffectRows即可,这样可改由外层逻辑对返回的实际影响行数进行自主判断。 Spider.getDefaultExtension(BatchApi.class).run(() -> { 更新逻辑,返回实际影响行数 }, BatchCommitTypeEnum.useAffectRows); 图4-1-5-4 将批量提交类型设置为useAffectRows 构建第一个VersionModel Step1 新建PetItemInventroy模型,继承快捷模型VersionModel package pro.shushi.pamirs.demo.api.model; import pro.shushi.pamirs.meta.annotation.Field; import pro.shushi.pamirs.meta.annotation.Model; import pro.shushi.pamirs.meta.base.common.VersionModel; import java.math.BigDecimal; @Model.model(PetItemInventroy.MODEL_MODEL) @Model(displayName = "宠物商品库存",summary="宠物商品库存",labelFields = {"itemName"}) public class PetItemInventroy extends VersionModel { public static final String MODEL_MODEL="demo.PetItemInventroy"; @Field(displayName = "商品名称") private String itemName; @Field(displayName = "库存数量") private BigDecimal quantity; } 图4-1-5-5 新建PetItemInventroy模型 Step2 修改DemoMenu,增加访问入口 @UxMenu("商品库存")@UxRoute(PetItemInventroy.MODEL_MODEL) class PetItemInventroyMenu{} 图4-1-5-6 修改DemoMenu Step3 重启看效果 体验一:页面上新增、修改数据库字段中的opt_version会自动加一 图4-1-5-7 示例效果一 图4-1-5-8 示例效果二 图4-1-5-9 示例效果三 图4-1-5-10 示例效果四 体验二:同时打两个页面,依次点击,您会发现一个改成功,一个没有改成功。但页面都没有报错,只是update返回影响行数一个为1,另一个为0而已。 图4-1-5-11 编辑宠物商品库存 图4-1-5-12 宠物商品库存列表 注:增加了乐观锁,我们在写代码的时候一定要注意,单记录更新操作的时候要去判断返回结果(影响行数),不然没改成功,程序是不会抛错的。不像batch接口默认会报错 Step4 预留任务:重写PetItemInventroy的update函数 留个任务,请各位小伙伴自行测试玩玩,这样会更有体感 package pro.shushi.pamirs.demo.core.action; import org.springframework.stereotype.Component; import pro.shushi.pamirs.demo.api.enumeration.DemoExpEnumerate; import pro.shushi.pamirs.demo.api.model.PetItemInventroy; import pro.shushi.pamirs.demo.api.model.PetTalent; import pro.shushi.pamirs.meta.annotation.Function; import pro.shushi.pamirs.meta.annotation.Model; import pro.shushi.pamirs.meta.common.exception.PamirsException; import pro.shushi.pamirs.meta.constant.FunctionConstants; import pro.shushi.pamirs.meta.enmu.FunctionOpenEnum; import pro.shushi.pamirs.meta.enmu.FunctionTypeEnum; import java.util.ArrayList; import java.util.List; @Model.model(PetItemInventroy.MODEL_MODEL) @Component public class PetItemInventroyAction { @Function.Advanced(type= FunctionTypeEnum.UPDATE) @Function.fun(FunctionConstants.update) @Function(openLevel = {FunctionOpenEnum.API}) public PetItemInventroy update(PetItemInventroy data){ List<PetItemInventroy> inventroys = new ArrayList<>(); inventroys.add(data); //批量更新会,自动抛错 int i =…

    2024年5月23日
    66500
  • 5.5 基础支撑之结算域

    一、基础介绍 随着企业的业务不断进行数字化改造、业务越来越在线化,给企业财务工作带来几个明显的变化和挑战: 变化: 业务在线后,不同类收费、预售、授信模式的创新层出不穷,需要财务不仅只从事单一传统的会计核算工作,还需要积极地参与到业务中去。 从事后算账事后报账,变成财务业务一体化信息的实时处理 挑战: 业务系统与财务系统明显割裂,业务部门与财务部门各自采用一套软件处理其数据,不能及时沟通信息和协同更正信息。 财务系统往往都是单体的传统架构,凭证处理能力无法适应今天企业的不断爆棚的业务发展。 财务的严谨性与业务的灵活性中间有巨大的鸿沟,导致业务要做一种创新的模式,财务可能是最大阻碍。 不论是传统软件公司喜欢说的业财一体化还是互联网平台公司喜欢说的结算平台,都是为了解决以上变化和挑战的。业财一体化主要是从财务部门角度出发进行,在业务支撑上化被动为主动。结算中心往往是结合财务部门和业务运营部门的需求。如果拿我们下面介绍的,计费、账务、会计三个领域来说,业财一体化项目往往只包括账务和会计,结算中心往往包括:计费、账务、会计。或者说业财一体化弱化了计费,没有纳入企业统一管理,把如何计价给到了业务系统自行决定或者简单处理只要产生应收应付单据(计费详单)就好了。 结算域的是一个相对比较专业的领域,没有一定背景知识甚至连一些专业名词都很难理解,更不用说模型设计了,这里我尽快地简单去描述定位而不是描述细节。而且2.1.9版本的结算领域相对还是没有那么完善,这里介绍的是下个版本的内容,所以大家看当前版本的时候会有一些对不上。 二、子领域职责 图5-5-1 子领域职责 计费 计费的价值 随着企业多业务发展以及融合计费需求,我们需要引入计费模型,对灵活计价模式进行支持,快速支撑未来可能的计费方式等 计费的核心设计理念 所有的计算器都继承自虚函数计算器y=f(x) 平滑兼容-默认斜率计算器y=a+bxY – 求值结果(用下标描述结果是什么)A – 偏移量(计算固定值)B – 斜率(费率值)X – 变量(数量)任何计算都是通过一组斜率组合出来的 利用区间限定定义各种斜率组合出各种算法交易额0-100w:y=0.03x >100w:y=0.02x;时间0:00-6:00:y=0.02x 6:00-24:00:y=0.03xX- 变量,数量 图5-5-2 计费的核心设计理念 更灵活多维区间组合,时间维度、计数器维度、其它属性维度计数器区间斜率限定,比如交易额、空间、使用月份数… 计费的核心功能 通过产品定义运营方案 通过订购产品完成商务合同的签订来决定客户计费策略,或者通过系统产品定义通用计费策略 支撑各类产品的模拟计费 以事件驱动,根据事件、产品、订购关系完成产品路由,并实时产生计费详单 根据计费科目与账务科目,打通账务进行核销 账务 账务的价值 以账户账本为中心,提供记账、账户管理,以及账务的实时监控与持续对账。如果计费是对接业务,那么账务的价值是对接财务系统 账务的核心设计理念 不依赖计费,可独立对接,所有业务最终都需要反馈到帐户账本的操作上,并通过账本明细记录所有操作 账务的核心功能 记账:充值、转账、提现,冻结、解冻,差错处理 账务管理:开户、科目维护 账务查询:对账 会计(暂不在计划内) 会计的价值 结算平台的会计模块不是严格意义上的会计系统,它主要是衔接其他的财务系统,做凭证前置处理。在于汇总凭证,产出业务帐,对接到财务总帐系统,缓解财务系统压力。 三、模型介绍 图5-5-3 模型介绍 四、结算基础流程 图5-5-4 结算基础流程

    2024年5月23日
    57400
  • 4.1.15 框架之网关协议

    一、多端协议 协议内容格式 请求头 头信息 headerMap "sec-fetch-mode" -> "cors" "content-length" -> "482" "sec-fetch-site" -> "none" "accept-language" -> "zh-CN,zh;q=0.9" "cookie" -> "pamirs_uc_session_id=241af6a1dbba41a4b35afc96ddf15915" "origin" -> "chrome-extension://flnheeellpciglgpaodhkhmapeljopja" "accept" -> "application/json" "host" -> "127.0.0.1:8090" "connection" -> "keep-alive" "content-type" -> "application/json" "accept-encoding" -> "gzip, deflate, br" "user-agent" -> "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36" "sec-fetch-dest" -> "empty" 图4-1-15-1 头信息 headerMap 请求地址 requestUrl 例如 http://127.0.0.1:8090/pamirs/DemoCore?scene=redirectListPage HTTP参数键值对 parameterMap url中queryString在服务端最终会转化为参数键值对。 请求体格式 请求体格式采用GraphQL协议。请求体格式分为API请求和上下文变量。以商品的test接口为例,请求格式如下。 API请求格式 query{ petShopProxyQuery { queryPage(page: {currentPage: 1, size: 1}, queryWrapper: {rsql: "(1==1)"}) { content { income id code creater { id nickname } relatedShopName shopName petTalents { id name } items { id itemName } } size totalPages totalElements } } } 图4-1-15-2 API请求格式 上下文变量 variables 请求策略requestStrategy 名称 类型 说明 checkStrategy CheckStrategyEnum 校验策略:RETURN_WHEN_COMPLETED -?全部校验完成再返回结果RETURN_WHEN_ERROR -?校验错误即返回结果 msgLevel InformationLevelEnum 消息级别:DEBUG("debug", "调试", "调试"),INFO("info", "信息", "信息"),WARN("warn", "警告", "警告"),SUCCESS("success", "成功", "成功"),ERROR("error", "错误", "错误")不设置,则只返回错误消息;上方消息级别清单,越往下级别越高。只有消息的级别高于或等于该设定级别才返回,否则会被过滤。 onlyValidate Boolean 只校验不提交数据 表4-1-15-1 请求策略requestStrategy 上下文变量式例如下。 { "requestStrategy": { "checkStrategy": "RETURN_WHEN_COMPLETED", "msgLevel":"INFO" } } 图4-1-15-3 上下文变量式例 响应体格式 协议响应内容包括data、extensions和errors三部分,extensions和errors是可缺省的。data部分为业务数据返回值。应用业务层可以在extensions中添加API返回值之外的扩展信息。extensions中包含success、messages和extra三部分,success标识请求是否成功。如果业务正确处理并返回,则errors部分为空;如果业务处理返回失败,则将错误信息添加到errors中。 正确响应格式示例如下。 { "data": { "petShopProxyQuery": { "queryPage": { "content": [ { "id": "246675081504233477", "creater": { "id": "10001" }, "relatedShopName": "oinone宠物店铺001", "shopName": "oinone宠物店铺001", "petTalents": […

  • 页面设计

    1. 功能说明 页面设计时界面设计器中「页面」模块的设计入口,在这个界面,进行页面的设置、搭建、设计、排版。 主要分为顶部操作栏、左侧工具栏、中部设计画布区、右侧属性面板。 2. 操作栏 进入页面设计,顶部显示了页面的标题,以及返回、发布等操作。 2.1 发布 页面设计完成后,点击「发布」运行页面生效。若不点击发布,页面也有自动保存的功能,但在发布前,自动保存也等同于草稿,不会正式生效。 发布时如果有属性不符合校验规则(必填的属性未填、输入的内容校验不通过),会发布失败,相应的字段会特殊标记,需要查看并修改属性。 2.2 显示/隐藏母版 进入页面设计时默认不展示母版,可以手动操作显示母版 3. 工具栏 左侧的工具栏中包括组件库、页面设置等模块。 3.1 组件库 组件库中包含组件和模型,组件是当前设计器支持的所有组件,模型是页面所在模型下的所有字段和动作。 3.1.1 组件 组件中展示了系统支持的所有组件。包含 1)布局类组件,如分组、选项卡等,使用布局组件可以将页面进字段分类、分页; 2)字段类组件,如单行文本、整数、日期等等,使用字段类组件时都会在模型下对应创建一个字段; 3)动作类组件,如跳转动作、提交动作、链接动作等。 3.1.2 模型 组件库顶部,由组件可切换为模型:模型选项下,会展示当前模型的所有字段,以及系统默认动作。可以直接拖拽字段至设计画布中,会应用形成某个组件,可对组件进行多样的属性设置,优化交互。 3.1.2 组件和模型有什么区别 1展示内容维度不同 组件中展示的内容是组件信息,如分组、选项卡、单行文本、文件上传等;模型中展示的是模型下已有的所有字段。 2使用功能不同 组件中的组件使用前需要在模型中创建一个字段,当然,创建好的字段也会存在于模型中;模型中的字段可直接使用,并且使用时会在设计画布中对应生成个默认组件。 3使用场景不同 如果模型中已经存在目标字段,应直接选择从模型中拖拽字段;如果模型中没有需要的字段,可以在页面中增加一个组件,实际上也是在新增一个字段。 3.2 页面设置 页面设置中可以修改当前页面的标题、分组、页面描述,同时也是给页面上传缩略图的唯一入口。 4. 设计画布 将组件或字段拖拽至设计画布区,会生成样式。点击组件,右侧可对其进行设置,大部分属性可实时在画布中展示效果。 5. 属性面板 右侧属性面板抽屉中可以设置属性或查看字段信息,通过不同的属性配置化实现组件的多样化。 5.1 属性 属性中包括基本信息(如标题、占位提示、描述说明等)、校验信息(如是否必填、长度校验等)、交互信息(如排序方式、是否展示计数器等) 5.2 字段 首次从组件中拖拽,会先展示字段信息,并且是需要先创建一个字段,字段创建成功后,再次切换字段选项,只读展示当前组件所对应的字段基本信息。同理,如果是直接从字段中拖拽,字段选项中,只读展示当前字段基本信息。

    2024年6月20日
    2.2K00

Leave a Reply

登录后才能评论