Kafka Producer详解(二)

2018-06-18  本文已影响0人  疯狂的哈丘

一、Producer介绍

在kakfa中,生产者采用push的方式想kafka集群提交数据。kakfa官方提供了一个producer的api,方便我们调用代码向集群发送消息。
Producer所需要的maven依赖如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>

导入maven依赖后,我们就可以编写程序想kafka集群发送数据了:

        //定义producer的一些参数
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //构造producer对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(
                configs, new StringSerializer(), new StringSerializer());
        //构造一个ProducerRecord对象,然后将它发送出去
        producer.send(new ProducerRecord<String, String>("test", null, "hello meitu"));
        //发送完消息后记得关闭producer
        producer.close();

在上面的代码中,我们先定义了一个configs的键值对,用来设置一下参数。这里我们设置了两个参数,ProducerConfig.CLIENT_ID_CONFIG表示发送producer的一个辨识id,ProducerConfig.BOOTSTRAP_SERVERS_CONFIG表示kafka集群的一个地址和端口。其他的配置参数下面会做一个更详细的介绍。

之后,我们构造了一个KafkaProducer对象,上面的配置会作用于这个producer对象,同时我们还设置了key的序列化器和value的序列化器。

producer对象构造完后,我们就可以直接调用send()方法发送消息了。发送的时候会构造一个ProducerRecord,并传入消息所属的topic,key,value信息。发送完消息后,如果没有用producer对象了,记得关闭producer。

至此,一个完整的生产者发送流程代码就编写完成了。

同步和异步的方式调用send()

二、Producer工作原理介绍

其实当我们调用producer.send()方法时,并不一定马上就将消息发送出去,而是将消息暂时缓存起来,累积到一定大小或间隔一定时长后再批量发送,从而来提高整体的吞吐率。

下面是producer发送消息时一个大体的流程:

[图片上传失败...(image-ab70b8-1533047290680)]
kafka producer发送消息时,一个大体的流程就是经过序列化器、分区器后写入指定的批次,当批次满后,或者达到指定时间后将消息发送到kafka broker中。

  1. 获取元数据:producer在发送数据前,需要获取消息topic的相关数据,比如topic的分区数量,有哪些分区,以及获取kafka集群的一个整体的情况等。producer在获取完元信息后,会将这些信息缓存起来,并且每隔一段时间更新一次。
  2. 序列化器:由于数据在网络中只能以字节流的方式传输,所以用户必须指定序列化器来将消息的key和value序列化成字节。如果没有指定序列化器,程序就会在初始化producer的时候报错。序列化器可以在配置中指定key.serializervalue.serializer的值,也可以在Producer的构造方法中直接指定。注意,序列化器必须org.apache.kafka.common.serialization.Serializer的子类。kafka目前已经提供了许多序列化器,足够满足大多数使用场景。
  3. 分区器:如果用户在构造record的时候没有指定parition的值,那么这条消息的分区就需要通过分区器来指定。当用户没有指定分区器的时候,kafka提供了一个默认的分区器。
    当record的key不为null的时候,kafka使用自己实现的一个散列算法来对消息进行分区。因此,如果partiton的数量发生变成,可能会出现同一个key的消息却分配了不同partition的问题,这点需要注意。
    当record的key为null的时候,kafka采用round robin的算法,将消息随机的分布到各个可用的partiton上面。
  4. 消息批次管理:当消息确定分区后,相同topic相同分区的消息会慢慢累积起来等待发送,这些消息的集合就叫做批次。当批次的大小达到上限了,producer会开启一个新的批次,同时会通知后台的sender线程准备发送数据。批次的大小可以通过batch.size来指定。

关于Sender线程

在我们构造一个KafkaProducer对象时,kafka会在后台启动一个sender线程,这个线程用来更新元数据的信息以及发送消息批次。
所以,在kafka producer中,真正的执行者其实是sender线程。在我们执行send()时,如果需要元数据,就需要设置一下状态,同时通知sender线程去获取最新的元数据信息,然后执行send()的线程才能获取到元数据。
接着,执行的send()方法的线程将消息经过序列化、分区后,加入到指定的消息批次中,然后判断是否可以发送消息了,如果满足消息发送条件了,就通知sender线程发送消息。最后,由sender线程完成消息的发送。

