kafka

Kafka集群监控、安全机制与最佳实践

2020-07-09  本文已影响0人  端碗吹水

Kafka监控安装

Kafka集群监控方案选择:

所以本小节先介绍该监控工具的安装及配置,到如下地址可以下载各个版本的Kafka Manager:

我这里下载的是当前最新的3.0.0.5版本,需要注意的是3.x版本需要基于JDK11环境,2.0.0.2及以下版本才兼容JDK1.8。复制下载链接后,使用wget命令进行下载:

[root@localhost ~]# cd /usr/local/src
[root@localhost /usr/local/src]# wget https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip

解压下载好的压缩包,放到kafka的安装目录下,并且更名为kafka-manager,不然总感觉cmak这个名称容易和cmake搞混:

[root@localhost /usr/local/src]# unzip cmak-3.0.0.5.zip
[root@localhost /usr/local/src]# mv cmak-3.0.0.5 /usr/local/kafka/kafka-manager

然后修改一下配置文件,主要是配置Kafka集群中Zookeeper的连接地址,也就是要监控哪个Kafka集群就配置哪个Kafka集群的Zookeeper地址:

[root@localhost ~]# vim /usr/local/kafka/kafka-manager/conf/application.conf
# 配置多个地址使用逗号分隔即可
cmak.zkhosts="127.0.0.1:2181"

配置完成后,使用如下命令启动即可:

[root@localhost ~]# nohup /usr/local/kafka/kafka-manager/bin/cmak &

如下,正常监听了9000端口代表已经启动成功:

[root@localhost ~]# netstat -lntp |grep 9000
tcp6       0      0 :::9000         :::*            LISTEN      25237/java
[root@localhost ~]#

如果你的机器打开了防火墙的话,还需要在防火墙上放开9000端口:

[root@localhost ~]# firewall-cmd --zone=public --add-port=9000/tcp --permanent
[root@localhost ~]# firewall-cmd --reload

然后就可以在浏览器上打开监控界面了,如下所示:


image.png

Kafka监控界面

当我们首次打开CMAK的监控页面时,是一片空白的。因为此时我们还没有添加任何需要被监控的集群,所以首先第一步就是要添加集群:


image.png

需要注意的是,如果要开启JMX轮询,则必须事先在Kafka的启动脚本中打开JMX的端口号:

[root@localhost ~]# vim /usr/local/kafka/bin/kafka-server-start.sh
# 打开JMX端口
export JMX_PORT=9999

然后重启Kafka:

[root@localhost ~]# kafka-server-stop.sh
[root@localhost ~]# nohub kafka-server-start.sh /usr/local/kafka/config/server.properties &

剩下的配置基本保持默认即可,然后点击“Save”进行保存:


image.png

保存成功后,点击“Go to cluster view”:


image.png

就可以查看到我们添加的这个集群信息,在“Cluster Summary”一栏中显示了该集群的Topic数量和Broker数量:


image.png

点击“Topics”的数字就可以进入到Topic的监控界面:


image.png

点击“Brokers”的数字就则是进入到Broker的监控界面:


image.png

Tips:这些指标的监控需要打开JMX

在Topic的监控界面中,点击一个具体的Topic可以进入到该Topic的监控页面,并且提供了对Topic的操作支持:


image.png

Topic的监控指标里也有与Broker一样的监控指标,只不过这里是针对Topic的,指标的含义都一样:


image.png

Kafka SSL签名库生成

Kafka的安全措施:

值得一提的是通常情况下都不会给Kafka加安全措施,类似的其他中间件也是。因为通常我们都会将这些中间件部署在一个可信的网络里,例如与外网隔离的内部网络,并且有防火墙进行保护。

而且给Kafka加上SSL或SASL安全机制也会导致性能有所损耗,通常这个损耗在20~30%左右。但如果你的Kafka是允许在外网进行访问的话,那么就需要考虑增加安全机制了。

