flink kafka offset配置/提交

2020-08-14  本文已影响0人  sunTengSt

一:flink kafka offset配置


1. setStartFromGroupOffsets(默认的):

example:
Map specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

查看partition offset

kafka-consumer-groups --bootstrap-server xxx:9092 --group groupId  --describe
TOPIC     PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID                                       HOST            CLIENT-ID
xxx         0          13949           13949           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
xxx         1          13871           13871           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
xxx         2          13974           13974           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
xxx         3          14192           14192           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
xxx         4          14036           14036           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1

2. setStartFromEarliest()

从最早的记录开始,使用此配置,在kafka中已经提交的offset将被忽略,不会被使用


3. setStartFromLatest()

从最新的开始,使用此配置,在kafka中已经提交的offset将被忽略,不会被使用


4. setStartFromTimestamp(long)


5. properties配置offset

properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

解释:


二:kafka消费offset提交配置:


1. checkpoint禁用:

flink kafka消费依赖于内部kafka客户端自动定期的offset提交

配置:enable.auto.commit / auto.commit.interval.ms


2. checkpoint启用:

flink kafka consumer在checkpoint完成时自动提交offset在checkpoint state中;

配置:setCommitOffsetsOnCheckpoints(boolean) 来启用关闭;默认情况下,是开启的true
此模式下,配置在properties中自动周期性的offset提交将被忽略;


上一篇 下一篇

猜你喜欢

热点阅读