Kafka集群

2022-02-28  本文已影响0人  想成为大师的学徒小纪

一、简介

二、kafka基本概念

在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。

概念/对象 简单说明
Broker Kafka节点
Leader 用于处理消息的接收和消费等请求,也就是说producer是将消息push到leader,而consumer也是从leader上去poll消息
Follower 主要用于备份消息数据,一个leader会有多个follower
Topic 主题,用来承载消息
Partition 分区,用于主题分片存储
Producer 生产者,向主题发布消息的应用
Consumer 消费者,从主题订阅消息的应用
Consumer Group 消费者组,由多个消费者组成

kafka集群拓扑图:

img

三、部署步骤

由于 kafka 依赖 zookeeper 因此需要安装 zookeeper,而kafka是基于scala语言编写,scala又是基于 jdk的,因此需要安装 jdk

本次搭建所用主机

kafka01:10.81.0.100 kafka02:10.81.0.101 kafka03:10.81.0.102

1、安装jdk8及以上版本

<!== 所有主机都执行 ==>

下载链接:https://www.oracle.com/java/technologies/javase/javase8u211-later-archive-downloads.html#license-lightbox

$ tar zxf jdk8.tar.gz -C /usr/local
$ echo 'export JAVA_HOME="/usr/local/jdk8"' >>/etc/profile
$ echo 'export PATH="$JAVA_HOME/bin:$PATH"' >> /etc/profile
$ source /etc/profile
$ java -version

2、Zookeeper集群部署

<!== 所有主机都执行 ==>

$ cd /usr/local/src && wget https://downloads.apache.org/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz
$ tar zxf apache-zookeeper-3.5.9-bin.tar.gz -C /usr/local/
$ cd .. && mv apache-zookeeper-3.5.9-bin/ zookeeper
$ echo 'export ZOOKEEPER_HOME="/usr/local/zookeeper"' >>/etc/profile
$ echo 'export PATN="$ZOOKEEPER_HOME/bin:$PATH"' >>/etc/profile
$ source /etc/profile
$ cd zookeeper/conf && cp -p zoo_sample.cfg zoo.cfg
$ mkdir -p /data/zookeeper
$ mkdir /var/log/zookeeper
$ cat > zoo.cfg<<'EOF'
dataDir=/data/zookeeper
dataLogDir=/var/log/zookeeper/
tickTime=2000
initLimit=5
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=0
maxClientCnxns=1024
#standaloneEnabled=true
#admin.enableServer=true
server.1=10.81.0.100:2888:3888
server.2=10.81.0.101:2888:3888
server.3=10.81.0.200:2888:3888
clientPort=2181
EOF

<!== kafka01主机执行 ==>

$ echo 1 > /data/zookeeper/myid

<!== kafka02主机执行 ==>

$ echo 2 > /data/zookeeper/myid

<!== kafka03主机执行 ==>

$ echo 3 > /data/zookeeper/myid
$ cat > /etc/systemd/system/zookeeper.service <<'EOF'
[Unit]
Description=zookeeper.service
After=network.target

[Service]
User=zookeeper
Type=forking
Environment=ZOO_LOG_DIR=/var/log/zookeeper
Environment=JAVA_HOME=/usr/local/jdk8
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
Restart=on-failure
StartLimitInterval=60
StartLimitBurst=3000

[Install]
WantedBy=multi-user.target
EOF
$ groupadd zookeeper
$ useradd -g zookeeper -M -s /sbin/nologin zookeeper
$ chown -R zookeeper. /data/zookeeper/
$ chown -R zookeeper. /var/log/zookeeper/
$ systemctl daemon-reload
$ systemctl start zookeeper
$ systemctl enable zookeeper
$ /usr/local/zookeeper/bin/zkServer.sh status

3、kafka集群部署

<!== 所有主机都执行 ==>

$ cd /usr/local/src && wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.12-3.0.0.tgz --no-check-certificate
$ tar xf kafka_2.12-3.0.0.tgz -C /usr/local
$ cd .. && mv kafka_2.12-3.0.0 kafka
$ echo 'export KAFKA_HOME=/usr/local/kafka' >>/etc/profile
$ echo 'export PATH="$PATH:$KAFKA_HOME/bin"' >>/etc/profile
$ source /etc/profile

<!== kafka01主机执行 ==>

$ cat >/usr/local/kafka/config/server.properties <<'EOF'
# broker的全局唯一编号,不能重复
broker.id=1

# kafka对外提供服务的监听地址及端口
listeners=PLAINTEXT://0.0.0.0:12315



# 代理将向生产者和消费者公布主机名和端口。如果没有设置,
# 如果已配置,它将使用“侦听器”的值。否则,它将使用该值 
advertised.listeners=PLAINTEXT://10.81.0.100:12315

# 处理网络请求的线程数量,也就是接收消息的线程数。
# 接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
num.network.threads=3

# broker处理磁盘IO的线程数,数值应该大于你的硬盘数
num.io.threads=8

# socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes=102400

# socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes=102400

# socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600

# kafka数据的存放地址,多个地址的话用逗号分割 
# 如果指定了多个地址,那么broker就会根据"最少使用原则"
# 把同一个分区数据片段放在同一个路径下
log.dirs=/data/kafka

# 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
num.partitions=3

# Kafka会使用可配置的线程池来处理日志片段:
#    服务器正常启动,用于打开每个分区的日志片段
#    服务器崩愤后重启,用于检查和截短每个分区的日志片段
#    服务器正常关闭,用于关闭日志片段。
# 默认情况下,每个日志目录只使用一个线程。
# 因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作
# 特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间
# 设置此参数时需要注意,所配置的数字对应的是log.dirs指定的单个日志目录。
# 也就是说,如果num.recovery.threads.per.data.dir被设为8,井且log.dirs指定了3个路径,那么总共需要24个线程。
num.recovery.threads.per.data.dir=1

