k8s容器容器框架

kafka(二)Kafka快速入门

2021-06-05  本文已影响0人  万事万物

集群部署

  1. 配置 server.properties
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能,当前版本此配置默认为true,已从配置文件移除
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

其他服务器一样配置

  1. 启动集群
bin/kafka-server-start.sh -daemon config/server.properties

其他服务器一样。

Kafka 命令行操作

topic 操作

脚本
kafka]$ bin\kafka-topics.sh
命令选项

选项 描述
--alter 更改分区数,副本分配,和/或主题的配置。
--at-min-isr-partitions 如果在描述主题时设置,则仅显示 isr 计数为的分区等于配置的最小值。 不是支持 --zookeeper 选项。
--bootstrap-server <String: server to connect to> 必需:要连接的 Kafka 服务器。 如果提供此项,则不需要直接的 Zookeeper 连接。
--command-config <String: command config property file> 包含要传递给管理客户端的配置的属性文件。 这仅与 --bootstrap-server 选项一起用于描述和更改代理配置。
--config <String: name=value>
--create 创建一个新的topic
--delete 删除一个topic
--delete-config <String: name> 要为现有主题删除的主题配置覆盖(请参阅 --config 选项下的配置列表)。 不支持 --bootstrap-server 选项。
--describe 列出给定主题的详细信息。
--disable-rack-aware 禁用机架感知副本分配
--exclude-internal 运行 list 或 describe 命令时排除内部主题。 默认会列出内部主题
--force 禁止控制台提示
--help 打印帮助信息。
--if-exists 如果在更改或删除或描述主题时设置,则该操作仅在主题存在时执行。 不支持 --bootstrap-server 选项。
--if-not-exists 如果在创建主题时设置,则只有在主题不存在时才会执行操作。 不支持 --bootstrap- 服务器选项。
--list 列出所有可用的topic。
--partitions <Integer: # of partitions> 设置topic 分区数
--replication-factor <Integer:replication factor> 指定topic的副本数
--topic <String: topic> 指定topic 名称
--topics-with-overrides 如果在描述主题时设置,则仅显示已覆盖配置的主题
--unavailable-partitions 如果在描述主题时设置,则只显示其领导者不可用的分区
--under-min-isr-partitions 如果在描述主题时设置,则仅显示 isr 计数小于配置的最小值的分区。 不支持 --zookeeper 选项。
--under-replicated-partitions 如果在描述主题时设置,则仅显示在复制分区下
--version 展示Kafka版本
--zookeeper <String: hosts> 已弃用,zookeeper 连接的连接字符串,格式为 host:port。 可以提供多个主机以允许故障转移。

案例

  1. 创建一个 topic
    语法:kafka-topics.sh --create --zookeeper <host>:<port> --if-not-exists --replication-factor <副本数> --partitions <分区数> --topic <副本名称>
bin]$ kafka-topics.sh --create --zookeeper hadoop102:2181 --if-not-exists --replication-factor 3 --partitions 3 --topic test
#输出结果
Created topic test.
  1. 查看当前服务器中的所有 topic
    语法: kafka-topics.sh --zookeeper <host>:<port> --list
bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --list
# 输出结果
__consumer_offsets
abc
test
  1. 删除一个topic
    语法:kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test
    需要server.properties中设置delete.topic.enable=true否则只是标记删除。
bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test
# 输出结果
Topic test is marked for deletion. # 并不会马上删除,而是先对该topic做一个标记,后面再进行删除
#需要在 配置中设置 delete.topic.enable=true ,否则不会进行删除
Note: This will have no impact if delete.topic.enable is not set to true.
  1. 查看 topic 详情
    语法:--describe
[atguigu@hadoop102 bin]$ kafka-topics.sh  --describe --bootstrap-server hadoop102:9092 --topic abc
#  topic  abc 详细信息
Topic: abc  PartitionCount: 1   ReplicationFactor: 3    Configs: segment.bytes=1073741824
    Topic: abc  Partition: 0    Leader: 1   Replicas: 2,0,1 Isr: 1,2,0
参数 描述
Topic topic名称
PartitionCount 分区数
ReplicationFactor 定义的分区数
Configs 配置
Partition 当前分区位置
Leader 当前那个broker为Leader
Replicas 副本位置
Isr lsr同步队列

producer 操作

脚本
kafka]$ bin\kafka-console-producer.sh
命令选项

