centos7.4/7.5搭建企业级kafka消息队列集群
kafka增加了账号认证后标志着它向企业级发展迈出了关键的一步,在这个功能后kafka也终于有了大版本,到现在已经演进到1.1.0,发展迅速以至于国内相关实践的资料很少,或者问题较多,经过一番折腾后发现有点复杂,所以把过程分享给大家,帮助大家少走弯路,主要使用SASL_PLAINTEXT 实现认证。
系统要求:
centos7.4/7.5 jdk1.8 /data为数据目录 各服务器的防火墙相互允许所有端口访问
服务器三台:
server1 172.16.99.35
server2 172.16.99.36
server3 172.16.99.37
下载:
wget http://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
解压:
tar -zxvf kafka_2.11-1.1.0.tgz -C /usr/local/
mv /usr/local/kafka_2.11-1.1.0 /usr/local/kafka
1.zookeeper配置
1.搭建集群
每台服务都执行:
mkdir -p /data/app/kafka/data/zookeeper
mkdir -p /data/app/logs/kafka/zookeeper
vi /usr/local/kafka/config/zookeeper.properties
dataDir= /data/app/kafka/data/zookeeper
dataLogDir=/data/app/logs/kafka/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=20
tickTime=2000
initLimit=5
syncLimit=2
server.1=172.16.99.35:2888:3888
server.2=172.16.99.36:2888:3888
server.3=172.16.99.37:2888:3888
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
2.设置zookeeper主机id
echo 1 > /data/app/kafka/data/zookeeper/myid
注意:这个id是zookeeper的主机标示,每个主机id不同第二台是2 第三台是3。
3.启动
逐次启动3台机器的zookeeper 构成一个集群
cd /usr/local/kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2.kafka集群配置
每台服务器都需要重复以下操作
1.创建kafka登陆认证文件
vi /usr/local/kafka/config/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="test"
password="testpwd";
};
vi /usr/local/kafka/config/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpwd"
user_admin="adminpwd"
user_test="testpwd"
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpwd";
};
vi /usr/local/kafka/config/kafka_zoo_jaas.conf
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpwd"
user_admin="adminpwd";
};
2. 绑定认证文件
修改kafka各项sh脚本,在最后一行之前添加绑定
在/usr/local/kafka/bin/zookeeper-server-start.sh中添加:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_zoo_jaas.conf"
在/usr/local/kafka/bin/kafka-server-start.sh中添加:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
在/usr/local/kafka/bin/kafka-console-consumer.sh/kafka-console-producer.sh中添加:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"
3.kafka添加SASL_PLAINTEXT支持
在/usr/local/kafka/config/server.properties文件最后添加:
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
auto.create.topics.enable=false
allow.everyone.if.no.acl.found=false
在/usr/local/kafka/config/producer.properties文件最后添加:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
在/usr/local/kafka/config/consumer.properties文件最后添加:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
4.kafka集群设置
在/usr/local/kafka/config/server.properties修改如下的值:
zookeeper.connect=172.16.99.35:2181,172.16.99.36:2181,172.16.99.37:2181
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://本机内网IP:9092 (当需要提供外网服务时则为本机外网IP,防火墙须打开外网IP相互访问9092端口)
log.dirs=/data/app/logs/kafka/kafka-logs
broker.id=0 (数字,每个服务都是唯一值)
5.启动
依次启动
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
3.测试
创建topic:
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test_topic replication-factor
查看所有topic:
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
创建有权限的消费者
/usr/local/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Create --cluster kafka-cluster (consumer不包括create 会导致无法读,所以需要单独加,但producer却会自动加)
/usr/local/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --consumer --topic test_topic --group test-consumer-group
创建生产者
/usr/local/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --producer --topic test_topic
生产数据
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.16.99.35:9092 --topic test_topic --producer.config=/usr/local/kafka/config/producer.properties
消费数据:
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.99.36:9092 --topic test_topic --from-beginning --consumer.config=/usr/local/kafka/config/consumer.properties
4.压力测试
在/usr/local/kafka/bin/kafka-consumer-perf-test.sh/kafka-producer-perf-test.sh倒数第一行前添加:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"
执行压测脚本:
/usr/local/kafka/bin/kafka-consumer-perf-test.sh --consumer.config /usr/local/kafka/config/consumer.properties --broker-list 172.16.99.35:9092,172.16.99.36:9092,172.16.99.37:9092 -messages 50000 --topic test_topic --group=test-consumer-group --threads 1
bin/kafka-producer-perf-test.sh --topic test_topic --num-records 5000000 --record-size 100 --throughput -1 --producer.config config/producer.properties --producer-props acks=1 bootstrap.servers=172.16.108.128:9092,172.16.108.139:9092,172.16.108.136:9092 buffer.memory=67108864 batch.size=8196