kafka Producer(一)
原创辛苦,转载就注明出处
前言:关于kafka的介绍一直在筹划中,但是也是各种事情和时间的原因迟迟没有进行,这次也是痛定思痛,为自己定了计划,希望也能给大家有点帮助,以下系列文章都是围绕kafka0.10.1.0
kafka produce流程
Kafka Producer (1).jpg从kafkaProducer.send()开始,消息经由ProducerInterceptors进行拦截处理,然后对消息的key和value进行序列化,通过Partitioner选择合适的分区,由RecordAccumulator进行收集后唤醒sender线程,最后执行网络IO,写入kafka。(具体细节在图中已经标注)
接下来我们看看具体的细节:
Produce.send()
/**
* Send the given record asynchronously and return a future which will eventually contain the response information.
*
* @param record The record to send
* @return A future which will eventually contain the response information
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
/**
* Send a record and invoke the given callback when the record has been acknowledged by the server
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
send方法最终都调用ProducerInterceptor.onSend()
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
由ProducerInterceptors对象在消息发送前对消息进行拦截和修改,优先于用户的callback,对ACK响应进行预处理,读者可以自行实现ProducerInterceptor类的onSend函数,举个例子:
//过滤掉key为奇数的消息
public class ProducerInterceptorDemo implements ProducerInterceptor<Integer,String>{
@override
public ProducerRecord<Integer,String> onSend(ProducerRecord<Integer,String> record){
if(record.key() % 2 == 0) return record;
return null;
}
}
Produce.doSend()
Produce.Send()之后真正执行操作的是doSend()
首先要确认发送到该topic的metedata是可用的,metedata结构如下:
public final class Metadata {
private static final Logger log = LoggerFactory.getLogger(Metadata.class);
private final long refreshBackoffMs;//metadata 更新失败时,为避免频繁更新 meta,最小的间隔时间,默认 100ms
private final long metadataExpireMs;//metadata 的过期时间, 默认 60*60*1000ms
private int version;//每更新成功1次,version自增1,主要是用于判断 metadata 是否更新
private long lastRefreshMs;//最近一次更新时的时间(包含更新失败的情况)
private long lastSuccessfulRefreshMs;//最近一次成功更新的时间(如果每次都成功的话,与前面的值相等, 否则,lastSuccessulRefreshMs < lastRefreshMs)
private Cluster cluster;// 集群中一些 topic 的信息
private boolean needUpdate;// 是否需要更新 metadata
private final Set<String> topics;
private final List<Listener> listeners;// 事件监控者
private boolean needMetadataForAllTopics;// 是否强制更新所有的 metadata
}
其中Cluster信息如下:
public final class Cluster {
private final boolean isBootstrapConfigured;
private final List<Node> nodes;//node列表
private final Set<String> unauthorizedTopics;//未认证的topic列表
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;// TopicPartition与PartitionInfo的映射关系
private final Map<String, List<PartitionInfo>> partitionsByTopic;// topic 与 partition 的对应关系
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;// 可用(leader 不为 null)的 topic 与 partition 的对应关系
private final Map<Integer, List<PartitionInfo>> partitionsByNode;// node 与 partition 的对应关系
private final Map<Integer, Node> nodesById;// node 与brokerid 的对应关系
}
确认metadata可用,调用waitOnMetadata方法,具体如下:
/**
* Wait for cluster metadata including partitions for the given topic to be available.
* @param topic The topic we want metadata for
* @param maxWaitMs The maximum time in ms for waiting on the metadata
* @return The amount of time we waited in ms
*/
private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already.
if (!this.metadata.containsTopic(topic))
this.metadata.add(topic);
if (metadata.fetch().partitionsForTopic(topic) != null)
return 0;
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
while (metadata.fetch().partitionsForTopic(topic) == null) {
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate();
sender.wakeup();
metadata.awaitUpdate(version, remainingWaitMs);
long elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (metadata.fetch().unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
}
return time.milliseconds() - begin;
}
先把topic添加到metadata topic list下如果不存在的话,然后去fetch metadata下的cluster信息,获取该topic对应的partition信息,如果能获取到就认为一切都是ready状态。
将needUpdate置成true并返回版本号,该版本号是判断metadata是否更新的标志,然后去唤醒sender线程,然后awaitUpdate()阻塞主线程等待更新,如下
//更新metadata信息
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
}
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while (this.version <= lastVersion) {//版本号判断是否更新
if (remainingWaitMs != 0)
wait(remainingWaitMs);//阻塞线程,等待 metadata 的更新
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)//timeout
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}
awaitUpdate中,线程会阻塞在while循环中,直到更新成功或者timeout
等待元数据更新完成后,keySerializer.serialize()和valueSerializer.serialize,producer端对record的key和value的值进行序列化。
之后获取partition值,调用partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster),如果record指定了partition,就直接返回指定值,如果没有指定,如下:
if (availablePartitions.size() > 0) {
if (keyBytes == null) {
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return DefaultPartitioner.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
看record有没有指定key,如果有key,则用murmur2方法哈希选一个part,如有没有,counter自增一,对part取余
取到partition值,调用RecordAccumulator.append()方法,将消息追加到RecordAccumulator中
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
try {
// check if we have an in-progress batch
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null)
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
}
// we don't have an in-progress record batch try to allocate a new batch
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer);
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
}
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
}
每个topicPartition对应一个dq,去除dq中最近创建的一个RecordBatch,尝试将record添加到其中,如果有足够的空间并添加成功,则直接返回,若没有RecordBatch或者剩余空间不足,则新建一个RecordBatch来追加record
RecordBatch
MemoryRecords
RecordAccumulator里比较主要的结构:MemoryRecords,表示多个消息的集合
public class MemoryRecords implements Records {
//压缩器,对消息进行压缩后输出到buffer
private final Compressor compressor;
// 记录buffer最多可以写入多少字节
private final int writeLimit;
// buffer字节总容量
private final int initialCapacity;
// 用于保存消息数据的Java NIO ByteBuffer
private ByteBuffer buffer;
// indicate if the memory records is writable or not
// 发送前设置成只读模式
private boolean writable;
}
关于Compressor,KafkaProducer支持GZIP,SNAPPY,LZ4三种压缩方式.(这边GZIP是JDK自带的,所以用new的方式,SNAPPY用的是反射创建的,减少用户不用此压缩方法时包的依赖,MARK)
ProduceRequestResult
再回到RecordBatch上来,有一个ProduceRequestResult字段,标识当前RecordBatch状态的Future对象,当RecordBatch中全部消息被正常响应、或超时、或关闭生产者时,会调用ProduceRequestResult.done(),通过error字段来区分异常完成还是正常完成,之后调用countDown方法,唤醒await的线程。
ProduceRequestResult上还有一个baseOffset字段,表示服务端为这个RecordBatch的第一条消息分配的offset,之后每一条消息可以根据这个baseoffset和它自身在RecordBatch里的offset来计算自己在服务端分区中的绝对偏移量
Thunk
同样是在RecordBatch类中,Producer在send的时候有一个callback的回调参数,Thunk可以理解为消息的回调对象队列,Thunk中callback就只想对应消息的callback对象,FutureRecordMetadata则实现的Future接口,result指向RecordBatch中的ProduceRequestResult,relativeOffset则标识对应消息在RecordBatch中的偏移量
tryAppend()
/**
* Append the record to the current record set and return the relative offset within that record set
*
* @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
//估算剩余空间,不是一个准确值
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
//向MemoryRecords中添加数据,offsetCounter是在RecordBatch中的偏移量
long checksum = this.records.append(offsetCounter++, timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
//将用户自定义的callback和future封装成Thunk
if (callback != null)
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
BufferPool
再往RecordBatch追加消息的时候,如果RecordBatch空间不够或者在dq中peek不到RecordBatch对象,会通过BufferPool来分配一定size的ByteBuffer
public final class BufferPool {
//整个Pool的大小
private final long totalMemory;
//多线程分配和回收的锁
private final ReentrantLock lock;
//缓存指定大小的ByteBuffer队列
private final Deque<ByteBuffer> free;
//记录因申请不到足够空间而阻塞的线程对应的Condition对象
private final Deque<Condition> waiters;
//可用的空间大小,totalMemory - free中所有ByteBuffer的大小
private long availableMemory;
}
BufferPool.allocate()
通过BufferPool.allocate()来申请空间
如果有足够空间,但是availableMemory不够,则会从free列表中不断释放空间,直到availableMemory大于所要求的size,然后直接返回size大小的HeapByteBuffer而不是用free队列中的buffer了
如果没有足够空间了,只能阻塞。将Condition添加到waiters中,然后循环等待
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
this.waiters.remove(moreMemory);
throw e;
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
//统计阻塞的时间
this.waitTime.record(timeNs, time.milliseconds());
}
//超时,直接报异常
if (waitingTimeElapsed) {
this.waiters.remove(moreMemory);
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
//请求的是poolableSize大小的ByteBuffer,且free中有空闲的ByteBuffer
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
} else {//先分配一部分的空间,并继续等待空闲空间
// we'll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.availableMemory);
this.availableMemory -= got;
accumulated += got;
}
}
最后返回的还是跟之前一样,符合size要求的分配free队列中的ByteBuffer对象,超过size的就返回HeapByteBuffer
BufferPool.deallocate()
看过了allocate()方法之后再来看deallocate(),相比就好理解了很多,当释放的ByteBuffer大小等于poolAbleSize的时候,放入free队列进行管理,如果不等于,则直接修改availaMemory就可以了。最后去唤醒一个因空间不足而阻塞的线程
RecordAccumulator
最后再回头看RecordAccumulator,结构就比较一目了然了
public final class RecordAccumulator {
private volatile boolean closed;
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
//每个RecordBAtch下ByteBuffer的大小
private final int batchSize;
//压缩类型
private final CompressionType compression;
private final long lingerMs;
private final long retryBackoffMs;
//BufferPool对象
private final BufferPool free;
private final Time time;
//topicPartition和RecordBatch集合的映射关系,map是CopyOnWriteMap(安全的),Deque是线程不安全的,操作RecordBatch时需要加锁
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
//未发送完成的RecordBatch集合
private final IncompleteRecordBatches incomplete;
// The following variables are only accessed by the sender thread, so we don't need to protect them.
private final Set<TopicPartition> muted;
//使用drain方法批量导出RecordBatch时,防止饥饿,用drainIndex记录上次发送停止的位置,下次继续
private int drainIndex;
}
最后再回到doSend()上来,最后一步,判断此次追加消息所在队列最后一个RecordBatch满了或者不止一个RecordBatch,则唤醒sender线程
就先看到这里,下次继续Kafka Producer,讲讲Sender线程等等