选项 描述
--batch-size <Integer: size> 如果消息不是同步发送的,则要在单个批次中发送的消息数。 (默认值:200)
--broker-list <String: broker-list> 链接Kafka,必需:采用 HOST1:PORT1,HOST2:PORT2 形式的代理列表字符串。
--compression-codec [String: compression-codec] 支持的压缩方式'none', 'gzip', 'snappy', 'lz4', or 'zstd'. 默认 'gzip'
--help 打印帮助信息
--line-reader <String: reader_class> 用于从标准输入读取行的类的类名。默认情况下,每行都作为单独的消息读取。 (默认:kafka.tools.ConsoleProducer$LineMessageReader)
--max-block-ms <Long: > 生产者发送的最大时间(默认:60000)
--max-memory-bytes <Long: > 缓冲大小,以字节为单位 (默认:33554432)
--max-partition-memory-bytes <Long: The buffer size allocated for a memory in bytes per partition> 合并数据的最小数 (默认: 16384)
--message-send-max-retries <Integer> 退休数,默认为3
--metadata-expiry-ms <Long:> 强制刷新数据条数默认为300000,元数据以毫秒为单位的过期间隔时间段
--producer-property <String> 传递用户定义的Producer_Prop的机制
--producer.config <String: config file> 指定配置文件。 请注意, [producer-property] 优先于此配置。
--property <String: prop> 一种将用户定义的属性以 key=value 的形式传递给消息阅读器的机制。 这允许对用户定义的消息阅读器进行自定义配置。
--request-required-acks <String:> 设置ack(确认收到)的三种模式(0,1,-1),默认为1
--request-timeout-ms <Integer:> 设置ack 的超时时间(单位毫秒)默认为 1500
--retry-backoff-ms <Integer> 等待选举时间,默认为100)
--socket-buffer-size <Integer: size> 设置 tcp RECV 大小(默认: 102400)
--sync 设置为同步的
--timeout <Integer: timeout_ms> 如果设置和生产者运行异步模式,这给一条消息的最长时间是否有足够的队列等待批处理大小。该值以ms为单位。(默认:1000)
--topic <String: topic> 生产的消息发送给定的主题
--version 显示Kafka版本
  1. 发送消息
    语法:kafka-console-producer.sh --broker-list <kafkaIP1>:<端口> <kafkaIP2>:<端口> --topic <topic名称>
bin]$ kafka-console-producer.sh --broker-list hadoop102:9092 hadoop103:9092 --topic abc
#输出
>hello

hadoop102 接收 topic abc 消息

[admin@hadoop102 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic abc
#接收生产者推送的消息
hello

hadoop103 接收 topic abc 消息

[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
#接收生产者推送的消息
hello

consumer操作

脚本
kafka]$ bin/kafka-console-consumer.sh
命令选项

选项 描述
--bootstrap-server <String: server to connect to> 需:要连接的服务器。
--consumer-property <String: consumer_prop> 一种将用户定义的属性以 key=value 的形式传递给消费者的机制。
--consumer.config <String: config file> consumer配置属性文件。 请注意, [consumer-property] 优先于此配置。
--enable-systest-events 记录消费者的消息及生命周期,用于系统测试
--formatter <String: class> 用于格式化 kafka 消息以供显示的类的名称。 (默认:kafka.tools.DefaultMessageFormatter)
--from-beginning 如果消费者还没有一个既定的偏移量来消费,那么从日志中出现的最早的消息而不是最新的消息开始。
--group <String: consumer group id> 消费者的消费者组ID。
--help 打印帮助信息
--isolation-level <String> 设置为 read_committed 以过滤掉未提交的事务消息。 设置为 read_uncommitted 以读取所有消息。 (默认值:read_uncommitted)
--key-deserializer <String: deserializer for key> 设置 密钥的解串器
--max-messages <Integer: num_messages> 退出前消费的最大消息数。 如果未设置,则消耗是连续的。
--offset <String: consume offset> 要消耗的偏移量 id(非负数),或 'earliest' 表示从开始,或 'latest' 表示从结束(默认值:latest)
--partition <Integer: partition> 要消费的分区。 除非指定了“--offset”,否则消耗从分区的末尾开始。
--property <String: prop> 初始化消息格式化程序的属性
--skip-message-on-error 如果在处理消息时出现错误,请跳过而不是暂停。
--timeout-ms <Integer: timeout_ms> 如果指定,则在指定的时间间隔内没有可供消费的消息时退出。要消费的主题 ID。
--value-deserializer <String: deserializer for values> 值的解串器
--version 显示Kafka版本
--whitelist <String: whitelist> 指定要包含以供使用的主题白名单的正则表达式。

案例

  1. 消费消息
    语法:kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名称>
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
#接收生产者推送的消息
hello
  1. 消费所有的消息
    语法:--from-beginning
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc --from-beginning
#接收生产者推送的消息
sh
nihao
发哦那旮
ka
niha
hdalfajkl
你好
股东大法师
hello
python
hello
haoh
hello
hello
hflahfla
flajklfja
flajla
afadf
上一篇 下一篇

猜你喜欢

热点阅读