超赞的Kafka命令行 !!!比原生好用n倍

2020-11-23  本文已影响0人  梨花菜

1、kaf安装和配置

Centos7源码安装Golang

[root@qa-cdh-001 bin]# kaf config add-cluster qa --brokers kafka-22-7:9093
Added cluster.

选择qa这个kafka集群(只需要执行一次,后续都会默认使用qa这个集群)

[root@qa-cdh-001 bin]# kaf config use-cluster qa
Switched to cluster "qa".

2、Golang kaf常用操作

2.1创建topic


创建名为test_topic1的topic,一共有5个分区

[root@qa-cdh-001 bin]# kaf topic create test_topic1 -p 5
✅ Created topic!
      Topic Name:            test_topic1
      Partitions:            5
      Replication Factor:    1
      Cleanup Policy:        delete

2.2发送topic

发送一条消息"send msg to test_topic1"到test_topic1这个主题

[root@qa-cdh-001 bin]# echo -n "send msg to test_topic1"|kaf produce test_topic1
Sent record to partition 0 at offset 0.

2.3消费topic

起一个名为lihuacai的groupid来消费test_topic1这个主题

[root@qa-cdh-001 ~]# kaf consume test_topic1 -g lihuacai
Partition:   0
Offset:      2
Timestamp:   2020-11-23 21:04:22.971 +0800 CST

Partition:   0
Offset:      3
Timestamp:   2020-11-23 21:05:32.733 +0800 CST
send msg to test_topic1

2.3.1消费多个topicszmsg_supply,szmsg_supply_all_new

[root@quanbu-qa-22-19 ~]# kaf consume szmsg_supply,szmsg_supply_all_new -g lihuacai2

Topic: szmsg_supply   Partition: 0    Offset: 663100   Timestamp: 2020-12-10 16:11:30.785 +0800 CST
1607587890wangdd2916075878

Topic: szmsg_supply_all_new   Partition: 0    Offset: 8261909   Timestamp: 2020-12-10 16:11:30.787 +0800 CST
{
  "clientid": "wangdd29",
  "cloudTime": 1607587890,
  "deviceTime": 1607587889,
  "lapCnt": 399000,
}

2.3.2 过滤topic指定字段hc

[root@quanbu-qa-22-19 ~]# kaf consume szmsg_supply,szmsg_supply_all_new -g lihuacai2 -F hc

Topic: szmsg_supply   Partition: 0    Offset: 663161   Timestamp: 2020-12-10 16:14:52.71 +0800 CST
1607588092hc01607588092A1

Topic: szmsg_supply_all_new   Partition: 0    Offset: 8261994   Timestamp: 2020-12-10 16:14:53.136 +0800 CST
{
  "clientid": "hc0",
  "cloudTime": 1607588092,
  "deviceTime": 1607588092,
  "factoryId": "factoryIdMChc0",
}


2.4查看topic

查看test_topic1这个主题的相关信息

[root@qa-cdh-001 bin]# kaf topic describe test_topic1
Name:        test_topic1
Internal:    false
Compacted:   false
Partitions:
  Partition  High Watermark  Leader  Replicas  ISR
  ---------  --------------  ------  --------  ---
  0          1               9       [9]       [9]
  1          0               7       [7]       [7]
  2          0               8       [8]       [8]
  3          0               9       [9]       [9]
  4          0               7       [7]       [7]
Config:
  Name            Value       ReadOnly  Sensitive
  ----            -----       --------  ---------
  cleanup.policy  delete      false     false
  segment.bytes   1073741824  false     false

2.5查看groupid消息积压

查看lihuacai这个groupid消息积压情况
在0分区,积压了2条消息
计算规则是Lag = High Watermark - Group Offset

[root@qa-cdh-001 bin]# kaf group describe lihuacai
Group ID:        lihuacai
State:           Empty
Protocol:
Protocol Type:   consumer
Offsets:
  test_topic1:
    Partition  Group Offset  High Watermark  Lag  Metadata
    ---------  ------------  --------------  ---  --------
    0          2             4               2

2.6重置groupid偏移量

lihuacai这个groupid全部分区的偏移量重置到最新状态
--all-partitions: 所有分区
-o: --offset,偏移量
--noconfirm:直接提交偏移量,不需要确认

[root@qa-cdh-001 bin]# kaf group commit lihuacai -t test_topic1 --all-partitions -o latest --noconfirm
Partition 0: determined offset 4 from timestamp.
Partition 1: determined offset 0 from timestamp.
Partition 2: determined offset 0 from timestamp.
Partition 3: determined offset 0 from timestamp.
Partition 4: determined offset 0 from timestamp.
Resetting offsets to: map[0:4 1:0 2:0 3:0 4:0]
Successfully committed offsets to map[0:4 1:0 2:0 3:0 4:0].
上一篇 下一篇

猜你喜欢

热点阅读