Kafka源码之客户端内存缓冲池
Kafka是一个高吞吐的消息队列,为了实现高吞吐,kafka在实现方面用到了以下技术:
- Zero Copy机制,内核copy数据直接copy到网络设备,不必经过内核到用户再到内核的copy,减小了copy次数和上下文切换次数,大大提高了效率;
- 磁盘顺序读写,减少了寻道等待的时间;
- 批量处理机制,服务端批量存储,客户端主动批量生产数据/消费数据,消息处理效率高;
- 存储具有O(1)的复杂度,读取因为分区和segment,是O(log(n))的复杂度;
- 分区机制,有助于提高吞吐量;
注:kafka的零拷贝机制见零拷贝
今天学习下kafka客户端批量压缩和发送消息机制。
客户端消息发送,首先消息肯定是放在内存中的,大数据场景下,内存中不断存在大量的消息需要发送,消息发送完成后,内存会被回收,这很容易引起GC,而频繁的GC特别是full GC会造成“stop the world”,即其他线程停止工作等待垃圾回收线程执行,从而影响发送的速度影响吞吐量,那么Kafka是如何解决这个问题呢?
1、Kafka客户端发送消息过程
首先介绍下Kafka客户端发送消息的大致过程,Kafka的kafkaProducer对象是线程安全的(send方法中调用的accumulator的append方法是线程安全的,有同步机制),多个发送线程在发送消息时候可以共用一个kafkaProducer对象来调用发送方法,最后发送的数据根据Topic和分区的不同被组装进某一个RecordBatch中。如下图所示
image.png发送的数据组装成RecordBatch放入双向队列Deque<RecordBatch>后,会被发送线程批量取出组装成ProduceRequest对象发送给Kafka服务端。
注:1. RecordBatch是存储数据的对象
2. 每个topic都有一个双向队列Deque<RecordBatch>;
3. 一个kafkaProducer有一个独立的Sender线程;
2、缓冲池
缓冲池:BufferPool,一个KafkaProducer实例中只有一个BufferPool对象,
2.1 缓冲池内存结构
image.png-
内存池总大小:它是已使用空间和可使用空间的总和,用totalMemory表示(由buffer.memory配置,默认32M)。
-
可使用的空间:它包含包括两个部分,绿色部分代表未申请未使用的部分,用availableMemory表示;
黄色部分代表已经申请但当前没有使用的部分(之前使用过但是释放了数据),用一个ByteBuffer双端队列(Deque)表示,在BufferPool中这个队列叫free,队列中的每个ByteBuffer的大小用poolableSize表示(由batch.size配置,默认16k),因为每次free申请内存都是以poolableSize为单位申请的,申请poolableSize大小的bytebuffer后用RecordBatch来包装起来。 -
已使用空间:代表缓冲池中已经装了数据的部分,即第一张图中每个topic对应的双向队列Deque<RecordBatch>。
缓冲池(BufferPool)中最重要的就是RecordBatch数据发送时申请内存和RecordBatch数据发送完成以后释放内存的过程。
2.2 消息的发送到队列过程
先从当前topic的队列Deque<RecordBatch>中取出最后一个RecordBatch,如果RecordBatch为null,或者把数据装入RecordBatch失败时,则从缓冲池申请内存,构建新的RecordBatch,然后再加入当前topic的队列;
当我们调用客户端的发送消息的时候,底层会调用Send,然后里面使用一个记录累计器RecordAccumulator把消息append进来,
源码如下,删除了不必要的一些代码
org.apache.kafka.clients.producer.internals.RecordAccumulator#append
public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
// 根据topic获取队列
Deque<RecordBatch> dq = dequeFor(tp);
synchronized (dq) {
RecordBatch last = dq.peekLast();
//如果队列不为空,即RecordBatch不为null;如果为空,则从内存池申请内存
if (last != null) {
//尝试把新的消息数据装入RecordBatch
FutureRecordMetadata future = last.tryAppend(key, value, callback);
//装入成功,直接返回,装入失败,则从内存池申请内存
if (future != null) {
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
}
}
//判断需要申请空间大小,如果需要申请空间大小比batchSize小,那么申请大小就是batchsize,
//如果比batchSize大,那么大小以实际申请大小为准
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
//申请新的内存
ByteBuffer buffer = free.allocate(size);
synchronized (dq) {
//往内存中装入数据,构建RecordBatch
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
//把RecordBatch加入队列
dq.addLast(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
}
2.3 数据的内存申请过程
消息发送时内存缓冲区的申请和释放才是重点。
org.apache.kafka.clients.producer.internals.BufferPool#allocate
public ByteBuffer allocate(int size) throws InterruptedException {
//已用空间中是否有
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
//申请内存是加锁的,所以是线程安全的
this.lock.lock();
try {
// 如果已申请未使用空间free队列不为空,且申请的内存大小正好等于free队列中buffer大小,则对队列中获取ByteBuffer
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = this.free.size() * this.poolableSize;
if (this.availableMemory + freeListSize >= size) {
// 不断让free释放空间直到availableMemory空间比申请空间大
freeUp(size);
this.availableMemory -= size;
lock.unlock();
// 构造HeapByteBuffer存储
return ByteBuffer.allocate(size);
} else if (!blockOnExhaustion) {
throw new BufferExhaustedException("You have exhausted the " + this.totalMemory
+ " bytes of memory you configured for the client and the client is configured to error"
+ " rather than block when memory is exhausted.");
} else {
//缓冲池没有足够的内存可以申请,则阻塞;
//直到有空间释放,并且空间足够为止
int accumulated = 0;
ByteBuffer buffer = null;
//通过Condition实现线程等待和唤醒
Condition moreMemory = this.lock.newCondition();
this.waiters.addLast(moreMemory);
// loop over and over until we have a buffer or have reserved
// enough memory to allocate one
while (accumulated < size) {
long startWait = time.nanoseconds();
moreMemory.await();
long endWait = time.nanoseconds();
this.waitTime.record(endWait - startWait, time.milliseconds());
// check if we can satisfy this request from the free list,
// otherwise allocate memory
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
} else {
// we'll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.availableMemory);
this.availableMemory -= got;
accumulated += got;
}
}
// remove the condition for this thread to let the next thread
// in line start getting memory
Condition removed = this.waiters.removeFirst();
if (removed != moreMemory)
throw new IllegalStateException("Wrong condition: this shouldn't happen.");
// signal any additional waiters if there is more memory left
// over for them
if (this.availableMemory > 0 || !this.free.isEmpty()) {
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
}
// unlock and return the buffer
lock.unlock();
if (buffer == null)
return ByteBuffer.allocate(size);
else
return buffer;
}
} finally {
if (lock.isHeldByCurrentThread())
lock.unlock();
}
}
消息发送时,从缓冲池中申请内存全过程如下图,
image.png2.4 数据的内存释放过程
public void deallocate(ByteBuffer buffer, int size) {
lock.lock();
try {
//只有标准规格(bytebuffer空间大小和poolableSize大小一致)的才放入free
if (size == this.poolableSize && size == buffer.capacity()) {
//注意这里的buffer是直接reset了,重新reset后可以重复利用,没有gc问题
buffer.clear();
//添加进free循环利用
this.free.add(buffer);
} else {
this.availableMemory += size;
}
//唤醒排在前面的等待线程
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
moreMem.signal();
} finally {
lock.unlock();
}
}
image.png
通过申过申请和释放过程流程图以及释放空间代码,我们可以得到一个结论:
如果需要申请空间大小(发送的消息)比batchSize(poolableSize,由batch.size配置,默认16k)小,那么申请大小就是batchsize,同时数据发送完成、内存释放以后,这个byteBuffer会直接添加到free(见上面deallocate代码)进行循环利用;
如果比batchSize大,那么大小以实际申请大小为准,同时数据发送完成、内存释放以后,等待gc回收这块空间;所以果批量消息里面单个消息都是超过16k,可以考虑调整batchSize大小
注:为啥消息大于poolableSize后,申请的内存不放入free队列中,循环使用呢?是为了避免一些特别大的对象占用大内存以后,如果放入free队列中,那么就没法再切割分配内存了,太浪费空间了。举一个极端的例子,加入有个消息是32M,那么整个free队列中就只有一个ByteBuffer,即缓冲区每次只能缓冲一个消息,只有这个消息发送完成以后,才能发送缓冲别的消息。
2.5 总结
Kafka通过使用内存缓冲池的设计,让整个发送过程中的存储空间循环利用,有效减少JVM GC造成的影响,从而提高发送性能,提升吞吐量。
image.png假如我们业务中也需要设计一个内存缓冲区,那么可以参考BufferPool,
首先,我们需要有两个双端队列,一个用来存储已使用的Deque<RecordBatch>空间,一个用来存储已申请未使用的空间Deque<ByteBuffer>,以及一个记录未申请未使用的空间availableMemory(long);
其次,我们要实现申请内存(从队列中取出byteBuffer组装业务数据),和释放内存(业务处理完成后把byteBuffer放入队列);
最后,当内存不够用时,我们要实现阻塞方法,阻塞可以通过Lock和Condition实现。
2.6 扩展——kafka producer实例池实现生产者多线程写入
从前面的介绍,我们知道一个kafkaProducer有一个独立的Sender线程向Kafka服务端发送消息,如果我们使用的是kafkaProducer单例,那么写入就相当于是单线程发送消息,但是当我们有大量写入数据到kafka的需求时,单线程的producer往往就难以满足需求,这时可以使用池化技术,即kafka producer实例池
public class TestKafkaProceserThread {
//Kafka配置文件
public static final String TOPIC_NAME = "test";
public static final String KAFKA_PRODUCER = "kafka-producer.properties";
public static final int producerNum=50;//实例池大小
//阻塞队列实现生产者实例池,获取连接作出队操作,归还连接作入队操作
public static BlockingQueue<KafkaProducer<String, String>> queue=new LinkedBlockingQueue<>(producerNum);
//初始化producer实例池
static {
for (int i = 0; i <producerNum ; i++) {
Properties props = PropertiesUtil.getProperties(KAFKA_PRODUCER);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
queue.add(kafkaProducer);
}
}
//生产者发送线程
static class SendThread extends Thread {
String msg;
public SendThread(String msg){
this.msg=msg;
}
public void run(){
ProducerRecord record = new ProducerRecord(TOPIC_NAME, msg);
try {
KafkaProducer<String, String> kafkaProducer =queue.take();//从实例池获取连接,没有空闲连接则阻塞等待
kafkaProducer.send(record);
queue.put(kafkaProducer);//归还kafka连接到连接池队列
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//测试
public static void main(String[]args){
for (int i = 0; i <100 ; i++) {
SendThread sendThread=new SendThread("test multi-thread producer!");
sendThread.start();
}
}
}
经过测试对比,发现比在每个线程中new一个新的producer这种方式的发送速度提升8倍以上
参考:https://blog.csdn.net/u013716179/article/details/96109501