消息队列RocketMQ 并发消费和顺序消费

在 RocketMQ 中,消息的消费模式包括并发消费和顺序消费,它们分别适用于不同的业务场景。下面是对这两种消费模式的介绍:

1. 并发消费:

  • 特点:

    • 并发消费是指多个消费者实例同时处理消息,每个实例独立地处理一部分消息。
    • 多个消费者实例可以并发地处理来自同一个主题和标签的消息,提高了消息处理的并发度。
  • 适用场景:

    并发消费适用于业务场景不要求消息的处理顺序,可以并行处理的情况。 适用于需要提高消息处理吞吐量的场景。

示例代码:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YourConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");

// 设置消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 处理消息的业务逻辑...
            System.out.println("Received message: " + new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

// 订阅主题和标签
consumer.subscribe("YourTopic", "YourTag");

// 启动消费者
consumer.start();

2. 顺序消费:

在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费。

  • 特点:

    • 顺序消费是指消息按照一定的顺序进行处理,同一个消息队列上的消息按照发送顺序和消费顺序进行处理。
    • 顺序消费通常需要保证同一消息队列上只有一个消费者实例处理消息,以确保消息的顺序性。
  • 适用场景:

    顺序消费适用于业务场景要求消息按照一定顺序处理的情况,如订单处理、流程审批等。

  • 因此RocketMQ提供了顺序消费的方式,顺序消费设置与并发消费API层面只有一处不同,在注册消费回调接口时传入 MessageListenerOrderly接口的实现。 示例代码:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YourConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");

// 设置消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        // 保证同一队列上的消息按照顺序处理
        for (MessageExt msg : msgs) {
            // 处理消息的业务逻辑...
            System.out.println("Received message: " + new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

// 订阅主题和标签
consumer.subscribe("YourTopic", "YourTag");

// 启动消费者
consumer.start();

注意:在顺序消费的情况下,确保同一消息队列上只有一个消费者实例处理消息是非常重要的,否则可能会导致消息处理的混乱。同时,消息的发送方也需要保证消息发送到同一个队列上,以确保顺序性。

持续更新ing,动动小手,点点关注,后续更精彩O