在开始介绍Rebalance操作的实现细节之前,我们需要明确在哪几种情况下会触发Rebalance操作:
- 有新的消费者加入Consumer Group。
- 有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向GroupCoordinator发送HeartbeatRequest时,GroupCoordinator会认为消费者下线。
- 有消费者主动退出Consumer Group。
- Consumer Group订阅的任一Topic出现分区数量的变化。
- 消费者调用unsubscrible()取消对某Topic的订阅。
第一阶段
Rebalance操作的第一步就是查找GroupCoordinator,这个阶段消费者会向Kafka集群中的任意一个Broker发送GroupCoordinatorRequest请求,并处理返回的GroupCoordinatorResponse响应。
GroupCoordinatorRequest消息体的格式比较简单,只包含了Consumer Group的id。GroupCoordinatorResponse消息体包含了错误码(short类型)、coordinator的节点Id(int类型)、GroupCoordinator的host(String类型)、GroupCoordinator的端口号(int类型)。
发送GroupCoordinatorRequest请求的入口是ConsumerCoordinator的ensureCoordinatorReady方法,其流程如图所示。
-
首先检测是否需要重新查找GroupCoordinator,主要是检查coordinator字段是否为空以及与GroupCoordinator之间的连接是否正常。
-
查找集群负载最低的Node节点,并创建GroupCoordinatorRequest请求。调用client.send方法将请求放入unsent队列中等待发送,并返回RequestFuture对象。返回的RequestFuture对象经过了compose方法适配,原理同HeartbeatCompletionHandler。
-
调用ConsumerNetworkClient.poll(future)方法,将GroupCoordinatorRequest请求发送出去。此处使用阻塞的方式发送,直到收到GroupCoordinatorResponse响应或异常完成,才从此方法返回。
-
检测检查RequestFuture对象的状态。如果出现RetriableException异常,则调用ConsumerNetworkClient.awaitMetadataUpdate()方法阻塞更新Metadata中记录的集群元数据后跳转到步骤1继续执行。如果不是RetriableException异常则直接报错。
-
如果成功找到GroupCoordinator节点,但是网络连接失败,则将其unsent中对应的请求清空,并将coordinator字段置为null,准备重新查找GroupCoordinator,退避一段时间后跳转到步骤1继续执行。
下面介绍处理GroupCoordinatorResponse的相关操作。通过对sendGroupCoordinatorRequest方法的分析我们知道,handleGroupMetadataResponse)方法是处理GroupCoordinatorResponse的入口,其步骤如下:
- 调用coordinatorUnknown()检测是否已经找到GroupCoordinator且成功连接。如果是则忽略此GroupCoordinatorResponse,因为在发送GroupCoordinatorRequest时并没有防止重发的机制,可能有多个GroupCoordinatorResponse;否则,继续下面的步骤。
- 解析GroupCoordinatorResponse得到服务端GroupCoordinator的信息。
- 构建Node对象赋值给coordinator字段,并尝试与GroupCoordinator建立连接。
- 启动HeartbeatTask定时任务。
- 最后,调用RequestFuture.complete()方法将正常收到GroupCoordinatorResponse的事件传播出去。
- 如果GroupCoordinatorResponse中的错误码不为NONE,则调用RequestFuture.raise方法将异常传播出去。最终由ensureCoordinatorReady方法中的步骤4处理。
第二阶段
在成功查找到对应的GroupCoordinator之后进入Join Group阶段。在此阶段,消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。先来了解JoinGroupRequest和JoinGroupResponse的消息体格式,如图所示。
了解了JoinGroupRequest和JoinGroupResponse的格式之后,再来分析第二阶段的相关处理流程,其入口函数是ensurePartitionAssignment方法。
ensurePartitionAssignment方法的流程如图所示。
-
调用SubscriptionState.partitionsAutoAssigned方法,检测Consumer的订阅是否是AUTO_TOPICS或AUTO_PATTERN。因为USER_ASSIGNED不需要进行Rebalance操作,而是由用户手动指定分区。
-
如果订阅模式是AUTO_PATTERN,则检查Metadata是否需要更新。
在前面提到过,在ConsumerCoordinator的构造函数中为Metadata添加了监听器。当Metadata更新时就会使用SubscriptionState中的正则表达式过滤Topic,并更改SubscriptionState中的订阅信息。同时,也会使用metadataSnapshot字段记录当前的Metadata的快照。这里要更新Metadata的原因,是为了防止因使用过期的Metadata进行Rebalance操作而导致多次连续的Rebalance操作。
-
调用ConsumerCoordinator.needRejoin()方法判断是要发送JoinGroupRequest加入ConsumerGroup,其实现是检测是否使用了AUTO_TOPICS或AUTO_PATTERN模式,检测rejoinNeeded和needsPartitionAssignment两个字段的值。
-
调用onJoinPrepare方法进行发送JoinGroupRequest请求之前的准备,做了三件事:一是如果开启了自动提交offset则进行同步提交offset,提交offset的内容后面会详细介绍,此步骤可能会阻塞线程;二是调用注册在SubscriptionState中的ConsumerRebalanceListener上的 回调方法;三是将SubscriptionState的needsPartitionAssignment字段设置为true并收缩groupSubscription集合。
-
再次调用needRejoin方法检测,之后调用ensureCoordinatorReady方法检测已经找到GroupCoordinator且与之建立了连接。
-
如果还有发往GroupCoordinator所在Node的请求,则阻塞等待这些请求全部发送完成并收到响应(即等待unsent及InFlightRequests的对应队列为空),然后返回步骤5继续执行,主要是为了避免重复发送JoinGroupRequest请求。
-
调用sendJoinGroupRequest方法创建JoinGroupRequest请求,并调用ConsumerNetworkClient.send方法将请求放入unsent中缓存,等待发送。
-
在步骤7返回的RequestFuture对象上添加RequestFutureListener。
-
调用ConsumerNetworkClient.poll方法发送JoinGroupRequest,这里会阻塞等待,直到收到JoinGroupResponse或出现异常。
-
检测RequestFuture.fail。如果出现RetriableException异常则进行重试,其他异常则报错。如果无异常,则整个第二阶段操作完成。
通过前面对JoinGroupRequest发送流程的分析,我们了解到JoinGrouResponse处理流程的入口是JoinGroupResponseHandler:handle()方法,其中还包括了SyncGroupRequest发送的操作。
JoinGrouResponse的处理流程如图所示。
- 解析JoinGroupResponse,获取GroupCoordinator分配的memberld、generation等信息,更新到本地。
- 消费者根据leaderld检测自己是不是Leader。如果是Leader则进入onJoinLeader方法,如果不是Leader则进入onJoinFollower方法。从上面的流程图也可以看出,onJoinFollower()方法的逻辑是onJoinLeader()方法的子集,下面主要分析onJoinLeader方法。
- Leader根据JoinGroupResponse的group_protocol字段指定的Parition分配策略,查找相应的PartitionAssignor对象。
- Leader将JoinGroupResponse的members字段进行反序列化,得到ConsumerGroup中全部消费者订阅的Topic。Leader会将这些Topic信息添加到其SubscriptionState.groupSubscription集合中。而Follower则只关心自己订阅的Topic信息。
- 第4步可能有新的Topic添加进来,所以更新Metadata信息。
- 待Metadata更新完成后,会在assignmentSnapshot字段中存储一个Metadata快照(即通过Metadata的Listener创建的快照)。
- 调用PartitionAssignor.assign()方法进行分区分配。
- 将分配结果序列化,保存到Map中返回,其中key是消费者的memberld,value是分配结果序列化后的ByteBuffer。
第三阶段
完成分区分配之后就进入了Synchronizing Group State 阶段,主要逻辑是向GroupCoordinator 发送 SyncGroupRequest 请求并处理 SyncGroupResponse 响应。
先来了解SyncGroupRequest 和 SyncGroupResponse 的消息体格式。
SyncGroupRequest 中各个字段的含义如表
SyncGroupResponse 中各个字段的含义如表
通过前面对onJoinLeader方法分析,我们知道发送 SyncGroupRequest 请求的逻辑紧接在分区分配操作之后,也是在 onJoinLeader方法中完成的。下面是其流程:
- 得到序列化后的分区分配结果后,Leader将其封装成 SyncGroupRequest,而Follower形成的SyncGroupRequest中这部分为空集合。
- 调用ConsumerNetworkClient.send方法将请求放入unsent集合中等待发送。
对SyncGroupResponse处理的入口是SyncGroupResponseHandler.handle方法。对于正常完成的情况,解析SyncGroupResponse,从中拿到分区分配结果并将其传递出去;对于出现异常情况,将rejoinNeeded设置为true,并针对不用的错误码进行不同的处理。
从SyncGroupResponse中得到的分区分配结果最终由ConsumerCoordinator.onJoinComplete()方法处理,调用此方法的是在第二阶段ensureActiveGroup方法的步骤8中添加的RequestFutureListener中调用。onJoinComplete()方法的流程如图所示。
- 在第二阶段Leader开始分配分区之前,Leader使用assignmentSnapshot字段记录了Metadata快照。此时在Leader中,将此快照与最新的Metadata快照进行对比。如果快照不一致则表示分区分配过程中出现了Topic增删或分区数量的变化,则将needsPartitionAssignment置为true,需重新进行分区分配。
- 反序列化拿到分配给当前消费者的分区,并添加到SubscriptionStata.assignment集合中,之后消费者会按照此集合指定的分区进行消费,将needsPartitionAssignment置为false。
- 调用PartitionAssignor的onAssignment()回调函数,默认是空实现。当用户自定义PartitionAssignor时,可以自定义此方法。
- 如果开启了自动提交offset的功能,则重新启动AutoCommitTask定时任务。
- 调用SubscriptionState中注册的ConsumerRebalanceListener。
- 将needsJoinPrepare重置为true,为下次Rebalance操作做准备。
- 重启HeartbeatTask定时任务,定时发送心跳。