Kafka生产者源码解析,学习总结

2020-04-30  本文已影响0人  后来丶_a24d

目录


简单使用示例

public class Test {
    private static String topicName;
    private static int msgNum;

    public static void main(String[] args) {
        // 配置
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*props.put("interceptor.classes", "kafka.productor.ProducerInterceptorDemo,kafka.productor.ProducerInterceptorDemo");*/

        topicName = "test1";
        // 发送的消息数
        msgNum = 10;
        // KafkaProducer线程安全类
        Producer<String, String> producer = new KafkaProducer<>(props);

        //发送带有回调的消息
        for (int i = 0; i < msgNum; i++) {
            String msg = i + " This is seeger's msg." + System.currentTimeMillis();
            producer.send( new ProducerRecord<>(topicName, msg), (metadata, exception) -> {
                if (exception != null) {
                    System.out.println("进行异常处理" + exception.getMessage());
                } else {
                    System.out.printf("topic=%s, partition=%d, offset=%s \n",
                            metadata.topic(), metadata.partition(), metadata.offset());
                }
            });

        }
        producer.close();
    }

}

kafka生产者总体架构

  1. 使用KafkaProducer发送消息时只是发送到RecordAccumulator缓存保存。
  2. 真正将消息发送到kafka服务器的是Send线程,这里相当于nio的selector。
  3. 利用RecordAccumulator中缓存队列 + send线程来解决主线程发送请求跟实际发送请求到服务器的速率不一致问题,以达到提高效率。也完成了主线程发送请求与实际发送请求到服务器的解耦。相同的做法在- 2000万条数据迁移从几天到几个小时这篇文章也有体现。
  1. 首先获取配置信息根据配置不同1-11会有不同表现
  2. 拦截器拦截
  3. 序列化Key Value
  4. 选择合适的分区
  5. RecordAccumulator收集消息批量发送
  6. send线程从RecordAccumulator获取消息
  7. 构造ClientRequest
  8. 将ClientRequest交给NetWorkClient
  9. NetWorkClient将请求放入KafkaChannel
  10. 执行真正的网络io
  11. 收到响应调用ClentRequest回调函数
  12. 调用RecordBatch回调函数,最终调用每个消息上注册的回调函数。

配置模块

public class Test {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(MockProducerConfig.RETRIES_CONFIG, "2");
        props.put(MockProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9091");
        props.put("key.serializer", "java.util.HashMap");

        MockKafkaProducer mockKafkaProducer = new MockKafkaProducer(props);
    }
}

// 测试用,所以简单写
public class MockKafkaProducer {
    private final Integer retries;
    private final List<String> addresses;
    //测试用所以这么写
    private final HashMap keySerializer;

    public MockKafkaProducer(Properties properties) {
        this(new MockProducerConfig(properties));
    }

    @SuppressWarnings("unchecked")
    private MockKafkaProducer(MockProducerConfig config) {
        this.retries = config.getInt(MockProducerConfig.RETRIES_CONFIG);
        System.out.println("配置已生效retries: " + retries);
        addresses = config.getList(MockProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
        addresses.stream().forEach((s -> System.out.println("配置已生效addresses :" + s)));
        this.keySerializer = config.getConfiguredInstance(MockProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                HashMap.class);
        System.out.println("配置已生效keySerializer: " + keySerializer.toString());
    }
}

import java.util.Map;
import kafka.productor.config.learn.MockConfigDef.MockType;

/**
 * kafka 生产者配置
 */
public class MockProducerConfig extends MockAbstractConfig {
    private static final MockConfigDef CONFIG;
    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
    public static final String RETRIES_CONFIG = "retries";
    public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";

    static {
        CONFIG = new MockConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, MockType.LIST, null)
                                    .define(RETRIES_CONFIG, MockType.INT, 0)
                                    .define(KEY_SERIALIZER_CLASS_CONFIG, MockType.CLASS, null);
    }

    public MockProducerConfig(Map<?, ?> props) {
        super(CONFIG, props);
    }

}

