Kafka部署及基本使用

2023-11-08  本文已影响0人  这货不是王马勺

安装部署

1.集群规划
hadoop102: 安装zk和kafka
hadoop103: 安装zk和kafka
hadoop104: 安装zk和kafka

2.集群部署
官方下载地址:

http://kafka.apache.org/downloads.html

producer和consumer是用java写的
broker是用scala写的,scala版本对应截图中的scala2.12和2.13
我们下载2.12对应的压缩包即可

上传服务器后解压:

tar -zxvf kafka_2.12-3.6.0.tgz -C /app/
cd /app
mv kafka_2.12-3.6.0 kafka
cd kafka

进入kafka路径后可以看到如下子目录和文件:

3.配置
配置集群

vim config/server.properties

需要配置如下参数:
找到参数broker.id,这个参数是唯一标识,不能重复;
找到参数log.dirs,这个参数是存储kafka数据的,默认在/tmp中,我们需要改到其他目录,如/opt/module/kafka/data
找到参数zookeeper.connect,配置成zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

配置环境变量

vim /etc/profile.d/my_env.sh

添加

export KAFKA_HOME=/app/kafka
export PATH=$PATH:$KAFKA_HOME/bin

然后source /etc/profile

其他节点也按此进行配置

4.启动
启动kafka前必须先启动zookeeper
zk启动后启动kafka:

bin/kafka-server-start.sh -daemon config/server.properties

用jps查看启动情况

5.启停脚本
使用脚本进行kafka启停,vim kf.sh:

#!/bin/bash
case $1 in
"start"){
 for i in hadoop102 hadoop103 hadoop104
 do
 echo " --------启动 $i Kafka-------"
 ssh $i "nohup /app/kafka/bin/kafka-server-start.sh  
/app/kafka/config/server.properties 2>&1 &"
 done
};;
"stop"){
 for i in hadoop102 hadoop103 hadoop104
 do
 echo " --------停止 $i Kafka-------"
 ssh $i "/app/kafka/bin/kafka-server-stop.sh "
 done
};;
esac

停止集群:

kf.sh stop 

注:

操作topic命令

对于每个模块,kafka有对应的脚本

先从kafka-topics.sh开始介绍:
查看用法

bin/kafka-topics.sh

1.核心参数

参数 描述
--bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号
--topic <String: topic> 操作的 topic 名称
--create 创建主题
--delete 删除主题
--alter 修改主题
--list 查看所有主题
--describe 查看主题详细描述
--partitions <Integer: # of partitions> 设置分区数
--replication-factor<Integer: replication factor> 设置分区副本
--config <String: name=value> 更新系统默认的配置

2.基本操作
查看当前服务器中的所有 topic:

bin/kafka-topics.sh --bootstrap-server ip:9092 --list

例:

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list

我们只要连上了102节点,103、104节点都可以查看的到;
如果需要提高可靠性(万一102挂了),可以写成如下形式:

bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092  --list

企业里一般也采取这种方式

创建 first topic:

bin/kafka-topics.sh --bootstrap-server ip:9092 --create --partitions 1 --replication-factor 3 --topic first

选项说明:
topic 定义 topic 名
replication-factor 定义副本数
partitions 定义分区数

查看 first 主题的详情:

bin/kafka-topics.sh --bootstrap-server ip:9092 --describe --topic first

其中可以看到topic名,有多少分区、多少副本;
此外segment.bytes代表了底层存储一个segment是1个G
分区号为0(指定了1给分区,从0开始)
leader显示了leader副本节点
后面也显示了其他节点

修改分区数(注意:分区数只能增加,不能减少,会报错,因为想象一下多个消费者消费不同分区,到不同位置,没法合并)

bin/kafka-topics.sh --bootstrap-server ip:9092 --alter --topic first --partitions 3

再次查看 first 主题的详情

bin/kafka-topics.sh --bootstrap-server  ip:9092 --describe --topic first

无法通过命令行的方式修改副本,后续我们有其他手段修改。

删除 topic

bin/kafka-topics.sh --bootstrap-server ip:9092 --delete --topic first

producer命令行操作

查看操作生产者命令参数

bin/kafka-console-producer.sh
参数 描述
--bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号
--topic <String: topic> 操作的 topic 名称

发送消息

bin/kafka-console-producer.sh --bootstrap-server ip:9092 --topic first
>hello world

如何确定数据有没有发送到topic?
我们还需创建一个消费者去消费,但是消费者会增量的消费topic里的数据,如果需要消费一开始的数据需要加参数--from-beginning

consumer命令行操作

查看操作消费者命令参数

bin/kafka-console-consumer.sh
参数 描述
--bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号
--topic <String: topic> 操作的 topic 名称
--from-beginning 从头开始消费
--group <String: consumer group id> 指定消费者组名称

消费消息
消费 first 主题中的数据。

bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic first

把主题中所有的数据都读取出来(包括历史数据)。

bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --from-beginning --topic first

参考:

https://cloud.tencent.com/developer/article/2218258
上一篇下一篇

猜你喜欢

热点阅读