消息中间件是在分布式开发中常见的一种技术手段,用于模块间的解耦、异步处理、数据最终一致等场景。
一、介绍
oinone对开源的RocketMQ进行了封装,是平台提供的一种较为简单的使用方式,并非是对RocketMQ进行的功能扩展。同时也伴随着两个非常至关重要的目的:
-
适配不同企业对RocketMQ的不同版本选择,不至于改上层业务代码。目前已经适配RocketMQ的开源版本和阿里云版本。 下个版本会对API进行升级支持不同类型MQ,以适配不同企业对MQ的不同要求,应对一些企业客户已经对MQ进行技术选择
-
对协议头进行扩展:如多租户的封装,saas模式中为了共用MQ基础资源,需要在消息头中加入必要租户信息。
二、使用准备
demo工程默认已经依赖消息,这里只是做介绍无需大家额外操作,大家可以用maven依赖树命令查看引用关系。
依赖包
增加对pamirs-connectors-event的依赖
<dependency>
<groupId>pro.shushi.pamirs.framework</groupId>
<artifactId>pamirs-connectors-event</artifactId>
</dependency>
相关功能引入
增加模型、触发器都依赖MQ
<!-- 增强模型 -->
<!-- 增强模型 -->
<dependency>
<groupId>pro.shushi.pamirs.core</groupId>
<artifactId>pamirs-channel</artifactId>
</dependency>
<!-- 触发器 api -->
<dependency>
<groupId>pro.shushi.pamirs.core</groupId>
<artifactId>pamirs-trigger-api</artifactId>
</dependency>
<!-- 触发器 core -->
<dependency>
<groupId>pro.shushi.pamirs.core</groupId>
<artifactId>pamirs-trigger-core</artifactId>
</dependency>
yml配置文件参考
详见4.1.1【模块之yml文件结构详解】的“pamirs.event”部分。
三、使用说明
发送消息(NotifyProducer)
概述
NotifyProducer是Pamirs Event中所有生产者的基本API,它仅仅定义了消息发送的基本行为,例如生产者自身的属性,启动和停止,当前状态,以及消息发送方法。它本身并不决定消息如何发送,而是根据具体的实现确定其功能。
目前仅实现了RocketMQProducer,你可以使用下面介绍的方法轻松使用这些功能。
使用方法
Notify注解方式
使用示例
@Component
public class DemoProducer {
@Notify(topic = "test", tags = "model")
public DemoModel sendModel() {
return new DemoModel();
}
@Notify(topic = "test", tags = "dto")
public DemoDTO sendDTO() {
return new DemoDTO();
}
}
解释说明
-
使用Component注解方式注册Spring Bean。
-
Notify注解指定topic和tags。
-
topic和tags对应NotifyEvent中的topic和tags。
RocketMQProducer方法调用
使用示例
@Component
public class SendRocketMQMessage {
@Autowired
private RocketMQProducer rocketMQProducer;
/**
* 发送普通消息
*/
public void sendNormalMessage() {
rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel()));
rocketMQProducer.send(new NotifyEvent("test", "dto", new DemoDTO()));
}
/**
* 发送有序消息
*/
public void sendOrderlyMessage() {
DemoModel data = new DemoModel();
data.setAge(10);
rocketMQProducer.send(new NotifyEvent("test", "model", data)
.setQueueSelector((queueSize, event) -> {
DemoModel body = (DemoModel) event.getBody();
return body.getAge() % queueSize;
}));
}
/**
* 发送事务消息
*/
public void sendTransactionMessage() {
rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel())
.setIsTransaction(true)
.setGroup("demoTransactionListener"));
}
}
解释说明
- 使用Component注解方式注册Spring Bean。
- 使用Autowired注解方式装配RocketMQProducer实例。
- 使用send方法发送指定消息。
- 在【发送普通消息】方法中,该实现效果与Notify注解方式所示完全一致。
- 在【发送有序消息】方法中,队列选择器是必须配置的,queueSize属性为MQ队列的总数量,在broker中配置。有序消息必须配合有序消费者才能达到有序消费的目的,否则还是无序的常规消息,消费者需要配置@NotifyListener(consumerType=ConsumerType.ORDERLY )注解。
- 在【发送事务消息】方法中指定的group为ProducerGroup,事务消息是通过不同的Producer发出的,事务消息监听请参考TransactionListener注解的相关使用方法。(示例中的group与下文介绍中的一致)
使用TransactionListener开启事务消息监听
使用示例
@Component
@TransactionListener
public class DemoTransactionListener implements NotifyTransactionListener {
@Override
public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
return NotifyTransactionState.COMMIT;
}
}
解释说明
-
实现NotifyTransactionListener接口。
-
使用Component注解方式注册Spring Bean。
-
添加TransactionListener注解注册事务监听,生成对应的生产者。
-
当前ProducerGroup将使用这个类的BeanName,即"demoTransactionListener"。如果你想自定义ProducerGroup,可以使用TransactionListener的value属性进行设置。
消费消息(NotifyConsumer)
概述
NotifyConsumer是Pamirs Evnet中所有消费者的基本API,与NotifyProducer类似,它仅仅定义了消息消费的基本行为,例如消费者自身的属性,启动和停止,当前状态,以及消息的监听和订阅方法。它本身并不决定消息如何消费以及如何被订阅,而是根据具体的实现确定其功能。
目前仅实现了RocketMQEventPushConsumer,你可以使用下面介绍的方法轻松使用这些功能。
使用方法
在类上使用NotifyListener注解
使用示例
@Component
@NotifyListener(topic = "test", tags = "model")
public class DemoConsumerClass implements NotifyEventListener {
@Override
public void consumer(NotifyEvent event) {
DemoModel data = (DemoModel) event.getBody();
// do some things.
}
}
解释说明
-
实现NotifyEventListener接口。
-
使用Component注解方式注册Spring Bean。
-
当前ConsumerGroup将使用这个类的BeanName,即"demoConsumerClass"。如果你想自定义ConsumerGroup,可以使用Component的value属性进行设置。
-
从body中将可以获取生产者发送的数据对象,并且已经做好了类型处理,可以直接使用。
-
使用原生的RocketMQ发送的消息,类型可能是无法识别的,你可以使用NotifyListener中提供的bodyClass来指定类型。
-
topic和tags对应NotifyEvent中的topic和tags。
在方法上使用NotifyListener注解
使用示例
@Component
public class DemoConsumerMethod {
@Bean
@NotifyListener(topic = "test", tags = "model")
public NotifyEventListener modelConsumer() {
return event -> {
DemoModel data = (DemoModel) event.getBody();
// do some things.
};
}
@Bean
@NotifyListener(topic = "test", tags = "dto")
public NotifyEventListener dtoConsumer() {
return event -> {
DemoDTO data = (DemoDTO) event.getBody();
// do some things.
};
}
}
解释说明
-
使用Bean注解方式注册Spring Bean。
-
方法返回值为NotifyEventListener类型。
-
当前ConsumerGroup将使用对应方法生成的BeanName,即"modelConsumer"和"dtoConsumer"。如果你想自定义ConsumerGroup,可以使用Bean的value属性进行设置。
实战
约定:每个模块下的Topic和Tags必须定义常量池进行统一管理,主要是为了方便维护与管理,技术没有限制
常规消息(举例)
Step1 新建PetNotifyEnum
用PetNotifyEnum来管理模块的所有Topic和Tags的常量定义
package pro.shushi.pamirs.demo.api.mq;
public class PetNotifyEnum {
public static class PetItemInventroy{
public interface Topic{
String PET_ITEM_INVENTROY_CHANGE ="petItemInventroyChange";
}
public interface Tag{
String CREATE ="create";
String UPDATE ="update";
String DELETE ="delete";
}
}
}
Step2 新建PetItemInventoryMqProducer
新建PetItemInventroy模型对应消息生产者,用于发送PetItemInventroy模型变动的相关消息
package pro.shushi.pamirs.demo.core.mq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;
@Component
public class PetItemInventoryMqProducer {
@Autowired
private RocketMQProducer rocketMQProducer;
/**
* 发送普通消息
*/
public void sendNormalMessage(PetItemInventroy data,String tag) {
rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
}
}
Step3 新建PetItemInventoryMqConsumer
新建PetItemInventoryMqConsumer,订阅PetItemInventroy模型变动的相关消息,并进行相关处理
package pro.shushi.pamirs.demo.core.mq;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;
@Component
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
@Override
public void consumer(NotifyEvent event) {
PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
}
}
Step4 修改PetItemInventroyAction
在修改PetItemInventroy完成之后,发送topic为【PetNotifyEnum.PetItemInventroyMq.Topic.PET_ITEM_INVENTROY_CHANGE】,tag为【PetNotifyEnum.PetItemInventroyMq.Tag.UPDATE】的消息出去
package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {
@Autowired
private PetItemInventoryMqProducer petItemInventoryMqProducer;
@Function.Advanced(type= FunctionTypeEnum.UPDATE)
@Function.fun(FunctionConstants.update)
@Function(openLevel = {FunctionOpenEnum.API})
public PetItemInventroy update(PetItemInventroy data){
……其他代码
petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
return data;
}
}
Step5 重新看效果
- 编辑商品库存记录
图4-1-21-12 编辑商品库存记录
- 查看后端日志是否打印
顺序消息(举例)
Step1 修改PetItemInventoryMqProducer
增加一个顺序消息发送的方法,发送时根据商品库存的Id就分队列,相同队列顺序消费
package pro.shushi.pamirs.demo.core.mq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.rocketmq.RocketMQProducer;
@Component
public class PetItemInventoryMqProducer {
@Autowired
private RocketMQProducer rocketMQProducer;
/**
* 发送普通消息
*/
public void sendNormalMessage(PetItemInventroy data,String tag) {
rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data));
}
/**
* 发送有序消息
*/
public void sendOrderlyMessage(PetItemInventroy data,String tag) {
rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
.setQueueSelector((queueSize, event) -> {
PetItemInventroy body = (PetItemInventroy) event.getBody();
Long queue = body.getId().longValue() % Long.valueOf(queueSize);
return queue.intValue();
}));
}
}
Step2 修改PetItemInventoryMqConsumer
增加@NotifyListener(consumerType= ConsumerType.ORDERLY)的注解
package pro.shushi.pamirs.demo.core.mq;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.demo.api.model.PetItemInventroy;
import pro.shushi.pamirs.demo.api.mq.PetNotifyEnum;
import pro.shushi.pamirs.framework.connectors.event.annotation.NotifyListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyEventListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.ConsumerType;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;
@Component
//@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*")
@NotifyListener(topic = PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tags = "*",consumerType= ConsumerType.ORDERLY)
@Slf4j
public class PetItemInventoryMqConsumer implements NotifyEventListener {
@Override
public void consumer(NotifyEvent event) {
PetItemInventroy petItemInventroy = (PetItemInventroy)event.getBody();
log.info("petItemInventroy的消息, 库存数量为:" + petItemInventroy.getQuantity());
}
}
Step3 修改PetItemInventroyAction
调用petItemInventoryMqProducer的顺序消息发送接口
package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {
@Autowired
private PetItemInventoryMqProducer petItemInventoryMqProducer;
@Function.Advanced(type= FunctionTypeEnum.UPDATE)
@Function.fun(FunctionConstants.update)
@Function(openLevel = {FunctionOpenEnum.API})
public PetItemInventroy update(PetItemInventroy data){
……其他代码
// petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
return data;
}
}
Step4 重启看效果
同常规消息
事务消息(举例)
注:这种写法默认第一次是UNKNOWN,然后通过MQ回调二次确认,在消息时效性要求高的场景下是不符合要求的。对实效性要求高的,请见下面事务消息-优化(举例)章节。
Step1 新建NotifyTransactionListener
package pro.shushi.pamirs.demo.core.mq;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;
@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {
@Override
public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
log.info("消息回滚");
return NotifyTransactionState.ROLLBACK;
}
}
Step2 修改PetItemInventoryMqProducer
增加发送事务性消息的方法,通过.setIsTransaction(true)来显示设置该消息是事务消息,通过setGroup("petItemInventoryMqTransactionListener")来匹配事务监听处理器
/**
* 发送事务消息
*/
public void sendTransactionMessage(PetItemInventroy data,String tag) {
rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
.setQueueSelector((queueSize, event) -> {
PetItemInventroy body = (PetItemInventroy) event.getBody();
Long queue = body.getId().longValue() % Long.valueOf(queueSize);
return queue.intValue();
})
.setIsTransaction(true)
.setGroup("petItemInventoryMqTransactionListener")
);
}
Step3 修改PetItemInventroyAction
调用petItemInventoryMqProducer的事务性消息发送接口
package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {
@Autowired
private PetItemInventoryMqProducer petItemInventoryMqProducer;
@Function.Advanced(type= FunctionTypeEnum.UPDATE)
@Function.fun(FunctionConstants.update)
@Function(openLevel = {FunctionOpenEnum.API})
public PetItemInventroy update(PetItemInventroy data){
……其他代码
//petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
return data;
}
}
Step4 重启看效果
- 编辑商品库存记录
- 查看后端日志是否打印
消息回滚,没有消息消费的日志,从日志打印时间上看MQ回调会有延迟
Step5 修改NotifyTransactionListener
package pro.shushi.pamirs.demo.core.mq;
import org.springframework.stereotype.Component;
import pro.shushi.pamirs.framework.connectors.event.annotation.TransactionListener;
import pro.shushi.pamirs.framework.connectors.event.api.NotifyTransactionListener;
import pro.shushi.pamirs.framework.connectors.event.engine.NotifyEvent;
import pro.shushi.pamirs.framework.connectors.event.enumeration.NotifyTransactionState;
import pro.shushi.pamirs.meta.annotation.fun.extern.Slf4j;
@Component
@Slf4j
@TransactionListener
public class PetItemInventoryMqTransactionListener implements NotifyTransactionListener {
@Override
public NotifyTransactionState checkLocalTransaction(NotifyEvent event) {
log.info("消息提交");
//正常业务情况下,这里要增加自己的逻辑判断,来确定返回状态值
return NotifyTransactionState.COMMIT;
}
}
Step6 重启看效果
消息提交,并看到消息消费的日志,从日志打印时间上看MQ回调会有延迟
事务消息-优化(举例)
Step1 修改PetItemInventoryMqProducer
修改PetItemInventoryMqProducer事务性消息发送方法sendTransactionMessage,增加NotifyExecuteLocalTransactionCallback入参。
/**
* 发送事务消息
*/
public void sendTransactionMessage(PetItemInventroy data, String tag, NotifyExecuteLocalTransactionCallback callback) {
rocketMQProducer.send(new NotifyEvent(PetNotifyEnum.PetItemInventroy.Topic.PET_ITEM_INVENTROY_CHANGE, tag, data)
.setQueueSelector((queueSize, event) -> {
PetItemInventroy body = (PetItemInventroy) event.getBody();
Long queue = body.getId().longValue() % Long.valueOf(queueSize);
return queue.intValue();
})
.setIsTransaction(true)
.setGroup("petItemInventoryMqTransactionListener")
.setExecuteLocalTransactionCallback(callback)
);
}
Step2 修改PetItemInventroyAction
修改PetItemInventroyAction的update方法
package pro.shushi.pamirs.demo.core.action;
……引入依赖类
@Model.model(PetItemInventroy.MODEL_MODEL)
@Component
public class PetItemInventroyAction {
@Autowired
private PetItemInventoryMqProducer petItemInventoryMqProducer;
@Function.Advanced(type= FunctionTypeEnum.UPDATE)
@Function.fun(FunctionConstants.update)
@Function(openLevel = {FunctionOpenEnum.API})
public PetItemInventroy update(PetItemInventroy data){
//petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
// petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){
@Override
public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {
List<PetItemInventroy> inventroys = new ArrayList<>();
inventroys.add(data);
PamirsSession.directive().disableOptimisticLocker();
try{
int i = data.updateBatch(inventroys);
} finally {
PamirsSession.directive().enableOptimisticLocker();
}
//业务代码执行完毕,提交消息
return NotifyTransactionState.COMMIT;
}
});
return data;
}
}
Step3 重启看效果
编辑商品库存记录,发现消息的处理几乎没有延迟
Step4 模拟业务成功,消息发送失败
修改PetItemInventroyAction的update方法
@Function.Advanced(type= FunctionTypeEnum.UPDATE)
@Function.fun(FunctionConstants.update)
@Function(openLevel = {FunctionOpenEnum.API})
public PetItemInventroy update(PetItemInventroy data){
//petItemInventoryMqProducer.sendNormalMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
//petItemInventoryMqProducer.sendOrderlyMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
// petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE);
petItemInventoryMqProducer.sendTransactionMessage(data, PetNotifyEnum.PetItemInventroy.Tag.UPDATE,new NotifyExecuteLocalTransactionCallback(){
@Override
public NotifyTransactionState callback(NotifyTransactionState executeState, NotifyEvent event) {
List<PetItemInventroy> inventroys = new ArrayList<>();
inventroys.add(data);
PamirsSession.directive().disableOptimisticLocker();
try{
int i = data.updateBatch(inventroys);
} finally {
PamirsSession.directive().enableOptimisticLocker();
}
//模拟业务成功,消息发送失败
throw new RuntimeException();
//业务代码执行完毕,提交消息
//return NotifyTransactionState.COMMIT;
}
});
return data;
}
Step5 重启看效果
这个效果跟第一种事务消息一样,消息提交延迟严重。
Step6 总结
优化后的事务消息,在正常情况下时效性非常高,异常情况下也能通过NotifyTransactionListener做保障
Oinone社区 作者:史, 昂原创文章,如若转载,请注明出处:https://doc.oinone.top/oio4/9296.html
访问Oinone官网:https://www.oinone.top获取数式Oinone低代码应用平台体验