kafka安装配置SASL_PLAINTEXT
1.准备
系统环境:centOs
软件包:zookeeper-3.4.10、kafka_2.12-0.10.2.1(可自行在官网下载)
安装包
2.安装
创建一个kafka的文件夹,把安装包放到目录下,然后在当前目录下,解压tar包
tar -xvf zookeeper-3.4.10.tar.gz
tar -xvf kafka_2.12-0.10.2.1.tgz
解压后得到两个文件夹,接下来进入配置阶段,挨个配置并启动
执行效果
2.1 配置zookeeper
进入zookeeper配置文件夹
cd /kafka/zookeeper-3.4.10/conf/
复制一个zoo_sample.cfg为zoo.cfg
cp zoo_sample.cfb zoo.cfg
配置文件夹目录内容
编辑zoo.cfg配置文件
vi zoo.cfg
添加如下配置
dautopurge.purgeInterval=1
#默认值为3000,单位是毫秒,不支持以系统属性方式配置。tickTime用于配置Zookeeper中最小时间单元的长度,很多>运行时的时间间隔都是使用tickTime的倍数来表示的
tickTime=2000
#默认值为10,表示是tickTime的10倍,必须配置,并且是正整数,不支持以系统属性方式配置,用于配置Leader服务器等待Follower启动,并完成数据同步的时间
initLimit=10
#默认值为5,表示是tickTime的5倍,必须配置,并且是正整数,不支持以系统属性方式配置,用于配置Leader服务器和Follower之间进行心跳检测的最大延时时间,在Zookeeper运行期间,Leader服务器会与所有的Follower进行心跳检测来确定该服务器是否存活。如果在syncLimit时间内无法获取到Follower的心跳检测响应,就认为Follower已经脱离了和自己的同步。在网络环境较差时,可以适当调大这个参数
syncLimit=5
#无默认值,不支持以系统属性方式配置,用于配置Zookeeper服务器存储快照文件的目录。默认情况下,如果没有配置dataLogDir,那么事务日志也会存储到该目录中
dataDir=/kafka/zookeeperdata/data
#默认值为dataDir的值,不支持以系统属性方式配置,用于配置Zookeeper服务器存储事务日志的目录。默认情况下与快照数据保存在相同的目录中。在条件允许的情况下,可以将事务日志的存储配置在一个单独的磁盘上,因为事务日志记>录对于磁盘的性能要求非常高,为了保证数据一致性,Zookeeper在返回客户端事务请求的响应之前,必须将本次请求对应的事务日志写入到磁盘中,因此,事务日志的写性能直接决定了ookeeper在处理事务请求时的吞吐。针对同一块磁盘的其他并发读写操作,尤其是数据快照操作,会极大的影响事务日志的写性能
dataLogDir=/kafka/zookeeperdata/log
#无默认值,不支持以系统属性方式配置,用于配置服务器对外的服务端口。客户端通过此端口和Zookeeper服务器创建>连接,通常设置为2181。每台服务器可以使用不同的端口,并且集群中的所有服务器也不需要保持此端口一致
clientPort=2181
#server.1=192.168.137.2:8886
#server.2=192.168.137.2:8887
#server.3=192.168.137.2:8888
#默认值为3,不支持以系统属性方式配置。用于配置Zookeeper在自动清理的时候需要保留的快照数据文件数量和对应的事务日志文件。此参数的最小值为3,如果配置的值小于3会自动调整到3
autopurge.snapRetainCount=3
#默认值为0,单位为小时,不支持以系统属性方式配置。用于配置Zookeeper进行历史文件自动清理的频率。如果配置为0或负数,表示不需要开启定时清理功能
autopurge.purgeInterval=1
2.2启动zookeeper
../bin/zkServer.sh start
控制台输出
从zookeeper.out中,可以看到启动的详细日志,没有报错信息
zookeeper.out日志文件
执行zkServer.sh命令,查看zookeeper启动状态
../bin/zkServer.sh status
zookeeper状态
还可以使用jps查看java运行进程,看到zookeeper运行状态
image
使用netstat查看当前网络监听状态
netstat -npotl
image
到此,确定zookeeper已完全启动
2.2配置kafka
进入kafka配置目录,编辑kafka配置文件
cd /kafka/kafka_2.12-0.10.2.1/config/
vi server.properties
#每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id=0
#broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads=3
#broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
num.io.threads=8
#socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes=102400
#socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes=102400
#socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600
#kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs=/kafka/kafkadata/kafka-logs
# 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
num.partitions=1
#数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
#log.retention.bytes和log.retention.hours任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.hours=168
#topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes=1073741824
#文件大小检查的周期时间,是否触发 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=300000
#zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect=192.168.137.2:2181
#ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.connection.timeout.ms=6000
#host.name=192.168.137.2
#port=9092
#advertised.listeners=PLAINTEXT://192.168.137.2:9092
#使用的认证协议
listeners=SASL_PLAINTEXT://192.168.137.2:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
启动kafka
./bin/kafka-server-start.sh ./config/server.properties
会发现报错
image需要配置jaas,新建kafka_saas.conf文件,内容如下
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpwd"
user_admin="adminpwd";
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpwd";
};
在KafkaServer部分,username和password是broker用于初始化连接到其他的broker,user_XXX 为自定义的用户,所有可以使用的用户必须在此定义(user_xxx必须配置admin用户,否则报错)。
- username/password:broker之间通信使用的用户名密码。
- user_admin:客户端连接broker时所使用到的用户名密码。
在KafkaClient部分,username和password是客户端用来配置客户端连接broker的用户,在上面配置中,客户端使用admin用户连接到broker。
新建启动脚本start_kafka.sh,内容如下
export KAFKA_OPTS=-Djava.security.auth.login.config=/kafka/kafka_2.12-0.10.2.1/config/kafka_saas.conf
/kafka/kafka_2.12-0.10.2.1/bin/kafka-server-start.sh /kafka/kafka_2.12-0.10.2.1/config/server.properties >/dev/null 2>&1 &
用脚本启动kafka
./start_kafka.sh
在日志文件夹中,从server.log中,可以看到启动日志,启动无报错信息。
再可以用jps,netstat -npotl 查看到9092监听的进程。
2.3 测试
创建topic
./bin/kafka-topics.sh --create --zookeeper 192.168.137.2:2181 --replication-factor 1 --partitions 1 --topic topic1
查看topic列表
./bin/kafka-topics.sh --list --zookeeper 192.168.137.2:2181
查看topic详情
./bin/kafka-topics.sh --describe --zookeeper 192.168.137.2:2181 --topic topic1
删除topic
./bin/kafka-topics.sh --delete --zookeeper 192.168.137.2:2181 --topic topic1
2.配置producer
编辑配置文件
vi producer.properties
添加如下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
编辑启动脚本,添加一行
vi kafka-console-producer.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka/kafka_2.12-0.10.2.1/config/kafka_saas.conf"
启动producer
./bin/kafka-console-producer.sh --broker-list 192.168.137.2:9092 --topic topic1 --producer.config /kafka/kafka_2.12-0.10.2.1/config/producer.properties
ps:
1).配置认证后,需要使用新版本producer方式连接。
2).由于kafka_saas.conf配置的KafkaClient用的是admin,所以不需要配置限,如使用其它账号需要添加权限。
3.添加权限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.137.2:2181 --add --allow-principal User:kafkaclient1 --operation Read --operation Write --topic topic1
查看权限
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.137.2:2181 --list --topic topic1
4.配置consumer
vi consumer.properties
添加如下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
vi kafka-console-consumer.sh
添加一行
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka/kafka_2.12-0.10.2.1/config/kafka_saas.conf"
启动consumer
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.2:9092 --topic topic1 --from-beginning --consumer.config /kafka/kafka_2.12-0.10.2.1/config/consumer.properties
这样就可以愉快的生产和消费消息了,至此,安装配置结束。