
4. kafka生产者&消费者

Properties props = new Properties();
// kafka集群地址
props.put("bootstrap.servers", "");
// kafka消息key的序列化方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// kafka消息value的序列化方式
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);



public class ProducerRecord<K, V> {

     * 所有构造方法最后都是调用这个构造方法, 所以弄明白这个构造方法所有参数含义就可以了
     * Creates a record with a specified timestamp to be sent to a specified topic and partition
     * @param topic - The topic the record will be appended to, topic名称
     * @param partition - The partition to which the record should be sent, 消息发送的目标分区名称, 如果不指定, kafka会根据Partitioner计算目标分区
     * @param timestamp - The timestamp of the record, in milliseconds since epoch. If null, the producer will assign
     *                  the timestamp using System.currentTimeMillis(). 消息发送的指定时间戳, 默认为当前时间
     * @param key - The key that will be included in the record, 消息的key, kafka根据这个key计算分区
     * @param value - The record contents 消息的内容
     * @param headers - the headers that will be included in the record
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        // topic是构造ProducerRecord的必传参数
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        // 发送的时间戳不能为负数
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        // 分区值不能为负数
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        this(topic, partition, timestamp, key, value, null);

    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
        this(topic, partition, null, key, value, headers);
    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    public ProducerRecord(String topic, V value) {
        this(topic, null, null, null, value, null);
    ... ...
// ProducerRecord就是发送的信息对象, 包括: topic名称, key(可选), value(发送的内容)
// key的用途主要是:消息的附加信息,用来决定消息被写到哪个分区,拥有相同key的消息会被写到同一个分区
ProducerRecord<String, String> record = new ProducerRecord<>("ORDER-DETAIL",
    JSON.toJSONString(new Order(201806260001L, new Date(), 98000, "desc", "165120001")));


Properties props = new Properties();
// kafka集群地址
props.put("bootstrap.servers", "");
// ConsumerGroup即消费者组名称
props.put("group.id", "afei");
// kafka消息key的反序列化方式
props.put("key.deserializer",   "org.apache.kafka.common.serialization.StringDeserializer");
// kafka消息value的序列化方式
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
// 订阅的topic名称
try {
    while (true) {
        // 消费者必须持续从kafka进行轮询, 否则会被认为死亡, 从而导致它处理的分区被交给同一ConsumerGroup的其他消费者
        ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
        // 为了防止消费者被认为死亡, 需要尽可能确保处理消息工作尽快完成
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("message content: "+GSON.toJson(record));
            System.out.println("message value  : "+record.value());
        // 每次消费完后异步提交
}finally {
    // 消费者关闭之前调用同步提交





<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"

    <!-- 定义producer的参数 -->
    <bean id="producerProperties" class="java.util.HashMap">
                <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
                <entry key="retries" value="${kafka.reties}" />
                <entry key="retry.backoff.ms" value="${kafka.retry.backoff.ms}" />
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />

    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg ref="producerProperties"/>

    <!-- 创建KafkaTemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <constructor-arg name="autoFlush" value="true" />
        <property name="defaultTopic" value="defaultTopic" />

 * @author afei
 * @version 1.0.0
 * @since 2018年06月11日
public class MyKafkaProducer {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    // 发送的消息统一通过google-gson序列化
    private static final Gson GSON = new Gson();
    // KafkaTemplate就是上面xml文件中定义的bean
    private KafkaTemplate<String, String> kafkaTemplate;

    public boolean send(String topic, String key, Object msg){
        // 将发送的消息序列化为json
        String json = toJsonString(msg);
        try {
            ListenableFuture<SendResult<String, String>> futureResult = kafkaTemplate.send(
                    topic, key, json);
            logger.info("Kafka send json: {}, topicName: {}", json, topic);
            SendResult<String, String> result = futureResult.get();
            // 这里的输出日志, 不能用fastjson, fastjson默认依赖bean的setter/getter方法,
            // 而SendResult中的RecordMetadata的属性并没有setter/getter方法
            logger.info("Kafka send result: {}", GSON.toJson(result));
            return result!=null;
        } catch (Throwable e) {
            logger.error("Kafka send failed.", e);
        return false;

    public boolean send(String topic, Object msg){
        return send(topic, null, msg);

    private String toJsonString(Object o) {
        String value;
        if (o instanceof String) {
            value = (String) o;
        } else {
            value = JSON.toJSONString(o);
        return value;


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"

    <!-- 定义consumer的参数 -->
    <bean id="consumerProperties" class="java.util.HashMap">
                <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
                <entry key="retries" value="${kafka.reties}" />
                <entry key="retry.backoff.ms" value="${kafka.retry.backoff.ms}" />
                <!-- 是否自动提交 -->
                <entry key="enable.auto.commit" value="${kafka.enable.auto.commit}"/>
                <!-- 自动提交的间隔时间 -->
                <entry key="auto.commit.interval.ms" value="${kafka.auto.commit.interval.ms}"/>
                <!-- 重启后是否从最新的offset地址消费 -->
                <entry key="auto.offset.reset" value="${kafka.auto.offset.reset}"/>
                <entry key="key.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
                <entry key="value.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />

    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg ref="consumerProperties"/>

    <bean id="containerProperties_openLevel3Account" class="org.springframework.kafka.listener.config.ContainerProperties">
        // 消费者消费的topic名称
        <constructor-arg value="${kafka.topic.name}"/>
        // 以开户为例, 消息由OpenAccountKafkaListener处理
        <property name="messageListener" ref="openAccountKafkaListener"/>
        // 即表示ConsumerGroup的groupId
        <property name="groupId" value="${kafka.group.id}"/>

    <bean id="messageListenerContainer_openAccount" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties_openAccount"/>
 * 钱包开户后送积分
 * @author afei
 * @version 1.0.0
 * @since 2018年06月26日
public class OpenAccountKafkaListener implements MessageListener<String, String> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    public void onMessage(ConsumerRecord<String, String> record) {
        logger.debug("msg: {}", record.value());
        // 拿到消息后, 反序列化为OpenAccount
        OpenAccount mqInput = JSON.parseObject(record.value(), OpenAccount.class);
        //TODO 拿到开户信息后, 可以送积分, 送优惠券等



public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {

    // 看这个方法的定义,增加了"default",即我们的业务类如果实现这个这个接口,就不需要实现这个接口,而只需要实现下面的接口即可,下面的接口有Consumer,我们就能主动执行同步提交或者异步提交了
    default void onMessage(ConsumerRecord<K, V> data) {
        throw new UnsupportedOperationException("Container should never call this");

    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);

public class OpenAccountKafkaListener  implements ConsumerAwareMessageListener<String, String> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    public void onMessage(ConsumerRecord<String, String> record, Consumer<?, ?> consumer) {
        logger.info("Receive msg with consumer: {}", record.value());
        OpenAccount mqInput = JSON.parseObject(record.value(), OpenAccount.class);

        try {
            //TODO 拿到开户信息后, 可以送积分, 送优惠券等
        } finally {
            // 消息处理完后异步提交
            consumer.commitAsync((offsets, exception) -> {
                if (exception==null){
                    // offsets需要用gson序列化输出
                    logger.info("The offset info of commit async: {}", GsonUtils.format(offsets));
                    logger.error("Commit async failed. ", exception);


前面已经介绍了如何使用kafka生产者发送消息,以及如何用消费者接收消息,包括原生方式和spring集成方式,接下来我们跟踪源码看看消息在调用KafkaProducer中的send()后发送到kafka broker之前需要经过哪些处理。

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);



private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        // first make sure the metadata for the topic is available
        // 得到集群信息和已经消耗的时间
        ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
        // 根据参数max.block.ms和已经消息的时间差,得到剩余时间
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        // 集群信息本地变量化
        Cluster cluster = clusterAndWaitTime.cluster;
        // 序列化key
        byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        // 序列化value
        byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
        // 选择分区
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);

        Header[] headers = record.headers().toArray();
        int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                compressionType, serializedKey, serializedValue, headers);
        // 发送的消息size不允许超过max.request.size和buffer.memory两个参数的值
        // 得到消息发送时间,默认为当前时间,除非构造ProducerRecord时指定了timestamp
        long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
        log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
        // producer callback will make sure to call both 'callback' and interceptor callback
        Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

        if (transactionManager != null && transactionManager.isTransactional())

        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs);
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
        return result.future;
        // handling exceptions and record the errors;
        // for API exceptions return them in the future,
        // for other exceptions throw directly
    } catch (Exception e) {
        // we notify interceptor about all exceptions, since onSend is called before anything else in this method
        // 如果发送消息时有异常,那么调用所有拦截器上的onAcknowledgement()方法,所以通过拦截器种onAcknowledgement()方法的exception是否为空,判断消息是否发送成功,从而可以统计发送成功率
        this.interceptors.onSendError(record, tp, e);
        throw e;
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
    // add topic to metadata topic list if it is not there already and reset expiry
    // 第一次获取集群信息,初始化的集群信息为bootstrap.servers参数指定的集群信息
    Cluster cluster = metadata.fetch();
    // 从缓存的主题&分区信息map(Map<String, List<PartitionInfo>>)中获取分区总数
    Integer partitionsCount = cluster.partitionCountForTopic(topic);
    if (partitionsCount != null && (partition == null || partition < partitionsCount))
        // 如果已经缓存了,那么直接返回,且因为没有花时间在获取集群信息上,所以构造方法的第二个参数为0
        return new ClusterAndWaitTime(cluster, 0);
    // 记下开始时间(为了计算获取集群信息消耗的时间)
    long begin = time.milliseconds();
    // 初始化剩余时间就是max.block.ms参数指定的时间(即获取集群信息最大允许阻塞时间,官方文档指KafkaProducer.send() and KafkaProducer.partitionsFor()两步的时间差)
    long remainingWaitMs = maxWaitMs;
    long elapsed;
    do {
        log.trace("Requesting metadata update for topic {}.", topic);
        // needUpdate置为true,并返回版本
        int version = metadata.requestUpdate();
        try {
            // 等待元数据信息更新,直到当前版本号超过上一次版本号version。另外,这个更新过程不能耗时不允许超过remainingWaitMs
            metadata.awaitUpdate(version, remainingWaitMs);
        } catch (TimeoutException ex) {
            // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        // 更新后,再次获取集群信息
        cluster = metadata.fetch();
        // 计算此次更新过程耗时
        elapsed = time.milliseconds() - begin;
        // 如果耗时超过了max.block.ms参数指定的时间,那么抛出异常
        if (elapsed >= maxWaitMs)
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        if (cluster.unauthorizedTopics().contains(topic))
            throw new TopicAuthorizationException(topic);
        // 根据刚才计算的此次更新消耗的时间,计算剩余时间
        remainingWaitMs = maxWaitMs - elapsed;
        // 得到这个topic的分区数
        partitionsCount = cluster.partitionCountForTopic(topic);
        // 如果这个topic分区数获取失败,那么继续获取,直到耗尽max.block.ms指定的时间
    } while (partitionsCount == null);

    // 如果构造ProducerRecord时指定了分区,且指定的值大于或等于分区数,那么抛出异常(例如,名为"ORDER-DETAIL"的topic,有7个分区,如果构造ProducerRecord时指定了partition的值且为7或者大于7,那么就会抛出这个异常,异常信息为:Invalid partition given with record: 7 is not in the range [0...7).)
    if (partition != null && partition >= partitionsCount) {
        throw new KafkaException(
                String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));

    // 返回集群信息和剩余时间
    return new ClusterAndWaitTime(cluster, elapsed);


keySerializer.serialize(record.topic(), record.headers(), record.key());
valueSerializer.serialize(record.topic(), record.headers(), record.value());
int partition = partition(record, serializedKey, serializedValue, cluster);
