卡夫卡数据消费

消费者负责从订阅的话题中拉取消息,消费者群体是逻辑概念。一个消费者只属于一个消费者组,一个消费者组包括一个或多个消费者。当消息发布到主题时,它将被传递到每个消费者组,但每个消费者组中只有一个消费者可以消费该消息。

消费者如何知道消费哪个分区?当消费群体中的消费者数量发生变化时,分区分配如何变化?

用消费者总数和分区总数相除得到一个跨度,然后按照跨度平均分配分区,保证分区尽可能平均地分配给所有消费者。对于每个主题,该策略将根据名称的字典顺序对消费者组中的所有消费者进行排序,然后为每个消费者划分一个固定的分区范围。如果它不是均匀分布的,具有最高字典顺序的消费者将被分配一个额外的分区。

假设n=分区数/消费者数,m=分区数%消费者数,那么前m个消费者每个分配n+1个分区,后面的每个消费者分配n个分区。

如图,主题中有7个分区。此时,消费者组中只有一个消费者C0,C0订阅了7个分区。

随着消费者在消费群体中的参与度越来越高,分区逐渐从C0分配到C1~C6。当最后一个消费者C7加入时,总是有8个消费者但只有7个分区,所以C7不能消费任何消息,因为它不能分配分区。

消费者越多越好。消费者的数量应该小于或等于分区的数量,否则会浪费资源。

缺点:

当一个消费群订阅两个主题,有四个分区时,分区分配结果如下,比较统一。

但是,当两个题目各有三个分区时,就会出现以下问题。如果类似情况扩大,可能会出现消费者超载的问题。

消费者组中的所有消费者和消费者订阅的所有主题的分区按照字典顺序排序,然后通过轮询将分区依次分配给每个消费者。如果消费群内消费者的订阅信息相同,分区分配会更加均匀。比如一个消费群中的两个消费者用3订阅了两个分区的主题,如图。

但当消费群体中消费者的订阅信息不同时,就会出现分布不均的问题。如图,假设消费群有三个消费者,主题1/2/3分别有1/2/3个分区。C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,分区结果如下。

这个策略后来介绍了,主要目的:

假设三个消费者订阅了四个主题,每个主题有两个分区,那么初始分区分配结果如下:

乍一看,结果与RoundRobin分配策略相同,但此时,如果C1脱机,消费者组将执行重新平衡操作,重新分配消息分区。如果是循环策略,分配结果如下:

如果是粘性分配策略,结果如下:

StickyAssignor保留C0和C2最后的分配结果,将C1的分区分配给C0和C2,使其平衡。

如果发生分区重分配,对于同一个分区,有可能前一个消费者和新指定的消费者并不相同,前一个消费者的处理会在新指定的消费者中再次重复,造成重复消费。StickyAssignor分配策略正如其名“Sticky”一样,使分配策略具有“粘性”,使前后分配尽可能相同,从而减少系统资源的损失和其他异常情况。

我们来看看消费者的订阅信息不一样的情况,以RoundRobinAssignor中的例子为例。

假设消费群有三个消费者,主题1/2/3分别有1/2/3个分区。C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,RoundRobinAssignor的划分结果将为

当采用StickyAssignor时,分区分配结果如下:

如果C0此时脱机,则RoundRobinAssignor重新分配的结果如下:

StickyAssignor重新分配的结果如下:

总而言之:

StickyAssignor分配策略的优点是,它可以使分区重新分配变得“粘性”,减少不必要的分区移动(一个分区被剥离后再分配给另一个新的消费者之前的一个消费者)。

卡夫卡的消息消费是基于拉模式的。

卡夫卡一次拉一组消息,每条消息的格式如下:

每次提取方法时,都会返回一个尚未使用的消息集。要实现这个功能,需要知道上次消费时的消费位移,消费者在消费完消息后要提交消费位移,消费位移要持久化,消费位移保存在__consumer_offsets主题中。

当前pull消息的最大偏移量为x,消费者的消费完成提交位移的偏移量实际为x+1,表示下一条pull消息的起始位置。

自动提交

默认采用自动提交,默认每5s提交一次每个分区的最大消息位移。真正的提交动作是在拉消息的逻辑中完成的。每次拉消息之前,都会判断位移是否可以提交,如果可以,就提交最后一次位移。这里会出现两个问题,如下图所示。

