python3读写kafka

2019-10-09  本文已影响0人  一飞冲不了天

消费kafka数据,方式一

#pip install kafka-python#安装kafkapython库
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(
    #'topic1',#主题
    #重置偏移量,可以订阅最早的消息
    #earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    #latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    #none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    # auto_offset_reset='earliest',
  #默认为true,即默认是自动提交offset的,不过设置为false,需要手动提交。手动提交分为同步提交,异步提交,同步+异步提交。
    enable_auto_commit=False,
    group_id='group_id',#消费分组
    bootstrap_servers=['192.168.1.1:31553','192.168.1.2:31554'])
consumer.subscribe(topics=('topic1','topic2'))#订阅多个主题的消息

for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    message.offset, message.key,
    message.value.decode()))
  #consumer.commit() #同步提交
  #consumer.commit_async(callback=function) #异步提交,function为回调函数
 #异步+同步的方式,即正常以异步提交,最后消费者退出时以同步的方式提交,同步提交可以放在finally块中。

消费kafka数据,方式二

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(
    enable_auto_commit=False,
    group_id='group_id',
    bootstrap_servers=['192.168.1.1:31553','192.168.1.2:31554'])
tp = TopicPartition(topic='test', partition=0)
consumer.assign([tp])#指定多个主题分区,list形式
consumer.seek_to_beginning()#将偏移量设置为最早的
consumer.seek(tp,888)#指定偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    message.offset, message.key,
    message.value.decode()))

将消息写入kafka

from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers=['192.168.1.1:31553','192.168.1.2:31554'])
msg = {'a':'xxx','b':'ccc'}
producer.send('dcar-company-news',bytes(json.dumps(msg,ensure_ascii=False),'utf-8'),partition=0)
producer.close()
上一篇 下一篇

猜你喜欢

热点阅读