kafka

kafka Producer(一)

2019-02-21  本文已影响0人  扎瓦叔叔

原创辛苦,转载就注明出处

前言:关于kafka的介绍一直在筹划中,但是也是各种事情和时间的原因迟迟没有进行,这次也是痛定思痛,为自己定了计划,希望也能给大家有点帮助,以下系列文章都是围绕kafka0.10.1.0

kafka produce流程

Kafka Producer (1).jpg

从kafkaProducer.send()开始,消息经由ProducerInterceptors进行拦截处理,然后对消息的key和value进行序列化,通过Partitioner选择合适的分区,由RecordAccumulator进行收集后唤醒sender线程,最后执行网络IO,写入kafka。(具体细节在图中已经标注)

接下来我们看看具体的细节:

Produce.send()

/**
 * Send the given record asynchronously and return a future which will eventually contain the response information.
 * 
 * @param record The record to send
 * @return A future which will eventually contain the response information
 */
public Future<RecordMetadata> send(ProducerRecord<K, V> record);

/**
 * Send a record and invoke the given callback when the record has been acknowledged by the server
 */
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

send方法最终都调用ProducerInterceptor.onSend()

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

由ProducerInterceptors对象在消息发送前对消息进行拦截和修改,优先于用户的callback,对ACK响应进行预处理,读者可以自行实现ProducerInterceptor类的onSend函数,举个例子:

//过滤掉key为奇数的消息
public class ProducerInterceptorDemo implements ProducerInterceptor<Integer,String>{
    @override
    public ProducerRecord<Integer,String> onSend(ProducerRecord<Integer,String> record){
        if(record.key() % 2 == 0) return record;
        return null;
    }
}

Produce.doSend()

Produce.Send()之后真正执行操作的是doSend()
首先要确认发送到该topic的metedata是可用的,metedata结构如下:

public final class Metadata {
    private static final Logger log = LoggerFactory.getLogger(Metadata.class);

    private final long refreshBackoffMs;//metadata 更新失败时,为避免频繁更新 meta,最小的间隔时间,默认 100ms
    private final long metadataExpireMs;//metadata 的过期时间, 默认 60*60*1000ms
    private int version;//每更新成功1次,version自增1,主要是用于判断 metadata 是否更新
    private long lastRefreshMs;//最近一次更新时的时间(包含更新失败的情况)
    private long lastSuccessfulRefreshMs;//最近一次成功更新的时间(如果每次都成功的话,与前面的值相等, 否则,lastSuccessulRefreshMs < lastRefreshMs)
    private Cluster cluster;// 集群中一些 topic 的信息
    private boolean needUpdate;// 是否需要更新 metadata
    private final Set<String> topics;
    private final List<Listener> listeners;// 事件监控者
    private boolean needMetadataForAllTopics;// 是否强制更新所有的 metadata
}

其中Cluster信息如下:

public final class Cluster {
    private final boolean isBootstrapConfigured;
    private final List<Node> nodes;//node列表
    private final Set<String> unauthorizedTopics;//未认证的topic列表
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;// TopicPartition与PartitionInfo的映射关系
    private final Map<String, List<PartitionInfo>> partitionsByTopic;// topic 与 partition 的对应关系
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;//  可用(leader 不为 null)的 topic 与 partition 的对应关系
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;// node 与 partition 的对应关系
    private final Map<Integer, Node> nodesById;// node 与brokerid 的对应关系
}

确认metadata可用,调用waitOnMetadata方法,具体如下:

/**
 * Wait for cluster metadata including partitions for the given topic to be available.
 * @param topic The topic we want metadata for
 * @param maxWaitMs The maximum time in ms for waiting on the metadata
 * @return The amount of time we waited in ms
 */
private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
    // add topic to metadata topic list if it is not there already.
    if (!this.metadata.containsTopic(topic))
        this.metadata.add(topic);
    if (metadata.fetch().partitionsForTopic(topic) != null)
        return 0;
    long begin = time.milliseconds();
    long remainingWaitMs = maxWaitMs;
    while (metadata.fetch().partitionsForTopic(topic) == null) {
        log.trace("Requesting metadata update for topic {}.", topic);
        int version = metadata.requestUpdate();
        sender.wakeup();
        metadata.awaitUpdate(version, remainingWaitMs);
        long elapsed = time.milliseconds() - begin;
        if (elapsed >= maxWaitMs)
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        if (metadata.fetch().unauthorizedTopics().contains(topic))
            throw new TopicAuthorizationException(topic);
        remainingWaitMs = maxWaitMs - elapsed;
    }
    return time.milliseconds() - begin;
}

