kafka——Consumer API
一、Kafka 核心 API
下图是官方文档中的一个图,形象的描述了能与 Kafka集成的客户端类型
Kafka的五类客户端API类型如下:
- AdminClient API:允许管理和检测Topic、broker以及其他Kafka实例,与Kafka自带的脚本命令作用类似。
- Producer API:发布消息到1个或多个Topic,也就是生产者或者说发布方需要用到的API。
- Consumer API:订阅1个或多个Topic,并处理产生的消息,也就是消费者或者说订阅方需要用到的API。
- Stream API:高效地将输入流转换到输出流,通常应用在一些流处理场景。
- Connector API:从一些源系统或应用程序拉取数据到Kafka,如上图中的DB。
本文中,我们将主要介绍 Consumer API。
二、Consumer API
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故
不用担心数据丢失问题。
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故
障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。
2.1、导入相关依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
需要用到的类:
- KafkaConsumer:需要创建一个消费者对象,用来消费数据。
- ConsumerConfig:获取所需的一系列配置参数。
- ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象。
自动提交 offset 的相关参数:
- enable.auto.commit:是否开启自动提交 offset 功能
- auto.commit.interval.ms:自动提交 offset 的时间间隔
2.2、自动提交 offset
private static final String TOPIC_NAME = "yibo_topic";
/**
* 工作中这种用法有,但是不推荐
*/
public static void helloWorld(){
Properties properties = new Properties();
//Kafka 集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//消费者组,只要 group.id 相同,就属于同一个消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//自动提交的延迟
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key,value的反序列化类
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
//消费订阅一个或多个topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//每间隔一定时间去拉取消息
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
}
}
}
虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握
offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。
手动提交 offset 的方法有两种:分别是 commitSync(同步阻塞提交)和 commitAsync(异步提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。
2.3、手动提交offset
- 虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞
吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
private static final String TOPIC_NAME = "yibo_topic";
/**
* 手动提交offset
*/
public static void commitedOffset(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
//消费订阅一个或多个topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//每间隔一定时间去拉取消息
try {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
//消息的消费
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
//业务处理异常,则不提交
//throw new RuntimeException("业务处理异常");
}
//手动控制offset提交
consumer.commitAsync();
} catch (Exception e) {
log.error("consumer offset error",e);
}
}
}
2.4、手动提交offset,并重置offset
重置消费者的offset,该配置生效方式:消费者换组
private static final String TOPIC_NAME = "yibo_topic";
/**
* 手动提交offset,并重置offset
*/
public static void restCommitedOffset(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//重置消费者的offset,该配置生效方式:消费者换组
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
//消费订阅一个或多个topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//每间隔一定时间去拉取消息
try {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
//消息的消费
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
//业务处理异常,则不提交
//throw new RuntimeException("业务处理异常");
}
//手动控制offset提交
consumer.commitAsync();
} catch (Exception e) {
log.error("consumer offset error",e);
}
}
}
2.5、手动提交offset,并且手动控制Partition
- 由于同步提交 offset 有失败重试机制,故更加可靠。
private static final String TOPIC_NAME = "yibo_topic";
/**
* 手动提交offset,并且手动控制Partition
*/
public static void commitedOffsetWithPartition(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
//消费订阅一个或多个topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//每间隔一定时间去拉取消息
try {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
//每个partition单独处理
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> recordList = records.records(partition);
for (ConsumerRecord<String, String> record : recordList) {
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
//业务处理异常,则不提交
//throw new RuntimeException("业务处理异常");
}
long lastOffset = recordList.get(recordList.size() - 1).offset();
//单个partition中的offset,并且进行提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition,new OffsetAndMetadata(lastOffset+1));
//对每个Partition做单独的offset提交
consumer.commitSync(offset);
System.out.println("--------------------- partition - " + partition + "end---------------------");
}
} catch (Exception e) {
log.error("consumer offset error",e);
}
}
}
2.6、手动提交offset,并且手动控制Partition,更高级
- 手动订阅某个或某些partition,手动提交offset
private static final String TOPIC_NAME = "yibo_topic";
/**
* 手动提交offset,并且手动控制Partition,更高级
*/
public static void commitedOffsetWithPartition2(){
Properties properties = new Properties();
//Kafka 集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//消费者组,只要 group.id 相同,就属于同一个消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//关闭自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
//自动提交的延迟
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key,value的反序列化类
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME,1);
//消费订阅某个Topic的某个分区
consumer.assign(Arrays.asList(p0));
//消费订阅一个或多个topic
// consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//每间隔一定时间去拉取消息
try {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
//每个partition单独处理
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> recordList = records.records(partition);
for (ConsumerRecord<String, String> record : recordList) {
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
//业务处理异常,则不提交
//throw new RuntimeException("业务处理异常");
}
long lastOffset = recordList.get(recordList.size() - 1).offset();
//单个partition中的offset,并且进行提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition,new OffsetAndMetadata(lastOffset+1));
//对每个Partition做单独的offset提交
consumer.commitSync(offset);
System.out.println("--------------------- partition - " + partition + "end---------------------");
}
} catch (Exception e) {
log.error("consumer offset error",e);
}
}
}
2.7、手动指定Offset的起始位置,及手动提交Offset
private static final String TOPIC_NAME = "yibo_topic";
/**
* 手动指定Offset的起始位置,及手动提交Offset
*/
public static void controlOffset(){
Properties properties = new Properties();
//Kafka 集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//消费者组,只要 group.id 相同,就属于同一个消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//关闭自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
//自动提交的延迟
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key,value的反序列化类
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
//消费订阅某个Topic的某个分区
consumer.assign(Arrays.asList(p0));
while(true){
try {
/**
* 1、人为控制Offset起始位置
* 2、如果出现程序错误,重复消费一次
*/
/**
* 1、第一次从0消费[一般情况]
* 2、比如一次消费了100条,offset置为101并且存入redis中
* 3、每次poll之前从redis中获取最新的Offset位置
* 4、每次从这个位置开始消费
*/
//手动指定Offset的起始位置
consumer.seek(p0,30);
//每间隔一定时间去拉取消息
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
//每个partition单独处理
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> recordList = records.records(partition);
for (ConsumerRecord<String, String> record : recordList) {
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
//业务处理异常,则不提交
//throw new RuntimeException("业务处理异常");
}
long lastOffset = recordList.get(recordList.size() - 1).offset();
//单个partition中的offset,并且进行提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition,new OffsetAndMetadata(lastOffset+1));
//对每个Partition做单独的offset提交
consumer.commitSync(offset);
System.out.println("--------------------- partition - " + partition + "end---------------------");
}
} catch (Exception e) {
log.error("consumer offset error",e);
}
}
}
2.8、流量控制 限流
引入guava依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
private static final String TOPIC_NAME = "yibo_topic";
/*** 令牌生成速率,单位为秒 */
public static final int permitsPerSecond = 1;
/*** 限流器 */
private static final RateLimiter LIMITER = RateLimiter.create(permitsPerSecond);
/**
* 流量控制 限流
*/
public static void controlPause(){
Properties properties = new Properties();
//Kafka 集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//消费者组,只要 group.id 相同,就属于同一个消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//关闭自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
//自动提交的延迟
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key,value的反序列化类
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME,1);
//消费订阅某个Topic的某个分区
consumer.assign(Arrays.asList(p0,p1));
//消费订阅一个或多个topic
// consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//每间隔一定时间去拉取消息
try {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
//每个partition单独处理
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> recordList = records.records(partition);
for (ConsumerRecord<String, String> record : recordList) {
/**
* 1、接收到record信息后,去令牌桶中拿取令牌
* 2、如果获取到令牌,则继续业务处理
* 3、如果获取不到令牌,则pause等待令牌
* 4、当令牌桶中的令牌足够,则将consumer置为resume状态
*/
// 限流
if (!LIMITER.tryAcquire()) {
System.out.println("无法获取到令牌,暂停消费");
consumer.pause(Arrays.asList(p0, p1));
}else {
System.out.println("获取到令牌,恢复消费");
consumer.resume(Arrays.asList(p0, p1));
}
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
//业务处理异常,则不提交
// throw new RuntimeException("业务处理异常");
}
long lastOffset = recordList.get(recordList.size() - 1).offset();
//单个partition中的offset,并且进行提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition,new OffsetAndMetadata(lastOffset+1));
//对每个Partition做单独的offset提交
consumer.commitSync(offset);
System.out.println("--------------------- partition - " + partition + "end---------------------");
}
} catch (Exception e) {
log.error("consumer offset error",e);
}
}
}
2.9、Consumer多线程并发控制
- 经典模式,每一个线程单独创建一个KafkaConsumer,用于保证线程安全。
public class ConsumerThreadSample {
private static final String TOPIC_NAME = "yibo_topic";
/**
* 这种类型是经典模式,每一个线程单独创建一个KafkaConsumer,用于保证线程安全
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
KafkaConsumerRunner r1 = new KafkaConsumerRunner();
Thread t1 = new Thread(r1);
t1.start();
Thread.sleep(15000);
r1.shutdown();
}
public static class KafkaConsumerRunner implements Runnable{
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer<String,String> consumer;
public KafkaConsumerRunner() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.174.128:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
consumer.assign(Arrays.asList(p0, p1));
}
@Override
public void run() {
try {
while(!closed.get()){
//处理消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);
// 处理每个分区的消息
for (ConsumerRecord<String, String> record : pRecord) {
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(),record.offset(), record.key(), record.value());
}
// 返回去告诉kafka新的offset
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
// 注意加1
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} catch (Exception e) {
if(!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
}
2.10、一个Consumer处理数据,但处理业务是多线程并发控制
- 采用线程池异步处理,此种方式无法手动控制Offset提交,不能保证消息的最终一致性。
public class ConsumerRecordThreadSample {
private static final String TOPIC_NAME = "yibo_topic";
public static void main(String[] args) throws InterruptedException {
String brokerList = "192.168.174.128:9092";
String groupId = "test";
int workerNum = 5;
CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME);
consumers.execute(workerNum);
Thread.sleep(1000000);
consumers.shutdown();
}
// Consumer处理
public static class CunsumerExecutor{
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
public CunsumerExecutor(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (final ConsumerRecord record : records) {
executors.submit(new ConsumerRecordWorker(record));
}
}
}
public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}
}
// 记录处理
public static class ConsumerRecordWorker implements Runnable {
private ConsumerRecord<String, String> record;
public ConsumerRecordWorker(ConsumerRecord record) {
this.record = record;
}
@Override
public void run() {
//具体的业务逻辑,比如数据入库操作
System.out.println("Thread - "+ Thread.currentThread().getName());
System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}
}
2.11、数据漏消费和重复消费分析
无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先
提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。
2.12、自定义存储 offset
Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本及之后,默认将 offset 存储在 Kafka的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。
offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。
当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发
生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。
消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
要实现自定义存储 offset,需要借助 ConsumerRebalanceListener,以下为示例代码,其中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。
public class CustomConsumer {
private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
public static void main(String[] args) {
//创建配置信息
Properties props = new Properties();
//Kafka 集群
props.put("bootstrap.servers","192.168.174.128:9092");
//消费者组,只要 group.id 相同,就属于同一个消费者组
props.put("group.id", "test");
//关闭自动提交 offset
props.put("enable.auto.commit", "false");
//Key 和 Value 的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消费者订阅主题
consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
//该方法会在 Rebalance 之前调用
@Override
public void
onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}
//该方法会在 Rebalance 之后调用
@Override
public void
onPartitionsAssigned(Collection<TopicPartition> partitions) { currentOffset.clear();
for (TopicPartition partition : partitions) {
consumer.seek(partition, getOffset(partition));//定位到最近提交的 offset 位置继续消费
}
}
});
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);//消费者拉取数据
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
}
commitOffset(currentOffset);//异步提交
}
}
//获取某分区的最新 offset
private static long getOffset(TopicPartition partition) {
return 0;
}
//提交该消费者所有分区的 offset
private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
}
}
三、SpringBoot 集成 Kafka
3.1、添加maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.1</version>
</dependency>
3.2、配置 application.properties
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=192.168.174.128:9092
#=============== provider =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=testGroup
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=false
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#=============== listener =======================
# 在侦听器容器中运行的线程数。
spring.kafka.listener.concurrency=5
#listner负责ack,每调用一次,就立即commit
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false
3.3、新建Consumer
@Component
@Slf4j
public class KafkaDemoConsumer {
private static final String TOPIC_NAME = "yibo_topic";
private static final String TOPIC_GROUP1 = "topic_group1";
private static final String TOPIC_GROUP2 = "topic_group2";
@KafkaListener(topics = TOPIC_NAME, groupId = TOPIC_GROUP1)
public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);
ack.acknowledge();
}
}
@KafkaListener(topics = TOPIC_NAME, groupId = TOPIC_GROUP2)
public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test1 消费了: Topic:" + topic + ",Message:" + msg);
ack.acknowledge();
}
}
}