public class MockAbstractConfig {

    /**
     * 用户原始配置
     */
    private final Map<String, ?> originals;

    /**
     * 解析后配置
     */
    private final Map<String, Object> values;

    @SuppressWarnings("unchecked")
    public MockAbstractConfig(MockConfigDef definition, Map<?, ?> originals) {
        // 校验key是否都属于String
        Optional keyString = originals.keySet().stream().filter(key -> !(key instanceof String)).findFirst();
        if (keyString.isPresent()) {
            throw new MockConfigException(keyString.get().toString(), originals.get(keyString.get()), "Key must be a string.");
        }

        this.originals = (Map<String, ?>) originals;
        this.values = definition.parse(this.originals);
    }

    public Integer getInt(String key) {
        return (Integer) get(key);
    }

    @SuppressWarnings("unchecked")
    public List<String> getList(String key) {
        return (List<String>) get(key);
    }

    protected Object get(String key) {
        if (!values.containsKey(key)) {
            throw new MockConfigException(String.format("Unknown configuration '%s'", key));
        }
        return values.get(key);
    }

    /**
     * 利用反射实例化,很好的设计值得借鉴,key.serializer, value.serializer都直接复用这个接口了
     */
    public <T> T getConfiguredInstance(String key, Class<T> t) {
        Class<?> c = getClass(key);
        if (c == null) {
            return null;
        }
        Object o = Utils.newInstance(c);
        if (!t.isInstance(o)) {
            throw new MockKafkaException(c.getName() + " is not an instance of " + t.getName());
        }
        return t.cast(o);
    }

    public Class<?> getClass(String key) {
        return (Class<?>) get(key);
    }
}

public class MockConfigDef {
    public static final String NO_DEFAULT_VALUE = "";
    private final Map<String, MockConfigKey> configKeys = new HashMap<>();

    public static class MockConfigKey {
        public final String name;
        public final MockType type;
        public final Object defaultValue;

        public MockConfigKey(String name, MockType type, Object defaultValue) {
            this.name = name;
            this.type = type;
            this.defaultValue = defaultValue;
        }

        public boolean hasDefault() {
            return this.defaultValue != NO_DEFAULT_VALUE;
        }
    }

    /**
     * 配置类型
     */
    public enum MockType {
        BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD
    }

    public MockConfigDef define(String name, MockType type, Object defaultValue) {
        if (configKeys.containsKey(name)) {
            throw new MockConfigException("Configuration " + name + " is defined twice.");
        }
        Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
        configKeys.put(name, new MockConfigKey(name, type, parsedDefault));
        return this;
    }


    public Map<String, Object> parse(Map<?, ?> props) {
        Map<String, Object> values = new HashMap<>();
        for (MockConfigKey key : configKeys.values()) {
            Object value;
            // props map contains setting - assign ConfigKey value
            if (props.containsKey(key.name)) {
                value = parseType(key.name, props.get(key.name), key.type);
                // props map doesn't contain setting, the key is required because no default value specified - its an error
            } else if (key.defaultValue == NO_DEFAULT_VALUE) {
                throw new MockConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
            } else {
                // otherwise assign setting its default value
                value = key.defaultValue;
            }

            // 当然还需要校验传入参数有效性,这里省略

            values.put(key.name, value);
        }
        return values;
    }

