软件篇-kakfa(三) - API操作
Api操作
1.生产者
1.1 创建
通过之前的介绍可知,生产者主要往kafka
中发布消息,因此在发送消息之前先创建topic
创建topic
命令如下:
kafka-topics.sh --bootstrap-server 192.168.80.110:9092 --create --topic test_topic --replication-factor 1 --partitions 1
关于命令解释如下:
kafka-topics.sh
topic
相关的脚本
--bootstrap-server
broker
的地址清单
--create
代表创建 topic
--topic test_topic
指定topic
的名字
--replication-factor
副本数
--partitions
指定分区数
1.2 查看
查询所有的topic
kafka-topics.sh --list --bootstrap-server 192.168.80.110:9092

1.3 修改
修改topic
分区数
kafka-topics.sh --bootstrap-server 192.168.80.110:9092 --alter --topic test_topic --partitions 2
--alter
代表修改topic
注意:分区数只能增加,不能修改,如果非要修改则出现以下错误

修改副本数
单节点修改副本数没有意义,等到后面集群章节再来讨论
1.4 删除
删除topic
kafka-topics.sh --bootstrap-server 192.168.80.110:9092 --delete --topic test_topic
1.5 发布
kafka-console-producer.sh --bootstrap-server 192.168.80.110:9092 --topic my_topic
注意: my_topic之前没有创建,向my_topic发送消息会自动创建
topic

1.6 监控
在之前的章节搭建了kafka-eagle
平台,在这个平台也可以监控一些kakfa
数据,登录该平台,就可以看到相对应消息

1.6.1 创建
选择Topics/create
也可以创建topic
,如下:

1.6.2 查看
选择Topics/List
则可以查看所有的topic
,如下:

1.6.3 ksql
当然也可以选择Topics/KSQL
,通过ksql
语句来查看某个topic
里面的消息


关于KSQL
这里暂时不做太多阐述,比较简单,有兴趣可以翻阅官方文档
1.6.4 发布
可以选择Topics/Mock
给某个topic
发送消息

通过对比发现
kafka-eagle
去发布消息比起命令行更加容易
2. 消费者
2.1 消费
kafka-console-consumer.sh --bootstrap-server 192.168.80.110:9092 --from-beginning --group wangzh -topic my_topic
--from-beginning
代表从头开始消费
--group wangzh
代表消费者组
每个消费者都属于一个消费者组,一个消费者组可以拥有多个消费者,
一个消费者组对一个
topic
里面的消费消息只能读取一次
每个消息在partition
上都有一个自增长的序号,当消费者每次消费到消息就会记录消费的序号,当下次消费就会从序号开始消费而不是从头开始,这个需要称为偏移量

offset
(偏移量)保存在__consumer_offsets
中,这样当消费者因为某种原因断开与kafka
的连接,当下次消费时也不会从头消费
2.2 多个
当然也可以消费多个topic
的消息,而不一定是消费一个topic
消费之前先往my_topic
和test_topic
中发布一些消息,如下


消费多个topic
kafka-console-consumer.sh --bootstrap-server 192.168.80.110:9092 --from-beginning --group wangzh --whitelist "my_topic|test_topic"

可以看到两个
topic
的消息都被消费出来了
2.3 查看
查看消费者组
kafka-consumer-groups.sh --bootstrap-server 192.168.80.110:9092 --list
查看消费者组偏移量
kafka-consumer-groups.sh --bootstrap-server 192.168.80.110:9092 --describe --group wangzh

当然也可以在kafka-eagle
看到消费者组的信息,如下

甚至可以看到这个消费者组的偏移量,如下:

2.4 删除
kafka-consumer-groups.sh --bootstrap-server 192.168.80.110:9092 --delete --group wangzh
消费组中当前没有消费者时才能删除