三、Producer配置参数

下面列出一些比较重要的参数介绍:

参数名称 参数描述 默认值
key.serializer key的序列化器
value.serializer value的序列化器
acks producer发送消息的确认策略。只能填[-1,0,1,all]中其中的一个。如果值为0时表示消息发送出去就表示成功,这是retries就没有作用了,因为即使消息到broker那边失败了producer也感知不到,这样可以保证消息至多推送一次。值为1表示partition的leader必须成功接收并且持久化到磁盘才返回成功。值为all或者-1的时候,不仅要写入到leader中,要必须保证写入到所有follows。这样的设置保证了消息至少推送一次 1
bootstrap.servers 用于建立和kafka集群的连接,然后发现整个集群的各个节点。必须符合格式host1:port1,host2:port2,...
buffer.memory producer允许使用的缓存字节数。如果申请的缓存大小超过整个值了,在执行send()的时候就会阻塞,直到有一定的缓存空间被释放出来。如果阻塞的时间超过了max.block.ms,就会抛出异常 33554432
compression.type 压缩类型。目前支持none, gzip, snappy, lz4 none
retries 消息发送失败后重试的次数 0
batch.size 一个批次允许的最大字节数。kafka在消息累积到一个批次的时候才会发送消息出去。如果这个值设置的太小会导致消息的发送过于频繁,降低吞吐量。如果设置的比较大,会浪费内存空间。所以需要均衡考虑 16384
connections.max.idle.ms 空闲的连接允许保持多长时间,超过这个时间的空闲连接将会被关闭 540000
client.id producer的id
linger.ms 该参数指定了在发送消息之前等待更多消息加入批次的时间。producer会在批次填满或者达到这个时间后把消息发送出去。如果把这个值设置为比0更大的数的话,当某个批次只有一条消息时,会等待一定时间后才把消息发送出去。这样虽然会增加延迟,但是提高了吞吐率。 0
max.block.ms 最长阻塞时间。当producer执行send()方法和等待获取元数据的时候,或者当缓存满了的时候,线程会进入阻塞。这时候如果阻塞超过指定的时间就会抛出异常 60000
max.request.size 每个请求允许发送的最大字节数。这个值限定了一次允许发送的批次数量。需要注意的是,broker那边也有同样的设置,也会限制一个请求所能发送的批量数量限制。 1048576
partitioner.class 指定producer的分区器 org.apache.kafka.clients.producer.internals.DefaultPartitioner
max.in.flight.requests.per.connection 指定了在收到broker的确认答复之前,还可以发送几条请求。值得注意的是,如果设置的值大于1,那么可能会因为重试导致消息的顺序和发送的顺序不一致。 5
metadata.max.age.ms 元数据更新的频率 300000

上面列的这些参数都是一些比较比较常用的参数。更具体的参数列表可以去kafka的官网观看https://kafka.apache.org/documentation/#producerconfigs

四、源码解析

