技术专题-中间件源码收藏-开发篇

Kafka源码之客户端内存缓冲池

2020-04-26  本文已影响0人  雪飘千里

Kafka是一个高吞吐的消息队列,为了实现高吞吐,kafka在实现方面用到了以下技术:

注:kafka的零拷贝机制见零拷贝

今天学习下kafka客户端批量压缩和发送消息机制。

客户端消息发送,首先消息肯定是放在内存中的,大数据场景下,内存中不断存在大量的消息需要发送,消息发送完成后,内存会被回收,这很容易引起GC,而频繁的GC特别是full GC会造成“stop the world”,即其他线程停止工作等待垃圾回收线程执行,从而影响发送的速度影响吞吐量,那么Kafka是如何解决这个问题呢?

1、Kafka客户端发送消息过程

首先介绍下Kafka客户端发送消息的大致过程,Kafka的kafkaProducer对象是线程安全的(send方法中调用的accumulator的append方法是线程安全的,有同步机制),多个发送线程在发送消息时候可以共用一个kafkaProducer对象来调用发送方法,最后发送的数据根据Topic和分区的不同被组装进某一个RecordBatch中。如下图所示

image.png

发送的数据组装成RecordBatch放入双向队列Deque<RecordBatch>后,会被发送线程批量取出组装成ProduceRequest对象发送给Kafka服务端。

注:1. RecordBatch是存储数据的对象
2. 每个topic都有一个双向队列Deque<RecordBatch>;
3. 一个kafkaProducer有一个独立的Sender线程;

2、缓冲池

缓冲池:BufferPool,一个KafkaProducer实例中只有一个BufferPool对象,

2.1 缓冲池内存结构

image.png

缓冲池(BufferPool)中最重要的就是RecordBatch数据发送时申请内存和RecordBatch数据发送完成以后释放内存的过程。

2.2 消息的发送到队列过程

先从当前topic的队列Deque<RecordBatch>中取出最后一个RecordBatch,如果RecordBatch为null,或者把数据装入RecordBatch失败时,则从缓冲池申请内存,构建新的RecordBatch,然后再加入当前topic的队列;

当我们调用客户端的发送消息的时候,底层会调用Send,然后里面使用一个记录累计器RecordAccumulator把消息append进来,

源码如下,删除了不必要的一些代码
org.apache.kafka.clients.producer.internals.RecordAccumulator#append

public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
        // 根据topic获取队列
        Deque<RecordBatch> dq = dequeFor(tp);
        synchronized (dq) {
            RecordBatch last = dq.peekLast();
            //如果队列不为空,即RecordBatch不为null;如果为空,则从内存池申请内存
            if (last != null) {
                //尝试把新的消息数据装入RecordBatch
                FutureRecordMetadata future = last.tryAppend(key, value, callback);
                //装入成功,直接返回,装入失败,则从内存池申请内存
                if (future != null) {
                    return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                }
            }
        }

        //判断需要申请空间大小,如果需要申请空间大小比batchSize小,那么申请大小就是batchsize,
        //如果比batchSize大,那么大小以实际申请大小为准
        int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
        //申请新的内存
        ByteBuffer buffer = free.allocate(size);
        synchronized (dq) {
            //往内存中装入数据,构建RecordBatch 
            MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
            RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
            //把RecordBatch加入队列
            dq.addLast(batch);
            return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
        }
    }

2.3 数据的内存申请过程

消息发送时内存缓冲区的申请和释放才是重点。

org.apache.kafka.clients.producer.internals.BufferPool#allocate

public ByteBuffer allocate(int size) throws InterruptedException {
//已用空间中是否有
 if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");
        //申请内存是加锁的,所以是线程安全的
        this.lock.lock();
        try {
            // 如果已申请未使用空间free队列不为空,且申请的内存大小正好等于free队列中buffer大小,则对队列中获取ByteBuffer 
            if (size == poolableSize && !this.free.isEmpty())
                return this.free.pollFirst();

            // now check if the request is immediately satisfiable with the
            // memory on hand or if we need to block
            int freeListSize = this.free.size() * this.poolableSize;
            if (this.availableMemory + freeListSize >= size) {
                // 不断让free释放空间直到availableMemory空间比申请空间大
                freeUp(size);
                this.availableMemory -= size;
                lock.unlock();
                // 构造HeapByteBuffer存储
                return ByteBuffer.allocate(size);
            } else if (!blockOnExhaustion) {
                throw new BufferExhaustedException("You have exhausted the " + this.totalMemory
                                                   + " bytes of memory you configured for the client and the client is configured to error"
                                                   + " rather than block when memory is exhausted.");
            } else {
                //缓冲池没有足够的内存可以申请,则阻塞;
                //直到有空间释放,并且空间足够为止
                int accumulated = 0;
                ByteBuffer buffer = null;
                //通过Condition实现线程等待和唤醒
                Condition moreMemory = this.lock.newCondition();
                this.waiters.addLast(moreMemory);
                // loop over and over until we have a buffer or have reserved
                // enough memory to allocate one
                while (accumulated < size) {
                    long startWait = time.nanoseconds();
                    moreMemory.await();
                    long endWait = time.nanoseconds();
                    this.waitTime.record(endWait - startWait, time.milliseconds());

                    // check if we can satisfy this request from the free list,
                    // otherwise allocate memory
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        // just grab a buffer from the free list
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    } else {
                        // we'll need to allocate memory, but we may only get
                        // part of what we need on this iteration
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.availableMemory);
                        this.availableMemory -= got;
                        accumulated += got;
                    }
                }

                // remove the condition for this thread to let the next thread
                // in line start getting memory
                Condition removed = this.waiters.removeFirst();
                if (removed != moreMemory)
                    throw new IllegalStateException("Wrong condition: this shouldn't happen.");

                // signal any additional waiters if there is more memory left
                // over for them
                if (this.availableMemory > 0 || !this.free.isEmpty()) {
                    if (!this.waiters.isEmpty())
                        this.waiters.peekFirst().signal();
                }

                // unlock and return the buffer
                lock.unlock();
                if (buffer == null)
                    return ByteBuffer.allocate(size);
                else
                    return buffer;
            }
        } finally {
            if (lock.isHeldByCurrentThread())
                lock.unlock();
        }
    }

