Java程序员

Kafka——消息的发送流程

2020-08-02  本文已影响0人  挪威的senlin

前言

本文将介绍kafka的一条消息的发送流程,从消息的发送到服务端的存储。上文说到kafak分为客户端与服务端,要发送消息就设计到了网络通讯,kafka采用TCP协议进行客户端与服务端的通讯协议。

案例

消息发送

下面看一个简单的生产者案例

        Properties props = new Properties();
        props.put("bootstrap.servers", "broker1:9092,broker1:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++){
            producer.send(
                    new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
  1. 发送并忘记(send and forget):producer.send(),默认为异步发送,并不关心消息是否达到服务端,会存在消息丢失的问题。
  2. 同步:producer.send()返回一个Future对象,调用get()方法变回进行同步等待,就知道消息是否发送成功。
  3. 异步发送:如果消息都进行同步发送,要发送这次的消息需要等到上次的消息成功发送到服务端,这样整个消息发送的效率就很低了。kafka支持producer.send()传入一个回调函数,消息不管成功或者失败都会调用这个回调函数,这样就算是异步发送,我们也知道消息的发送情况,然后再回调函数中选择记录日志还是重试都取决于调用方。Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

指定分区

kafka会将一个topic划分成n个分区,那么在生产者发送消息的时候是怎么知道要发给哪个分区的呢。上面说过生产者会拿到整个集群的信息,所以生产者知道每个topic下面有一个分区,基于此可以有些常见的消息分区策略:

客户端

上面介绍了怎么发送消息和一些分区策略,接下来介绍生产者是如何发送消息的。


消息发送

上图是生产者消息发送的一个流程图。

  1. 创建一个消息,topic、value必填,key、partition可以选填;
  2. 调用send()方法,根据指定的key.serializer、value.serializer将key与value进行序列化,将对象序列化为字节数组,才能在网络上进行传输。
  3. 如果ProducerRecord指定了分区,分区器不会做任何事情,否则会按照指定的分区策略选择一个分区。
  4. 之前提到过,生产者发送消息是按照批次来发,所以消息选择好了分区之后,生产者并不会立即发送消息到服务端,而是消息添加到本地的消息批次里,这个批次的所有消息的topic和分区都相同。
  5. 生产者有个后台的Sender线程,负责将消息发送到服务端。服务端在接收到消息之后会返回一个响应,它包含主题和分区信息以及消息在分区中的偏移量。如果消息写入失败会返回一个错误。生产者接收到错误之后会根据配置的retries,尝试重复发送消息,如果超过retries配置的重试次数,还是发送失败,就会返回错误。

以上就是生产者发送消息的一个主要流程,当然还有很多细节没有涉及到,这里只是一个简易的主流程。

服务端

请求处理

客户端发送消息之后,服务端需要请求作出相应,kafka使用的是TCP作为服务端与客户端通讯协议。Kafak的请求的处理使用的是Reactor模式,关于Reactor模式我前面有文章进行介绍,这里不详细展开。简单来说Reactor模式利用底层的多路IO复用,采用事件驱动的形式,从传统阻塞IO,转为非阻塞的IO。Reactor常见的模型是一个mainReactor接受网络连接,然后由subReactor处理网络请求。Kafka也是这样设计的,服务端在它监听的端口上运行一个Acceptor线程,该线程会创建一个连接然后转交给Processor(网络线程)线程处理。

极客时间《Kafka核心技术与实战》

上图是服务端处理请求的流程图,并非只是处理发送消息的请求,获取元数据、读取消息也是这样的流程。这里就拿消息发送的请求举例。
由Acceptor接受请求,然后以轮询的方式均匀的将请求分给网络线程(数量可以通过num.network.threads配置),然后网络线程将请求放到一个共享请求队列中。Broker 端还有个 IO 线程池,负责从该队列中取出请求,执行真正的处理。如果是写请求,将会把消息写入本地磁盘。在Linux系统上,并不会立即写入磁盘,而是写入文件系统的缓存中,kafka并不会等待数据写入到磁盘中。
当 IO 线程处理完请求后,会将生成的响应发送到网络线程池的响应队列中,然后由对应的网络线程负责将 Response 返还给客户端。
Purgarory它是用来缓存延时请求,所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比一个生产消息当acks=all时,这时候请求就需要等待其他参与复制的副本响应成功之后,才能回应客户端写入成功的消息。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。

副本

这里再介绍一下kafka的副本机制。备份日志文件是分布式数据系统最基础的要素之一,实现方法也有很多种。最常见的做法就是,一个leader副本和若干个follower副本,有leader接受写请求,然后将日志同步给follower。正常情况下leader正常工作,没有任何的问题,如果leader崩溃就需要从follower副本中选择一个当下一个leader副本。此时就有一个问题存在,因为网络传输存在延迟,在follower副本中的数据并没有和leader副本保持一致。所以 我们必须确保我们leader的候选者们是一个数据同步最新的follower节点。
如果选择写入时候需要保证一定数量的副本写入成功,读取时需要保证读取一定数量的副本,读取和写入之间有重叠。这样的读写机制称为 Quorum。
常见的方式是过半策略,在写入的时候保证集群中过半节点写入成功,才算一次写操作成功,zookeeper、raft采用的就是这种策略。这样能够保证follower中有能与leader数据保持一致的几点。缺点在于,写操作的成本过大,需要保证过半的节点写入成功,而且容错的数量有限,如果5个几点的集群只能支持2个节点挂掉。
Kafka 不是用大多数投票选择 leader 。Kafka动态维护了一个同步状态的备份的集合 (a set of in-sync replicas), 简称 ISR ,在这个集合中的节点都是和 leader 保持高度一致的,只有这个集合的成员才有资格被选举为 leader,一条消息必须被这个集合所有节点读取并追加到日志中了,这条消息才能视为提交。
ISR中的副本首先eader副本是在其中的,如何判断follower节点是否在ISR中呢?
通过Broker 端参数 replica.lag.time.max.ms,这个参数的含义是Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,将会将该副本踢出ISR,如果后续该副本后面追上Leader的进度,也会被加入到ISR中。采用ISR的方式能够提高系统的可用性,容错的节点也变多了,只有ISR中有一个副本也能保证正常的处理请求。
当然如果ISR一个副本都没有了就不能继续对外提供服务了,kafka提供unclean leader 选举机制,有以下两种方式,在一致性与可用性之间做出选择。

关于kafka副本机制,这里就不说太多了,可以看出kafka在解决分布式的可用性、一致性问题做出的思考和解决方案。

总结

本文介绍了生成者如何发送一个请求和服务端如何处理一个请求,之前写过关于netty的文章。可以看出在处理网络请求的方式上,Reactor模式依然是主流的选择。
还有就是kafka在日志复制上的设计通过ISR提高系统的可用性的同时保证数据的一致性,与unclean leader机制在一致性与可用性的选择,当然kafka还有很多很多精妙的设计,就不一一探讨了,本文分析到此结束。

参考

《Kafka权威指南》
kafka官网
极客时间《Kafka核心技术与实战》

上一篇下一篇

猜你喜欢

热点阅读