# 组元数据内部topic"__consumer_offsets" and "__transaction_state"的复制系数
# 对于开发测试以外的任何测试,建议使用大于1的值来确保可用性,例如3。
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

# 数据默认保存时间
log.retention.hours=168

# 基于大小的日志保留策略。将从日志中删除段,除非剩余的
# 段位于log.retention.bytes下面。独立于log.retention.hours的功能。
log.retention.bytes=10737418240

# topic的分区是以一堆segment文件存储的,这个控制每个segment的大小
# 大小达到指定的上限(默认是lGB)时,当前`segment`就会被关闭
# 一个新的`segment`被打开
log.segment.bytes=1073741824

# 文件大小检查的周期时间
log.retention.check.interval.ms=300000

# zookeeper连接地址
zookeeper.connect=10.81.0.100:2181,10.81.0.101:2181,10.81.0.200:2181

# zookeeper连接超时时间
zookeeper.connection.timeout.ms=18000

# 指定GroupCoordinator延迟初始使用者重新平衡的时间(以毫秒为单位)。
# 官方建议生产环境设置为3秒
group.initial.rebalance.delay.ms=0
EOF

<!== kafka02主机执行 ==>

$ cat >/usr/local/kafka/config/server.properties <<'EOF'
broker.id=2
listeners=PLAINTEXT://0.0.0.0:12315
advertised.listeners=PLAINTEXT://10.81.0.101:12315
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=168
log.retention.bytes=10737418240
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.81.0.100:2181,10.81.0.101:2181,10.81.0.200:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

<!== kafka03主机执行 ==>

$ cat >/usr/local/kafka/config/server.properties <<'EOF'
broker.id=3
listeners=PLAINTEXT://0.0.0.0:12315
advertised.listeners=PLAINTEXT://10.81.0.200:12315
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=168
log.retention.bytes=10737418240
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.81.0.100:2181,10.81.0.101:2181,10.81.0.200:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
$ groupadd kafka
$ useradd -g kafka -M -s /sbin/nologin kafka
$ mkdir /data/kafka
$ chown -R kafka. /data/kafka
$ chown -R kafka. /usr/local/kafka
$ cat >/etc/systemd/system/kafka.service <<'EOF'
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target
After=zookeeper.service
 
[Service]
Type=forking
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/local/jdk8
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
# 启动失败后重启
Restart=on-failure
# 每次尝试重启间隔60秒
StartLimitInterval=60
# 最终尝试重启50次
StartLimitBurst=50
[Install]
WantedBy=multi-user.target
EOF
$ systemctl daemon-reload
$ systemctl start kafka
$ systemctl status kafka
$ systemctl enable kafka

4、kafka开启jmx监控

<!== 所有主机都执行 ==>

$ vi /usr/local/kafka/bin/kafka-server-start.sh
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
fi
...
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.rmi.server.hostname=xxx.xxx.xxx.xxx kafka.Kafka "$@"
$ systemctl restart kafka

5、kafka-eagle监控部署

$ cd /usr/local/src && wget https://github.com/smartloli/kafka-eagle-bin/archive/v2.0.8.tar.gz
$ tar zxf v2.0.8.tar.gz
$ tar zxf efak-web-2.0.8-bin.tar.gz -C /usr/local
$ cd .. && mv efak-web-2.0.8 kafka-eagle
$ cat >/usr/local/kafka-eagle/conf/system-config.properties <<'EOF'
efak.zk.cluster.alias=cluster1
cluster1.zk.list=10.81.0.100:2181,10.81.0.101:2181,10.81.0.200:2181
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123
cluster1.efak.broker.size=20
kafka.zk.limit.size=32
efak.webui.port=8048
cluster1.efak.offset.storage=kafka
cluster2.efak.offset.storage=zk
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
efak.metrics.charts=true
efak.metrics.retain=15
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
efak.topic.token=keadmin
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/kafka_eagle?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=kafka_eagle_user
efak.password=123456
EOF
$ chown -R kafka. /usr/local/kafka-eagle
$ vim /usr/local/zookeeper/bin/zkServer.sh
...
// after 77 lines
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
$ systemctl restart zookeeper
$ cat >/etc/systemd/system/kafka-eagle.service <<'EOF'
[Unit]
Description=Kafka Eagle
After=kafka.service

[Service]
Environment=KE_HOME=/usr/local/kafka-eagle
Environment=JAVA_HOME=/usr/local/jdk8
User=kafka
Group=kafka
Type=forking
ExecStart=/usr/local/kafka-eagle/bin/ke.sh start
ExecReload=/usr/local/kafka-eagle/bin/ke.sh restart
ExecStop=/usr/local/kafka-eagle/bin/ke.sh stop
#启动失败后重启
Restart=on-failure
#每次尝试重启间隔60秒
StartLimitInterval=60
#最终尝试重启50次
StartLimitBurst=50

[Install]
WantedBy=multi-user.target
EOF
$ systemctl daemon-reload
$ systemctl start kafka-eagle
$ systemctl status kafka-eagle
$ systemctl enable kafka-eagle

http://xxx.xxx.xxx.xxx:8048

管理员用户admin

管理员密码123456

A330FC34-3C35-4fc5-A2BA-B11045B92876.png
上一篇下一篇

猜你喜欢

热点阅读