消息中间件是在分布式开发中常见的一种技术手段,用于模块间的解耦、异步处理、数据最终一致等场景。 一、介绍 oinone对开源的RocketMQ进行了封装,是平台提供的一种较为简单的使用方式,并非是对RocketMQ进行的功能扩展。同时也伴随着两个非常至关重要的目的: 适配不同企业对RocketMQ的不同版本选择,不至于改上层业务代码。目前已经适配RocketMQ的开源版本和阿里云版本。 下个版本会对API进行升级支持不同类型MQ,以适配不同企业对MQ的不同要求,应对一些企业客户已经对MQ进行技术选择 对协议头进行扩展:如多租户的封装,saas模式中为了共用MQ基础资源,需要在消息头中加入必要租户信息。 二、使用准备 demo工程默认已经依赖消息,这里只是做介绍无需大家额外操作,大家可以用maven依赖树命令查看引用关系。 依赖包 增加对pamirs-connectors-event的依赖 <dependency> <groupId>pro.shushi.pamirs.framework</groupId> <artifactId>pamirs-connectors-event</artifactId> </dependency> 图4-1-21-1 分布式消息的依赖包 相关功能引入 增加模型、触发器都依赖MQ <!– 增强模型 –> <!– 增强模型 –> <dependency> <groupId>pro.shushi.pamirs.core</groupId> <artifactId>pamirs-channel</artifactId> </dependency> <!– 触发器 api –> <dependency> <groupId>pro.shushi.pamirs.core</groupId> <artifactId>pamirs-trigger-api</artifactId> </dependency> <!– 触发器 core –> <dependency> <groupId>pro.shushi.pamirs.core</groupId> <artifactId>pamirs-trigger-core</artifactId> </dependency> 图4-1-21-2 增加模型、触发器都依赖MQ yml配置文件参考 详见4.1.1【模块之yml文件结构详解】的“pamirs.event”部分。 三、使用说明 发送消息(NotifyProducer) 概述 NotifyProducer是Pamirs Event中所有生产者的基本API,它仅仅定义了消息发送的基本行为,例如生产者自身的属性,启动和停止,当前状态,以及消息发送方法。它本身并不决定消息如何发送,而是根据具体的实现确定其功能。 目前仅实现了RocketMQProducer,你可以使用下面介绍的方法轻松使用这些功能。 使用方法 Notify注解方式 使用示例 @Component public class DemoProducer { @Notify(topic = "test", tags = "model") public DemoModel sendModel() { return new DemoModel(); } @Notify(topic = "test", tags = "dto") public DemoDTO sendDTO() { return new DemoDTO(); } } 图4-1-21-3 Notify注解方式使用示例 解释说明 使用Component注解方式注册Spring Bean。 Notify注解指定topic和tags。 topic和tags对应NotifyEvent中的topic和tags。 RocketMQProducer方法调用 使用示例 @Component public class SendRocketMQMessage { @Autowired private RocketMQProducer rocketMQProducer; /** * 发送普通消息 */ public void sendNormalMessage() { rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel())); rocketMQProducer.send(new NotifyEvent("test", "dto", new DemoDTO())); } /** * 发送有序消息 */ public void sendOrderlyMessage() { DemoModel data = new DemoModel(); data.setAge(10); rocketMQProducer.send(new NotifyEvent("test", "model", data) .setQueueSelector((queueSize, event) -> { DemoModel body = (DemoModel) event.getBody(); return body.getAge() % queueSize; })); } /** * 发送事务消息 */ public void sendTransactionMessage() { rocketMQProducer.send(new NotifyEvent("test", "model", new DemoModel()) .setIsTransaction(true) .setGroup("demoTransactionListener")); } } 图4-1-21-4 RocketMQProducer方法调用…