kafka生产者和消费者api的简单使用
2021-01-09 本文已影响0人
huan1993
一、背景
此处主要是简单记录一下,使用 kafka api 发送消息和接收消息。
二、需要实现的功能
1、生产者实现功能
1、KafkaProducer线程安全的,可以在多线程中使用。
2、消息发送的key和value的序列化
3、自定义分区的使用
4、自定义拦截器的使用
5、消息发送完成后的回调使用
2、消费者实现功能
1、消息接收的key和value的序列化
2、指定消费者组
3、自动提交 offset (生产环境可以使用手动提交offset)
4、重置消费者的偏移量,此配置生效的条件
5、自定义消息消费拦截器
6、每次从服务器获取多少数据
3、详细实现
详细实现,请参考代码中的注释。
三、代码实现
1、jar包的引入
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.0</version>
</dependency>
</dependencies>
2、生产者代码实现
1、自定义分区器
主要目的是为了能自己控制自己的消息需要发送到那个分区中。
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
/**
* @author huan.fu 2021/1/5 - 上午9:57
*/
public class CustomPartitioner extends DefaultPartitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partition = super.partition(topic, key, keyBytes, value, valueBytes, cluster);
System.err.println("消息被分配到分区: " + partition);
return partition;
}
@Override
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
System.err.println("topic: " + topic + " 产生了一个新的批次");
super.onNewBatch(topic, cluster, prevPartition);
}
}
2、自定义消息生产拦截器
主要是消息在发送到kafka服务器之前,最消息进行一个拦截。
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* 自定义生产者消息拦截器,此类不是线程安全的,需要自己控制线程安全问题
*
* @author huan.fu 2021/1/5 - 上午10:04
*/
public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
System.out.println("此方法是在消息发送的业务线程中,可以在消息发送到kafka服务器时做一些处理");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("此方法是在kafka 生产者的 I/O 线程中执行,需要立即返回,否则影响消息发送的速度");
if (exception != null) {
System.out.println("消息发送到服务器时发生了异常");
return;
}
System.out.println("消息发送到了服务器 - offset:" + metadata.offset() + " partition:" + metadata.partition());
}
@Override
public void close() {
System.out.println("释放资源");
}
}
3、生产者代码
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
/**
* 测试kafka发送者
* 1、KafkaProducer线程安全的,可以在多线程中使用。
* 2、消息发送的key和value的序列化
* 3、自定义分区的使用
* 4、自定义拦截器的使用
* 5、消息发送完成后的回调使用
*
* @author huan.fu 2021/1/4 - 上午10:11
*/
public class KafkaProducerDemo {
public static void main(String[] args) {
Properties properties = new Properties();
// kafka服务器的地址,端口,可以不用全写,写几个既可
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
// kafka 消息key的序列化方式
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// kafka 消息value的序列化方式
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 0: 消息只要发送出去了,就认为是发送成功
// 1: 消息被 leader 分区保存后,就认为发送成功
// all: 消息被 leader + isr 都保存成功后,才认为发送成功
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 设置消息重试的次数
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
// 消息发送失败后,重试的间隔,即等待多少毫秒后在重试
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
// kafka 的消息默认是批量发送,即等批次消息达到一定大小(单位字节)的时候发送,此参数控制批次消息的大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// batch.size 设置kafka消息批次达到一定的字节时才发送,那万一一直没有达到,此时就由linger.ms控制,即等待多少毫秒之后发送,等于0表示立即发送
properties.put(ProducerConfig.LINGER_MS_CONFIG, 100);
// 表示生产者可以用来缓存等待发送到服务器上消息的可用内存大小。
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 控制一次请求的大小,防止出现一个非常大的请求,默认是一兆
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);
// 控制客户端等待服务端响应的最大时间,此配置的值应该要大于 replica.lag.time.max(服务端移除isr的一个超时配置) 的值。
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
// 配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()会阻塞多长时间。这些方法可能因为缓冲区已满或元数据不可用而被阻塞。
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
// 配置kafka自定义的分区器,我们自己实现的分区器,需要实现 org.apache.kafka.clients.producer.Partitioner 接口
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.huan.kafka.api.CustomPartitioner");
// 配置kafka自定义的拦截器,可以配置多个,多个以英文的逗号分割开
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.huan.kafka.api.CustomProducerInterceptor");
// KafkaProducer 是线程安全的,可以多个线程使用用一个 KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("topic-a", "value - (" + i + 1 + ")");
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("发送数据到kafka中,发生了异常.");
exception.printStackTrace();
return;
}
System.out.println("topic: " + metadata.topic() + " offset: " + metadata.offset() + " partition: "
+ metadata.partition());
}
});
}
System.out.println("消息发送完成");
kafkaProducer.close();
}
}
3、消息消费者实现
1、自定义消息消费拦截器
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
/**
* @author huan.fu 2021/1/5 - 下午2:48
*/
public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
System.out.println("从kafka拉取到了数据,可以在此修改记录数据");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
System.out.println("提交偏移量成功时(即已经提交到了broker上),调用此函数.");
}
@Override
public void close() {
System.out.println("close");
}
}
2、消息消费者实现
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* 测试kafka消费者
* 1、消息接收的key和value的序列化
* 2、指定消费者组
* 3、自动提交 offset (生产环境可以使用手动提交offset)
* 4、重置消费者的偏移量,此配置生效的条件
* 5、自定义消息消费拦截器
* 6、每次从服务器获取多少数据
*
* @author huan.fu 2021/1/5 - 上午10:29
*/
public class KafkaConsumerDemo {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
// kafka服务器的地址,端口,可以不用全写,写几个既可
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
// kafka 消息key的序列化方式
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// kafka 消息value的序列化方式
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 设置消费者组,多个消费者只要组一样,就认为在一个组中,那个topic中的某一个分区,只能被这个组中的某个消费组消费,消费者组和消费者组之间互不影响
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-test-001");
// 自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交 offset 的间隔,即隔多长时间提交offset
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
// 重置消费者的偏移量,此配置生效的条件:
// 1、当前消费者的偏移量不在kafka服务器上(比如是一个新的消费者组)
// 2、当前消费者的offset在kafka服务器上不存在(比如: 当前消费者消费的偏移量到了10,但是此时消费者宕机了很长一段时间,而服务器上数据默认保存7天,那么此时10之后的某些偏移量可能被删除了)
// earliest: 从最早的偏移量开始消费
// latest: 从最新的偏移量开始消费
// none: 如果没有在消费者组中找到先前的偏移量,则向消费者抛出异常
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 配置kafka自定义的拦截器,可以配置多个,多个以英文的逗号分割开
properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.huan.kafka.api.CustomConsumerInterceptor");
// 设置两次 poll 方法之间的最大的延时,如果超过了最大的延时,则kafka认为该consumer消费能力弱,会将该分区给别的消费者消费
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// 设置一次 pool 最多获取多少条记录
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// 拉取消息时,每个分区返回的最大的消息的字节数
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,1048576);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 指定consumer消费的主题
consumer.subscribe(Collections.singletonList("topic-a"));
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("接收到了一个数据. partition:" + consumerRecord.partition() + " offset:"
+ consumerRecord.offset() + " 消息的值:" + consumerRecord.value());
TimeUnit.SECONDS.sleep(3);
}
}
}
}