前言
RocketMQ是由Alibaba用Java开发、现已加入到Apache下的一个分布式消息中间件,具有高性能、低延迟,高可靠性,下面是与Springboot的整合使用过程
1、RocketMQ 安装过程请自行百度,本篇不涉及
2、个人觉得 RocketMQ 比 RabbitMQ 优势明显,如:RocketMQ 支持事务、消息过滤、消息查询,当队列较多、消息堆积时性能也很稳定(毕竟挺过这么多的双11),而 RabbitMQ 性能明显下降,并且 RocketMQ 有可视化的中文管理后台,非常友好,不过活跃度上 RabbitMQ 要高出不少,相关资料 RocketMQ 要相对少一点
更新:这里分享下个人之前用docker安装rocketmq的笔记,可供参考
链接:https://pan.baidu.com/s/1GC_d1UV7RAO0kHlGVoLYNg romq
第一步:引入maven依赖和配置
在 pom.xml 中加入以下依赖(这里用的 2.0.4 版本,不算太老,完全够用,新的 2.x 版本在事务消息那块代码有小小改动)
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency> <!-- 还有其它需要的jar包自由引入(注:fastjson不要使用低于1.2.60版本,会有安全漏洞) --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
在 application.yml 中添加以下配置
rocketmq: name-server: 192.168.1.224:9876 # 访问地址 producer: group: Pro_Group # 必须指定group send-message-timeout: 3000 # 消息发送超时时长,默认3s retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2 retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
第二步:编写生产者
新建 MQProducerService 类,如下(建议自己手动跟着敲一遍,熟悉rocketMQTemplate使用方式)
@Slf4j @Component public class MQProducerService { @Value("${rocketmq.producer.send-message-timeout}") private Integer messageTimeOut; // 建议正常规模项目统一用一个TOPIC private static final String topic = "RLT_TEST_TOPIC"; // 直接注入使用,用于发送消息到broker服务器 @Autowired private RocketMQTemplate rocketMQTemplate; /** * 普通发送(这里的参数对象User可以随意定义,可以发送个对象,也可以是字符串等) */ public void send(User user) { rocketMQTemplate.convertAndSend(topic + ":tag1", user); // rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(user).build()); // 等价于上面一行 } /** * 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息) * (msgBody也可以是对象,sendResult为返回的发送结果) */ public SendResult sendMsg(String msgBody) { SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build()); log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult)); return sendResult; } /** * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) * (适合对响应时间敏感的业务场景) */ public void sendAsyncMsg(String msgBody) { rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 处理消息发送成功逻辑 } @Override public void onException(Throwable throwable) { // 处理消息发送异常逻辑 } }); } /** * 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时) * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h */ public void sendDelayMsg(String msgBody, int delayLevel) { rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel); } /** * 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志) */ public void sendOneWayMsg(String msgBody) { rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build()); } /** * 发送带tag的消息,直接在topic后面加上":tag" */ public SendResult sendTagMsg(String msgBody) { return rocketMQTemplate.syncSend(topic + ":tag2", MessageBuilder.withPayload(msgBody).build()); } }
上面写的这几个消息发送方法,你应该注意到了: 第一个方法和最后一个方法的参数 topic 和其它的不一样
其实这是 rocketmq 和 springboot 整合后设置 Tag 的方式(Tag:用于区分过滤同一主题下的不同业务类型的消息,非常实用)
在项目里往mq写入消息时,最好每条消息都带上tag,用于消费时根据业务过滤
另外,对于延时消息的参数理解是这样:共 18 个等级,值在上面已经注明了,下标从 1 开始,举例:我要发送个延迟为 1 分钟的消息,那么参数 delayLevel 的值为 5
在 rocketmq-spring-boot-starter 中,Tag 的设置方式: 在 topic后面加上 “:tagName”
源码中是以 “:”进行分割的,前面的是 topic,后面的就是 tag,截图如下:
另外,从上面的截图中可以看到“key”的设置方式,发送消息时在header中设置:
MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.KEYS, "key1")
第三步:编写消费者
新建 MQConsumerService 类(本案例中我就以上面生产者中第一个和最后一个加了tag的消息进行消费)
@Slf4j @Component public class MQConsumerService { // topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意 // selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息 @Service @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1", consumerGroup = "Con_Group_One") public class ConsumerSend implements RocketMQListener<User> { // 监听到消息就会执行此方法 @Override public void onMessage(User user) { log.info("监听到消息:user={}", JSON.toJSONString(user)); } } // 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存, // 不然生产者发送一条消息,这两个都会去消费,如果类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行 @Service @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two") public class ConsumerSend2 implements RocketMQListener<String> { @Override public void onMessage(String str) { log.info("监听到消息:str={}", str); } } // MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便) @Service @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2", consumerGroup = "Con_Group_Three") public class Consumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { byte[] body = messageExt.getBody(); String msg = new String(body); log.info("监听到消息:msg={}", msg); } } }
消费者这里要注意设置的参数要正确,我这里为了方便就写在一个类里
这里消费者会直接监听生产者发送的消息,一旦生产者那边发送消息,对应这里就会消费
另外可以对注解 @RocketMQMessageListener 点进去看看它的属性参数,都是非常熟悉的
还有,实际生产中,应避免自己的消费者代码出现非业务逻辑上的错误,比如消费时某个消费者报类型转换异常,建议多用TAG做区分
第四步:测试
这里就直接建个Controller吧
@RestController @RequestMapping("/rocketmq") public class RocketMQController { @Autowired private MQProducerService mqProducerService; @GetMapping("/send") public void send() { User user = User.getUser(); mqProducerService.send(user); } @GetMapping("/sendTag") public Result<SendResult> sendTag() { SendResult sendResult = mqProducerService.sendTagMsg("带有tag的字符消息"); return Result.success(sendResult); } }
用postman调用测试:
(1)http://localhost:8080/rocketmq/send
(2)http://localhost:8080/rocketmq/sendTag
你也可以试试消费消息时不指定tag,它会监听消费所有消息
开始提到,rocketmq有一个可视化的后台,你也可以在里面找到你发的消息和消费情况
总结:
- 实际运用中一些配置不要像我上面一样写在代码里,写在配置文件里或统一配置
- 消息发送成功与失败可以根据sendResult判断,消息消费成功与否其实源码内部已做了处理,只要不出现异常,就是消费成功,如果你业务代码逻辑有问题那另说
- 实际生产中还要注意重复消费问题,这里我提供一个方法:在数据库加一个去重表,给表里的一个字段如key添加唯一索引,消费前先入库,正常则往下执行你的业务逻辑,入库失败了表明该消息已消费过,不能往下走了
- 其实rocketmq还有一个很重要的特性:事务,其它mq可是不支持的,利用事务可以做很多事,如跟钱相关的业务、分布式事务,不过事务的实现过程要麻烦点
- 上面就是RocketMQ与Springboot的整合,整合了使用起来还是比较简单的
- 分布式事务已更新:Springboot中用RocketMQ解决分布式事务