    private Object parseType(String name, Object value, MockType type) {
        try {
            if (value == null) {
                return null;
            }

            String trimmed = null;
            if (value instanceof String) {
                trimmed = ((String) value).trim();
            }

            switch (type) {
                case STRING:
                    if (value instanceof String) {
                        return trimmed;
                    } else {
                        throw new MockConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
                    }
                case INT:
                    if (value instanceof Integer) {
                        return (Integer) value;
                    } else if (value instanceof String) {
                        return Integer.parseInt(trimmed);
                    } else {
                        throw new MockConfigException(name, value, "Expected value to be an number.");
                    }
                case LIST:
                    if (value instanceof List) {
                        return (List<?>) value;
                    } else if (value instanceof String) {
                        if (trimmed.isEmpty()) {
                            return Collections.emptyList();
                        } else {
                            return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
                        }
                    } else {
                        throw new MockConfigException(name, value, "Expected a comma separated list.");
                    }
                case CLASS:
                    if (value instanceof Class) {
                        return (Class<?>) value;
                    } else if (value instanceof String) {
                        return Class.forName(trimmed, true, MockUtils.getContextOrMockKafkaClassLoader());
                    } else {
                        throw new MockConfigException(name, value, "Expected a Class instance or class name.");
                    }
                default:
                    throw new IllegalStateException("Unknown type.");
            }
        } catch (NumberFormatException e) {
            throw new MockConfigException(name, value, "Not a number of type " + type);
        } catch (ClassNotFoundException e) {
            throw new MockConfigException(name, value, "Class " + value + " could not be found.");
        }
    }
}

public class MockConfigException extends MockKafkaException {
    private static final long serialVersionUID = 1L;

    public MockConfigException(String message) {
        super(message);
    }

    public MockConfigException(String name, Object value) {
        this(name, value, null);
    }

    public MockConfigException(String name, Object value, String message) {
        super("Invalid value " + value + " for configuration " + name + (message == null ? "" : ": " + message));
    }
}

public class MockKafkaException extends RuntimeException {
    private final static long serialVersionUID = 1L;

    public MockKafkaException(String message, Throwable cause) {
        super(message, cause);
    }

    public MockKafkaException(String message) {
        super(message);
    }

    public MockKafkaException(Throwable cause) {
        super(cause);
    }

    public MockKafkaException() {
        super();
    }
}

public class MockUtils {

    public static ClassLoader getContextOrMockKafkaClassLoader() {
        ClassLoader cl = Thread.currentThread().getContextClassLoader();
        return cl == null ? getMockKafkaClassLoader() : cl;
    }

    public static ClassLoader getMockKafkaClassLoader() {
        return MockUtils.class.getClassLoader();
    }

    public static <T> T newInstance(Class<T> c) {
        try {
            return c.newInstance();
        } catch (IllegalAccessException e) {
            throw new MockKafkaException("Could not instantiate class " + c.getName(), e);
        } catch (InstantiationException e) {
            throw new MockKafkaException("Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?", e);
        } catch (NullPointerException e) {
            throw new MockKafkaException("Requested class was null", e);
        }
    }
}

拦截器模块

序列化模块

分区模块

RecordAccumulator模块

总览之并发安全设计
  1. 队列对象因为读多写少所以用cow,key是TopPartition,所以写少。
  2. kafka同步发送如何实现,同步发送在send()结果加个get()即可实现,本质上同步是调用FutureRecordMetadata.get方法, 实际是使用CountDownLatch实现。
  3. NIO需要有Buffer,而创建和销毁Buff比较耗时,所以Kafka弄了个个BufferPool,BufferPool线程安全是仿造AQS实现
  4. 有些不可变类的设计保证线程安全
  5. append大体流程(这里只是提取关键信息,详细可看后文)
    5.1 步骤2 加锁,以实现非线程安全队列的插入
    5.2 步骤5 追加失败 则向Buffer对象池中申请对象,Buffer是线程安全的设计
    5.3 向Buffer对象池申请到对象后,再加锁,重试插入必须要重试,否则有可能产生内存碎片。
数据结构
数据结构.png
前置知识之队列对象
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();

public final class TopicPartition implements Serializable {

    private int hash = 0;
    private final int partition;
    private final String topic;

    public TopicPartition(String topic, int partition) {
        this.partition = partition;
        this.topic = topic;
    }

    public int partition() {
        return partition;
    }

    public String topic() {
        return topic;
    }

    @Override
    public int hashCode() {
        if (hash != 0)
            return hash;
        final int prime = 31;
        int result = 1;
        result = prime * result + partition;
        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
        this.hash = result;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        TopicPartition other = (TopicPartition) obj;
        if (partition != other.partition)
            return false;
        if (topic == null) {
            if (other.topic != null)
                return false;
        } else if (!topic.equals(other.topic))
            return false;
        return true;
    }