重复消耗:目前拉消息X+2和X+7,目前消耗达到X+5。在提交消费位移之前,消费者是向下的;新消费者还是会从X+2拉消息,导致重复消费。

消息丢失:目前拉消息X+2和X+7,目前消耗X+5。到下一次拉取时,消耗位移已经提交为X+8。如果此时消费者宕机,新的消费者会从X+8开始消费,导致X+5到X+7的消息没有被消费,造成消息丢失。

手动提交

同步提交和异步提交。

同步提交默认为这个拉分区消息的最大偏移量,比如X+2和X+7的这个拉消息,同步提交默认提交位置X+8;当时同步提交还可以指定提交的偏移量,比如一个提交被消耗1次,因为提交本身就是同步操作,所以会消耗一定的性能。

同步提交还会导致重复消费的问题,比如消费完成后提交前消费者停机。

异步提交消费线程不会被阻塞,会增强性能,但是异步提交失败的重试可能会导致提交位移被覆盖的问题,比如这次异步提交offset=X失败,下一次异步提交offset=X+y成功;此时,将重试之前的提交,并将再次提交offset=x。如果业务中没有重试检查,冲抵会被覆盖,最终导致重复消费。

当建立了新的消费群体,消费者订阅了新的主题,或者之前提交的位移信息因过期被删除,此时无法找到记录的消费位移。Kafka可以配置为从最晚或最早开始消费。

卡夫卡也从特定的位移支持消费,可以实现回溯消费。卡夫卡内部提供Seek()方法重置消费位移。

当需要回溯指定时间后的消息时,可以先用offsetsForTimes方法找到指定时间后第一条消息的位移,然后用seek重新设置位移。

分区的所有权从一个消费者转移到另一个消费者,保证了消费者群体的高可用性和可扩展性,使我们能够方便、安全地删除或添加消费者。

Kfaka提供了一个GroupCoordinator和一个ConsumerCoordinator。前者负责管理消费群体,后者负责与前者互动。两者最重要的职责是负责重新平衡运营。

比如消费者加入一个消费群,消费者、消费群、群协调员一般要经历以下几个阶段。

第一阶段(找到协调人)

消费者需要确定他们所属的消费者组对应的GroupCoordinator所在的代理,并创建一个网络连接与代理进行通信。

消费者将向集群中的一个节点发送FindCoordinatorRequest,以找到相应的组协调器。

Kafka根据请求中coordinator_key(即groupld)的哈希值计算__consumer_offsets中的分区号,如下图所示。找到对应的分区后,寻找该分区的leader副本所在的broker节点,也就是当前消费组所在的group coordinator节点。

消费者组的最终分区分配方案和组内消费者提交的消费置换信息将被发送到代理节点。broker节点不仅起到GroupCoordinato的作用,还起到保存分区分配方案和组内消费者置换的作用,可以节省大量中间轮换带来的不必要开销。

第二阶段(加入团体)

成功找到消费群对应的GroupCoordinator后,消费者将进入加入消费群的阶段。在这个阶段,消费者将向GroupCoordinator发送JoinGroupRequest请求,并处理响应。

以下事情主要在小组协调员内部完成:

* * * *选举消费组组长。

如果当前组没有组长,第一个加入消费组的就是组长。如果leader挂断,组协调器将从内部维护的HashMap(消费者信息,键是member_id)中选择第一个键作为新的leader。

选举分区分配策略

如上所述,每个消费者可以报告多个分区分配策略,选择过程如下:

第三阶段(同步组)

领导者消费者根据第二阶段获得的分区分配策略实施分区分配,然后将分配结果同步到组协调器。每个消费者将向组协调器发送SyncGroupRequest请求,以同步分配方案。

请求结构如图,领导发的请求会有group_assignment。

包含了每个消费者对应的具体分配方案,其中member_id表示消费者的唯一标识,member_assignment为消费者对应的分配方案,如图所示。

消费者收到具体的分区分配方案后,会启动心跳任务,定期向组协调器发送心跳请求,确保自己在线。

第四阶段(心跳)

在正式消费之前,消费者还需要确定拉消息的起始位置。假设之前已经成功提交了最后一次消费位移,消费者会请求获取最后一次提交的消费位移,从这里继续消费。

心跳线程是一个独立的线程,可以在轮询消息的间隙发送。如果消费者停止发送心跳足够长的时间,组协调器将认为消费者已经死亡,并触发重新平衡行为。