目录
一、消费消息的规则
二、消费消息的具体实现方法
?? 1、编写消费者类(ConsumerEnv)
?? 2、编写Consumer函数式接口(回调函数)
?? 3、编写ConsumeerManager类
??定义成员变量
??notifyConsume()方法
??添加构造方法
?? addConsumer()方法
?? 完善consumeMessage()方法
?? 4、完成VirtualHost类编写
?? basicConsume()方法编写
?? 编写basicAck类(手动应答)
三、测试VirtualHost
?? 1、准备工作和收尾工作
?? 2、测试交换机的创建和删除
??3、测试队列的创建和删除
?? 4、测试绑定的创建和删除
?? 5、测试发布消息
??6、测试消费消息
?? 先订阅队列,再发送消息
?? 先发送消息,再订阅队列
?? 测试basicAck
一、消费消息的规则
前面主要讲了basicPublish,发布消息这一块,同时写了Router类,实现了bindingKey和routingKey的命名规则和匹配规则,主要就是讲的是生产消息。
那么接下来就实现消费者消费消息。
?? 推送给消费者消息的基本思路:
1、broker server管理者哪些消费者
2、收到了对应的消息,把消息推送给消费者
已知,一个broker server中是包含了很多个队列的:
?? 消费者调用basicConsume,就是订阅某个队列的消息:
1、消费者是以队列的维度订阅消息
2、一个队列可以有多个消费者
此处,只需要约定消费者如何消费即可。
这里使用“轮询”的方式消费消息:轮询,举例子,如上图,有123三个消费者,让他们分别轮流消费一条消息,依次轮流来,一次消费一个。
具体实现:
1、定义一个类,描述一个消费者
2、然后给每个队列对象(MSGQueue对象)加上属性,相当于一个List,包含若干个消费者对象。
二、消费消息的具体实现方法
在VirtualHost类中实现一个订阅消息的方法basicConsume()
添加一个队列的订阅者,当队列收到消息以后,就要把消息推送给对应的订阅者。
consumerTag:消费者的身份标识
aotoAck:消息被消费完成后,应答的方式,为true自动应答,为false就手动应答。
Consumer:一个回调函数,也就是一个函数式接口(lambda函数底层实现),这样在后面调用basicConsume的时候,并且传实参的时候,就可以写作lambda样子
?? 1、编写消费者类(ConsumerEnv)
/* * 表示一个消费者 * */ @Data public class ConsumerEnv { private String consumerTag; //消费者身份标识 private String queueName; private boolean autoAck; // 通过回调处理收到的消息 private Consumer consumer; }
然后再MSGQueue.java类中,进行相应的扩充。
private List<ConsumerEnv> consumerEnvList = new ArrayList<>(); // 记录取到了第几个消费者,方便实现轮询策略 // AtomicInteger是一个原子性类型,因为consumerSeq再消费信息的时候会被修改, // 如果使用int可能造成线程不安全,于是这里就使用AtomicInteger public AtomicInteger consumerSeq = new AtomicInteger(); // 添加一个新的订阅者 public void addConsumerEnv(ConsumerEnv consumerEnv){ synchronized (this){ consumerEnvList.add(consumerEnv); } } // 挑选一个订阅者,处理当前的消息(轮询) public ConsumerEnv chooseConsumer(){ if (consumerEnvList.size() == 0){ // 该队列没有人订阅 return null; } // 计算当前要取的元素的下标 int index = consumerSeq.get() % consumerEnvList.size(); // getAndIncrement()先获取当前值,再加1。相当于 getAndAdd(1). consumerSeq.getAndIncrement(); //进行自增 return consumerEnvList.get(index); }
?? 2、编写Consumer函数式接口(回调函数)
创建一个Consumer接口。
/* * 只是一个函数式接口 * 收到消息之后要处理消息时调用的方法 * */ @FunctionalInterface public interface Consumer { // 处理投递 // 每次服务器收到消息之后,调用消息,通过这个方法把消息推送给对应的消费者 void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException; }
?? 3、编写ConsumeerManager类
这个类主要就是用来实现消费者消费消息的核心逻辑。主要有以下几块。
消费消息:就是让线程池,执行对应消费者中的回调函数。在调用回调函数的时候,就把消息的内容通过参数,传进去。消费者在最初订阅消息的时候,就把回调注册给broker server。回调函数的内容时消费者确定的,取决于消费者的业务逻辑。
扫描线程:能够感知到哪个队列里面收到了新的消息,扫描线程会取出该消息,找出对应的消费者,将该内容打包成一个任务,丢给线程池去调用
为什么需要线程池?
一些消费者给出的回调函数,处理起来可能会比较耗时,如果只有一个扫描线程,那么可能会导致处理不及时,导致队列中消息越来越多。所以这里引入的扫描线程就轻量的取消息和获取回调,而线程池就用来执行处理的回调函数。
扫描线程如何明白哪个队列中有了新消息?
引入一个阻塞队列。该队列中的元素是有消息的队列的名字,哪一个队列有消息了,就把队列名放到该阻塞队列中。扫描线程就可以从阻塞队列中获取到新增消息的队列的名字。
如何保证消息不被丢失?
使用消息确认(ACK)。在消息确认就是为了避免,消费者的回调方法在执行过程中出错,导致消息丢失这种情况。
为了保证消息不丢失:
(1)在真正执行回调之前,把该消息放到“待确认集合”中,也就是前面MemoryDataCenter中的queueMessageWaitAckMap集合中;
(2)执行回调
(3)当前消费者采取的是autoAck == true,也就是回调执行完毕不抛异常,就算消费成功;消费成功以后,删除消息(硬盘,内存哈希表,待确认集合)
(4)当前消息采取的是autoAck == false,手动应答。也就是消费者这边,在回调方法内部,显示调用basicAck这个核心API。
??定义成员变量
也就是上面提到过的,阻塞队列,扫描线程,线程池。
public class ConsumerManager { // 持有上层VirtualHost private VirtualHost parent; // 指定一个线程池,负责去执行具体的回调任务 private ExecutorService workerPool = Executors.newFixedThreadPool(4); // 引入一个阻塞队列,存放队列名的 private BlockingQueue<String > tokenQueue = new LinkedBlockingDeque<>(); // 扫描线程 private Thread scannerThread = null; }
??notifyConsume()方法
这个方法主要就是为了通知什么时候消费,这里主要就是在发送消息的时候,通知消费,将含有该消息的队列名放在阻塞队列中:
// 通知消费 // 调用时机:发送消息的时候,就调用(sendMessage) public void notifyConsume(String queueName) throws InterruptedException { tokenQueue.put(queueName); }
所以,我们就需要在前面VirtualHost类中的sendMessage方法中再调用一个通知消费的方法:
异常大家自己向上抛一下。
// 通知消费者进行消费 consumerManager.notifyConsume(queue.getName());
??添加构造方法
添加构造方法,构造一个线程,编写从队列中取出消息的过程,
其中的consumeMessage(queue)是消费消息的具体实现方法,先列在这里,不实现
public ConsumerManager(VirtualHost p){ parent = p; scannerThread = new Thread(()->{ // 持续运行 while (true){ try { // 1、从阻塞队列中拿到队列名 String queueName = tokenQueue.take(); // 2、根据队列名找到队列 MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName); if (queue == null){ throw new MqException("[ConsumerManager]取出令牌后发现,该队列名不存在!queuName = " + queueName); } // 3、从队列中消费一个消息 synchronized (queue){ consumeMessage(queue); } } catch (InterruptedException | MqException e) { e.printStackTrace(); } } }); // 把线程设为后台线程 scannerThread.setDaemon(true); scannerThread.start(); } private void consumeMessage(MSGQueue queue) { //TODO }
?? addConsumer()方法
该方法主要是为了新增一个Consumer对象到指定的队列中。
// 新增一个Consumer对象到指定的队列中 public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException { // 找到对应的队列 MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName); if (queue == null){ throw new MqException("[ConsumerManager]队列不存在!queueName = " + queueName); } ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer); synchronized (queue){ queue.addConsumerEnv(consumerEnv); // 如果当前队列中已经有了一些消息,需要立即消费掉 int n = parent.getMemoryDataCenter().getMessageCount(queueName); for (int i = 0; i < n; i++) { // 调用一次就消费一条消息 consumeMessage(queue); } } }
?? 完善consumeMessage()方法
这个方法前面只列了一下,没有实现,这里具体实现一下。
主要有以下几步:
(1)按照轮询的方式,找出一个消费者
(2)从队列中取出一个消息
(3)把消息丢给回调函数,给线程池处理。
a. 把消息放到待确认集合中
b. 真正的执行回调操作
c. 如果是自动应答,直接删除消息;手动应答,先不处理,交给后续消费者调用 basicAck()。
private void consumeMessage(MSGQueue queue) { // 1、按照轮询的方式,找出一个消费者来 ConsumerEnv luckyDog = queue.chooseConsumer(); if (luckyDog == null){ // 当前没有消费者,暂时不消费 return; } // 2、从队列中取出一个消息 // pollMessage是为了从队列中取出消息 Message message = parent.getMemoryDataCenter().pollMessage(queue.getName()); if (message == null) { // 当前队列没有消息 return; } // 3、把消息丢给回调函数中,给线程池处理 workerPool.submit(() -> { try { // 1、把消息放到待确认集合中 parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message); // 2、真正执行回调操作 luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody()); // 3、如果当前是自动应答,就可以直接删除消息 // 如果是手动应答,就需要调用basicAck() if (luckyDog.isAutoAck()){ // 1).删除硬盘,先看是不是持久化消息 if (message.getDeliverMode() == 2){ parent.getDiskDataCenter().deleteMessage(queue,message); } // 2)、待确认集合中的消息 parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(),message.getMessageId()); // 3)、删除内存中消息中心的消息 parent.getMemoryDataCenter().removeMessage(message.getMessageId()); System.out.println("[ConsumerManager]消息被成功消费!queueName = " + queue.getName()); } } catch (Exception e) { e.printStackTrace(); } }); }
?? 4、完成VirtualHost类编写
?? basicConsume()方法编写
该方法主要作用是订阅消息(消费消息)。在VirtualHost中实现。其中调用了ConsumerManager中的方法。
首先在VirtualHost添加consumerManager的实例。
private ConsumerManager consumerManager = new ConsumerManager(this);
然后写订阅消的方法。
// 订阅消息 public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer){ // 构造一个ConsumerEnv对象,也就是消费者对象,把对应的队列找到,然后将Consumer对象添加到该队列中。 queueName = virtualHostName + queueName; try { consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer); System.out.println("[VirtualHost]basicConsume成功! queueName = " + queueName); return true; } catch (Exception e){ System.out.println("[VirtualHost]basicConsume失败! queueName = " + queueName); e.printStackTrace(); return false; } }
?? 编写basicAck类(手动应答)
public boolean basicAck(String queueName,String messageId){ queueName = virtualHostName + queueName; try{ // 1、获取到消息和队列 Message message = memoryDataCenter.getMessage(messageId); if (message == null){ throw new MqException("[VirtualHost] 消息不存在!messgeId = " + messageId); } MSGQueue queue = memoryDataCenter.getQueue(queueName); if (queue == null){ throw new MqException("[VirtualHost] 要确认的队列不存在!queueName = " + queueName); } // 2、删除硬盘上的数据 if (message.getDeliverMode() == 2){ diskDataCenter.deleteMessage(queue,message); } // 3、、删除内存中的数据 memoryDataCenter.removeMessage(messageId); // 4、删除待确认集合中的数据 memoryDataCenter.removeMessageWaitAck(queueName,messageId); System.out.println("[VirtualHost]basicAck成功!消息被成功确认!queueName = " + queueName); return true; // }catch (Exception e){ System.out.println("[VirtualHost]basicAck失败!消息被成功失败!queueName = " + queueName); e.printStackTrace(); return false; } }
到这里,我们的虚拟主机VirtualHost类,就算全部写完了。
三、测试VirtualHost
?? 1、准备工作和收尾工作
@SpringBootTest public class VirtualHostTests { private VirtualHost virtualHost = null; @BeforeEach public void setUp(){ TigerMqApplication.context = SpringApplication.run(TigerMqApplication.class); virtualHost = new VirtualHost("default"); } public void tearDown() throws IOException { TigerMqApplication.context.close(); virtualHost = null; // 把硬盘目录删除 File dataDir = new File("./data"); FileUtils.deleteDirectory(dataDir); } }
?? 2、测试交换机的创建和删除
// 测试创建和删除交换机 @Test public void testExchange(){ boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true); Assertions.assertTrue(ok); ok = virtualHost.exchangeDelete("testExchange"); Assertions.assertTrue(ok); }
??3、测试队列的创建和删除
//测试创建队列和删除队列 @Test public void testQueue(){ boolean ok = virtualHost.queueDeclare("testQueue", true); Assertions.assertTrue(ok); ok = virtualHost.queueDelete("testQueue"); Assertions.assertTrue(ok); }
?? 4、测试绑定的创建和删除
// 测试创建绑定和删除绑定 @Test public void testQueueBind(){ boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true); Assertions.assertTrue(ok); ok = virtualHost.queueDeclare("testQueue", true); Assertions.assertTrue(ok); ok = virtualHost.queueBind("testQueue","testExchange","testBindingKey"); Assertions.assertTrue(ok); ok = virtualHost.queueUnbind("testQueue","testExchange"); Assertions.assertTrue(ok); }
?? 5、测试发布消息
// 测试发布消息 @Test public void testBasicPublish() { boolean ok = virtualHost.queueDeclare("testQueue", true); Assertions.assertTrue(ok); ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true); Assertions.assertTrue(ok); ok = virtualHost.basicPublish("testExchange", "testQueue", null, "hello".getBytes()); Assertions.assertTrue(ok); }
??6、测试消费消息
?? 先订阅队列,再发送消息
// 消费消息 // 先订阅队列, 后发送消息 @Test public void testBasicConsume1() throws InterruptedException { boolean ok = virtualHost.queueDeclare("testQueue", true); Assertions.assertTrue(ok); ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true); Assertions.assertTrue(ok); // 先订阅队列 ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) { // 消费者自身设定的回调方法. System.out.println("messageId=" + basicProperties.getMessageId()); System.out.println("body=" + new String(body, 0, body.length)); Assertions.assertEquals("testQueue", basicProperties.getRoutingKey()); Assertions.assertEquals(1, basicProperties.getDeliverMode()); Assertions.assertArrayEquals("hello".getBytes(), body); } }); Assertions.assertTrue(ok); Thread.sleep(500); // 再发送消息 ok = virtualHost.basicPublish("testExchange", "testQueue", null, "hello".getBytes()); Assertions.assertTrue(ok); }
打印的日志如下:
[DataBaseManger]创建表完成 [DataBaseManger]创建初始数据已经完成 [DataBaseManger]数据库初始化完成 [MemoryDataCenter]队列删除成功!queueName = defaulttestQueue [VirtualHost]队列创建成功!queueName = defaulttestQueue [MemoryDataCenter]新交换机添加成功!exchangeName = defaulttestExchange [VirtualHost] 交换机创建完成!exchangeName = defaulttestExchange [VirtualHost]basicConsume成功! queueName = defaulttestQueue [MemoryDataCenter]新消息添加成功!messageId = M-a500879e-5461-4550-8d56-5bef00571ab3 [MemoryDataCenter]消息被投递到到队列中! messageId = M-a500879e-5461-4550-8d56-5bef00571ab3 [MemoryDataCenter]消息从队列中取出!messageId = M-a500879e-5461-4550-8d56-5bef00571ab3 [MemoryDataCenter]消息进入待确认队列!messageId = M-a500879e-5461-4550-8d56-5bef00571ab3 messageId=M-a500879e-5461-4550-8d56-5bef00571ab3 body=hello [MemoryDataCenter]消息从待确认队列删除!messageId = M-a500879e-5461-4550-8d56-5bef00571ab3 [MemoryDataCenter]消息被移除!messageId = M-a500879e-5461-4550-8d56-5bef00571ab3 [ConsumerManager]消费被成功消费!queueName = defaulttestQueue
?? 先发送消息,再订阅队列
@Test public void testBasicConsume2() throws InterruptedException { boolean ok = virtualHost.queueDeclare("testQueue", true); Assertions.assertTrue(ok); ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true); Assertions.assertTrue(ok); // 先发送消息 ok = virtualHost.basicPublish("testExchange", "testQueue", null, "hello".getBytes()); Assertions.assertTrue(ok); Thread.sleep(500); // 再订阅队列 ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) { // 消费者自身设定的回调方法. System.out.println("messageId=" + basicProperties.getMessageId()); System.out.println("body=" + new String(body, 0, body.length)); Assertions.assertEquals("testQueue", basicProperties.getRoutingKey()); Assertions.assertEquals(1, basicProperties.getDeliverMode()); Assertions.assertArrayEquals("hello".getBytes(), body); } }); Assertions.assertTrue(ok); }
[MessageFileManager]恢复Message数据完成 [VirtualHost]队列已经存在!queueName = defaulttestQueue [VirtualHost]交换机已经存在!exchangeName = defaulttestExchange [MemoryDataCenter]新消息添加成功!messageId = M-b765df78-6997-4ce2-b87e-a6d37e3ee3c8 [MemoryDataCenter]消息被投递到到队列中! messageId = M-b765df78-6997-4ce2-b87e-a6d37e3ee3c8 [MemoryDataCenter]消息从队列中取出!messageId = M-b765df78-6997-4ce2-b87e-a6d37e3ee3c8 [VirtualHost]basicConsume成功! queueName = defaulttestQueue [MemoryDataCenter]消息进入待确认队列!messageId = M-b765df78-6997-4ce2-b87e-a6d37e3ee3c8 messageId=M-b765df78-6997-4ce2-b87e-a6d37e3ee3c8 body=hello [MemoryDataCenter]消息从待确认队列删除!messageId = M-b765df78-6997-4ce2-b87e-a6d37e3ee3c8 [MemoryDataCenter]消息被移除!messageId = M-b765df78-6997-4ce2-b87e-a6d37e3ee3c8 [ConsumerManager]消费被成功消费!queueName = defaulttestQueue
?? 测试basicAck
@Test public void testBasicAck() throws InterruptedException { boolean ok = virtualHost.queueDeclare("testQueue", true); Assertions.assertTrue(ok); ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true); Assertions.assertTrue(ok); // 先发送消息 ok = virtualHost.basicPublish("testExchange", "testQueue", null, "hello".getBytes()); Assertions.assertTrue(ok); // 再订阅队列 [把 autoAck 改成 false] ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) { // 消费者自身设定的回调方法. System.out.println("messageId=" + basicProperties.getMessageId()); System.out.println("body=" + new String(body, 0, body.length)); Assertions.assertEquals("testQueue", basicProperties.getRoutingKey()); Assertions.assertEquals(1, basicProperties.getDeliverMode()); Assertions.assertArrayEquals("hello".getBytes(), body); // [新增手动调用 basicAck] boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId()); Assertions.assertTrue(ok); } }); Assertions.assertTrue(ok); Thread.sleep(500); }
[DataBaseManger]创建表完成 [DataBaseManger]创建初始数据已经完成 [DataBaseManger]数据库初始化完成 [MemoryDataCenter]队列删除成功!queueName = defaulttestQueue [VirtualHost]队列创建成功!queueName = defaulttestQueue [MemoryDataCenter]新交换机添加成功!exchangeName = defaulttestExchange [VirtualHost] 交换机创建完成!exchangeName = defaulttestExchange [MemoryDataCenter]新消息添加成功!messageId = M-72d857bf-fea8-4cf3-a94b-2c87c5226107 [MemoryDataCenter]消息被投递到到队列中! messageId = M-72d857bf-fea8-4cf3-a94b-2c87c5226107 [MemoryDataCenter]消息从队列中取出!messageId = M-72d857bf-fea8-4cf3-a94b-2c87c5226107 [VirtualHost]basicConsume成功! queueName = defaulttestQueue [MemoryDataCenter]消息进入待确认队列!messageId = M-72d857bf-fea8-4cf3-a94b-2c87c5226107 messageId=M-72d857bf-fea8-4cf3-a94b-2c87c5226107 body=hello [MemoryDataCenter]消息被移除!messageId = M-72d857bf-fea8-4cf3-a94b-2c87c5226107 [MemoryDataCenter]消息从待确认队列删除!messageId = M-72d857bf-fea8-4cf3-a94b-2c87c5226107 [VirtualHost]basicAck成功!消息被成功确认!queueName = defaulttestQueue