Kafka QuickStart快速上手

2020-04-25  本文已影响0人  robot_test_boy

Kafka introduction理论知识后,需要动手实践了。七步来使用一下Kafka。

第零步:先执行java -version,看是否安装java

第一步:从官网下载kafka版本kafka_2.13-2.4.1.tgz

第二步:启动ZooKeeper服务器和Kafka 服务器

第三步:连接zookeeper服务器,创建1个主题topic

第四步:生产者连接Kafka服务器,发布消息到kafka

第五步:消费者连接Kafka服务器,从kafka消费消息

第六步:连接zookeeper服务器,查看创建的主题topic

第七步:设置kafka 3节点集群

第零步:先执行java -version,看是否安装java,没有参考openjdk官网下载安装

root@docker:~# java -version

openjdk version "14" 2020-03-17

OpenJDK Runtime Environment (build 14+36-1461)

OpenJDK 64-Bit Server VM (build 14+36-1461, mixed mode, sharing)

open jdk14下载后上传到服务器/home/openjdk/文件夹下,然后tar xvf openjdk-14_linux-x64_bin.tar.gz后配置java环境变量然后java -version查看是否可用。vi /etc/profile命令配置JAVA_HOME:

export JAVA_HOME=/home/openjdk/jdk-14

export PATH=$PATH:$JAVA_HOME/bin

第一步:先从官网下载kafka版本,写本文时kafka最新版本为kafka_2.13-2.4.1.tgz,我们先尝个鲜,将文件上传到/home/docker_123文件下。

root@docker:~# cd /home/docker_123

root@docker:~# tar  xvf  kafka_2.13-2.4.1.tgz

root@docker:~# cd kafka_2.13-2.4.1

第二步:启动ZooKeeper服务器和Kafka 服务器

启动Kafka 前先启动一个ZooKeeper服务器,ZooKeeper是协调服务,暂时理解为协调kafka生产者和消费者。 通过与kafka打包在一起的便捷脚本创建一个单节点ZooKeeper实例。

root@docker:/home/docker_123/kafka_2.13-2.4.1# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

注释:nohup {command} & 以后台方式执行command,如果服务器上没有nohup命令则执行command但不可ctrl+c退出否则后续发送kafka消息失败。

现在启动Kafka服务器:

root@docker:/home/docker_123/kafka_2.13-2.4.1# nohup bin/kafka-server-start.sh config/server.properties &

第三步:连接zookeeper服务器,创建1个主题topic

连接localhost:2181的zookeeper服务器,创建一个有1个分区和1个副本且叫为“test”的topic:

root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Created topic test.

连接localhost:2181的zookeeper服务器,查看这个topic:

root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-topics.sh --list --zookeeper localhost:2181

test

备注:代理配置下,在发布的topic不存在时,自动创建topic,而不是手动创建。

第四步:生产者连接Kafka服务器,发布消息到kafka

使用Kafka自带的命令行生产者客户端,连接localhost:9092的Kafka服务器,将message发送到Kafka。默认情况下,每行将作为单独的message发送。

root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

>hello kafka world! I am docker_123!

>Hi kafka world! what you are doing?

第五步:消费者连接Kafka服务器,从kafka消费消息

使用Kafka自带的命令行消费者客户端,连接localhost:9092的Kafka服务器,从kafka消费消息。

root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

hello kafka world! I am docker_123!

Hi kafka world! why you are not at you are doing?

备注:如果第四步和第五步在不同的终端中运行,将消息输入到生产者客户端,在消费客户端端显示出来,这个时候生产者就可以消费者实时通信了,当然生产者和消费者都可以退出,只有在发送消息时生产者才去连接kafka服务器,只有消费消息时消费者才去连接kafka服务器。

默认情况下,kafka消息不回重复消费,有offset偏移量。当然指定某个offset偏移量,可以消费该offset的消息。当bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning 从头消费时kafka消息又出来了“hello kafka world! I am docker_123!”吧啦吧啦。

第六步:连接zookeeper服务器,查看创建的主题topic

连接localhost:2181的zookeeper服务器,查看创建的主题topic,有1个分区,1个副本,1个分区中只有1个leader。

root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic: test    PartitionCount: 1      ReplicationFactor: 1    Configs:

        Topic: test    Partition: 0    Leader: 0      Replicas: 0    Isr: 0

第七步:设置kafka 3节点集群

root@docker:/home/docker_123/kafka_2.13-2.4.1# cp config/server.properties config/server-1.properties

root@docker:/home/docker_123/kafka_2.13-2.4.1# cp config/server.properties config/server-2.properties

vi分别修改server-1.properties 和 server-2.properties 如下参数

