Kafka部署及基本使用
安装部署
1.集群规划
hadoop102: 安装zk和kafka
hadoop103: 安装zk和kafka
hadoop104: 安装zk和kafka
2.集群部署
官方下载地址:
http://kafka.apache.org/downloads.html
producer和consumer是用java写的
broker是用scala写的,scala版本对应截图中的scala2.12和2.13
我们下载2.12对应的压缩包即可
上传服务器后解压:
tar -zxvf kafka_2.12-3.6.0.tgz -C /app/
cd /app
mv kafka_2.12-3.6.0 kafka
cd kafka
进入kafka路径后可以看到如下子目录和文件:
3.配置
配置集群
vim config/server.properties
需要配置如下参数:
找到参数broker.id,这个参数是唯一标识,不能重复;
找到参数log.dirs,这个参数是存储kafka数据的,默认在/tmp中,我们需要改到其他目录,如/opt/module/kafka/data
找到参数zookeeper.connect,配置成zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
配置环境变量
vim /etc/profile.d/my_env.sh
添加
export KAFKA_HOME=/app/kafka
export PATH=$PATH:$KAFKA_HOME/bin
然后source /etc/profile
其他节点也按此进行配置
4.启动
启动kafka前必须先启动zookeeper
zk启动后启动kafka:
bin/kafka-server-start.sh -daemon config/server.properties
用jps查看启动情况
5.启停脚本
使用脚本进行kafka启停,vim kf.sh:
#!/bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------启动 $i Kafka-------"
ssh $i "nohup /app/kafka/bin/kafka-server-start.sh
/app/kafka/config/server.properties 2>&1 &"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------停止 $i Kafka-------"
ssh $i "/app/kafka/bin/kafka-server-stop.sh "
done
};;
esac
停止集群:
kf.sh stop
注:
- 当传进来start参数,则进行start相关操作;传进来stop参数,进行stop相关操作;
- 一定要写绝对路径;
- 如果没配主机名和ip地址映射的话,主机名可以写成ip地址;
- 需要配ssh互信;
- 需要授予可执行权限;
- kafka集群停止的时候需要一段时间,不会很快就停;
- 必须在所有kafka全部停止后再关闭zookeeper,如果先关闭zookeeper则kafka就关不掉了,只能通过kill -9杀进程,因为kafka的信息都存在zookeeper中。
操作topic命令
对于每个模块,kafka有对应的脚本
- producer:kafka-console-producer.sh
- broker:kafka-topics.sh
- consumer:kafka-console-consumer.sh
先从kafka-topics.sh开始介绍:
查看用法
bin/kafka-topics.sh
1.核心参数
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
--topic <String: topic> | 操作的 topic 名称 |
--create | 创建主题 |
--delete | 删除主题 |
--alter | 修改主题 |
--list | 查看所有主题 |
--describe | 查看主题详细描述 |
--partitions <Integer: # of partitions> | 设置分区数 |
--replication-factor<Integer: replication factor> | 设置分区副本 |
--config <String: name=value> | 更新系统默认的配置 |
2.基本操作
查看当前服务器中的所有 topic:
bin/kafka-topics.sh --bootstrap-server ip:9092 --list
例:
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
我们只要连上了102节点,103、104节点都可以查看的到;
如果需要提高可靠性(万一102挂了),可以写成如下形式:
bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --list
企业里一般也采取这种方式
创建 first topic:
bin/kafka-topics.sh --bootstrap-server ip:9092 --create --partitions 1 --replication-factor 3 --topic first
选项说明:
topic 定义 topic 名
replication-factor 定义副本数
partitions 定义分区数
查看 first 主题的详情:
bin/kafka-topics.sh --bootstrap-server ip:9092 --describe --topic first
其中可以看到topic名,有多少分区、多少副本;
此外segment.bytes代表了底层存储一个segment是1个G
分区号为0(指定了1给分区,从0开始)
leader显示了leader副本节点
后面也显示了其他节点
修改分区数(注意:分区数只能增加,不能减少,会报错,因为想象一下多个消费者消费不同分区,到不同位置,没法合并)
bin/kafka-topics.sh --bootstrap-server ip:9092 --alter --topic first --partitions 3
再次查看 first 主题的详情
bin/kafka-topics.sh --bootstrap-server ip:9092 --describe --topic first
无法通过命令行的方式修改副本,后续我们有其他手段修改。
删除 topic
bin/kafka-topics.sh --bootstrap-server ip:9092 --delete --topic first
producer命令行操作
查看操作生产者命令参数
bin/kafka-console-producer.sh
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
--topic <String: topic> | 操作的 topic 名称 |
发送消息
bin/kafka-console-producer.sh --bootstrap-server ip:9092 --topic first
>hello world
如何确定数据有没有发送到topic?
我们还需创建一个消费者去消费,但是消费者会增量的消费topic里的数据,如果需要消费一开始的数据需要加参数--from-beginning
consumer命令行操作
查看操作消费者命令参数
bin/kafka-console-consumer.sh
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
--topic <String: topic> | 操作的 topic 名称 |
--from-beginning | 从头开始消费 |
--group <String: consumer group id> | 指定消费者组名称 |
消费消息
消费 first 主题中的数据。
bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic first
把主题中所有的数据都读取出来(包括历史数据)。
bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --from-beginning --topic first
参考:
https://cloud.tencent.com/developer/article/2218258