下面的源码分析均以0.9.0版本为准。
先看一下KafkaProducer的一些参数以及部分构造函数

    //producer的id
    private String clientId;
    //分区器
    private final Partitioner partitioner;
    //单个请求的最大字节数
    private final int maxRequestSize;
    //允许申请的缓存最大值
    private final long totalMemorySize;
    //元数据相关类
    private final Metadata metadata;
    //消息的累计器,用来维护批次队列并往批次添加消息
    private final RecordAccumulator accumulator;
    //一个Runable对象,启动后会自旋不断的检查是否需要发送消息或者获取元数据
    private final Sender sender;
    //在构造producer对象的时候会启动这个线程,实际上就是启动Sender
    private final Thread ioThread;
    //压缩类型
    private final CompressionType compressionType;
    //key和value的序列化器
    private final Serializer<K> keySerializer;
    private final Serializer<V> valueSerializer;
    //producer的一些配置
    private final ProducerConfig producerConfig;
    //最长阻塞时间
    private final long maxBlockTimeMs;
    //请求超时时间
    private final int requestTimeoutMs;

    //部分构造函数
    private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        try {
            ...
            //构造一个消息累积器
            this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.totalMemorySize,
                    this.compressionType,
                    config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                    retryBackoffMs,
                    metrics,
                    time,
                    metricTags);
            //获取连接集群的节点,并更新meta的集群相关信息
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
            this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
            //构造网路通信组件
            NetworkClient client = new NetworkClient(
                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder),
                    this.metadata,
                    clientId,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                    this.requestTimeoutMs, time);
            //构造sender线程
            this.sender = new Sender(client,
                    this.metadata,
                    this.accumulator,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    config.getInt(ProducerConfig.RETRIES_CONFIG),
                    this.metrics,
                    new SystemTime(),
                    clientId,
                    this.requestTimeoutMs);
            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
            //构造守护线程,然后开始执行
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();

            this.errors = this.metrics.sensor("errors");

            if (keySerializer == null) {
                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                        Serializer.class);
                this.keySerializer.configure(config.originals(), true);
            } else {
                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = keySerializer;
            }
            if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                        Serializer.class);
                this.valueSerializer.configure(config.originals(), false);
            } else {
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = valueSerializer;
            }
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed
            // this is to prevent resource leak. see KAFKA-2121
            close(0, TimeUnit.MILLISECONDS, true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    }

KafkaProducer的构造函数看起来也比较简单。主要就是根据配置参数构造一些组件出来使用。比如RecordAccumulator ,NetworkClient,Sender等,然后启动一个守护线程,在后台执行,不断自旋,根据各个设置的条件来判断是否要发送消息以及更新集群的元数据。

Producer的send()实现

//不带异步的send方法
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return send(record, null);
}

//带异步回调的send方法
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        try {
            // 保证目标topic的元数据是可用的,如果不可用,就会去更新元数据,拉取该topic的最新数据然后缓存起来
            long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
            //减去在获取topic元数据上消耗的时间后,还剩下多少阻塞时长
            long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
            //下面通过序列化器序列化key和value
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer");
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer");
            }
            //对消息进行分区,获取分区的编号
            int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
            //对消息的大小进行评估
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
            //保证消息的大小不能太大。如果超过了maxRequestSize和totalMemorySize就会报错
            ensureValidRecordSize(serializedSize);
            TopicPartition tp = new TopicPartition(record.topic(), partition);
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            //将消息添加到批次中
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);
            //判断批次是否填充满了,或者是否有新的批次要产生,如果是的话就要准备通知sender线程发送数据了
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
        } catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            return new FutureFailure(e);
        } catch (InterruptedException e) {
            this.errors.record();
            throw new InterruptException(e);
        } catch (BufferExhaustedException e) {
            this.errors.record();
            this.metrics.sensor("buffer-exhausted-records").record();
            throw e;
        } catch (KafkaException e) {
            this.errors.record();
            throw e;
        }
}

send()函数主要包括了几个步骤:

  1. 保证topic的元数据是可用的。如果缓存中没有topic的元数据,就需要去获取该topic的元数据
  2. 序列化key和value,将消息转成字节,在网络上传输
  3. 对消息进行分区,获取消息所属的分区编号
  4. 将消息放入批次中,如果批次满了或者batch 的剩余空间不足以添加下一条 Record,就通知sender发送消息批次。

更新元数据

private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
        // 如果topic没有在metadata中,就需要将它加入进去
        if (!this.metadata.containsTopic(topic))
            this.metadata.add(topic);
        //如果从缓存中获取到该topic的信息了,就直接返回
        if (metadata.fetch().partitionsForTopic(topic) != null)
            return 0;

        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        //自旋直到获取到topic的信息
        while (metadata.fetch().partitionsForTopic(topic) == null) {
            log.trace("Requesting metadata update for topic {}.", topic);
            //设置更新状态
            int version = metadata.requestUpdate();
            //通知sender线程更新元数据
            sender.wakeup();
            //阻塞等待元数据更新,最长只能阻塞remainingWaitMs毫秒
            metadata.awaitUpdate(version, remainingWaitMs);
            long elapsed = time.milliseconds() - begin;
            //判断等待的时间是否超过了最长允许阻塞的时间,如果是的话就要抛出超时异常
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            if (metadata.fetch().unauthorizedTopics().contains(topic))
                throw new TopicAuthorizationException(topic);
            remainingWaitMs = maxWaitMs - elapsed;
        }
        return time.milliseconds() - begin;
}

