并发(个人收集)

Kafka 生产者

2019-11-06  本文已影响0人  一生逍遥一生

Kafka的版本为2.0.0、jdk 1.8.0_231、scala 2.11.8。

生产者客户端开发

必要的参数配置

  • bootstrap.servers:指定生产者客户端连接Kafka集群所需的broker地址,多个使用地址使用逗号分割。
  • key.serializer和value.serializer:broker端接收的消息必须以字节数组的形式存在。

在编写代码的过程中,参数的名称,可以使用ProducerConfig中的变量(在java代码中),如果是scala代码,需要自己编写参数。

KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。

消息的发送

Kafka生产者发送消息的模式为:发后即忘和异步(Kafka0.8.2之后,全部都是使用异步调用)。

发后即忘

只管发送消息,不管消息是否正确到达。会出现消息丢失,这种发送方式,性能最高,可靠性最差。

异步调用

Kafka发送消息都是异步,都会发送一个Future<RecordMetadata>对象,如果想要使用同步的方式,可以在后面调用get方法,
get方法会阻塞Kafka的响应,直到消息发送成功,或者发送异常。RecordMetaData中含有一些元数据:主题、分区号、分区中的偏移量、hash值。

try{
    Future<RecordMetadata> future = producer.send(record);
    RecordMetadata metadata = future.get();
    System.out.println(metadata.topic()+"-"+metadata.partition()+"-"+metadata.offset());
}catch (ExecutionException | InterruptedException e){ //jdk版本要合适,本人使用的jdk1.8,否则会报错。
    e.printStackTrace();
}

Kafka中一般发生两种类型的异常:可重试异常和不可重试异常。常见的可重试异常:NetworkException(网络故障)、LeaderNotAvailableException(分区leader副本不可用)、
UnknownTopicOrPartitionException、NotEnoughReplicasExceotion、NotCoordinatorException等。不可重试异常:RecordTooLargeException等。为了保证
消息可以顺利达到,可以设置重试次数,参数为retries。

在调用send方法来发送消息时,可以指定或者不指定调用函数,如果想用阻塞请求,可以调用get方法。

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("topic-demo", "key".getBytes(), "value".getBytes());
producer.send(record,
    new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception e) {
    if(e != null)
        e.printStackTrace();
    System.out.println("The offset of the record we just sent is: " + metadata.offset());
    }
});

同一分区发送的数据,每条记录的callback发送都是按照顺序执行的。
KafkaProducer的close方法会阻塞等待之前所有发送的请求完成之后再关闭KafkaProducer。
KafkaProducer使用total.memory.bytes来控制Producer缓存数据的最大字节数(要保证内存充足)。
异步调用,需要设置block.on.buffer.full=false。

序列化

  • 1.默认序列化实现
    KafkaProducer默认的编码格式为UTF-8。在编写自动代码之前,需要添加相应的依赖:
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.10</version>
    <scope>provided</scope>
</dependency>

下面是自定义的序列化代码:

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanySerializer implements Serializer<Company> {
    private String encoding = "UTF8";
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    public byte[] serialize(String topic, Company data) {
        try {
            if (null == data){
                return null;
            }
            byte[] name,address;
            if (null != data.getName()){
                name = data.getName().getBytes(encoding);
            }else {
                name = new byte[0];
            }
            if (null != data.getAddress()){
                address = data.getAddress().getBytes(encoding);
            }else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length+address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        }catch (UnsupportedEncodingException e){
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
        return new byte[0];
    }
    public void close() {

    }
}

反序列化:

package com.edu.kafka;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanyDeserializer implements Deserializer<Company> {

    private String encoding = "UTF8";

    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("deserializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    public Company deserialize(String topic, byte[] data) {
        if (null == data){
            return null;
        }
        if (data.length < 8){
            throw new SerializationException("Size of data received by deserializer is shorter than expected!");
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        int nameLen,addressLen;
        String name,address;
        nameLen = buffer.getInt();
        byte[] nameBytes = new byte[nameLen];
        buffer.get(nameBytes);
        addressLen = buffer.getInt();
        byte[] addressBytes = new byte[addressLen];
        buffer.get(addressBytes);
        try {
            name = new String(nameBytes,encoding);
            address = new String(addressBytes,encoding);
        }catch (UnsupportedEncodingException e){
            throw new SerializationException("Error occur when deserializing!");
        }
        return new Company(name,address);
    }
    public void close() {

    }
}
  • 2.使用Protostuff实现序列化

添加相应依赖:

<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.6.2</version>
</dependency>

序列化:

package com.edu.kafka;

import io.protostuff.LinkedBuffer;
import io.protostuff.ProtobufIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanySerializer implements Serializer<Company> {
    private String encoding = "UTF8";

    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    public byte[] serialize(String topic, Company data) {
        if (null == data){
            return null;
        }
        Schema schema = RuntimeSchema.getSchema(data.getClass());
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        byte[] protostuff = null;
        try {
            protostuff = ProtobufIOUtil.toByteArray(data,schema,buffer);
        }catch (Exception e){
            throw new IllegalStateException(e.getMessage(),e);
        }finally {
            buffer.clear();
        }
        return protostuff;
    }

    public void close() {

    }
}

反序列化:

package com.edu.kafka;

import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanyDeserializer implements Deserializer<Company> {

    private String encoding = "UTF8";

    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("deserializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    public Company deserialize(String topic, byte[] data) {
        if (null == data){
            return null;
        }
        Schema schema = RuntimeSchema.getSchema(Company.class);
        Company company = new Company();
        ProtostuffIOUtil.mergeFrom(data,company,schema);
        return company;
    }

    public void close() {

    }
}

使用序列化:

package com.edu.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerAnalysis {

    public static final String topic = "topic-demo";

    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());
        KafkaProducer<String, Company> producer = new KafkaProducer<String, Company>(prop);
        Company company = Company.builder().name("xiaoyao").address("Beijing").build();
        ProducerRecord<String, Company> record = new ProducerRecord<String, Company>("topic-demo", company);
        System.out.println("===start===");
        try {
            producer.send(record);
        } catch (Exception e) {
            System.out.println("===exception===");
            e.printStackTrace();
        }

        producer.close();
        System.out.println("===finish===");
    }
}

