kafka 的集群搭建
启动zookeeper
在本地2181端口启动ZK。zookeeper集群启动参考
https://blog.csdn.net/qiushisoftware/article/details/79043379
bin/zookeeper-server-start.shconfig/zookeeper.properties
1
如果你需要对zookeeper开启SASL认证,请在配置文件中加上
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProviderrequireClientAuthScheme=sasljaasLoginRenew=360000
1
2
3
并编写JAAS文件
Server { org.apache.kafka.common.security.plain.PlainLoginModulerequired username="admin"password="admin-secret"user_admin="admin-secret";};
1
2
3
4
5
6
启动命令行如下
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/app/gdl/zookeeper-3.4.11/conf/zookeeper_jaas.conf"nohup sh bin/zkServer.shstart conf/zoo.cfg>zoo.log&
1
2
如果你使用的是kafka自带的zookeeper,请参考
https://blog.csdn.net/geting/article/details/52044055
复制kafka的配置文件,两个配置文件的端口号应该不同
bin/kafka-server-start.sh config/server.propertiesbin/kafka-server-start.sh config/server-1.propertiesbin/kafka-server-start.sh config/server-2.properties
1
2
3
config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2
1
2
3
4
5
6
7
8
9
如果你在云环境部署,建议在listeners中配置上该主机的IP。否则,broker注册到zk的将是自己的主机名,如果云环境不能很好的解析主机名,就会导致问题。
如果你的主机内网访问IP和外网访问IP不同,listeners中应配置内网访问IP,advertised listeners应配置外网访问IP。
bin/kafka-topics.sh--create--zookeeper localhost:2181--replication-factor3--partitions1--topic my-replicated-topic//查看已经创建的topicbin/kafka-topics.sh--list--zookeeper localhost:2181//查看topic的具体信息bin/kafka-topics.sh--describe--zookeeper localhost:2181--topic my-replicated-topic
1
2
3
4
5
6
bin/kafka-console-producer.sh--broker-listlocalhost:9092--topic my-replicated-topic
1
bin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--from-beginning--topic my-replicated-topic
1
这里使用kafkamanager,这个使用play框架
下载地址
https://github.com/yahoo/kafka-manager/releases
首先,在用户目录的./sbt下建立如下repositories文件
[repositories]localaliyun: http://maven.aliyun.com/nexus/content/groups/publictypesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
1
2
3
4
解压缩并进入上述文件夹
./sbt clean dist
1
kafka manager应配置登陆权限,避免配置被意外更改。kafka manager通过访问zookeeper来操纵kafka集群,即使没有broker存活,也可以对相关信息进行更改。
注意,现在kafka集群采取在集群中topic中存储offset的做法。但是kafka manager只支持PLAINTEXT的方式访问集群。所以必须为集群配置PLAINTEXT
启动前,需要对配置文件做如下修改
kafka-manager.zkhosts="10.237.64.46:2181"#这是你zookeeper的IP
1
运行启动脚本即可。
使用请参考如下文档:
https://docs.confluent.io/current/schema-registry/docs/maven-plugin.html
启动前修改schema-registry.properties文件
listeners=http://10.237.64.46:9096#对外提供服务的端口kafkastore.connection.url=localhost:2181#kafka zk的ip 用于持久化
1
2
运行启动脚本即可。
请参考如下文档
https://github.com/landoop/schema-registry-ui
使用UI一定要设置跨域访问。跨域访问除了服务端设置,还要在浏览器上设置跨域访问。
首先,如果要使用外网来传递消息,安全起见,需要加上SSL
在server.properties文件上加上如下配置:
##内网client和broker可以通过9092端口明文访问,外网client则通过9094端口SSL方式访问listeners=PLAINTEXT://10.83.1.48:9092,SSL://10.83.1.48:9094ssl.keystore.location=/opt/app/mskyprocess/kafka_2.11-1.0.1/ssl/server.keystore.jksssl.keystore.password=123456ssl.key.password=123456ssl.truststore.location=/opt/app/mskyprocess/kafka_2.11-1.0.1/ssl/server.truststore.jksssl.truststore.password=123456
1
2
3
4
5
6
7
server.keystore.jks和server.truststore.jks的取得请参考kafka官方文档(其实就是SSL的公钥库和信任库)。
http://kafka.apache.org/documentation/#security_overview
使用JDK自带的keytool工具。具体命令如下
#!/bin/bash#Step 1##产生服务端的keystoreserver.keystore.jks文件(加密算法为RSA,有效期为36500,存储在server.keystore.jks,别名为localhost)keytool -keystore server.keystore.jks -aliaslocalhost -validity36500-keyalg RSA -genkey##产生客户端的keystorekeytool -keystore client.keystore.jks -aliaslocalhost -validity36500-keyalg RSA -genkey#Step 2##产生ca-cert ca-key文件 就是一个密钥对 注意集群中的ca-key和ca-cert应该保持一致!(也就是说你配置除了第一台服务器外,其他服务器应该吧这两个文件和ca-cert.srl直接拷贝过去)openssl req -new-x509 -keyout ca-key -out ca-cert -days36500##添加server和client对CA的信任。keytool -keystore server.truststore.jks -aliasCARoot -import -fileca-cert keytool -keystore client.truststore.jks -aliasCARoot -import -fileca-cert#Step 3##将第一步生成的localhost密钥导出(公钥)为cert-file文件keytool -keystore server.keystore.jks -aliaslocalhost -certreq -filecert-file##对client进行同样的操作keytool -keystore client.keystore.jks -aliaslocalhost -certreq -fileclient-cert-file##使用ca-key(CA的私钥)对上一步导出的cert-file进行加密,加密结果为cert-signed文件openssl x509 -req -CA ca-cert -CAkey ca-key -incert-file-out cert-signed -days36500##对于client做同样操作openssl x509 -req -CA ca-cert -CAkey ca-key -inclient-cert-file-out client-cert-signed -days36500##Step 4#将CA证书导入服务器keystorekeytool -keystore server.keystore.jks -aliasCARoot -import -fileca-cert keytool -keystore client.keystore.jks -aliasCARoot -import -fileca-cert#签名后的server认证加入server的keystore,client认证加到client的keystorekeytool -keystore server.keystore.jks -aliaslocalhost -import -filecert-signed keytool -keystore client.keystore.jks -aliaslocalhost -import -fileclient-cert-signed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
配置完成后可以作如下验证,验证SSL是否配置成功
openssl s_client-debug-connectlocalhost:9094-tls1
1
配置命令行producer
在producer.properties文件下加入如下设置
bootstrap.servers=localhost:9094security.protocol=SSLssl.truststore.location=/opt/app/mskyprocess/kafka_2.11-1.0.1/ssl/client.truststore.jksssl.truststore.password=123456
1
2
3
4
启动之
shkafka-console-producer.sh--broker-list10.221.198.126:9094--topictest--producer.config../config/producer.properties
1
配置命令行consumer
修改consumer.propeties文件,加入如上producer.properties的设置。启动之。
shkafka-console-consumer.sh--bootstrap-server10.221.198.126:9094--topictest--consumer.config../config/consumer.properties
1
即使在内网,我们也可能会面临员工误操作带来悲剧事件。所以,我们需要加上SASL_JAAS来进行访问控制。简单来说,就是明文校验用户名和密码。
首先在broker端的配置目录新建kafka_server_jaas.conf
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModulerequired username="admin"password="admin"user_admin="admin"
有需要的联系我2317384986 yxxy1717