消息发送时,从缓冲池中申请内存全过程如下图,

image.png

2.4 数据的内存释放过程

public void deallocate(ByteBuffer buffer, int size) {
        lock.lock();
        try {
            //只有标准规格(bytebuffer空间大小和poolableSize大小一致)的才放入free
            if (size == this.poolableSize && size == buffer.capacity()) {
                //注意这里的buffer是直接reset了,重新reset后可以重复利用,没有gc问题
                buffer.clear();
                //添加进free循环利用
                this.free.add(buffer);
            } else {
                this.availableMemory += size;
            }
            //唤醒排在前面的等待线程
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                moreMem.signal();
        } finally {
            lock.unlock();
        }
    }
image.png

通过申过申请和释放过程流程图以及释放空间代码,我们可以得到一个结论:

如果需要申请空间大小(发送的消息)比batchSize(poolableSize,由batch.size配置,默认16k)小,那么申请大小就是batchsize,同时数据发送完成、内存释放以后,这个byteBuffer会直接添加到free(见上面deallocate代码)进行循环利用;

如果比batchSize大,那么大小以实际申请大小为准,同时数据发送完成、内存释放以后,等待gc回收这块空间;所以果批量消息里面单个消息都是超过16k,可以考虑调整batchSize大小

注:为啥消息大于poolableSize后,申请的内存不放入free队列中,循环使用呢?是为了避免一些特别大的对象占用大内存以后,如果放入free队列中,那么就没法再切割分配内存了,太浪费空间了。举一个极端的例子,加入有个消息是32M,那么整个free队列中就只有一个ByteBuffer,即缓冲区每次只能缓冲一个消息,只有这个消息发送完成以后,才能发送缓冲别的消息。

2.5 总结

Kafka通过使用内存缓冲池的设计,让整个发送过程中的存储空间循环利用,有效减少JVM GC造成的影响,从而提高发送性能,提升吞吐量。

image.png

假如我们业务中也需要设计一个内存缓冲区,那么可以参考BufferPool,
首先,我们需要有两个双端队列,一个用来存储已使用的Deque<RecordBatch>空间,一个用来存储已申请未使用的空间Deque<ByteBuffer>,以及一个记录未申请未使用的空间availableMemory(long);

其次,我们要实现申请内存(从队列中取出byteBuffer组装业务数据),和释放内存(业务处理完成后把byteBuffer放入队列);

最后,当内存不够用时,我们要实现阻塞方法,阻塞可以通过Lock和Condition实现。

2.6 扩展——kafka producer实例池实现生产者多线程写入

从前面的介绍,我们知道一个kafkaProducer有一个独立的Sender线程向Kafka服务端发送消息,如果我们使用的是kafkaProducer单例,那么写入就相当于是单线程发送消息,但是当我们有大量写入数据到kafka的需求时,单线程的producer往往就难以满足需求,这时可以使用池化技术,即kafka producer实例池

public class TestKafkaProceserThread {
    //Kafka配置文件
    public static final String TOPIC_NAME = "test";
    public static final String KAFKA_PRODUCER = "kafka-producer.properties";
    public static final int producerNum=50;//实例池大小
    //阻塞队列实现生产者实例池,获取连接作出队操作,归还连接作入队操作
    public static BlockingQueue<KafkaProducer<String, String>> queue=new LinkedBlockingQueue<>(producerNum);
    //初始化producer实例池
    static {
        for (int i = 0; i <producerNum ; i++) {
            Properties props = PropertiesUtil.getProperties(KAFKA_PRODUCER);
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
            queue.add(kafkaProducer);
        }
    }
    //生产者发送线程
    static class SendThread extends Thread {
        String msg;
        public SendThread(String msg){
            this.msg=msg;
        }
        public void run(){
            ProducerRecord record = new ProducerRecord(TOPIC_NAME, msg);
            try {
                KafkaProducer<String, String> kafkaProducer =queue.take();//从实例池获取连接,没有空闲连接则阻塞等待
                kafkaProducer.send(record);
                queue.put(kafkaProducer);//归还kafka连接到连接池队列
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    //测试
    public static void main(String[]args){
        for (int i = 0; i <100 ; i++) {
            SendThread sendThread=new SendThread("test multi-thread producer!");
            sendThread.start();
        } 
    }
}

经过测试对比,发现比在每个线程中new一个新的producer这种方式的发送速度提升8倍以上

参考:https://blog.csdn.net/u013716179/article/details/96109501

上一篇下一篇

猜你喜欢

热点阅读