这个方法做的事情就是先判断该topic是的元数据是否能在缓存中找到,找不到的话就设置一下需要更新的标志位,然后通知sender线程去更新元数据,自己去不断检查该topic的数据是否已经拉回来了。最终的结果是要么阻塞超时,要么拉取到topic的元数据信息。

序列化器

序列化器负责将消息的key和value转成字节流,在网络中传输。kafka目前内置了一些序列化器,足以满足大多数使用场景。


序列化器列表

如果上面的这些序列化器不满足自己的需求,我们可以自己编写一个类实现org.apache.kafka.common.serialization.Serializer来作为序列化器。

消息分区

private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        //如果消息指定了分区号,就不用专门去计算了。只要检测分区号是否合法就行
        if (partition != null) {
            //获取该topic的分区情况
            List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
            int numPartitions = partitions.size();
            //如果分区号不合法,就会抛出异常
            if (partition < 0 || partition >= numPartitions)
                throw new IllegalArgumentException("Invalid partition given with record: " + partition
                                                   + " is not in the range [0..."
                                                   + numPartitions
                                                   + "].");
            return partition;
        }
        //使用分区器获取一个分区号
        return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,
            cluster);
}

//DefaultPartitioner.java
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            //每次对消息分区就+1,保证消息均匀分散在各个分区
            int nextValue = counter.getAndIncrement();
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            //如果可用分区数量大于0,就返回nextValue对应的partition
            if (availablePartitions.size() > 0) {
                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // 如果没有可用的partition,就返回一个不可用的分区号
                //DefaultPartitioner.toPositive(nextValue)会返回nextValue的绝对值
                return DefaultPartitioner.toPositive(nextValue) % numPartitions;
            }
        } else {
            // 如果key部位null,采用散列的方式获取分区号
            return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
}

从上面的代码可以看出,默认的分区器实现其实很简单。如果消息没有指定key,就采用round robin的方式获取分区,其实就是设定一个值,然后不断+1对分区数量取模的过程。如果指定了key,就直接对key计算散列值算出分区号。

消息批次管理

//RecordAccumulator.java
public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {
        appendsInProgress.incrementAndGet();
        try {
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            //寻找改topic-partition的双向队列,如果没找到,就创建一个
            Deque<RecordBatch> dq = dequeFor(tp);
            //需要加锁操作
            synchronized (dq) {
                //拿到队尾的那个消息批次
                RecordBatch last = dq.peekLast();
                if (last != null) {
                    //尝试往批次添加消息
                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
                    //如果future不为null,说明消息已经插入到批次中,就可以直接返回了,这时候如果双向队列中的元素超过一个或者批次满了,就要通知sender线程推送数据了
                    if (future != null)
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                }
            }

            // 如果前面没有获取到批次或者消息没办法加到批次中了,就要开始尝试申请新的消息批次
            //
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            //为消息申请内存空间,空间大小以消息和批次配置大小最大的那个为准
            //调用free.allocate()方法时,会涉及到buffer.size这个配置,如果目前剩余的空间不足,线程就会阻塞,等待内存空间被释放出来。或者等待超过maxTimeToBlock后抛出异常
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                //需要判断producer是否关闭
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                //再次尝试从双向队列获取批次,然后往批次添加消息。这是读取最新的双向队列中的状态,因为其他线程可能也会生成新的批次然后加入到队列中
                RecordBatch last = dq.peekLast();
                if (last != null) {
                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
                    if (future != null) {
                        //如果添加到批次成功了,刚才申请到的内存就没用了,记得释放
                        free.deallocate(buffer);
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                    }
                }
                //如果前面还是没添加消息到批次中,就用刚才申请到的空间构造一个新的消息批次
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                //这时必须保证可以添加成功
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));
                //把新生成的批次添加到队列尾部
                dq.addLast(batch);
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }
        } finally {
            appendsInProgress.decrementAndGet();
        }
}

