kafka

kafka的安装与使用

2022-08-27  本文已影响0人  木火应
zookeeper.properties
server.properties
from kafka import KafkaProducer
from kafka.errors import kafka_errors
import traceback
import json


def producer_demo():
    # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
    producer = KafkaProducer(
        bootstrap_servers=['192.168.244.143:9092'],
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode())
    # 发送三条消息
    for i in range(0, 3):
        future = producer.send(
            'test',
            key='count_num',  # 同一个key值,会被送至同一个分区
            value="木火应 is a handsome man",
            partition=0)  # 向分区0发送消息
        print("send {}".format(str(i)))
        try:
            future.get(timeout=10) # 监控是否发送成功
        except kafka_errors:  # 发送失败抛出kafka_errors
            traceback.format_exc()



if __name__ == '__main__':
    producer_demo()
  1. consumer.py
from kafka import KafkaConsumer
import json


def consumer_demo():
    consumer = KafkaConsumer(
        'test',
        bootstrap_servers='192.168.244.143:9092',
        group_id='aaa'
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
        )
        )
if __name__ == '__main__':
    onsumer_demo()

参考

上一篇下一篇

猜你喜欢

热点阅读