2019-10-27

2019-10-27  本文已影响0人  自己为王

Kafka基本操作

1. 查看Topic

--list命令(列举所有主题):

kafka-topics.sh –list –zookeeper server-1:2181,server-2:2181

--describe命令(不指定topic参数则查看所有主题信息,若指定topic参数则查看特定主题的信息):

kafka-topics.sh –describe –zookeeper server-1:2181,server-2:2181

#查看指定主题已覆盖的配置:

Kafka-topics.sh –describe –zookeeper server-1:2181,server-2:2181,server-3:2181–topics-with-overrides–topic config-test

2.创建Topic

命令:

kafka-topics.sh –create–zookeeper server-1:2181,server-2:2181,server-3:2181 --replication-factor 2 –partitions3 –topic kafka-action

此时会在${log.dir}目录下创建相应的分区文件目录,副本分别分布在不同的节点上。

同时登陆ZooKeeper客户端查看所创建的主题元数据信息,登陆ZK之后,“kafka-action”元数据信息如下:

Ls /brokers/topics/kafka-action/partitions

[0,1,2]

ls /brokers/topics/kafka-action

{“version”:1,”partitions”:{“2”:[3,1], “1”:[2,3],”0”:[1,2]}}

注意:

1)创建topic时,主要要指定副本数、分区数等,如果不指定副本和分区数,默认分别创建${num.partitions}个分区数和${default.replcation.factor}个副本。

  2)当生产者向一个还未创建的topic发送消息时,会自动创建该副本,但是前提是:auto.create.topics.enable=true

3. 修改Topic

    当创建一个主题之后,可以通过alter命令对主题进行修改,包括修改主题级别的配置、增加主题分区、修改副本分配方案、修改主题Offset等。

命令:

kafka-topics.sh --alter –zookeeper server-1:2181,server-2:2181,server-3:2181–topic config-test –config max.message.bytes=204800

修改max.message.bytes参数值,某些版本使用kafka-topics.sh脚本修改参数可以成功,但是可能会提示该脚本已过时,在后续版本可能会被移除,推荐使用kafka-configs.sh脚本修改参数。

kafka-topics.sh --alter –zookeeper server-1:2181,server-2:2181,server-3:2181–topic config-test –configsegment.bytes=209715200

修改segment.bytes参数后,可通过topics-with-overrides查看主题config-test已覆盖的配置信息。

删除参数segment.bytes配置:

kafka-topics.sh --alter –zookeeper server-1:2181,server-2:2181,server-3:2181–topic config-test –delete-configsegment.bytes

再次通过topics-with-overrides查看主题config-test已覆盖的配置信息,此时已不包含segment.bytes。

增加分区:

Kafka并不支持减少分区的操作,只能为一个主题增加分区。

假设当前分区为3个,要增加至5个:

kafka-topics.sh --alter –zookeeper server-1:2181,server-2:2181,server-3:2181–topic config-test –partitions 5

4.删除Topic

删除Kafka主题,一般有以下两种方式:

[if !supportLists](1)      [endif]手动删除各节点${log.dir}目录下该主题分区文件夹,同时登录ZK客户端删除待删除主题对应的节点,主题元数据保存在/brokers/topics和/config/topics目录下。

[if !supportLists](2)      [endif]执行kafka-topics.sh脚本进行删除,若希望通过该脚本彻底删除主题,则需要保证在启动Kafka时所加载的server.properties文件中配置delete.topic.enable=true,该配置默认为false。否则执行该脚本并未真正删除主题,而是在ZK的/admin/delete_topics目录下创建一个与待删除主题同名的节点,将该主题标记为删除状态。

5.启动生产者发送消息

启动一个向主题kafka-action发送消息的生产者,同时指定每条消息包含有Key:

Kafka-console-producer.sh –broker-listserver-1:9092,server-2:9092,server-3:9092 –topic kafka-action –property parse.key=true

该命令执行后,控制台等待客户端输入消息。由于没有指定消息Key与消息净荷(payload)之间的分隔符,默认是以制表符分隔。可以通过配置项key.separator指定。

Kafka-console-producer.sh –broker-list server-1:9092,server-2:9092,server-3:9092–topic kafka-action –property parse.key=true –property key.separator=’ ’

在控制台分别输入一批消息,消息key与消息实际数据之间以空格分隔。然后执行以下命令,验证消息是否发送成功。

Kafka-run-class.shkafka.tools.GetOffsetShell –broker-list server-1:9092,server-2:9092,server-3:9092–topic kafka-action –time -1

以上命令用于查看某个主题各分区对应消息偏移量。可以通过partitions参数指定一个或多个分区,多个分区之间以逗号分隔,若不指定则默认查看该主题所有分区;time参数表示查看在指定时间之前的数据,支持-1(latest)、-2(earliest)两个时间选项,默认取值为-1.

执行以上命令输出结果如下,共3列,分别表示主题名、分区编号、消息偏移量:

Kafka-action:2:6

Kafka-action:1:6

Kafka-action:0:4

通过结果信息可知,总共产生了16条消息,3个分区按编号从大到小依次有6条、6条、4条消息。

6.查看消息

Kafka生产的消息以二进制的形式在文件中,为了便于查看消息内容,Kafka提供了一个查看日志文件的工具类kafka.tools.DumpLogSegments.命令如下:

Kafka-run-class.shkafka.tools.DumpLogSegments –files ./././00000000000000.log

7.启动消费者接受消息

上一篇下一篇

猜你喜欢

热点阅读