当要把消息添加到批次中时,会经过下面这些流程

  1. 先找出目标topic对应partition的双向队列,没有的话就创建一个。所以每一个partition都有一个属于自己的批次队列Deque<RecordBatch>
  2. 从队列尾部取出一个批次,如果取到了,就尝试把消息添加到这个批次中,如果添加成功了,就立即返回。返回的时候会将目前批次的状态也携带着返回,调用者就可以知道批次是否满了或者队列中已经不止一个批次了,就可以准备发送数据了。
  3. 如果队列中没有一个批次,或者消息添加到批次失败了。就要去重新申请内存,这个内存的空间取决于当前消息大小和batch.size的最大值。如果要申请的空间+已经使用的空间超过了buffer.size的值,线程就会阻塞,等到空间被释放或者达到maxTimeToBlock时间后抛出异常。
  4. 由于在前面申请空间的时候可能有其他线程改变了双向队列的状态(比如有线程新生成了一个批次呢),所以要再去检查一下是否可以从队列中获取到批次并将消息添加进去。如果这时候添加成功了就立即返回
  5. 用申请到的内存空间新建一个批次出来,然后把消息添加到批次。再把批次加到队列尾部,然后返回当前的状态。

sender发送消息

当sender线程启动后,就开始执行run方法。

public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // 不断自旋执行下面的run方法
        while (running) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        if (forceClose) {
            this.accumulator.abortIncompleteBatches();
        }
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
}

void run(long now) {
        Cluster cluster = metadata.fetch();
        //获取所有可以被发送出去的消息批次
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        //如果要发送的消息批次中有partition的leader是未知的(也就是不在缓存中)
        //就需要请求去获取最新的元数据缓存起来。更新元数据时会往负载最低的那个节点发送更新请求
        if (!result.unknownLeaderTopics.isEmpty()) {
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }

        //遍历所有准备发送的节点,如果有节点不是存活的,也就是没有连接上该节点,就暂时移除,此次不发送消息批次给这个节点
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

       //获取所有已连接节点的待准备发送的批次数据
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }
        //将在消息批次队列中滞留太久的消息批次移除出去。这个过期时间和requestTimeout有关
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);

        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            pollTimeout = 0;
        }
        //准备发送这些消息批次
        //这里并不是真的发送,而是设置一下标志位,然后在后面的poll()方法中才是真正的发送出去
        sendProduceRequests(batches, now);
        //调用selector.select()检查nio的事件,可能做的事情有
        //读取服务端返回的响应信息,发送前面待准备发送的消息批次信息,更新元数据(看标志位是否被设置)等
        this.client.poll(pollTimeout, now);
    }

在sender线程的run方法里面,大概会做这些事情

  1. 先找出哪些消息批次可以被发送出去了,要发送往哪些节点
  2. 之后判断在要送的这些消息批次中,是否有消息的partition的leader获取不到的情况,如果有,就需要更新元数据来获取该partition的leader
  3. 判断要发送的那些节点是否都保持着连接,没连接上的那些节点要先移除掉
  4. 从accumulator中把那些可以发送的节点的消息批次都拿出来准备发送
  5. 遍历所有还在队列中待着的消息批次,判断他们是否过期了。过期的判断和request.timeout.ms配置有关系。
  6. 之后就是将要发送的消息批次封装成请求对象,然后设置一下发送的标志位。这里不会真正的发送,其实只是用相关nio的channel在selector上注册OP_WRITE事件,然后设置一下要发送的数据作为attach,之后poll()的时候就会把这些数据拿出来发送出去。
  7. 最后的poll()方法中,会先尝试更新元数据,是否要更新元数据是根据元数据更新频率和更新标志位是否被设置来决定的。然后发送请求,将步骤6中的那些封装的消息批次发送出去。这里也有可能读取服务端返回的响应,然后进行相应的处理,比如失败重试或者回调函数等等。
上一篇下一篇

猜你喜欢

热点阅读