Kafka集群
一、简介
-
Kafka
简介Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(Elasticsearch、Hadoop等)
Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka也是目前性能最好的消息中间件。
-
Zookeeper
简介Zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
二、kafka
基本概念
在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。
概念/对象 | 简单说明 |
---|---|
Broker | Kafka节点 |
Leader | 用于处理消息的接收和消费等请求,也就是说producer是将消息push到leader,而consumer也是从leader上去poll消息 |
Follower | 主要用于备份消息数据,一个leader会有多个follower |
Topic | 主题,用来承载消息 |
Partition | 分区,用于主题分片存储 |
Producer | 生产者,向主题发布消息的应用 |
Consumer | 消费者,从主题订阅消息的应用 |
Consumer Group | 消费者组,由多个消费者组成 |
kafka
集群拓扑图:
三、部署步骤
由于 kafka
依赖 zookeeper
因此需要安装 zookeeper
,而kafka
是基于scala
语言编写,scala
又是基于 jdk
的,因此需要安装 jdk
。
本次搭建所用主机
kafka01:10.81.0.100 kafka02:10.81.0.101 kafka03:10.81.0.102
1、安装jdk8及以上版本
<!== 所有主机都执行 ==>
$ tar zxf jdk8.tar.gz -C /usr/local
$ echo 'export JAVA_HOME="/usr/local/jdk8"' >>/etc/profile
$ echo 'export PATH="$JAVA_HOME/bin:$PATH"' >> /etc/profile
$ source /etc/profile
$ java -version
2、Zookeeper
集群部署
<!== 所有主机都执行 ==>
- 安装软件
$ cd /usr/local/src && wget https://downloads.apache.org/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz
$ tar zxf apache-zookeeper-3.5.9-bin.tar.gz -C /usr/local/
$ cd .. && mv apache-zookeeper-3.5.9-bin/ zookeeper
$ echo 'export ZOOKEEPER_HOME="/usr/local/zookeeper"' >>/etc/profile
$ echo 'export PATN="$ZOOKEEPER_HOME/bin:$PATH"' >>/etc/profile
$ source /etc/profile
- 修改配置文件
$ cd zookeeper/conf && cp -p zoo_sample.cfg zoo.cfg
$ mkdir -p /data/zookeeper
$ mkdir /var/log/zookeeper
$ cat > zoo.cfg<<'EOF'
dataDir=/data/zookeeper
dataLogDir=/var/log/zookeeper/
tickTime=2000
initLimit=5
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=0
maxClientCnxns=1024
#standaloneEnabled=true
#admin.enableServer=true
server.1=10.81.0.100:2888:3888
server.2=10.81.0.101:2888:3888
server.3=10.81.0.200:2888:3888
clientPort=2181
EOF
<!== kafka01主机执行 ==>
$ echo 1 > /data/zookeeper/myid
<!== kafka02主机执行 ==>
$ echo 2 > /data/zookeeper/myid
<!== kafka03主机执行 ==>
$ echo 3 > /data/zookeeper/myid
- 设置systemd管理
$ cat > /etc/systemd/system/zookeeper.service <<'EOF'
[Unit]
Description=zookeeper.service
After=network.target
[Service]
User=zookeeper
Type=forking
Environment=ZOO_LOG_DIR=/var/log/zookeeper
Environment=JAVA_HOME=/usr/local/jdk8
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
Restart=on-failure
StartLimitInterval=60
StartLimitBurst=3000
[Install]
WantedBy=multi-user.target
EOF
$ groupadd zookeeper
$ useradd -g zookeeper -M -s /sbin/nologin zookeeper
$ chown -R zookeeper. /data/zookeeper/
$ chown -R zookeeper. /var/log/zookeeper/
$ systemctl daemon-reload
$ systemctl start zookeeper
$ systemctl enable zookeeper
- 查看主节点
$ /usr/local/zookeeper/bin/zkServer.sh status
3、kafka
集群部署
<!== 所有主机都执行 ==>
- 安装软件
$ cd /usr/local/src && wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.12-3.0.0.tgz --no-check-certificate
$ tar xf kafka_2.12-3.0.0.tgz -C /usr/local
$ cd .. && mv kafka_2.12-3.0.0 kafka
$ echo 'export KAFKA_HOME=/usr/local/kafka' >>/etc/profile
$ echo 'export PATH="$PATH:$KAFKA_HOME/bin"' >>/etc/profile
$ source /etc/profile
- 修改配置文件
<!== kafka01主机执行 ==>
$ cat >/usr/local/kafka/config/server.properties <<'EOF'
# broker的全局唯一编号,不能重复
broker.id=1
# kafka对外提供服务的监听地址及端口
listeners=PLAINTEXT://0.0.0.0:12315
# 代理将向生产者和消费者公布主机名和端口。如果没有设置,
# 如果已配置,它将使用“侦听器”的值。否则,它将使用该值
advertised.listeners=PLAINTEXT://10.81.0.100:12315
# 处理网络请求的线程数量,也就是接收消息的线程数。
# 接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
num.network.threads=3
# broker处理磁盘IO的线程数,数值应该大于你的硬盘数
num.io.threads=8
# socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes=102400
# socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes=102400
# socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600
# kafka数据的存放地址,多个地址的话用逗号分割
# 如果指定了多个地址,那么broker就会根据"最少使用原则"
# 把同一个分区数据片段放在同一个路径下
log.dirs=/data/kafka
# 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
num.partitions=3
# Kafka会使用可配置的线程池来处理日志片段:
# 服务器正常启动,用于打开每个分区的日志片段
# 服务器崩愤后重启,用于检查和截短每个分区的日志片段
# 服务器正常关闭,用于关闭日志片段。
# 默认情况下,每个日志目录只使用一个线程。
# 因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作
# 特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间
# 设置此参数时需要注意,所配置的数字对应的是log.dirs指定的单个日志目录。
# 也就是说,如果num.recovery.threads.per.data.dir被设为8,井且log.dirs指定了3个路径,那么总共需要24个线程。
num.recovery.threads.per.data.dir=1
# 组元数据内部topic"__consumer_offsets" and "__transaction_state"的复制系数
# 对于开发测试以外的任何测试,建议使用大于1的值来确保可用性,例如3。
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
# 数据默认保存时间
log.retention.hours=168
# 基于大小的日志保留策略。将从日志中删除段,除非剩余的
# 段位于log.retention.bytes下面。独立于log.retention.hours的功能。
log.retention.bytes=10737418240
# topic的分区是以一堆segment文件存储的,这个控制每个segment的大小
# 大小达到指定的上限(默认是lGB)时,当前`segment`就会被关闭
# 一个新的`segment`被打开
log.segment.bytes=1073741824
# 文件大小检查的周期时间
log.retention.check.interval.ms=300000
# zookeeper连接地址
zookeeper.connect=10.81.0.100:2181,10.81.0.101:2181,10.81.0.200:2181
# zookeeper连接超时时间
zookeeper.connection.timeout.ms=18000
# 指定GroupCoordinator延迟初始使用者重新平衡的时间(以毫秒为单位)。
# 官方建议生产环境设置为3秒
group.initial.rebalance.delay.ms=0
EOF
<!== kafka02主机执行 ==>
$ cat >/usr/local/kafka/config/server.properties <<'EOF'
broker.id=2
listeners=PLAINTEXT://0.0.0.0:12315
advertised.listeners=PLAINTEXT://10.81.0.101:12315
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=168
log.retention.bytes=10737418240
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.81.0.100:2181,10.81.0.101:2181,10.81.0.200:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
<!== kafka03主机执行 ==>
$ cat >/usr/local/kafka/config/server.properties <<'EOF'
broker.id=3
listeners=PLAINTEXT://0.0.0.0:12315
advertised.listeners=PLAINTEXT://10.81.0.200:12315
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=168
log.retention.bytes=10737418240
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.81.0.100:2181,10.81.0.101:2181,10.81.0.200:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
- 创建数据目录并授权
$ groupadd kafka
$ useradd -g kafka -M -s /sbin/nologin kafka
$ mkdir /data/kafka
$ chown -R kafka. /data/kafka
$ chown -R kafka. /usr/local/kafka
- 设置systemd管理
$ cat >/etc/systemd/system/kafka.service <<'EOF'
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target
After=zookeeper.service
[Service]
Type=forking
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/local/jdk8
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
# 启动失败后重启
Restart=on-failure
# 每次尝试重启间隔60秒
StartLimitInterval=60
# 最终尝试重启50次
StartLimitBurst=50
[Install]
WantedBy=multi-user.target
EOF
$ systemctl daemon-reload
$ systemctl start kafka
$ systemctl status kafka
$ systemctl enable kafka
4、kafka
开启jmx监控
<!== 所有主机都执行 ==>
- 修改kafka启动脚本
$ vi /usr/local/kafka/bin/kafka-server-start.sh
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
fi
...
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.rmi.server.hostname=xxx.xxx.xxx.xxx kafka.Kafka "$@"
- 重启kafka
$ systemctl restart kafka
5、kafka-eagle监控部署
- 安装软件
$ cd /usr/local/src && wget https://github.com/smartloli/kafka-eagle-bin/archive/v2.0.8.tar.gz
$ tar zxf v2.0.8.tar.gz
$ tar zxf efak-web-2.0.8-bin.tar.gz -C /usr/local
$ cd .. && mv efak-web-2.0.8 kafka-eagle
- 修改配置文件
$ cat >/usr/local/kafka-eagle/conf/system-config.properties <<'EOF'
efak.zk.cluster.alias=cluster1
cluster1.zk.list=10.81.0.100:2181,10.81.0.101:2181,10.81.0.200:2181
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123
cluster1.efak.broker.size=20
kafka.zk.limit.size=32
efak.webui.port=8048
cluster1.efak.offset.storage=kafka
cluster2.efak.offset.storage=zk
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
efak.metrics.charts=true
efak.metrics.retain=15
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
efak.topic.token=keadmin
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/kafka_eagle?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=kafka_eagle_user
efak.password=123456
EOF
$ chown -R kafka. /usr/local/kafka-eagle
- 当你的zookeeper版本是3.5以后,需要开启zkcli权限修复脚本内容
$ vim /usr/local/zookeeper/bin/zkServer.sh
...
// after 77 lines
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
$ systemctl restart zookeeper
- 设置systemd管理
$ cat >/etc/systemd/system/kafka-eagle.service <<'EOF'
[Unit]
Description=Kafka Eagle
After=kafka.service
[Service]
Environment=KE_HOME=/usr/local/kafka-eagle
Environment=JAVA_HOME=/usr/local/jdk8
User=kafka
Group=kafka
Type=forking
ExecStart=/usr/local/kafka-eagle/bin/ke.sh start
ExecReload=/usr/local/kafka-eagle/bin/ke.sh restart
ExecStop=/usr/local/kafka-eagle/bin/ke.sh stop
#启动失败后重启
Restart=on-failure
#每次尝试重启间隔60秒
StartLimitInterval=60
#最终尝试重启50次
StartLimitBurst=50
[Install]
WantedBy=multi-user.target
EOF
$ systemctl daemon-reload
$ systemctl start kafka-eagle
$ systemctl status kafka-eagle
$ systemctl enable kafka-eagle
- 访问网页
管理员用户admin
管理员密码123456
A330FC34-3C35-4fc5-A2BA-B11045B92876.png