    @Override
    public String toString() {
        return topic + "-" + partition;
    }

}
前置知识之MemoryRecords
MemoryRecords:
private final Compressor compressor;
private ByteBuffer buffer;

Compressor: 
private final CompressionType type;
private final DataOutputStream appendStream;
private final ByteBufferOutputStream bufferStream;
装饰器模式典型应用.png
public Compressor(ByteBuffer buffer, CompressionType type) {
    // 通过kafka配置设置的压缩类型
    this.type = type;
    appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}

static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
    try {
        switch (type) {
            case NONE:
                return new DataOutputStream(buffer);
                        // 装饰器模式GZIPOutputStream jdk自带
            case GZIP:
                return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
                      // snappy压缩方式,可以使用反射获取,这种方式在不额外依赖jar包的情况下,可以不用额外依赖
            case SNAPPY:
                try {
                    OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
                    return new DataOutputStream(stream);
                } catch (Exception e) {
                    throw new KafkaException(e);
                }
            case LZ4:
                try {
                    OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
                    return new DataOutputStream(stream);
                } catch (Exception e) {
                    throw new KafkaException(e);
                }
            default:
                throw new IllegalArgumentException("Unknown compression type: " + type);
        }
    } catch (IOException e) {
        throw new KafkaException(e);
    }
}

前置知识之RecordBatch
FutureRecordMetadata:
private final ProduceRequestResult result;
public RecordMetadata get() throws InterruptedException, ExecutionException {
    this.result.await();
    return valueOrError();
}

