kafka在python中的运行机制

2020-03-04  本文已影响0人  yangqingqing

第一次了解kafka不是很深入,后期会继续补更,完善~

kafaka工作原理:生产者生产消息-->消费者接收消息后消费

1. 消费者工作原理

连接kafka

先设置消费组id,并制定消费哪个topic:

# trans

    trans_cons = Consumer(

        {**sys_conf.get_kafka_config(consumer_id=f'{func_mark}trans')}

    )

    trans_cons.subscribe(trans_topics)

消费对应topic的消息:

msgs = cons.consume(sys_conf.CONSUMER_BZ, sys_conf.CONSUMER_TIMEOUT)

因为msgs里面有很多个消息,需要将消息一个个解读出来然后计算处理,最后输入到数据库或者将结果塞进生产者中推到kafka等对方进行消费。

将msg解析:

for msg in msgs:

    print(msg.value())

如需转发请加载连接https://www.jianshu.com/p/6e00abd7780a

上一篇 下一篇

猜你喜欢

热点阅读