在本文中主要介绍一下SSL这种安全机制,SSL如今也不算啥冷门知识了,对HTTPS有所了解的话都应该清楚,这里就不进行赘述了。首先我们知道SSL是需要证书的,所以第一步就是创建证书,但在此之前需要先创建密钥仓库,用于存储证书文件。具体命令如下:

[root@localhost ~]# mkdir ca-store  # 创建一个目录
[root@localhost ~]# cd ca-store/  # 进入该目录
[root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias mykafka -validity 100000 -genkey  # 创建密钥仓库
Enter keystore password:     # 输入密码
Re-enter new password:       # 确认密码
What is your first and last name?   # 输入你的姓名
  [Unknown]:  lingyi
What is the name of your organizational unit?  # 输入你的组织单位
  [Unknown]:  zj
What is the name of your organization?  # 输入你的组织名称
  [Unknown]:  zj
What is the name of your City or Locality?   # 输入你的所在城市
  [Unknown]:  beijing
What is the name of your State or Province?  # 输入你的所在省份
  [Unknown]:  beijing
What is the two-letter country code for this unit?   # 输入两个字母的国家代码
  [Unknown]:  cn
Is CN=lingyi, OU=zj, O=zj, L=beijing, ST=beijing, C=cn correct?
  [no]:  y   # 输入y确认以上信息
[root@localhost ~/ca-store]# ls
server.keystore.jks  # 创建完成后,当前目录下会有这样一个文件
[root@localhost ~/ca-store]# 

创建CA证书:

[root@localhost ~]# openssl req -new -x509 -keyout ca-key -out ca-cert -days 100000
Generating a 2048 bit RSA private key
........................+++
........+++
writing new private key to 'ca-key'
Enter PEM pass phrase:   # 输入密码
Verifying - Enter PEM pass phrase:   # 确认密码
-----
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [XX]:cn  # 输入两个字母的国家代码
State or Province Name (full name) []:beijing      # 输入你的所在省份
Locality Name (eg, city) [Default City]:beijing    # 输入你的所在城市
Organization Name (eg, company) [Default Company Ltd]:zj    # 输入你的组织单位
Organizational Unit Name (eg, section) []:zj   # 输入你的组织名称
Common Name (eg, your name or your server's hostname) []:lingyi  # 输入你的姓名或服务器名称
Email Address []:email@example.com   # 输入你的邮箱地址
[root@localhost ~/ca-store]# ls  # 创建完成后,当前目录下会多出两个文件
ca-cert  ca-key  server.keystore.jks
[root@localhost ~/ca-store]# 

将生成的CA添加到客户端信任库:

[root@localhost ~]# keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
Enter keystore password:     # 输入密码
Re-enter new password:       # 确认密码
Owner: EMAILADDRESS=binary0_1@163.com, CN=lingyi, OU=zj, O=zj, L=beijing, ST=beijing, C=cn
Issuer: EMAILADDRESS=binary0_1@163.com, CN=lingyi, OU=zj, O=zj, L=beijing, ST=beijing, C=cn
Serial number: e6fa410f7c90ff2a
Valid from: Mon Jul 06 22:20:50 CST 2020 until: Sat Apr 21 22:20:50 CST 2294
Certificate fingerprints:
     SHA1: 1F:F3:8C:F4:37:9C:47:45:42:A4:51:77:7D:DA:05:E5:59:27:0C:9F
     SHA256: F6:7E:F6:E2:A9:12:8B:C4:04:6E:F0:23:49:6F:0D:3C:94:5F:AD:D6:76:42:42:63:24:69:96:C6:EE:02:70:91
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3

Extensions: 

#1: ObjectId: 2.5.29.35 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: 9A 1D 7C 61 ED 94 C0 BC   13 EA 20 3B 59 05 6A F9  ...a...... ;Y.j.
0010: 40 3B E8 4D                                        @;.M
]
]

#2: ObjectId: 2.5.29.19 Criticality=false
BasicConstraints:[
  CA:true
  PathLen:2147483647
]