ProduceRequestResult: 
private final CountDownLatch latch = new CountDownLatch(1);
public void await() throws InterruptedException {
    latch.await();
}
public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
    this.topicPartition = topicPartition;
    this.baseOffset = baseOffset;
    this.error = error;
    // 这里实现了,get同步的结束
    this.latch.countDown();
}
前置知识之BufferPool
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
    
    //加锁同步
    this.lock.lock();
    try {
        // 请求的是指定大小而且free有空闲的则直接从空闲队列返回
        if (size == poolableSize && !this.free.isEmpty())
            return this.free.pollFirst();

        
        // free队列都是固定poolableSize大小的
        int freeListSize = this.free.size() * this.poolableSize;
        if (this.availableMemory + freeListSize >= size) {
            // 为了让availableMemory > size,freeUp方法会从free队列不断释放ByteBuff
            freeUp(size);
            this.availableMemory -= size;
            lock.unlock();
            // new一个Heap的ByteBuff
            return ByteBuffer.allocate(size);
        } else {
            // 没有足够空间只能阻塞
            int accumulated = 0;
            ByteBuffer buffer = null;
            Condition moreMemory = this.lock.newCondition();
            long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
            //放到等待队列
            this.waiters.addLast(moreMemory);
            // 循环等待
            while (accumulated < size) {
                long startWaitNs = time.nanoseconds();
                long timeNs;
                boolean waitingTimeElapsed;
                try {
                    // 阻塞等待, await会释放锁
                    waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                } catch (InterruptedException e) {
                    // 异常则移除阻塞队列
                    this.waiters.remove(moreMemory);
                    throw e;
                } finally {
                    // 时间统计
                    long endWaitNs = time.nanoseconds();
                    timeNs = Math.max(0L, endWaitNs - startWaitNs);
                    this.waitTime.record(timeNs, time.milliseconds());
                }
                // 超时报错
                if (waitingTimeElapsed) {
                    this.waiters.remove(moreMemory);
                    throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                }
                
                remainingTimeToBlockNs -= timeNs;
                // 有空闲了
                if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                    // just grab a buffer from the free list
                    buffer = this.free.pollFirst();
                    accumulated = size;
                } else {
                    // 先分配一部分等更多的
                    freeUp(size - accumulated);
                    int got = (int) Math.min(size - accumulated, this.availableMemory);
                    this.availableMemory -= got;
                    accumulated += got;
                }
            }

        
            // 移除等待队列
            Condition removed = this.waiters.removeFirst();
            if (removed != moreMemory)
                throw new IllegalStateException("Wrong condition: this shouldn't happen.");

            // 还是有空闲唤醒下一个线程
            if (this.availableMemory > 0 || !this.free.isEmpty()) {
                if (!this.waiters.isEmpty())
                    this.waiters.peekFirst().signal();
            }

            // 解锁
            lock.unlock();
            if (buffer == null)
                return ByteBuffer.allocate(size);
            else
                return buffer;
        }
    } finally {
        if (lock.isHeldByCurrentThread())
            lock.unlock();
    }
}
public void deallocate(ByteBuffer buffer, int size) {
    lock.lock();
    try {
        // 释放的是poolableSize大小的,则放入free队列管理
        if (size == this.poolableSize && size == buffer.capacity()) {
            buffer.clear();
            this.free.add(buffer);
        } else {
            //释放的不是poolableSize大小,仅仅修改availableMemory的值
            this.availableMemory += size;
        }
        // 唤醒一个因空间不足的线程
        Condition moreMem = this.waiters.peekFirst();
        if (moreMem != null)
            moreMem.signal();
    } finally {
        lock.unlock();
    }
}
append实现逻辑
public RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Callback callback,
                                 long maxTimeToBlock) throws InterruptedException {
    // 统计正在向RecordAccmulator中追加的线程数
    appendsInProgress.incrementAndGet();
    try {
        // 步骤1 查找TopPartition对应的Deque
        Deque<RecordBatch> dq = getOrCreateDeque(tp);
        // 步骤2 加锁,synchronized不把free.allocate一起加进来的原因是:
        // 减少锁的持有时间,free.allocate会阻塞,假设线程1消息比较大,线程2消息比较小
        // 线程1不能插入现有的RecordBatch需要new一个,线程2可以插入,此时如果free.allocate也在同步代码块
        // 如果线程2这样的线程比较多,则会造成多个线程阻塞。
        synchronized (dq) {
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            RecordBatch last = dq.peekLast();
            if (last != null) {
                // 步骤3 追加record
                FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
                if (future != null)
                    // 步骤4 成功则直接返回
                    return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
            }
        }

        // 步骤5 追加失败 则向Buffer对象池中申请对象
        ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
        // 这里加锁是为了避免内存碎片,如果不加锁多线程可能会创建多个RecordBatch,后续线程插入只插入最后一个
        // 那么,有几个已经RecordBatch就不会被插入。
        synchronized (dq) {
            // Need to check if producer is closed again after grabbing the dequeue lock.
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            RecordBatch last = dq.peekLast();
            if (last != null) {
                // 步骤7 追加record,重试的原因为: 此时如果有多个线程阻塞在free.allocate,某个线程成功了,则新建RecordBatch
                // 其他线程插入此时的RecordBatch即可
                FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
                if (future != null) {
                    // 步骤8 追加成功需释放资源并返回,因为只有new的RecordBatch才需要申请新的Buffer
                    free.deallocate(buffer);
                    return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                }
            }
            MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
            RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
            // 步骤9 在新建的RecordBatch中新增record
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

            dq.addLast(batch);
            // 步骤10 添加到incomplete中
            incomplete.add(batch);
            return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
        }
    } finally {
        appendsInProgress.decrementAndGet();
    }
}

Sender发送模块

发送时序图
发送时序图.png
send和poll主体流程
send和poll主体流程.png
poll流程
public void poll(long timeout) throws IOException {
    
  /* check ready keys */
  int numReadyKeys = select(timeout);

  if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
      Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

      pollSelectionKeys(readyKeys, false, endSelect);
      // 清除所有SelectionKey,避免下一次在进行处理
      readyKeys.clear();
      
      //处理发起连接时,马上就建立连接的请求,这种一般只在broker和client在同一台机器上才存在
      pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
      immediatelyConnectedKeys.clear();
  }
  //将暂存起来的网络响应添加到已完成网络响应集合里面
  addToCompletedReceives();
}

