kafkaTemplate对kafkaProducer的封装

2020-05-26  本文已影响0人  陆阳226

kafkatemplate发送方法

public ListenableFuture<SendResult<K, V>> sendDefault(@Nullable V data)
public ListenableFuture<SendResult<K, V>> sendDefault(K key, @Nullable V data)
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data)
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data)
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data)
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data)
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data)
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,@Nullable V data)
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record)
public ListenableFuture<SendResult<K, V>> send(Message<?> message)

这些发送方法内部生成一个ProducerRecord对象,传递给dosend方法,该对象包含了发送给kafka的所有信息

public class ProducerRecord<K, V> {

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

doSend方法:通过getTheProducer获得KafkaProducer对象,KafkaProducer对象发送数据ProducerRecord

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {

    // 获取KafkaPruducer对象
    final Producer<K, V> producer = getTheProducer(producerRecord.topic());
    this.logger.trace(() -> "Sending: " + producerRecord);
    final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
    Object sample = null;
    if (this.micrometerEnabled && this.micrometerHolder == null) {
        this.micrometerHolder = obtainMicrometerHolder();
    }
    if (this.micrometerHolder != null) {
        sample = this.micrometerHolder.start();
    }

    // 执行kafkaProducer的send方法,发送数据
    Future<RecordMetadata> sendFuture =
            producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
    // May be an immediate failure
    if (sendFuture.isDone()) {
        try {
            sendFuture.get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new KafkaException("Interrupted", e);
        }
        catch (ExecutionException e) {
            throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace
        }
    }
    if (this.autoFlush) {
        flush();
    }
    this.logger.trace(() -> "Sent: " + producerRecord);
    return future;
}

getTheProducer方法,前面的是开启事务的Producer创建,暂不了解;对于非事务的Producer创建主要在最后两个if判断中,都是获取当前KafkaTemplateproducerFactoryproducerFactory用来创建KafkaProducer。getProducerFactory(topic)方法通过传入的topic来确定producerFactory,默认是返回this.producerFactory,可以在子类中重写具体的策略。剩下的就转到了producerFactorycreateProducer方法。

protected Producer<K, V> getTheProducer(@SuppressWarnings("unused") @Nullable String topic) {
    // 开启事务时创建事务Producer
    boolean transactionalProducer = this.transactional;
    if (transactionalProducer) {
        boolean inTransaction = inTransaction();
        Assert.state(this.allowNonTransactional || inTransaction,
                "No transaction is in process; "
                    + "possible solutions: run the template operation within the scope of a "
                    + "template.executeInTransaction() operation, start a transaction with @Transactional "
                    + "before invoking the template method, "
                    + "run in a transaction started by a listener container when consuming a record");
        if (!inTransaction) {
            transactionalProducer = false;
        }
    }
    if (transactionalProducer) {
        Producer<K, V> producer = this.producers.get();
        if (producer != null) {
            return producer;
        }
        KafkaResourceHolder<K, V> holder = ProducerFactoryUtils
                .getTransactionalResourceHolder(this.producerFactory, this.transactionIdPrefix, this.closeTimeout);
        return holder.getProducer();
    }
    else if (this.allowNonTransactional) {
        return this.producerFactory.createNonTransactionalProducer();
    }
    // 创建非事务Producer
    else if (topic == null) {
        return this.producerFactory.createProducer();
    }
    else {
        return getProducerFactory(topic).createProducer();
    }
}

createProducer方法:ProducerFactory接口只有一个默认实现类DefaultKafkaProducerFactory,又调用了doCreateProducer方法

public Producer<K, V> createProducer() {
    return createProducer(this.transactionIdPrefix);
}
public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
    String txIdPrefix = txIdPrefixArg == null ? this.transactionIdPrefix : txIdPrefixArg;
    return doCreateProducer(txIdPrefix);
}

doCreateProducer方法:

  1. 第一个if中是创建事务Producer
  2. 第二个if中为每个线程创建一个Producer,Kafka文档中建议多线程共享一个Producer,当然也可以每个线程创建一个
  3. synchronized同步块中是创建非事务的Producer,保证多线程也只会创建一个Producer实例,这个Producer实例通过CloseSafeProducer类代理
private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
    if (txIdPrefix != null) {
        if (this.producerPerConsumerPartition) {
            return createTransactionalProducerForPartition(txIdPrefix);
        }
        else {
            return createTransactionalProducer(txIdPrefix);
        }
    }
    if (this.producerPerThread) {
        CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
        if (this.threadBoundProducerEpochs.get() == null) {
            this.threadBoundProducerEpochs.set(this.epoch.get());
        }
        if (tlProducer != null && this.epoch.get() != this.threadBoundProducerEpochs.get()) {
            closeThreadBoundProducer();
            tlProducer = null;
        }
        if (tlProducer == null) {
            tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
                    this.physicalCloseTimeout, this.beanName);
            for (Listener<K, V> listener : this.listeners) {
                listener.producerAdded(tlProducer.clientId, tlProducer);
            }
            this.threadBoundProducers.set(tlProducer);
            this.threadBoundProducerEpochs.set(this.epoch.get());
        }
        return tlProducer;
    }
    synchronized (this) {
        // 创建一个CloseSafeProducer对象
        if (this.producer == null) {
            this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
                    this.physicalCloseTimeout, this.beanName);
            this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));
        }
        return this.producer;
    }
}

createKafkaProducer方法:根据clientIdPrefix处理配置问题,clientId就是创建的Producer的名字标识

protected Producer<K, V> createKafkaProducer() {
    Map<String, Object> newConfigs;
    if (this.clientIdPrefix == null) {
        newConfigs = new HashMap<>(this.configs);
    }
    else {
        newConfigs = new HashMap<>(this.configs);
        newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
                this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
    }
    checkBootstrap(newConfigs);
    return createRawProducer(newConfigs);
}
protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
    return new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
}

CloseSafeProducer代理了KafkaProducer实例,在创建对象时主要传入了KafkaProducer实例、关闭KafkaProducer的行为:this::removeProducer,其send方法就是调用调用代理的KafkaProducer的send方法。Callback回调中处理的时事务Producer的异常问题。

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    LOGGER.trace(() -> toString() + " send(" + record + ")");
    return this.delegate.send(record, new Callback() {

        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception instanceof OutOfOrderSequenceException) {
                CloseSafeProducer.this.producerFailed = exception;
                close(CloseSafeProducer.this.closeTimeout);
            }
            callback.onCompletion(metadata, exception);
        }

    });
}

CloseSafeProducer,对于非事务Producer来说只有send方法是有用的,其中的close等相关方法都是针对事务Producer。

非事务Producer是通过closeDelegate来关闭的,该方法在destroy方法中调用。在spring应用结束运行时会调用destroy方法,然后Producer的所有资源都会被释放。

上一篇下一篇

猜你喜欢

热点阅读