Kafka程序员文字欲

无镜--kafka之生产者(一)

2018-07-27  本文已影响19人  绍圣

学习kafka有一段时间了。关于它里面的知识还是需要总结一下,一来是能让自己对kafka能有一个比较成型的理解,二来是对知识的一种回顾。多总结加深理解。

前言

kafka作为应用最为广泛的消息中间件,其内部各个的组件是怎么来协调工作的?其内部的设计思想是怎么样的?这些都很值得我们去细细的分析和研究。此处一系列的分析以kafka-0.10.1.0版本为基础进行解读。

由于新版本的kafka的C端是采用java实现的(S端是由Scala ),所以对于java开发人员很好上手。在使用kafka的时候,最开始接触的就是它的生产者和消费者客户端。并且kafka客户端对外提供的接口非常简洁,使用起来简单方便。当然这些得益于kafka在幕后为我们做了很多工作。

生产者(KafkaProducer)

使用KafkaProducer的伪代码:

KafkaProducer producer = new KafkaProducer(Map);

ProducerRecordrecord = new ProducerRecord(Topic, Key, Value); 

Future fu = producer.send(record);

producer.flush();

RecordMetadata rm = fu.get(1, TimeUnit.MINUTES);

可以看出,生产者的API确实非常简单。以上的伪代码是同步发送消息,kafka的生产者还提供异步提交消息的方法(伪代码):

KafkaProducer producer = new KafkaProducer(Map);

ProducerRecord record = new ProducerRecord(Topic, Key, Value);

producer.send(record, new Callback(){

@Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { System.out.println(e.getMessage()); } if(metadata != null) { System.out.println("message send to partition " + metadata.partition() + ",topic:" + metadata.topic() + ", offset: " + metadata.offset()); } }

});

KafkaProducer对象代表一个客户端进程,KafkaProducer.send()方法发送到服务端的消息,并不是直接发送到服务端,而是KafkaProducer把消息存放到内存队列中。再由一个消息发送线程从队列中拉取出消息,以批量的方式发送给服务端。kafka中的记录收集器(RecordAccumulator)负责缓存生产者产生的消息。发送线程(Sender)负责读取记录收集器(RecordAccumulator)的批量消息,通过网络发送给服务端。

ProducerRecord

需要发送给服务器的消息,都会封装成ProducerRecord对象。ProducerRecord对象中定义了消息相关信息:

private final String topic; // 要发送的topic名称

private final Integer partition; // 要发送的分区ID

private final K key; // 消息的KEY值

private final V value; // 消息的value

private final Long timestamp;

生产者中的拦截器(interceptor)

在发送消息之前,kafka允许用户对消息进行操作,拦截器(interceptor)孕育而生。并且允许用户指定多个interceptor从而形成一个拦截器链路来对用户所要发送的消息进行操作。

在构造KafkaProducer对象的时候,可以指定自定义的拦截链:

// 构建拦截链

 List interceptors = new ArrayList();

interceptors.add("xx.xx....");

interceptors.add("xx.xx....");

HashMap config = new HashMap();

config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

KafkaProducer producer = new KafkaProducer(config);

在KafkaProducer构造函数中,会给KafkaProducer中的拦截器变量进行赋值(伪代码)。

private final ProducerInterceptors interceptors;

private KafkaProducer() {

List interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);

this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

}

在调用发送方法进行消息发送的时候,会首先使用在初始化中设置的拦截器,来对消息进行操作。

public Future send(ProducerRecord record, Callback callback) {

ProducerRecord interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);

return doSend(interceptedRecord, callback);

}

调用拦截器里面的方法,对消息进行处理:

public ProducerRecord onSend(ProducerRecord record) {

ProducerRecord interceptRecord = record;

for (ProducerInterceptor interceptor : this.interceptors) {

try {

interceptRecord = interceptor.onSend(interceptRecord);

} catch (Exception e) {

if (record != null)

log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);

else

log.warn("Error executing interceptor onSend callback", e);

}

}

return interceptRecord;

}

onSend()方法里面调用用户自定义的拦截器。用户自定义拦截器实现org.apache.kafka.clients.producer.ProducerInterceptor接口。

调用了用户自定义的拦截器后,就进入了KafkaProducer.doSend()。