void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                       boolean isImmediatelyConnected,
                       long currentTimeNanos) {
  for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
    KafkaChannel channel = channel(key);
 
    boolean sendFailed = false;
    //READ事件
    if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
    && !explicitlyMutedChannels.contains(channel)) {

      NetworkReceive networkReceive;

      //read方法会从channel中将数据读取到Buffer中(还是通过KafkaChannel中的transportLayer),
      while ((networkReceive = channel.read()) != null) {
        if (!stagedReceives.containsKey(channel))
          stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());

        //将读到的请求存起来
        Deque<NetworkReceive> deque = stagedReceives.get(channel);
        deque.add(receive);
      }
  }

  //写事件
  if (channel.ready() && key.isWritable()) {
    //从buffer中写入数据到Channel(KafkaChannel中的transportLayer)
    Send send = channel.write();
  }
}


 key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
  1. 应用在 Server 端时,Server 为了保证消息的时序性,在 Selector 中提供了两个方法:mute(String id) 和 unmute(String id),对该 KafkaChannel 做标记来保证同时只能处理这个 Channel 的一个 request(可以理解为排它锁)。当 Server 端接收到 request 后,先将其放入 stagedReceives 集合中,此时该 Channel 还未 mute,这个 Receive 会被放入 completedReceives 集合中。Server 在对 completedReceives 集合中的 request 进行处理时,会先对该 Channel mute,处理后的 response 发送完成后再对该 Channel unmute,然后才能处理该 Channel 其他的请求
  2. 应用在 Client 端时,Client 并不会调用 Selector 的 mute() 和 unmute() 方法,client 发送消息的时序性而是通过 InFlightRequests(保存了max.in.flight.requests.per.connection参数的值) 和 RecordAccumulator 的 mutePartition 来保证的,因此对于 Client 端而言,这里接收到的所有 Receive 都会被放入到 completedReceives 的集合中等待后续处理。
  3. 传输过程中tcp能保证消息按顺序到达
  1. sender 线程第一次调用 poll() 方法时,初始化与 node 的连接;
  2. sender 线程第二次调用 poll() 方法时,发送 Metadata 请求;
  3. sender 线程第三次调用 poll() 方法时,获取 metadataResponse,并更新 metadata。
kafka生产者配置对应源码部分
参数名称 说明 默认值
acks 用于设置在什么情况一条才被认为已经发送成功了。acks=0:msg 只要被 producer 发送出去就认为已经发送完成了;acks=1:如果 leader 接收到消息并发送 ack (不会等会该 msg 是否同步到其他副本)就认为 msg 发送成功了; acks=all或者-1:leader 接收到 msg 并从所有 isr 接收到 ack 后再向 producer 发送 ack,这样才认为 msg 发送成功了,这是最高级别的可靠性保证。 1
buffer.memory producer 可以使用的最大内存,如果超过这个值,producer 将会 block max.block.ms 之后抛出异常 32mb
compression.type Producer 数据的压缩格式,可以选择 none、gzip、snappy、lz4, 策略模式实现 none
retries msg 发送失败后重试的次数,允许重试,如果 max.in.flight.requests.per.connection 设置不为1,可能会导致乱序. 实现原理是可重试的异常重新丢入队列 0
batch.size producer 向 partition 发送数据时,是以 batch 形式的发送数据,当 batch 的大小超过 batch.size 或者时间达到 linger.ms 就会发送 batch 16kb
linger.ms 在一个 batch 达不到 batch.size 时,这个 batch 最多将会等待 linger.ms 时间,超过这个时间这个 batch 就会被发送 0
max.in.flight.requests.per.connection 对一个 connection,同时发送最大请求数,不为1时,不能保证顺序性。 5
设计模式学习
  1. 配置时配置TransportLayer的实现类,运行时根据配置参数决定具体实现类
  2. Builder模式,解析配置时使用, 效果类似连续. append().append()
  3. 装饰器模式 new DatOp(new BuyyteOp)

参考文章

上一篇下一篇

猜你喜欢

热点阅读