大数据大数据,机器学习,人工智能

kafka 的集群搭建

2018-08-29  本文已影响1人  清风_d587

启动zookeeper

在本地2181端口启动ZK。zookeeper集群启动参考 

https://blog.csdn.net/qiushisoftware/article/details/79043379

bin/zookeeper-server-start.shconfig/zookeeper.properties

1

如果你需要对zookeeper开启SASL认证,请在配置文件中加上

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProviderrequireClientAuthScheme=sasljaasLoginRenew=360000

1

2

3

并编写JAAS文件

Server {  org.apache.kafka.common.security.plain.PlainLoginModulerequired  username="admin"password="admin-secret"user_admin="admin-secret";};

1

2

3

4

5

6

启动命令行如下

export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/app/gdl/zookeeper-3.4.11/conf/zookeeper_jaas.conf"nohup sh bin/zkServer.shstart conf/zoo.cfg>zoo.log&

1

2

如果你使用的是kafka自带的zookeeper,请参考 

https://blog.csdn.net/geting/article/details/52044055

启动kafka

复制kafka的配置文件,两个配置文件的端口号应该不同

bin/kafka-server-start.sh config/server.propertiesbin/kafka-server-start.sh config/server-1.propertiesbin/kafka-server-start.sh config/server-2.properties

1

2

3

config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2

1

2

3

4

5

6

7

8

9

如果你在云环境部署,建议在listeners中配置上该主机的IP。否则,broker注册到zk的将是自己的主机名,如果云环境不能很好的解析主机名,就会导致问题。 

如果你的主机内网访问IP和外网访问IP不同,listeners中应配置内网访问IP,advertised listeners应配置外网访问IP。

创建topic(使用命令行)

bin/kafka-topics.sh--create--zookeeper localhost:2181--replication-factor3--partitions1--topic my-replicated-topic//查看已经创建的topicbin/kafka-topics.sh--list--zookeeper localhost:2181//查看topic的具体信息bin/kafka-topics.sh--describe--zookeeper localhost:2181--topic my-replicated-topic

1

2

3

4

5

6

生产消息(使用命令行)

bin/kafka-console-producer.sh--broker-listlocalhost:9092--topic my-replicated-topic

1

消费消息(使用命令行)

bin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--from-beginning--topic my-replicated-topic

1

部署kafka监控

这里使用kafkamanager,这个使用play框架 

下载地址 

https://github.com/yahoo/kafka-manager/releases

首先,在用户目录的./sbt下建立如下repositories文件

[repositories]localaliyun: http://maven.aliyun.com/nexus/content/groups/publictypesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly

1

2

3

4

解压缩并进入上述文件夹

./sbt clean dist

1

kafka manager应配置登陆权限,避免配置被意外更改。kafka manager通过访问zookeeper来操纵kafka集群,即使没有broker存活,也可以对相关信息进行更改。

注意,现在kafka集群采取在集群中topic中存储offset的做法。但是kafka manager只支持PLAINTEXT的方式访问集群。所以必须为集群配置PLAINTEXT

启动前,需要对配置文件做如下修改

kafka-manager.zkhosts="10.237.64.46:2181"#这是你zookeeper的IP

1

运行启动脚本即可。

部署schema-registry的插件

使用请参考如下文档: 

https://docs.confluent.io/current/schema-registry/docs/maven-plugin.html 

启动前修改schema-registry.properties文件

listeners=http://10.237.64.46:9096#对外提供服务的端口kafkastore.connection.url=localhost:2181#kafka zk的ip 用于持久化

1

2

运行启动脚本即可。

部署schema-registry ui

请参考如下文档 

https://github.com/landoop/schema-registry-ui 

使用UI一定要设置跨域访问。跨域访问除了服务端设置,还要在浏览器上设置跨域访问。

为kafka配置SSL

首先,如果要使用外网来传递消息,安全起见,需要加上SSL 

在server.properties文件上加上如下配置:

##内网client和broker可以通过9092端口明文访问,外网client则通过9094端口SSL方式访问listeners=PLAINTEXT://10.83.1.48:9092,SSL://10.83.1.48:9094ssl.keystore.location=/opt/app/mskyprocess/kafka_2.11-1.0.1/ssl/server.keystore.jksssl.keystore.password=123456ssl.key.password=123456ssl.truststore.location=/opt/app/mskyprocess/kafka_2.11-1.0.1/ssl/server.truststore.jksssl.truststore.password=123456

1

2

3

4

5

6

7

server.keystore.jks和server.truststore.jks的取得请参考kafka官方文档(其实就是SSL的公钥库和信任库)。 

http://kafka.apache.org/documentation/#security_overview 

使用JDK自带的keytool工具。具体命令如下

#!/bin/bash#Step 1##产生服务端的keystoreserver.keystore.jks文件(加密算法为RSA,有效期为36500,存储在server.keystore.jks,别名为localhost)keytool -keystore server.keystore.jks -aliaslocalhost -validity36500-keyalg RSA -genkey##产生客户端的keystorekeytool -keystore client.keystore.jks -aliaslocalhost -validity36500-keyalg RSA -genkey#Step 2##产生ca-cert ca-key文件 就是一个密钥对 注意集群中的ca-key和ca-cert应该保持一致!(也就是说你配置除了第一台服务器外,其他服务器应该吧这两个文件和ca-cert.srl直接拷贝过去)openssl req -new-x509 -keyout ca-key -out ca-cert -days36500##添加server和client对CA的信任。keytool -keystore server.truststore.jks -aliasCARoot -import -fileca-cert            keytool -keystore client.truststore.jks -aliasCARoot -import -fileca-cert#Step 3##将第一步生成的localhost密钥导出(公钥)为cert-file文件keytool -keystore server.keystore.jks -aliaslocalhost -certreq -filecert-file##对client进行同样的操作keytool -keystore client.keystore.jks -aliaslocalhost -certreq -fileclient-cert-file##使用ca-key(CA的私钥)对上一步导出的cert-file进行加密,加密结果为cert-signed文件openssl x509 -req -CA ca-cert -CAkey ca-key -incert-file-out cert-signed -days36500##对于client做同样操作openssl x509 -req -CA ca-cert -CAkey ca-key -inclient-cert-file-out client-cert-signed -days36500##Step 4#将CA证书导入服务器keystorekeytool -keystore server.keystore.jks -aliasCARoot -import -fileca-cert            keytool -keystore client.keystore.jks -aliasCARoot -import -fileca-cert#签名后的server认证加入server的keystore,client认证加到client的keystorekeytool -keystore server.keystore.jks -aliaslocalhost -import -filecert-signed            keytool -keystore client.keystore.jks -aliaslocalhost -import -fileclient-cert-signed

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

配置完成后可以作如下验证,验证SSL是否配置成功

openssl s_client-debug-connectlocalhost:9094-tls1

1

配置命令行producer 

在producer.properties文件下加入如下设置

bootstrap.servers=localhost:9094security.protocol=SSLssl.truststore.location=/opt/app/mskyprocess/kafka_2.11-1.0.1/ssl/client.truststore.jksssl.truststore.password=123456

1

2

3

4

启动之

shkafka-console-producer.sh--broker-list10.221.198.126:9094--topictest--producer.config../config/producer.properties

1

配置命令行consumer 

修改consumer.propeties文件,加入如上producer.properties的设置。启动之。

shkafka-console-consumer.sh--bootstrap-server10.221.198.126:9094--topictest--consumer.config../config/consumer.properties

1

访问控制

即使在内网,我们也可能会面临员工误操作带来悲剧事件。所以,我们需要加上SASL_JAAS来进行访问控制。简单来说,就是明文校验用户名和密码。

首先在broker端的配置目录新建kafka_server_jaas.conf

KafkaServer {      org.apache.kafka.common.security.plain.PlainLoginModulerequired      username="admin"password="admin"user_admin="admin"

有需要的联系我2317384986     yxxy1717

上一篇下一篇

猜你喜欢

热点阅读