KafkaProducer.doSend()

private Future<RecordMetadata> doSend(ProducerRecord record, Callback callback) {

        TopicPartition tp = null;

        try {

// 发送之前确认topic对应的metadata(元数据)可用(topic的partition的主副本可用),如果没有metadata就要获取相应的metadata,获取metadata是阻塞的。总之必须metadata可用才会发送生产信息

ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);

            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);

            Cluster cluster = clusterAndWaitTime.cluster; // 获取集群信息

            byte[] serializedKey;

// 序列化ProducerRecord中的KEY和VALUE

            try {

                serializedKey = keySerializer.serialize(record.topic(), record.key());

            } catch (ClassCastException cce) {

            }

            byte[] serializedValue;

            try {

                serializedValue = valueSerializer.serialize(record.topic(), record.value());

            } catch (ClassCastException cce) {

            }

// 根据ProducerRecord中的partition和KEY计算出要发送到的partition

            int partition = partition(record, serializedKey, serializedValue, cluster);

            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);

// 根据ProducerRecord中计算出字节如果超出限制,则会抛出异常

            ensureValidRecordSize(serializedSize);

            tp = new TopicPartition(record.topic(), partition);

            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();

            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);

            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

// 追加数据到RecordAccumulator中(缓存)

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);

// 如果追加数据后,RecordAccumulator中的数据已经达到限制,或者空间不足,则会唤醒发送线程(sender) ,把消息批量的提交到服务端。

            if (result.batchIsFull || result.newBatchCreated) {

                this.sender.wakeup();

            }

            return result.future;

        } catch (Exception e) {

            if (this.interceptors != null)

                this.interceptors.onSendError(record, tp, e);

            throw e;

        }

    }

发送过程分解分析

获取topic对应的metadata(元数据)

发送过程中,通过waitOnMetadata()来获取topic对应的metadata。因为metadata涉及的内容比较多。所以后面单独来写。总之必须metadata可用才会发送生产信息。

序列化ProducerRecord的Key和Value

KafkaProducer在发送消息之前需要对ProducerRecord中的Key和Value进行序列化操作,在KafkaComsumer端将对消息中的Key和Value进行反序列化操作。在kafka内部提供了序列化和反序列化默认实现类:

我们也可以自定义序列化(实现Serializer接口)和反序列化(实现Deserializer接口)类。然后同加载拦截器实现一样,使用自定义序列化和反序列化类。

HashMap config = new HashMap();

config.put("key.serializer", "xx.xx....");

config.put("value.serializer", "xx.xx....");

KafkaProducer  producer = new KafkaProducer(new Map(config));

接下来就应该选择把消息发送到topic的哪个分区上了

计算partition值

int partition = partition(record, serializedKey, serializedValue, cluster);

private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {

Integer partition = record.partition();

return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);

}

KafkaProducer中默认使用org.apache.kafka.clients.producer.internals.DefaultPartitioner来实现计算partition的值。

上面介绍过ProducerRecord,它里面的变量会用来计算partition。具体如下:

1,如果指定了partition,那么消息会被发送到指定的分区(partition)中。

2,如果没有指定partition,但指定了key,那么会使用key进行hash计算,根据计算出来的值发送到对应的分区(partition)中。

3,如果没有指定partition,没有指定key,那么会使用round-robin模式(轮询模式)发送消息到分区(partition)中。

4,如果同时指定了partition和key。那么partition起作用(key无效),发送到partition指定的分区中。

代码如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

List partitions = cluster.partitionsForTopic(topic);

int numPartitions = partitions.size();

if (keyBytes == null) {

// 没有指定 key 的情况下

int nextValue = counter.getAndIncrement(); // counter(final类型的AtomicInteger)在计算的时候在此随机数的基础上自增;

// 获取topic中有效的分区信息(有效的分区代表着这个分区的leader可以正常的提供读写服务)

List availablePartitions = cluster.availablePartitionsForTopic(topic);

if (availablePartitions.size() > 0) { // 根据可用分区数量和随机数来计算partition

int part = Utils.toPositive(nextValue) % availablePartitions.size();

return availablePartitions.get(part).partition();

} else { // 没有可用的分区,就使用topic下所有的分区数量来计算partition

return Utils.toPositive(nextValue) % numPartitions;

}

} else { // 有key的情况下,使用key的hash值进行计算partition

return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

}

}

