python连接有sasl认证的kafka

2023-03-16  本文已影响0人  大空翼123

公司的kafka做了安全升级,加入了sasl认证。

使用confluent_kafka  进行认证连接kafka

首先安装confluent_kafka 

pip install confluent_kafka

生产端示例代码

import json

from datetimeimport datetime

from confluent_kafkaimport Producer

topic_name ='TOPIC_NAME'

conf = {

'bootstrap.servers':'XXXX:xx,XXXXX:XX',

    'security.protocol':'SASL_PLAINTEXT',

    'sasl.mechanisms':'PLAIN',

    'sasl.username':'XX',

    'sasl.password':'XXXXXXXX'

}

def delivery_report(err, msg):

if erris not None:

print('Message delivery failed: {}'.format(err))

else:

print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

producer = Producer(**conf)

data = {

'name':'sheng',

    'time':str(datetime.now())

}

for iin range(10):

#print(data)

    producer.produce(topic_name, (json.dumps(data)).encode(), callback=delivery_report)

producer.flush()

消费端代码

from confluent_kafkaimport Consumer

topic_name ='TOPIC_NAME'

KAFKA_BROKER_SERVERS ="XXX:xx,XXXX:xx"

consumer = Consumer({

'bootstrap.servers': KAFKA_BROKER_SERVERS,

    'group.id':'test_sasl',

    'auto.offset.reset':'earliest',

    'security.protocol':'SASL_PLAINTEXT',

    'sasl.mechanisms':'PLAIN',

    'sasl.username':'XX',

    'sasl.password':'XXXXXXXX'

})

consumer.subscribe([topic_name])

while True:

msg = consumer.poll(1.0)

if msgis None:

continue

    if msg.error():

print("Consumer error: {}".format(msg.error()))

continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

consumer.close()

上一篇下一篇

猜你喜欢

热点阅读