RocketMQ与Springboot整合

前言
RocketMQ是由Alibaba用Java开发、现已加入到Apache下的一个分布式消息中间件,具有高性能、低延迟,高可靠性,下面是与Springboot的整合使用过程

1、RocketMQ 安装过程请自行百度,本篇不涉及
2、个人觉得 RocketMQ 比 RabbitMQ 优势明显,如:RocketMQ 支持事务、消息过滤、消息查询,当队列较多、消息堆积时性能也很稳定(毕竟挺过这么多的双11),而 RabbitMQ 性能明显下降,并且 RocketMQ 有可视化的中文管理后台,非常友好,不过活跃度上 RabbitMQ 要高出不少,相关资料 RocketMQ 要相对少一点

更新:这里分享下个人之前用docker安装rocketmq的笔记,可供参考
链接:https://pan.baidu.com/s/1GC_d1UV7RAO0kHlGVoLYNg romq

第一步:引入maven依赖和配置

在 pom.xml 中加入以下依赖(这里用的 2.0.4 版本,不算太老,完全够用,新的 2.x 版本在事务消息那块代码有小小改动)

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
 </dependency>

<!-- 还有其它需要的jar包自由引入(注:fastjson不要使用低于1.2.60版本,会有安全漏洞) -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.62</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

在 application.yml 中添加以下配置

 rocketmq:
   name-server: 192.168.1.224:9876 # 访问地址
   producer:
     group: Pro_Group # 必须指定group
     send-message-timeout: 3000 # 消息发送超时时长,默认3s
     retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
     retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

第二步:编写生产者

新建 MQProducerService 类,如下(建议自己手动跟着敲一遍,熟悉rocketMQTemplate使用方式)

  @Slf4j
  @Component
  public class MQProducerService {

  	@Value("${rocketmq.producer.send-message-timeout}")
      private Integer messageTimeOut;

  	// 建议正常规模项目统一用一个TOPIC
      private static final String topic = "RLT_TEST_TOPIC";

  	// 直接注入使用,用于发送消息到broker服务器
      @Autowired
      private RocketMQTemplate rocketMQTemplate;

  	/**
       * 普通发送(这里的参数对象User可以随意定义,可以发送个对象,也可以是字符串等)
       */
      public void send(User user) {
          rocketMQTemplate.convertAndSend(topic + ":tag1", user);
  //        rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(user).build()); // 等价于上面一行
      }

      /**
       * 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
       * (msgBody也可以是对象,sendResult为返回的发送结果)
       */
      public SendResult sendMsg(String msgBody) {
          SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());
          log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
          return sendResult;
      }

  	/**
       * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
       * (适合对响应时间敏感的业务场景)
       */
      public void sendAsyncMsg(String msgBody) {
          rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
              @Override
              public void onSuccess(SendResult sendResult) {
                  // 处理消息发送成功逻辑
              }
              @Override
              public void onException(Throwable throwable) {
                  // 处理消息发送异常逻辑
              }
          });
      }

  	/**
       * 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)
       * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
       */
      public void sendDelayMsg(String msgBody, int delayLevel) {
          rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
      }

      /**
       * 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)
       */
      public void sendOneWayMsg(String msgBody) {
          rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());
      }

  	/**
       * 发送带tag的消息,直接在topic后面加上":tag"
       */
      public SendResult sendTagMsg(String msgBody) {
          return rocketMQTemplate.syncSend(topic + ":tag2", MessageBuilder.withPayload(msgBody).build());
      }

  }

上面写的这几个消息发送方法,你应该注意到了: 第一个方法和最后一个方法的参数 topic 和其它的不一样
其实这是 rocketmq 和 springboot 整合后设置 Tag 的方式(Tag:用于区分过滤同一主题下的不同业务类型的消息,非常实用)
在项目里往mq写入消息时,最好每条消息都带上tag,用于消费时根据业务过滤