#3: ObjectId: 2.5.29.14 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: 9A 1D 7C 61 ED 94 C0 BC   13 EA 20 3B 59 05 6A F9  ...a...... ;Y.j.
0010: 40 3B E8 4D                                        @;.M
]
]

Trust this certificate? [no]:  y   # 是否信任此证书
Certificate was added to keystore
[root@localhost ~/ca-store]# ls
ca-cert  ca-key  client.truststore.jks  server.keystore.jks
[root@localhost ~/ca-store]# 

为Broker提供信任库以及所有客户端签名了密钥的CA证书:

[root@localhost ~/ca-store]# keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
... 与之前的内容一致,略 ...
[root@localhost ~/ca-store]# ls
ca-cert  ca-key  client.truststore.jks  server.keystore.jks  server.truststore.jks
[root@localhost ~/ca-store]# 

完成以上步骤后,就是对证书进行签名,也就是用自己生成的CA来签名前面生成的证书。

1、从密钥仓库导出证书:

[root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias mykafka -certreq -file cert-file
Enter keystore password:    # 这里输入server.keystore.jks的密码
[root@localhost ~/ca-store]# 

2、用CA签名:

[root@localhost ~/ca-store]# openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 100000 -CAcreateserial -passin pass:123456
Signature ok
subject=/C=cn/ST=beijing/L=beijing/O=zj/OU=zj/CN=lingyi
Getting CA Private Key
[root@localhost ~/ca-store]# 

3、导入CA的证书和已签名的证书到密钥仓库:

[root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
... 与之前的内容一致,略 ...
[root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias mykafka -import -file cert-signed
Enter keystore password:  
Certificate reply was installed in keystore
[root@localhost ~/ca-store]# ls   # 完成所有步骤后,当前目录下会有如下文件
ca-cert  ca-cert.srl  ca-key  cert-file  cert-signed  client.truststore.jks  server.keystore.jks  server.truststore.jks
[root@localhost ~/ca-store]# 

Kafka SSL服务端集成

Kafka SSL服务端集成其实也比较简单,只需要修改一下Kafka的server.properties配置文件即可。具体如下所示:

[root@localhost ~]# vim /usr/local/kafka/config/server.properties
# 在原本的配置上追加SSL的监听端口及协议配置
listeners=PLAINTEXT://192.168.220.128:9092,SSL://192.168.220.128:8989
advertised.listeners=PLAINTEXT://192.168.220.128:9092,SSL://192.168.220.128:8989

# 增加SSL相关配置
ssl.keystore.location=/root/ca-store/server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/root/ca-store/server.truststore.jks
ssl.truststore.password=123456

完成配置的修改后重启Kafka:

[root@localhost ~]# kafka-server-stop.sh
[root@localhost ~]# kafka-server-start.sh /usr/local/kafka/config/server.properties &

然后就可以使用openssl测试一下SSL配置是否成功,执行如下命令并输出了类似的内容则代表配置成功,已经能够通过SSL协议连接:

[root@localhost ~]# openssl s_client -debug -connect 192.168.220.128:8989 -tls1
CONNECTED(00000003)
write to 0xfcd230 [0xfd6d63] (181 bytes => 181 (0xB5))
0000 - 16 03 01 00 b0 01 00 00-ac 03 01 0c a9 85 ea 8f   ................
0010 - f2 f3 c1 ac fb 9d f6 78-9c ed f4 60 97 ad 91 33   .......x...`...3
0020 - 32 ab b2 81 9e 81 6b 3f-e0 db da 00 00 64 c0 14   2.....k?.....d..
0030 - c0 0a 00 39 00 38 00 37-00 36 00 88 00 87 00 86   ...9.8.7.6......
0040 - 00 85 c0 0f c0 05 00 35-00 84 c0 13 c0 09 00 33   .......5.......3
0050 - 00 32 00 31 00 30 00 9a-00 99 00 98 00 97 00 45   .2.1.0.........E
0060 - 00 44 00 43 00 42 c0 0e-c0 04 00 2f 00 96 00 41   .D.C.B...../...A
0070 - c0 12 c0 08 00 16 00 13-00 10 00 0d c0 0d c0 03   ................
0080 - 00 0a 00 07 c0 11 c0 07-c0 0c c0 02 00 05 00 04   ................
0090 - 00 ff 01 00 00 1f 00 0b-00 04 03 00 01 02 00 0a   ................
00a0 - 00 0a 00 08 00 17 00 19-00 18 00 16 00 23 00 00   .............#..
00b0 - 00 0f 00 01 01                                    .....
read from 0xfcd230 [0xfd2813] (5 bytes => 5 (0x5))
0005 - <SPACES/NULS>
write to 0xfcd230 [0xfdc2b0] (7 bytes => 7 (0x7))
0000 - 15 03 01 00 02 02 46                              ......F
140425165125520:error:1408F10B:SSL routines:SSL3_GET_RECORD:wrong version number:s3_pkt.c:365:
---
no peer certificate available
---
No client certificate CA names sent
---
SSL handshake has read 5 bytes and written 7 bytes
---
New, (NONE), Cipher is (NONE)
Secure Renegotiation IS NOT supported
Compression: NONE
Expansion: NONE
No ALPN negotiated
SSL-Session:
    Protocol  : TLSv1
    Cipher    : 0000
    Session-ID: 
    Session-ID-ctx: 
    Master-Key: 
    Key-Arg   : None
    Krb5 Principal: None
    PSK identity: None
    PSK identity hint: None
    Start Time: 1594126681
    Timeout   : 7200 (sec)
    Verify return code: 0 (ok)
---

Kafka SSL客户端集成

完成服务端的配置后,接下来继续完成客户端的配置。首先我们需要把之前生成的client.truststore.jks文件从服务器上download下来,并存放在客户端工程目录里,例如resources目录:

image.png

然后在创建Producer客户端的时候,增加SSL相关配置项。如下代码示例:

/**
 * 创建支持SSL的Producer实例
 */
public static Producer<String, String> createProducerWitchSSL() {
    Properties properties = new Properties();
    // 注意,这里的端口一定得是Kafka服务器上配置的SSL协议监听端口
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.119.23:8989");
    properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
    properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
    properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1");
    properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
            , "org.apache.kafka.common.serialization.StringSerializer");
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
            , "org.apache.kafka.common.serialization.StringSerializer");
    // 配置事务支持
    properties.setProperty(ProducerConfig.RETRIES_CONFIG, "1");
    properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "trans-id");
    
    // SSL配置
    properties.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
    properties.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "src\\main\\resources\\client.truststore.jks");
    properties.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "123456");
    properties.setProperty("security.protocol", "SSL");

    return new KafkaProducer<>(properties);
}

/**
 * 发送消息
 */
public static void producerAsyncSend() {
    String topicName = "MyTopic";
    String key = "test-key";
    String value = "this is test message!";

    Producer<String, String> producer = createProducerWitchSSL();
    // 初始化事务
    producer.initTransactions();
    try {
        // 开启事务
        producer.beginTransaction();

        // 构建消息对象
        ProducerRecord<String, String> record =
                new ProducerRecord<>(topicName, key, value);
        // 发送一条消息
        Future<RecordMetadata> future = producer.send(record);
        System.out.println(future.get().timestamp());

        // 提交事务
        producer.commitTransaction();
    } catch (Exception e) {
        e.printStackTrace();
        // 发生异常回滚事务
        producer.abortTransaction();
    } finally {
        producer.close();
    }
}

Consumer客户端也是同样的,只需要在创建客户端实例的时候增加相同的SSL配置即可。完整代码如下:

/**
 * 创建支持SSL的Consumer实例
 */
public static Consumer<String, String> createConsumerWithSSL() {
    Properties props = new Properties();
    // 注意,这里的端口一定得是Kafka服务器上配置的SSL协议监听端口
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.119.23:8989");
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
    props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");

    // SSL配置
    props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
    props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "src\\main\\resources\\client.truststore.jks");
    props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "123456");
    props.setProperty("security.protocol", "SSL");

    return new KafkaConsumer<>(props);
}

/**
 * 消费消息
 */
public static void autoCommitOffset() {
    Consumer<String, String> consumer = createConsumerWithSSL();
    List<String> topics = List.of("MyTopic");
    // 订阅一个或多个Topic
    consumer.subscribe(topics);
    while (true) {
        // 从Topic中拉取数据,每1000毫秒拉取一次
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        // 每次拉取可能都是一组数据,需要遍历出来
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",
                    record.partition(), record.offset(), record.key(), record.value());
        }
    }
}

Kafka最佳实践配置项

服务端必要参数

zookeeper.connect:必配参数,建议在kafka集群的每台实例都配置所有的zk节点

broker.id:必配参数。集群节点的标示符,不得重复,取值范围0~n

log.dirs:不要使用默认的“/tmp/kafka-logs”,因为/tmp目录的性质没法保证数据的持久性

服务端推荐参数

advertised.host.name:注册到zk供用户使用的主机名

advertised.port:注册到zk供用户使用的服务端口

num.partitions:创建topic时的默认partition数量,默认是1

default.replication.factor:自动创建topic的默认副本数量,建议至少修改为2

min.insync.replicasISR:提交生成者请求的最小副本数,建议至少2~3个

unclean.leader.election.enable:是否允许不具备ISR资格的replicas被选举为leader,建议设置为否,除非能够允许数据的丢失

controlled.shutdown.enable:在kafka收到stop命令或者异常终止时,允许自动同步数据,建议开启

可动态调整的参数

unclean.leader.election.enable:不严格的leader选举,有助于集群健壮,但是存在数据丢失风险。

min.insync.replicas:如果同步状态的副本小于该值,服务器将不再接受request.required.acks为-1或all的写入请求。

max.message.bytes:单条消息的最大长度。如果修改了该值,那么replica.fetch.max.bytes和消费者的fetch.message.max.bytes也要跟着修改。

cleanup.policy:生命周期终结数据的处理,默认删除。

flush.messages:强制刷新写入的最大缓存消息数。

flush.ms:强制刷新写入的最大等待时长。

客户端配置:

Producer客户端:ack、压缩、同步生产 vs 异步生产、批处理大小(异步生产)

Consumer客户端方面主要考虑:partition数量及获取消息的大小


Kafka服务器配置最佳实践

JVM参数建议:

1、能分配较大堆的情况下使用JVM的G1垃圾回收器

以下是一段基于24GB内存、四核英特尔至强处理器,8x7200转的SATA硬盘机器的配置参考示例,可以参照该示例代入自己的机器配置进行调整:

-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

机器硬件建议:

1. 内存:建议使用64G内存的机器
2. CPU:尽量选择更多核心,将会获得多核带来的更好的并发处理性能
3. 磁盘:RAID是优先推荐的,SSD可以考虑
4. 网络:最好是万兆网络,千兆也可
5. 文件系统:ext4是最佳选择
6. 操作系统:任何Unix系统上运行良好,并且已经在Linux和Solaris上进行了测试

核心参数调整建议:

  1. 文件描述符数量调整:(number_of_partitions)*(partition_size / segment_size),通常都在100000左右
  2. 视具体情况调整最大套接字缓冲区大小
  3. pagecache:尽量分配与大多数日志的激活日志段大小一致
  4. 禁用swap
  5. 设计broker的数量:单broker上的分区数小于2000;分区大小则建议不要超过25GB
  6. 设计partition的数量:
    1. 至少和最大的消费者组中consumer的数量一致
    2. 分区不要太大,建议小于25GB
上一篇下一篇

猜你喜欢

热点阅读