转载部分

Kafka内部原理(二)

2021-03-10  本文已影响0人  zzj0990

单点内部机制

Kafka内部运行机制.png

ACK机制

Kafka的ack机制,指的是producer的消息发送确认机制,这直接影响到Kafka集群的吞吐量和消息可靠性。而吞吐量和可靠性就像硬币的两面,两者不可兼得,只能平衡。


ACKS.png

分区可靠性机制

当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:

如果要提高数据的可靠性,在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数(可以在broker或者topic层面进行设置)的配合,这样才能发挥最大的功效。min.insync.replicas这个参数设定ISR中的最小副本数是多少,默认值为1,当且仅当request.required.acks参数设置为-1时,此参数才生效。如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。

  1. ack=-1的场景


    ack=-1的场景_1.png
ack=-1的场景演示_2.png

同步(Kafka默认为同步,即producer.type=sync)的发送模式,replication.factor>=2且min.insync.replicas>=2的情况下,不会丢失数据。
有两种典型情况。acks=-1的情况下(如无特殊说明,以下acks都表示为参数request.required.acks),数据发送到leader, ISR的follower全部完成数据同步后,leader此时挂掉,那么会选举出新的leader,数据不会丢失。

ack=-1场景场景演示_3.png

acks=-1的情况下,数据发送到leader后 ,部分ISR的副本同步,leader此时挂掉。比如follower1和follower2都有可能变成新的leader, producer端会得到返回异常,producer端会重新发送数据,数据可能会重复。这需要HW协同处理,详见HW小节
当然上图中如果在leader crash的时候,follower2还没有同步到任何数据,而且follower2被选举为新的leader的话,这样消息就不会重复。

  1. ack=1的场景


    ack=1的场景_1.png

注:Kafka只处理fail/recover问题,不处理Byzantine(拜占庭)问题。

HW探讨

考虑上图ack=-1场景场景演示_3.png(即acks=-1,部分ISR副本同步)中的另一种情况,如果在Leader挂掉的时候,follower1同步了消息4,5,follower2同步了消息4,与此同时follower2被选举为leader,那么此时follower1中的多出的消息5该做如何处理呢?

这里就需要HW的协同配合了。如前所述,一个partition中的ISR列表中,leader的HW是所有ISR列表里副本中最小的那个的LEO。类似于木桶原理,水位取决于最低那块短板。


HW.png

如上图,某个topic的某partition有三个副本,分别为A、B、C。A作为leader肯定是LEO最高,B紧随其后,C机器由于配置比较低,网络比较差,故而同步最慢。这个时候A机器宕机,这时候如果B成为leader,假如没有HW,在A重新恢复之后会做同步(makeFollower)操作,在宕机时log文件之后直接做追加操作,而假如B的LEO已经达到了A的LEO,会产生数据不一致的情况,所以使用HW来避免这种情况。A在做同步操作的时候,先将log文件截断到之前自己的HW的位置,即3,之后再从B中拉取消息进行同步。

如果失败的follower恢复过来,它首先将自己的log文件截断到上次checkpointed时刻的HW的位置,之后再从leader中同步消息。leader挂掉会重新选举,新的leader会发送“指令”让其余的follower截断至自身的HW的位置然后再拉取新的消息。
当ISR中的个副本的LEO不一致时,如果此时leader挂掉,选举新的leader时并不是按照LEO的高低进行选举,而是按照ISR中的顺序选举。至于选举就自行度娘吧

消息传输机制

接下来讨论的是Kafka如何确保消息在producer和consumer之间传输。有以下三种可能的传输保障(delivery guarantee):

Kafka的消息传输保障机制非常直观。当producer向broker发送消息时,一旦这条消息被commit,由于副本机制(replication)的存在,它就不会丢失。但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经提交(commit)。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以retry多次,确保消息已经正确传输到broker中,所以目前Kafka实现的是at least once。

consumer从broker中读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset。该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然也可以将consumer设置为autocommit,即consumer一旦读取到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了exactly once, 但是如果由于前面producer与broker之间的某种原因导致消息的重复,那么这里就是at least once。

考虑这样一种情况,当consumer读完消息之后先commit再处理消息,在这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于at most once了。

读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了,这就对应于at least once。

要做到exactly once就需要引入消息去重机制。

消息去重

如上一节所述,Kafka在producer端和consumer端都会出现消息的重复,这就需要去重处理。

Kafka文档中提及GUID(Globally Unique Identifier)的概念,通过客户端生成算法得到每个消息的unique id,同时可映射至broker上存储的地址,即通过GUID便可查询提取消息内容,也便于发送方的幂等性保证,需要在broker上提供此去重处理模块,目前版本尚不支持。

针对GUID, 如果从客户端的角度去重,那么需要引入集中式缓存,必然会增加依赖复杂度,另外缓存的大小难以界定。

不只是Kafka, 类似RabbitMQ以及RocketMQ这类商业级中间件也只保障at least once, 且也无法从自身去进行消息去重。所以我们建议业务方根据自身的业务特点进行去重,比如业务消息本身具备幂等性,或者借助Redis等其他产品进行去重处理。

一致性
1,强一致性,所有节点必须全部存活,一致性破坏可用性
2-1,最终一致性,[网络到分布式],过半通过,最常用的分布一致性解决方案
2-2,
ISR(in-sync replicas),连通性&活跃性
OSR(outof-sync replicas),超过阈值时间(10秒),没有"心跳"
AR(Assigned replicas),面向分区的副本集合,创建topic的时候给出分区的副本数,那么controller在创建的时候就已经分配了broker和分区的对应关系,并得到了该分区的broker集合
AR=ISR+OSR

总结

———————————————————
坐标帝都,白天上班族,晚上是知识的分享者
如果读完觉得有收获的话,欢迎点赞加关注

上一篇 下一篇

猜你喜欢

热点阅读