有了partition后就要向RecordAccumulator中追加消息了

向RecordAccumulator(记录收集器)追加消息

首先了解RecordAccumulator相关的生态类:

生产者向RecordAccumulator添加记录,就像向一个车间中各个不同的传送带上的箱子添加物品一样(传送带的名称代表一个TopicPartition,传送带代表一个队列,传送带上的箱子代表RecordBatch)。生产者把属于同一个传送带的物品,放到传送带的箱子里面,当箱子存满,或者当前箱子无法装下物品时,就会放在新的一个箱子里面,那个装满的箱子里面的物品就会等待被一次性的处理。如果传送带上面的箱子已经超出了传送带所能承受的容量,生产者后面生产的物品就必须等待,等待传送带释放出新的空间能放上新的箱子来装物品,如果等待一段时间后还没有空间来装物品,就放弃此物品。

看看记录收集器周围的生态:

RecordBatch

对生产的批量记录的一个封装,表示正在或者将要被发送的的一批记录(传送带上的箱子)。会拥有一个MemoryRecords的引用

MemoryRecords

生产者发送的记录在内存中的一个记录集。记录最终存放的地方

Compressor

负责执行追加写操作

ProduceRequestResult

ProduceRequestResult是在初始化RecordBatch时建立。属于批次级的实例。ProduceRequestResult内置了CountdownLatch并且count为1。在外围调用FutureRecordMetadata的get方法中获取到记录的元数据时,会阻塞当前线程,必须等到ProduceRequestResult完成,也就是CountdownLatch变为0(在FutureRecordMetadata.get()中会调用ProduceRequestResult.CountdownLatch.await())。在Sender线程完成RecordBatch中的全部消息的发送并且收到服务端的响应后,会把CountdownLatch中的count变为0,这样外围阻塞的线程就会继续往下走获取到记录的元数据信息。

baseOffset变量:记录服务端响应客户端中的RecordBatch中第一个消息分配的的offset。

ReadyCheckResult

在发送线程(Sender)读取RecordAccumulator的消息进行发送的时候,首先会进行就绪检查:遍历所有的RecordAccumulator.ConcurrentMap>中各个tp的Deque中的批记录集(RecordBatch),把可以发送的RecordBatch对应的节点(leader)取出来,封装成了ReadyCheckResult对象。ReadyCheckResult里面就是保持的可以发送的RecordBatch对应的节点(leader)信息,下一次就绪检查点的时间,分区的leader未知的topic信息。

FutureRecordMetadata

保存生产记录的元数据,包括:ProduceRequestResult实例,RecordBatch中保存的记录个数,产生出记录时候的时间,key的大小,value的大小,checksum(暂时没有搞明白)等。每次追加完消息后返回FutureRecordMetadata实例,属于消息级的实例。提供get方法来让外围程序获取记录元数据信息,只是必须等到Sender线程完成RecordBatch的发送并且收到服务端的响应后,才返回元数据信息-RecordMetadata。

RecordMetadata

对批次元数据信息的封装。

RecordAppendResult

持有FutureRecordMetadata对象实例,RecordBatch是否满的标识(batchIsFull变量),需不需要重新创建新的RecordBatch(newBatchCreated变量)。batchIsFull和newBatchCreated在调用RecordAccumulator.append()方法后来判断是否需要唤醒Sender线程进行发送消息。如果batchIsFull为true:代表双向队列里面有RecordBatch满了,可以唤醒发送线程发送消息了。如果newBatchCreated为true:代表旧的RecordBatch满了或者是装不下新的消息了,可以唤醒发送线程发送消息了。

BufferPool

每实例化一个KafkaProducer,对应一个RecordAccumulator实例,每一个RecordAccumulator实例对应一个BufferPool实例,BufferPool提供分配内存存放消息空间的方法:allocate和释放消息空间的方法:deallocate。每个BufferPool实例中包括:

totalMemory(池总的内存的总量),

poolableSize(控制Deque队列中每个ByteBuffer的大小),

lock:ReentrantLock(保证每次分配和释放空间线程安全),

free<ByteBuffer>:Deque(已经申请未使用的空间),

