Kafka QuickStart快速上手
继Kafka introduction理论知识后,需要动手实践了。七步来使用一下Kafka。
第零步:先执行java -version,看是否安装java
第一步:从官网下载kafka版本kafka_2.13-2.4.1.tgz
第二步:启动ZooKeeper服务器和Kafka 服务器
第三步:连接zookeeper服务器,创建1个主题topic
第四步:生产者连接Kafka服务器,发布消息到kafka
第五步:消费者连接Kafka服务器,从kafka消费消息
第六步:连接zookeeper服务器,查看创建的主题topic
第七步:设置kafka 3节点集群
第零步:先执行java -version,看是否安装java,没有参考openjdk官网下载安装。
root@docker:~# java -version
openjdk version "14" 2020-03-17
OpenJDK Runtime Environment (build 14+36-1461)
OpenJDK 64-Bit Server VM (build 14+36-1461, mixed mode, sharing)
从open jdk14下载后上传到服务器/home/openjdk/文件夹下,然后tar xvf openjdk-14_linux-x64_bin.tar.gz后配置java环境变量然后java -version查看是否可用。vi /etc/profile命令配置JAVA_HOME:
export JAVA_HOME=/home/openjdk/jdk-14
export PATH=$PATH:$JAVA_HOME/bin
第一步:先从官网下载kafka版本,写本文时kafka最新版本为kafka_2.13-2.4.1.tgz,我们先尝个鲜,将文件上传到/home/docker_123文件下。
root@docker:~# cd /home/docker_123
root@docker:~# tar xvf kafka_2.13-2.4.1.tgz
root@docker:~# cd kafka_2.13-2.4.1
第二步:启动ZooKeeper服务器和Kafka 服务器
启动Kafka 前先启动一个ZooKeeper服务器,ZooKeeper是协调服务,暂时理解为协调kafka生产者和消费者。 通过与kafka打包在一起的便捷脚本创建一个单节点ZooKeeper实例。
root@docker:/home/docker_123/kafka_2.13-2.4.1# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
注释:nohup {command} & 以后台方式执行command,如果服务器上没有nohup命令则执行command但不可ctrl+c退出否则后续发送kafka消息失败。
现在启动Kafka服务器:
root@docker:/home/docker_123/kafka_2.13-2.4.1# nohup bin/kafka-server-start.sh config/server.properties &
第三步:连接zookeeper服务器,创建1个主题topic
连接localhost:2181的zookeeper服务器,创建一个有1个分区和1个副本且叫为“test”的topic:
root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.
连接localhost:2181的zookeeper服务器,查看这个topic:
root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-topics.sh --list --zookeeper localhost:2181
test
备注:代理配置下,在发布的topic不存在时,自动创建topic,而不是手动创建。
第四步:生产者连接Kafka服务器,发布消息到kafka
使用Kafka自带的命令行生产者客户端,连接localhost:9092的Kafka服务器,将message发送到Kafka。默认情况下,每行将作为单独的message发送。
root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello kafka world! I am docker_123!
>Hi kafka world! what you are doing?
第五步:消费者连接Kafka服务器,从kafka消费消息
使用Kafka自带的命令行消费者客户端,连接localhost:9092的Kafka服务器,从kafka消费消息。
root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello kafka world! I am docker_123!
Hi kafka world! why you are not at you are doing?
备注:如果第四步和第五步在不同的终端中运行,将消息输入到生产者客户端,在消费客户端端显示出来,这个时候生产者就可以消费者实时通信了,当然生产者和消费者都可以退出,只有在发送消息时生产者才去连接kafka服务器,只有消费消息时消费者才去连接kafka服务器。
默认情况下,kafka消息不回重复消费,有offset偏移量。当然指定某个offset偏移量,可以消费该offset的消息。当bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning 从头消费时kafka消息又出来了“hello kafka world! I am docker_123!”吧啦吧啦。
第六步:连接zookeeper服务器,查看创建的主题topic
连接localhost:2181的zookeeper服务器,查看创建的主题topic,有1个分区,1个副本,1个分区中只有1个leader。
root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
第七步:设置kafka 3节点集群
root@docker:/home/docker_123/kafka_2.13-2.4.1# cp config/server.properties config/server-1.properties
root@docker:/home/docker_123/kafka_2.13-2.4.1# cp config/server.properties config/server-2.properties
vi分别修改server-1.properties 和 server-2.properties 如下参数
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
备注:broker.id属性是集群中每个kafka节点的唯一的名称。9092是kafka默认端口,kafka1和kafka2端口分别修改为9093和9094,log.dir目录下放着每个offset偏移量的kafka消息等。
现在启动kafka的9093节点和9094节点:
root@docker:/home/docker_123/kafka_2.13-2.4.1# nohup bin/kafka-server-start.sh config/server-1.properties &
root@docker:/home/docker_123/kafka_2.13-2.4.1# nohup bin/kafka-server-start.sh config/server-2.properties &
现在创建一个分区为1和副本为3的新topic:
root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
有条件的话,可以创建一个分区为5和副本为3的新topic,对比查看下。
root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic my-replicated-topic
多个副本的容错性,暂时测试不了了,很遗憾,因为资源问题启动不了3个kafka节点,第2个节点在一创建topic时自动退出了,想玩下2节点也不行。
FAQ:
1) 当创建主题topic的副本数超过kafka节点个数,会创建topic失败
root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Error while executing topic command : Replication factor: 3 larger than available brokers: 1.
[2020-04-24 23:42:11,705] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
(kafka.admin.TopicCommand$)
2)在启动其他2个kafka节点时,不知道对消费者有什么影响?启动kafka时消费者在重复消费消息,如下:
hello kafak worka world! I am docker_123!
Hi kafka world! why you are not at you are doing?
docker
tcpdump is working
tcpdump is working
tcpdump
hello kafak worka world! I am docker_123!
Hi kafka world! why you are not at you are doing?
docker
tcpdump is working
tcpdump is working
tcpdump
3)kafka服务器启动流程:
先连接localhost:2181 的zookeeper ,当连接超时后会自动退出kafka集群。
root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-server-start.sh config/server-2.properties
[2020-04-24 23:54:33,599] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2020-04-24 23:54:55,400] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2020-04-24 23:54:55,546] INFO starting (kafka.server.KafkaServer)
[2020-04-24 23:54:55,745] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2020-04-24 23:54:57,789] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2020-04-24 23:55:01,268] INFO Client environment:os.memory.free=980MB (org.apache.zookeeper.ZooKeeper)
[2020-04-24 23:55:01,268] INFO Client environment:os.memory.max=1024MB (org.apache.zookeeper.ZooKeeper)
[2020-04-24 23:55:01,268] INFO Client environment:os.memory.total=1024MB (org.apache.zookeeper.ZooKeeper)
[2020-04-24 23:55:01,887] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@7d3e8655 (org.apache.zookeeper.ZooKeeper)
[2020-04-24 23:55:02,797] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2020-04-24 23:55:08,488] INFO Opening socket connection to server localhost/[0:0:0:0:0:0:0:1]:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2020-04-24 23:55:08,910] INFO Socket connection established, initiating session, client: /[0:0:0:0:0:0:0:1]:35353, server: localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn)
分支1:timed out, have not heard from server in 6001ms
[2020-04-24 23:55:14,910] WARN Client session timed out, have not heard from server in 6001ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
[2020-04-24 23:55:15,434] INFO Session: 0x0 closed (org.apache.zookeeper.ZooKeeper)
[2020-04-24 23:55:15,434] INFO EventThread shut down for session: 0x0 (org.apache.zookeeper.ClientCnxn)
[2020-04-24 23:55:15,438] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2020-04-24 23:55:15,580] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:259)
[2020-04-24 23:55:16,531] INFO shutting down (kafka.server.KafkaServer)
[2020-04-24 23:55:18,384] INFO shut down completed (kafka.server.KafkaServer)
[2020-04-24 23:55:18,399] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)
[2020-04-24 23:55:18,810] INFO shutting down (kafka.server.KafkaServer)
分支2:Connected后分配了Cluster ID
[2020-04-25 00:04:09,290] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2020-04-25 00:04:17,712] INFO Cluster ID = Z5diDnfIS924PrzGrYtFcg (kafka.server.KafkaServer)
[2020-04-25 00:04:23,407] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2020-04-25 00:04:23,410] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2020-04-25 00:04:23,470] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2020-04-25 00:04:25,645] INFO Log directory /tmp/kafka-logs-2 not found, creating it. (kafka.log.LogManager)