另外,对于延时消息的参数理解是这样:共 18 个等级,值在上面已经注明了,下标从 1 开始,举例:我要发送个延迟为 1 分钟的消息,那么参数 delayLevel 的值为 5

在 rocketmq-spring-boot-starter 中,Tag 的设置方式: 在 topic后面加上 “:tagName”

源码中是以 “:”进行分割的,前面的是 topic,后面的就是 tag,截图如下:
tag源码
另外,从上面的截图中可以看到“key”的设置方式,发送消息时在header中设置:

MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.KEYS, "key1")

第三步:编写消费者

新建 MQConsumerService 类(本案例中我就以上面生产者中第一个和最后一个加了tag的消息进行消费)

 @Slf4j
 @Component
 public class MQConsumerService {

     // topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
     // selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
     @Service
     @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1", consumerGroup = "Con_Group_One")
     public class ConsumerSend implements RocketMQListener<User> {
         // 监听到消息就会执行此方法
         @Override
         public void onMessage(User user) {
             log.info("监听到消息:user={}", JSON.toJSONString(user));
         }
     }

     // 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存,
     // 不然生产者发送一条消息,这两个都会去消费,如果类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行
     @Service
     @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two")
     public class ConsumerSend2 implements RocketMQListener<String> {
         @Override
         public void onMessage(String str) {
             log.info("监听到消息:str={}", str);
         }
     }

 	// MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)
     @Service
     @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2", consumerGroup = "Con_Group_Three")
     public class Consumer implements RocketMQListener<MessageExt> {
         @Override
         public void onMessage(MessageExt messageExt) {
             byte[] body = messageExt.getBody();
             String msg = new String(body);
             log.info("监听到消息:msg={}", msg);
         }
     }

 }

消费者这里要注意设置的参数要正确,我这里为了方便就写在一个类里
这里消费者会直接监听生产者发送的消息,一旦生产者那边发送消息,对应这里就会消费
另外可以对注解 @RocketMQMessageListener 点进去看看它的属性参数,都是非常熟悉的
还有,实际生产中,应避免自己的消费者代码出现非业务逻辑上的错误,比如消费时某个消费者报类型转换异常,建议多用TAG做区分

第四步:测试

这里就直接建个Controller吧

 @RestController
 @RequestMapping("/rocketmq")
 public class RocketMQController {

     @Autowired
     private MQProducerService mqProducerService;

     @GetMapping("/send")
     public void send() {
         User user = User.getUser();
         mqProducerService.send(user);
     }

     @GetMapping("/sendTag")
     public Result<SendResult> sendTag() {
         SendResult sendResult = mqProducerService.sendTagMsg("带有tag的字符消息");
         return Result.success(sendResult);
     }

 }

用postman调用测试:
(1)http://localhost:8080/rocketmq/send
send
(2)http://localhost:8080/rocketmq/sendTag
sendTag
sendResult

你也可以试试消费消息时不指定tag,它会监听消费所有消息
开始提到,rocketmq有一个可视化的后台,你也可以在里面找到你发的消息和消费情况

总结:

  1. 实际运用中一些配置不要像我上面一样写在代码里,写在配置文件里或统一配置
  2. 消息发送成功与失败可以根据sendResult判断,消息消费成功与否其实源码内部已做了处理,只要不出现异常,就是消费成功,如果你业务代码逻辑有问题那另说
  3. 实际生产中还要注意重复消费问题,这里我提供一个方法:在数据库加一个去重表,给表里的一个字段如key添加唯一索引,消费前先入库,正常则往下执行你的业务逻辑,入库失败了表明该消息已消费过,不能往下走了
  4. 其实rocketmq还有一个很重要的特性:事务,其它mq可是不支持的,利用事务可以做很多事,如跟钱相关的业务、分布式事务,不过事务的实现过程要麻烦点
  5. 上面就是RocketMQ与Springboot的整合,整合了使用起来还是比较简单的
  6. 分布式事务已更新:Springboot中用RocketMQ解决分布式事务