区分器

消息经过序列化之后就需要确定他们发往的分区。如果ProducerRecord指定分区,就会发往指定的分区,
如果没有指定,就需要依赖分区器,根据key字段的哈希值选择一个分区,如果分区和key都没有指定,使用轮训的方式。
在Kafka0.8.2.2中,有一个默认的区分器:Partitioner。
在Kafka2.0.0中有一个默认分区器:DefaultPartitioner,并且提供了Partitioner接口,用户可以自定义实现分区器。

自定义分区器:

package com.edu.kafka;


import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomPartitioner implements Partitioner {

    private final AtomicInteger counter = new AtomicInteger(0);

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
        int numPartitions = partitionInfos.size();
        if (null == keyBytes){
            return counter.getAndIncrement() % numPartitions;
        }else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    public void close() {

    }

    public void configure(Map<String, ?> map) {

    }
}

在Kafka 0.8.2.2版本中,也可以使用上面的代码,需要将implements Partitioner去掉,然后修改下面的key移动到相应分区的代码。

生产者拦截器

拦截器是在Kafka0.10.0.0之后才引入的功能。
KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。
KafkaProducer会在消息被应答之前或者消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于
用户设定的Callback之前执行。
close()方法主要用于在关闭拦截器时执行一些资源的清理工作。
自定义的拦截器:

package com.edu.kafka;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CustomProducerInterceptor implements ProducerInterceptor<String,String> {
    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        String modifiedValue = "prefix1-"+record.value();
        return new ProducerRecord<String, String>(record.topic(),record.partition(),
                record.timestamp(),record.key(),modifiedValue,record.headers());
    }

    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (null == exception){
            sendFailure++;
        }else{
            sendFailure++;
        }
    }

    public void close() {
        double successRatio = (double)sendSuccess/(sendSuccess + sendFailure);
        System.out.println("[INFO] send ratio="+String.format("%f",successRatio*100)+"%");
    }

    public void configure(Map<String, ?> configs) {

    }
}

KafkaProducer可以指定多个拦截器,形成拦截链,按照指定的顺序执行。

原理分析

整体架构

20190620170845181.png

执行流程:
1.在启动main方法之后,KafkaProducer发送的数据到ProducerInterceptor拦截器。
2.ProducerInterceptor拦截器对数据进行过滤,然后将数据发送到序列化。
3.将过滤之后的数据发送到序列化,然后发送到不同的分区。
4.分区中数据,会发送到RecordAccumulator(消息累加器),RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络的资源消耗以提升性能,默认大小为 buffer.memory=32MB,max.block.ms = 60000(超时时间)。
RecordAccumulator维护多个双端队列,数据会堆积到一个ProducerBatch(多个ProducerRecord),然后批量发送。消息在网络上都是以字节的形式传输,RecordAccumulator的内部有一个BufferPool,用来实现ByteBuffer的复用,以实现缓存的高效利用。
5.Sender从RecordAccumulator中获取缓存数据,转换为<Node,List<ProducerBatch>>,Node表示broker节点。
6.将<Node,List<ProducerBatch>>转换为<NodeRequest>的形式。
7.将信息缓存起来,保存到InFlightRequests中,InFlightRequest保存对象的具体形式为Map<NodeId,Deque<Request>>,它的主要作用是缓存已经发出去但还没有收到响应的请求。
8.将信息提交给selector准备发送。
9.selector将信息发送到KafkaCluster,KafkaCluster信息接收到信息之后,返回数据。
10.selector将KafkaCluster发送过来的数据,传递给InFlightRequests。
11.InFlightRequests接收到信息之后,会将信息发挥给主进程。

元数据的更新

InFlightRequests中可以获取LeastLoadedNode,即所有Node中负载最小。元数据操作是在客户端内部进行的。更新元数据时,
会挑选出LeastLoadedNode,然后这个Node发送MetadataRequest请求来获取具体的元数据信息。这里的数据同步是通过
synchronized和final来保证。

生产者参数

参数 默认值 描述
acks 1 指定分区中必须要有多少个副本收到这条信息
acks = 1,如果失败,会重发,只要有一个接收,就是成功;acks = 0,不需要等待服务端的响应;acks = -1,需要将所有的副本都返回成功才可以。
max.request.sizes 1MB 发送消息的最大值
retries 0 重试次数
retries.backoff.ms 100 重试之间时间间隔
compress.type none 可以对消息进行压缩
connections.max.idle.ms 540000ms 闲置多长时间之后关闭连接
receive.buffer.bytes 32KB 接收消息缓存区大小
send.buffer.bytes 128KB 发送消息缓存区大小
request.timeout.ms 3000ms 等待请求响应的最大时间

其他的参数请参考官网

参考文献

kafka消息发送模式
深入理解Kafka:核心设计与实践原理
Kafka Documentation

后续

本文是《深入理解Kafka:核心设计与实践原理》的读书笔记。

上一篇 下一篇

猜你喜欢

热点阅读