waiters:Deque(记录申请不到足够空间而阻塞的线程,队列中记录的是阻塞线程对应的Condition对象),

availableMemory(未申请未使用的空间)

分配空间的流程:

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {

if (size > this.totalMemory) // 申请空间的大小已经超过了BufferPool总的字节大小抛出异常 throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations.");

this.lock.lock();

try {

// 申请的大小和Deque队列中ByteBuffer的大小一样,并且队列又存在已经申请未使用的内存,就直接使用队列中的ByteBuffer实例

if (size == poolableSize && !this.free.isEmpty())

return this.free.pollFirst();

// 计算出已经申请未使用的内存大小

int freeListSize = this.free.size() * this.poolableSize;

if (this.availableMemory + freeListSize >= size) { // 可用空间大于或等于要申请的空间 freeUp(size);

this.availableMemory -= size; // 减少size大小的未申请未使用的空间

lock.unlock();

return ByteBuffer.allocate(size); // 返回size大小的ByteBuffer

else { // 没有空间来分配申请的空间

int accumulated = 0;

ByteBuffer buffer = null;

Condition moreMemory = this.lock.newCondition();

long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory);

while (accumulated < size) {

long startWaitNs = time.nanoseconds();

long timeNs;

boolean waitingTimeElapsed;

try { // 如果累计的空间大小小于申请的空间大小,释放当前线程占有的锁,阻塞当前线程 // 阻塞remainingTimeToBlockNs时间后,没有被唤醒(unlock或者是await),返回false, waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);

} catch (InterruptedException e) {

this.waiters.remove(moreMemory); throw e;

} finally {

long endWaitNs = time.nanoseconds();

timeNs = Math.max(0L, endWaitNs - startWaitNs); t

his.waitTime.record(timeNs, time.milliseconds());

}

if (waitingTimeElapsed) {

this.waiters.remove(moreMemory);

throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");

}

remainingTimeToBlockNs -= timeNs;

// 累计空间大小是0并且申请大小等于Deque队列中ByteBuffer的大小并且已经申请未使用的空间队列不为空

if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {

buffer = this.free.pollFirst(); // 使用Deque队列中的第一个ByteBuffer

accumulated = size; // accumulated=size 会跳出循环

} else {

freeUp(size - accumulated); // 释放空间

int got = (int) Math.min(size - accumulated, this.availableMemory);

this.availableMemory -= got; // 减少未申请未使用的空间的值

accumulated += got; // 增加累计大小的值,直到累计的大小大于申请的大小

}

}

Condition removed = this.waiters.removeFirst();

if (removed != moreMemory)

throw new IllegalStateException("Wrong condition: this shouldn't happen.");

if (this.availableMemory > 0 || !this.free.isEmpty()) {

if (!this.waiters.isEmpty())

this.waiters.peekFirst().signal(); // 修改Condition队列中节点的状态,让其中的节点可以被唤醒

}

lock.unlock();

if (buffer == null)

return ByteBuffer.allocate(size);

else

return buffer;

}

} finally {

if (lock.isHeldByCurrentThread())

lock.unlock();

}

}

IncompleteRecordBatches

在记录收集器添加发送数据的方法中(RecordAccumulator.append()),把那些发送尚未完成的RecordBatch保存到此集合中,作用是对所有的RecordBatch的监控和管理。在Sender线程把生产请求提交到服务端后(实际就是提交RecordBatch中的数据),服务端正常响应给客户端,客户端在处理响应的回调函数中,如果是成功提交到服务端,就从IncompleteRecordBatches集合中删除。删除还有以下情况:Sender强制退出时,超时。此类操作的所有RecordBatch都加锁,保证线程安全。

以上分析了记录收集器的生态圈,在KafkaProducer.doSend方法中把信息追加到记录收集器后,根据RecordAccumulator.append方法返回的RecordAccumulator.RecordAppendResult实例的batchIsFull和newBatchCreated变量来判断是否唤醒发送线程,进行信息发送。客户端可以从doSend方法返回的FutureRecordMetadata实例中获取到请求完成(收到服务器响应)后的元数据信息。

小结:以上就是客户端使用KafkaProducer发送消息,最后保存在记录收集器的过程。接下来将重点看看消息怎么进入记录收集器的。

上一篇下一篇

猜你喜欢

热点阅读