先把topic添加到metadata topic list下如果不存在的话,然后去fetch metadata下的cluster信息,获取该topic对应的partition信息,如果能获取到就认为一切都是ready状态。
将needUpdate置成true并返回版本号,该版本号是判断metadata是否更新的标志,然后去唤醒sender线程,然后awaitUpdate()阻塞主线程等待更新,如下

//更新metadata信息
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
    if (maxWaitMs < 0) {
        throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
    }
    long begin = System.currentTimeMillis();
    long remainingWaitMs = maxWaitMs;
    while (this.version <= lastVersion) {//版本号判断是否更新
        if (remainingWaitMs != 0)
            wait(remainingWaitMs);//阻塞线程,等待 metadata 的更新
        long elapsed = System.currentTimeMillis() - begin;
        if (elapsed >= maxWaitMs)//timeout
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        remainingWaitMs = maxWaitMs - elapsed;
    }
}

awaitUpdate中,线程会阻塞在while循环中,直到更新成功或者timeout

等待元数据更新完成后,keySerializer.serialize()和valueSerializer.serialize,producer端对record的key和value的值进行序列化。

之后获取partition值,调用partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster),如果record指定了partition,就直接返回指定值,如果没有指定,如下:

if (availablePartitions.size() > 0) {
if (keyBytes == null) {
  int nextValue = counter.getAndIncrement();
  List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
  if (availablePartitions.size() > 0) {
      int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
      return availablePartitions.get(part).partition();
  } else {
      // no partitions are available, give a non-available partition
      return DefaultPartitioner.toPositive(nextValue) % numPartitions;
  }
} else {
  // hash the keyBytes to choose a partition
  return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

看record有没有指定key,如果有key,则用murmur2方法哈希选一个part,如有没有,counter自增一,对part取余

取到partition值,调用RecordAccumulator.append()方法,将消息追加到RecordAccumulator中

public RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Callback callback,
                                 long maxTimeToBlock) throws InterruptedException {
    // We keep track of the number of appending thread to make sure we do not miss batches in
    // abortIncompleteBatches().
    appendsInProgress.incrementAndGet();
    try {
        // check if we have an in-progress batch
        Deque<RecordBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            RecordBatch last = dq.peekLast();
            if (last != null) {
                FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
                if (future != null)
                    return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
            }
        }

        // we don't have an in-progress record batch try to allocate a new batch
        int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
        ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            // Need to check if producer is closed again after grabbing the dequeue lock.
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            RecordBatch last = dq.peekLast();
            if (last != null) {
                FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
                if (future != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    free.deallocate(buffer);
                    return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                }
            }
            MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
            RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

            dq.addLast(batch);
            incomplete.add(batch);
            return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
        }
    } finally {
        appendsInProgress.decrementAndGet();
    }
}

每个topicPartition对应一个dq,去除dq中最近创建的一个RecordBatch,尝试将record添加到其中,如果有足够的空间并添加成功,则直接返回,若没有RecordBatch或者剩余空间不足,则新建一个RecordBatch来追加record

RecordBatch

MemoryRecords

RecordAccumulator里比较主要的结构:MemoryRecords,表示多个消息的集合

public class MemoryRecords implements Records {
    //压缩器,对消息进行压缩后输出到buffer
    private final Compressor compressor;

    // 记录buffer最多可以写入多少字节
    private final int writeLimit;

    // buffer字节总容量
    private final int initialCapacity;

    // 用于保存消息数据的Java NIO ByteBuffer
    private ByteBuffer buffer;

    // indicate if the memory records is writable or not 
    // 发送前设置成只读模式
    private boolean writable;
}

关于Compressor,KafkaProducer支持GZIP,SNAPPY,LZ4三种压缩方式.(这边GZIP是JDK自带的,所以用new的方式,SNAPPY用的是反射创建的,减少用户不用此压缩方法时包的依赖,MARK)

ProduceRequestResult

再回到RecordBatch上来,有一个ProduceRequestResult字段,标识当前RecordBatch状态的Future对象,当RecordBatch中全部消息被正常响应、或超时、或关闭生产者时,会调用ProduceRequestResult.done(),通过error字段来区分异常完成还是正常完成,之后调用countDown方法,唤醒await的线程。

ProduceRequestResult上还有一个baseOffset字段,表示服务端为这个RecordBatch的第一条消息分配的offset,之后每一条消息可以根据这个baseoffset和它自身在RecordBatch里的offset来计算自己在服务端分区中的绝对偏移量

Thunk

同样是在RecordBatch类中,Producer在send的时候有一个callback的回调参数,Thunk可以理解为消息的回调对象队列,Thunk中callback就只想对应消息的callback对象,FutureRecordMetadata则实现的Future接口,result指向RecordBatch中的ProduceRequestResult,relativeOffset则标识对应消息在RecordBatch中的偏移量

