用python来监控kafka运行情况

2018-12-29  本文已影响0人  陈亮2019

除了用kafka自带的命令行方式和java方式外,现在可以用kafka-python中的KafkaAdminClient来获取kafka运行时消费者在topic的分区中的偏移量信息。

首先需要安装kafka-python包,一条命令就搞定:

pip install -U kafka-python

然后就创建KafkaAdminClient对象

from kafka.admin import KafkaAdminClient

adminClient = KafkaAdminClient(bootstrap_servers='192.168.*.*:9092')  #运行时填入kafka服务器实际地址,如果是多台服务器,该参数就写成列表形式。

然后就可以获取消费者组的列表:

adminClient.list_consumer_groups()

返回值: [('console-consumer-95318', 'consumer'),

('cgtrcSpring', 'consumer'),

('msgSealBackup', 'consumer'),

('es2', 'consumer'),

('pushSubscription1', 'consumer'),

('es1', 'consumer'),

('es4', 'consumer'),

('es3', 'consumer'),

('python_client_1', 'consumer')]

list_consumer_groups()的返回值是一个元组组成的列表,每个元组前一个元素是消费者组的名称,第二个元素是消费组协议类型。

得到消费者组后,利用list_consumer_group_offsets就可以获得某一个消费者组在各topic和分区的偏移量。下面的语句取es4消费组的偏移量。返回值是一个字典,字典的key是TopicPartition,值是OffsetAndMetada 。

adminClient.list_consumer_group_offsets('es4')

返回值: {TopicPartition(topic='log-eport-bizlog-push', partition=0): OffsetAndMetadata(offset=1781083, metadata=''),

TopicPartition(topic='log-eport-bizlog-push', partition=1): OffsetAndMetadata(offset=1784580, metadata=''),

TopicPartition(topic='log-eport-bizlog-push', partition=2): OffsetAndMetadata(offset=1784585, metadata=''),

TopicPartition(topic='log-eport-msglog-gatewaylog-eport-msglog-gateway', partition=0): OffsetAndMetadata(offset=17740, metadata=''),

TopicPartition(topic='log-eport-msglog-gatewaylog-eport-msglog-gateway', partition=1): OffsetAndMetadata(offset=17736, metadata=''),

TopicPartition(topic='log-eport-msglog-gatewaylog-eport-msglog-gateway', partition=2): OffsetAndMetadata(offset=17739, metadata='')}

从返回值中很容易就可以得到消费者组在每个topic的各个分区的偏移量。

用python来获取这种运行时的信息比java简单快捷,相比命令行方式,可以对取到的信息进行后续处理,和其他应用集成也比较方便。

kafka-python还提供了往kafka发送消息和从kafka读取消息的功能,详情参见如下:

kafka-python 项目地址: https://github.com/dpkp/kafka-python

文档地址:https://kafka-python.readthedocs.io/en/master/apidoc/modules.html

上一篇 下一篇

猜你喜欢

热点阅读