config/server-1.properties:

    broker.id=1

    listeners=PLAINTEXT://:9093

    log.dir=/tmp/kafka-logs-1

config/server-2.properties:

    broker.id=2

    listeners=PLAINTEXT://:9094

    log.dir=/tmp/kafka-logs-2

备注:broker.id属性是集群中每个kafka节点的唯一的名称。9092是kafka默认端口,kafka1和kafka2端口分别修改为9093和9094,log.dir目录下放着每个offset偏移量的kafka消息等。

现在启动kafka的9093节点和9094节点:

root@docker:/home/docker_123/kafka_2.13-2.4.1# nohup bin/kafka-server-start.sh config/server-1.properties &

root@docker:/home/docker_123/kafka_2.13-2.4.1# nohup bin/kafka-server-start.sh config/server-2.properties &

现在创建一个分区为1和副本为3的新topic:

root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

有条件的话,可以创建一个分区为5和副本为3的新topic,对比查看下。

root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic my-replicated-topic

多个副本的容错性,暂时测试不了了,很遗憾,因为资源问题启动不了3个kafka节点,第2个节点在一创建topic时自动退出了,想玩下2节点也不行。

FAQ:

1) 当创建主题topic的副本数超过kafka节点个数,会创建topic失败

root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

Error while executing topic command : Replication factor: 3 larger than available brokers: 1.

[2020-04-24 23:42:11,705] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.

(kafka.admin.TopicCommand$)

2)在启动其他2个kafka节点时,不知道对消费者有什么影响?启动kafka时消费者在重复消费消息,如下:

hello kafak worka world! I am docker_123!

Hi kafka world! why you are not at you are doing?

docker

tcpdump is working

tcpdump is working

tcpdump

hello kafak worka world! I am docker_123!

Hi kafka world! why you are not at you are doing?

docker

tcpdump is working

tcpdump is working

tcpdump

3)kafka服务器启动流程:

先连接localhost:2181 的zookeeper ,当连接超时后会自动退出kafka集群。

root@docker:/home/docker_123/kafka_2.13-2.4.1# bin/kafka-server-start.sh config/server-2.properties

[2020-04-24 23:54:33,599] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)

[2020-04-24 23:54:55,400] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)

[2020-04-24 23:54:55,546] INFO starting (kafka.server.KafkaServer)

[2020-04-24 23:54:55,745] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)

[2020-04-24 23:54:57,789] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)

[2020-04-24 23:55:01,268] INFO Client environment:os.memory.free=980MB (org.apache.zookeeper.ZooKeeper)

[2020-04-24 23:55:01,268] INFO Client environment:os.memory.max=1024MB (org.apache.zookeeper.ZooKeeper)

[2020-04-24 23:55:01,268] INFO Client environment:os.memory.total=1024MB (org.apache.zookeeper.ZooKeeper)

[2020-04-24 23:55:01,887] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@7d3e8655 (org.apache.zookeeper.ZooKeeper)

[2020-04-24 23:55:02,797] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)

[2020-04-24 23:55:08,488] INFO Opening socket connection to server localhost/[0:0:0:0:0:0:0:1]:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)

[2020-04-24 23:55:08,910] INFO Socket connection established, initiating session, client: /[0:0:0:0:0:0:0:1]:35353, server: localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn)

分支1:timed out, have not heard from server in 6001ms

[2020-04-24 23:55:14,910] WARN Client session timed out, have not heard from server in 6001ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)

[2020-04-24 23:55:15,434] INFO Session: 0x0 closed (org.apache.zookeeper.ZooKeeper)

[2020-04-24 23:55:15,434] INFO EventThread shut down for session: 0x0 (org.apache.zookeeper.ClientCnxn)

[2020-04-24 23:55:15,438] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)

[2020-04-24 23:55:15,580] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)

kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING  at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:259)

[2020-04-24 23:55:16,531] INFO shutting down (kafka.server.KafkaServer)

[2020-04-24 23:55:18,384] INFO shut down completed (kafka.server.KafkaServer)

[2020-04-24 23:55:18,399] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)

[2020-04-24 23:55:18,810] INFO shutting down (kafka.server.KafkaServer)

分支2:Connected后分配了Cluster ID

[2020-04-25 00:04:09,290] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)

[2020-04-25 00:04:17,712] INFO Cluster ID = Z5diDnfIS924PrzGrYtFcg (kafka.server.KafkaServer)

[2020-04-25 00:04:23,407] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)

[2020-04-25 00:04:23,410] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)

[2020-04-25 00:04:23,470] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)

[2020-04-25 00:04:25,645] INFO Log directory /tmp/kafka-logs-2 not found, creating it. (kafka.log.LogManager)

上一篇下一篇

猜你喜欢

热点阅读