kafka的安装与使用
2022-08-27 本文已影响0人
木火应
- 安装与配置
- 下载kafka并解压
[mhy@server03 kafka]$ wget https://downloads.apache.org/kafka/3.2.1/kafka_2.12-3.2.1.tgz [mhy@server03 kafka]$ tar xvf kafka_2.13-3.2.1.tgz
- 配置zookeeper监听端口,分别修改
config
目录下zookeeper.properties与server.properties配置文件
server.properties
-
运行
- 启动zookeeper
[mhy@server03 kafka_2.13-3.2.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties
- 启动kafka
[mhy@server03 kafka_2.13-3.2.1]$ bin/kafka-server-start.sh config/server.properties
- 创建一个topic,并查看已创建的topic列表(
topic
相当于一个中间媒介,后面的消费者与生产者都需要通过同一个topic消费与生产数据,类似于一管道,这样比喻估计不妥,差不多是这意思吧)
[mhy@server03 kafka_2.13-3.2.1]$ bin/kafka-topics.sh --create --bootstrap-server 192.168.244.143:9092 --replication-factor 1 --partitions 1 --topic test Created topic test. [mhy@server03 kafka_2.13-3.2.1]$ bin/kafka-topics.sh --list --bootstrap-server 192.168.244.143:9092 __consumer_offsets test
- 创建信息消费者,需要指定一个topic,此时程序处于监听状态
[mhy@server03 kafka_2.13-3.2.1]$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.244.143:9092 --topic test --from-beginning
- 创建信息生产者,指定以上同一个topic,写入数据,以上消费者会输出相应数据
生产者[mhy@server03 kafka_2.13-3.2.1]$ bin/kafka-console-producer.sh --broker-list 192.168.244.143:9092 --topic test
消费者- 删除主题
[mhy@server03 kafka_2.13-3.2.1]$ bin/kafka-topics.sh --delete --bootstrap-server 192.168.244.143:9092 --topic test
-
集群部署,修改另一台kafka主机
broker.id设置不唯一server.properties
配置文件,重要的配置有以下几处:
listeners改成本地ip:端口
log.dirs
num.partitions,分区数
zookeeper地址
集群中创建topic -
pyhon-demo
- producer.py
from kafka import KafkaProducer
from kafka.errors import kafka_errors
import traceback
import json
def producer_demo():
# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
producer = KafkaProducer(
bootstrap_servers=['192.168.244.143:9092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode())
# 发送三条消息
for i in range(0, 3):
future = producer.send(
'test',
key='count_num', # 同一个key值,会被送至同一个分区
value="木火应 is a handsome man",
partition=0) # 向分区0发送消息
print("send {}".format(str(i)))
try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
if __name__ == '__main__':
producer_demo()
- consumer.py
from kafka import KafkaConsumer
import json
def consumer_demo():
consumer = KafkaConsumer(
'test',
bootstrap_servers='192.168.244.143:9092',
group_id='aaa'
)
for message in consumer:
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)
if __name__ == '__main__':
onsumer_demo()