tryAppend()
/**
 * Append the record to the current record set and return the relative offset within that record set
 * 
 * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
 */
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
    //估算剩余空间,不是一个准确值
    if (!this.records.hasRoomFor(key, value)) {
        return null;
    } else {
        //向MemoryRecords中添加数据,offsetCounter是在RecordBatch中的偏移量
        long checksum = this.records.append(offsetCounter++, timestamp, key, value);
        this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
        this.lastAppendTime = now;
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                               timestamp, checksum,
                                                               key == null ? -1 : key.length,
                                                               value == null ? -1 : value.length);
        //将用户自定义的callback和future封装成Thunk
        if (callback != null)
            thunks.add(new Thunk(callback, future));
        this.recordCount++;
        return future;
    }
}
BufferPool

再往RecordBatch追加消息的时候,如果RecordBatch空间不够或者在dq中peek不到RecordBatch对象,会通过BufferPool来分配一定size的ByteBuffer

public final class BufferPool {
    //整个Pool的大小
    private final long totalMemory;
    //多线程分配和回收的锁
    private final ReentrantLock lock;
    //缓存指定大小的ByteBuffer队列
    private final Deque<ByteBuffer> free;
    //记录因申请不到足够空间而阻塞的线程对应的Condition对象
    private final Deque<Condition> waiters;
    //可用的空间大小,totalMemory - free中所有ByteBuffer的大小
    private long availableMemory;
}
BufferPool.allocate()

通过BufferPool.allocate()来申请空间
如果有足够空间,但是availableMemory不够,则会从free列表中不断释放空间,直到availableMemory大于所要求的size,然后直接返回size大小的HeapByteBuffer而不是用free队列中的buffer了
如果没有足够空间了,只能阻塞。将Condition添加到waiters中,然后循环等待

while (accumulated < size) {
    long startWaitNs = time.nanoseconds();
    long timeNs;
    boolean waitingTimeElapsed;
    try {
        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
    } catch (InterruptedException e) {
        this.waiters.remove(moreMemory);
        throw e;
    } finally {
        long endWaitNs = time.nanoseconds();
        timeNs = Math.max(0L, endWaitNs - startWaitNs);
        //统计阻塞的时间
        this.waitTime.record(timeNs, time.milliseconds());
    }

    //超时,直接报异常
    if (waitingTimeElapsed) {
        this.waiters.remove(moreMemory);
        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
    }

    remainingTimeToBlockNs -= timeNs;
    // check if we can satisfy this request from the free list,
    // otherwise allocate memory
    //请求的是poolableSize大小的ByteBuffer,且free中有空闲的ByteBuffer
    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
        // just grab a buffer from the free list
        buffer = this.free.pollFirst();
        accumulated = size;
    } else {//先分配一部分的空间,并继续等待空闲空间
        // we'll need to allocate memory, but we may only get
        // part of what we need on this iteration
        freeUp(size - accumulated);
        int got = (int) Math.min(size - accumulated, this.availableMemory);
        this.availableMemory -= got;
        accumulated += got;
    }
}

最后返回的还是跟之前一样,符合size要求的分配free队列中的ByteBuffer对象,超过size的就返回HeapByteBuffer

BufferPool.deallocate()

看过了allocate()方法之后再来看deallocate(),相比就好理解了很多,当释放的ByteBuffer大小等于poolAbleSize的时候,放入free队列进行管理,如果不等于,则直接修改availaMemory就可以了。最后去唤醒一个因空间不足而阻塞的线程

RecordAccumulator

最后再回头看RecordAccumulator,结构就比较一目了然了

  public final class RecordAccumulator {
      private volatile boolean closed;
      private final AtomicInteger flushesInProgress;
      private final AtomicInteger appendsInProgress;
      //每个RecordBAtch下ByteBuffer的大小
      private final int batchSize;
      //压缩类型
      private final CompressionType compression;
      private final long lingerMs;
      private final long retryBackoffMs;
      //BufferPool对象
      private final BufferPool free;
      private final Time time;
      //topicPartition和RecordBatch集合的映射关系,map是CopyOnWriteMap(安全的),Deque是线程不安全的,操作RecordBatch时需要加锁
      private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
      //未发送完成的RecordBatch集合
      private final IncompleteRecordBatches incomplete;
      // The following variables are only accessed by the sender thread, so we don't need to protect them.
      private final Set<TopicPartition> muted;
      //使用drain方法批量导出RecordBatch时,防止饥饿,用drainIndex记录上次发送停止的位置,下次继续
      private int drainIndex;
  }

最后再回到doSend()上来,最后一步,判断此次追加消息所在队列最后一个RecordBatch满了或者不止一个RecordBatch,则唤醒sender线程

就先看到这里,下次继续Kafka Producer,讲讲Sender线程等等

上一篇 下一篇

猜你喜欢

热点阅读