Python kafka 操作 带SASL鉴权
2019-12-03 本文已影响0人
Sunnky
python-kafka
不支持 SASL_PLAINTEXT
, SCRAM-SHA-256
,所以选择confluent_kafka
- 客户端
from confluent_kafka import Consumer, KafkaError
topic_name = 'test-topic'
KAFKA_BROKER_SERVERS = "127.0.0.1:9092"
consumer = Consumer({
'bootstrap.servers': KAFKA_BROKER_SERVERS,
'group.id': 'omega-self-approve',
'auto.offset.reset': 'earliest',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanisms': 'SCRAM-SHA-256',
'sasl.username': 'admin',
'sasl.password': 'admin',
})
consumer.subscribe([topic_name])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
consumer.close()
- 服务端
import json
from confluent_kafka import Producer
topic_name = 'test-topic'
conf = {
'bootstrap.servers': '127.0.0.1:9092',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanisms': 'SCRAM-SHA-256',
'sasl.username': 'admin',
'sasl.password': 'admin',
}
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
producer = Producer(**conf)
for i in range(1):
data = {"test": 1, "test2": 2}
producer.produce(topic_name, (json.dumps(data)).encode(), callback=delivery_report)
producer.flush()