python 操作 kafka consumer设置重启时从最新
2020-07-10 本文已影响0人
HAO延WEI
1.1 安装
pip install kafka-python
1.2 消费
# -*- coding: utf-8 -*-
"""
Create by Mr.Hao on 2019/12/6.
"""
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'test', # 指定topic
bootstrap_servers = "127.0.0.1:9092", # kafka集群地址
group_id = "newConsumerTest1", # 消费组id
client_id = '8eaa8c81edfd41f28a50f9121ad14572',
auto_offset_reset="latest"
max_poll_records=10, # 每次最大消费数量
enable_auto_commit = True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
auto_commit_interval_ms = 5000, # 自动提交的周期(毫秒)
)
#consumer.subscribe(["auto_datacenter_spider_snapshot"])
"""
使用seek_to_end函数,seek_to_end会直接将位置定位到最新数据。
但是在之前需要poll一次数据,不然会报没有分配partition的错误。
这说明我们的框架也是懒加载的,只有在具体poll数据的时候才会分配partition
"""
res = consumer.poll(10)
consumer.seek_to_end()
for msg in consumer: # 迭代器,等待下一条消息
offset, value = msg.offset, msg.value
print value