【Pulsar 精选】CLI 常用命令
2021-12-20 本文已影响0人
熊本极客
1.数据方面
1.1 查询 topic 的生产和消费状态
$./pulsar-admin topics partitioned-stats --per-partition [topicName]
${
"msgRateIn":0,
"msgThroughputIn":0,
"msgRateOut":0,
"msgThroughputOut":0,
"bytesInCounter":79093,
"msgInCounter":95,
"bytesOutCounter":79093,
"msgOutCounter":95,
"averageMsgSize":0,
"msgChunkPublished":false,
"storageSize":79093,
"backlogSize":0,
"offloadedStorageSize":0,
"lastOffloadLedgerId":0,
"lastOffloadSuccessTimeStamp":0,
"lastOffloadFailureTimeStamp":0,
"publishers":[
{
"msgRateIn":0,
"msgThroughputIn":0,
"averageMsgSize":0,
"chunkedMessageRate":0,
"producerId":0
},
{
"msgRateIn":0,
"msgThroughputIn":0,
"averageMsgSize":0,
"chunkedMessageRate":0,
"producerId":0
}
],
"waitingPublishers":0,
"subscriptions":{
"test-pulsar-sb-name":{
"msgRateOut":0,
"msgThroughputOut":0,
"bytesOutCounter":79093,
"msgOutCounter":95,
"msgRateRedeliver":0,
"chunkedMessageRate":0,
"msgBacklog":0,
"backlogSize":0,
"msgBacklogNoDelayed":0,
"blockedSubscriptionOnUnackedMsgs":false,
"msgDelayed":0,
"unackedMessages":0,
"msgRateExpired":0,
"totalMsgExpired":0,
"lastExpireTimestamp":0,
"lastConsumedFlowTimestamp":0,
"lastConsumedTimestamp":0,
"lastAckedTimestamp":0,
"lastMarkDeleteAdvancedTimestamp":0,
"consumers":[
{
"msgRateOut":0,
"msgThroughputOut":0,
"bytesOutCounter":79093,
"msgOutCounter":95,
"msgRateRedeliver":0,
"chunkedMessageRate":0,
"availablePermits":905,
"unackedMessages":0,
"avgMessagesPerEntry":0,
"blockedConsumerOnUnackedMsgs":false,
"lastAckedTimestamp":0,
"lastConsumedTimestamp":0
},
{
"msgRateOut":0,
"msgThroughputOut":0,
"bytesOutCounter":0,
"msgOutCounter":0,
"msgRateRedeliver":0,
"chunkedMessageRate":0,
"availablePermits":1000,
"unackedMessages":0,
"avgMessagesPerEntry":0,
"blockedConsumerOnUnackedMsgs":false,
"lastAckedTimestamp":0,
"lastConsumedTimestamp":0
}
],
"isDurable":true,
"isReplicated":false,
"allowOutOfOrderDelivery":false,
"consumersAfterMarkDeletePosition":{
},
"nonContiguousDeletedMessagesRanges":0,
"nonContiguousDeletedMessagesRangesSerializedSize":0,
"subscriptionProperties":{
},
"durable":true,
"replicated":false
}
},
"replication":{
},
"nonContiguousDeletedMessagesRanges":0,
"nonContiguousDeletedMessagesRangesSerializedSize":0,
"compaction":{
"lastCompactionRemovedEventCount":0,
"lastCompactionSucceedTimestamp":0,
"lastCompactionFailedTimestamp":0,
"lastCompactionDurationTimeInMills":0
},
"metadata":{
"partitions":1
},
"partitions":{
"persistent://public/default/test-topic-partition-0":{
"msgRateIn":0,
"msgThroughputIn":0,
"msgRateOut":0,
"msgThroughputOut":0,
"bytesInCounter":79093,
"msgInCounter":95,
"bytesOutCounter":79093,
"msgOutCounter":95,
"averageMsgSize":0,
"msgChunkPublished":false,
"storageSize":79093,
"backlogSize":0,
"offloadedStorageSize":0,
"lastOffloadLedgerId":0,
"lastOffloadSuccessTimeStamp":0,
"lastOffloadFailureTimeStamp":0,
"publishers":[
{
"accessMode":"Shared",
"msgRateIn":0,
"msgThroughputIn":0,
"averageMsgSize":0,
"chunkedMessageRate":0,
"producerId":0,
"metadata":{
},
"connectedSince":"2021-12-20T03:09:51.715569Z",
"clientVersion":"2.8.1-hw-0.1.3",
"producerName":"pulsar-2-3856",
"address":"/10.244.3.58:38030"
}
],
"waitingPublishers":0,
"subscriptions":{
"test-pulsar-sb-name":{
"msgRateOut":0,
"msgThroughputOut":0,
"bytesOutCounter":79093,
"msgOutCounter":95,
"msgRateRedeliver":0,
"chunkedMessageRate":0,
"msgBacklog":0,
"backlogSize":0,
"msgBacklogNoDelayed":0,
"blockedSubscriptionOnUnackedMsgs":false,
"msgDelayed":0,
"unackedMessages":0,
"type":"Failover",
"activeConsumerName":"ca523",
"msgRateExpired":0,
"totalMsgExpired":0,
"lastExpireTimestamp":0,
"lastConsumedFlowTimestamp":1639969783099,
"lastConsumedTimestamp":1639981115294,
"lastAckedTimestamp":1639981115515,
"lastMarkDeleteAdvancedTimestamp":1639981115515,
"consumers":[
{
"msgRateOut":0,
"msgThroughputOut":0,
"bytesOutCounter":79093,
"msgOutCounter":95,
"msgRateRedeliver":0,
"chunkedMessageRate":0,
"consumerName":"ca523",
"availablePermits":905,
"unackedMessages":0,
"avgMessagesPerEntry":6,
"blockedConsumerOnUnackedMsgs":false,
"lastAckedTimestamp":1639981115515,
"lastConsumedTimestamp":1639981115294,
"metadata":{
},
"connectedSince":"2021-12-20T03:09:41.605318Z",
"clientVersion":"2.8.1-hw-0.1.3",
"address":"/10.244.3.28:58786"
}
],
"isDurable":true,
"isReplicated":false,
"allowOutOfOrderDelivery":false,
"consumersAfterMarkDeletePosition":{
},
"nonContiguousDeletedMessagesRanges":0,
"nonContiguousDeletedMessagesRangesSerializedSize":0,
"subscriptionProperties":{
},
"durable":true,
"replicated":false
}
},
"replication":{
},
"deduplicationStatus":"Disabled",
"nonContiguousDeletedMessagesRanges":0,
"nonContiguousDeletedMessagesRangesSerializedSize":0,
"compaction":{
"lastCompactionRemovedEventCount":0,
"lastCompactionSucceedTimestamp":0,
"lastCompactionFailedTimestamp":0,
"lastCompactionDurationTimeInMills":0
}
}
}
}
topic 的生产和消费状态详解:
# msgRateIn 表示 topic 级别生产速率,其单位是 msg/s
# msgThroughputIn 表示 topic 级别生产吞吐量,其单位是 byte/s
# msgRateOut 表示 topic 级别消费速率,其单位是 msg/s
# msgThroughputOut 表示 topic 级别消费吞吐量,其单位是 byte/s
# publishers 表示生产者信息,一下是部分信息
# "publishers":[
# {
# "accessMode":"Shared",
# "msgRateIn":0, #生产者级别生产速率 msg/s
# "msgThroughputIn":0, #生产者级别生产吞吐量 byte/s
# "connectedSince":"2021-12-20T03:09:51.715569Z",
# "clientVersion":"2.8.1-hw-0.1.3",
# "producerName":"pulsar-2-3856",
# "address":"/10.244.3.58:38030" #生产者的 IP 地址
# }
# ]
# subscriptions 表示订阅组信息,一下是部分信息
# "subscriptions":{
# "test-pulsar-sb-name":{ #订阅组的名称
# "msgRateOut":0, #订阅(消费组)级别消费速率 msg/s
# "msgThroughputOut":0, #订阅(消费组)级别消费吞吐量 byte/s
# "msgBacklog":0, #消息积压数
# "unackedMessages":0, #订阅(消费组)级别 unack 消息数量
# "type":"Failover", #订阅方式
# "consumers":[ #消费者列表,消费者创建成功才会在这体现
# {
# "msgRateOut":0, #消费者级别消费速率 msg/s
# "msgThroughputOut":0, #消费者级别消费吞吐量 byte/s
# "unackedMessages":0, #消费者级别 unack 消息数量
# "address":"/10.244.3.28:58786" #消费者的 IP 地址
# }
# ],
# "isDurable":true,
# }
# }
# partitions 表示分区信息,根据 partition 维度统计的数据。下面是部分信息
# "partitions":{
# "persistent://public/default/test-topic-partition-0":{ #分区名
# "msgRateIn":0,
# "msgThroughputIn":0,
# "msgRateOut":0,
# "msgThroughputOut":0,
# "publishers":[ #生产者
# {
# "accessMode":"Shared",
# "msgRateIn":0,
# "msgThroughputIn":0,
# "connectedSince":"2021-12-20T03:09:51.715569Z", #生产者建链时间
# "clientVersion":"2.8.1-hw-0.1.3",
# "producerName":"pulsar-2-3856",
# "address":"/10.244.3.58:38030" #生产者的 IP 地址
# }
# ],
# "waitingPublishers":0,
# "subscriptions":{ #订阅信息
# "test-pulsar-sb-name":{ #消费者的订阅组名
# "msgRateOut":0,
# "msgThroughputOut":0,
# "unackedMessages":0, #partition 级别的 unack 消息数量
# "type":"Failover",
# "consumers":[ #消费者信息
# {
# "msgRateOut":0,
# "msgThroughputOut":0,
# "connectedSince":"2021-12-20T03:09:41.605318Z", #消费者建链时间
# "address":"/10.244.3.28:58786" #消费者的 IP 地址
# }
# ],
# "isDurable":true,
# "isReplicated":false,
# "allowOutOfOrderDelivery":false
# }
# }
直接查询某一 partition 分区的状态
#./pulsar-admin topics stats [topicName-partition-n]
$./pulsar-admin topics stats persistent://public/default/lt-test-partition-0
1.2 创建 consumer 消费数据
#./pulsar-client consume --subscription-name [subscriptionName] [topicName]
$./pulsar-client consume --subscription-name lt-test persistent://public/default/lt-test
#从头开始消费
$./pulsar-client consume --subscription-name lt-test --subscription-position Earliest persistent://public/default/lt-test
#指定 messageId 消费数据
#说明:messageId 由 ledgerId,entryId,partition index 顺序组成,如 messageId是2346:21:0,表示 ledgerId 为 2346,entryId 为 21
#./pulsar-admin topics get-message-by-id --entryId [entryId] --ledgerId [ledgerId] [topicName]
$./pulsar-admin topics get-message-by-id --entryId 21 --ledgerId 2346 persistent://public/default/lt-test
1.3 创建 producer 生产数据
#./pulsar-client produce --files [filePath] [topicName]
#首先创建 txt 文件,指定文件生产数据到 pulsar
$./pulsar-client produce --files /tmp/data.txt persistent://public/default/lt-test
2.管理方面
pulsar topic 的组成:{persistent:non-persistent}://tenant/namespace/topic-partitioned
topic 样例:persistent://public/default/lt-test-partition-0
2.1 创建和删除 tenant
#创建 tenant
#./pulsar-admin tenants create [tenantName]
$./pulsar-admin tenants create public
#删除 tenant
#./pulsar-admin tenants delete [tenantName]
$./pulsar-admin tenants delete public
2.2 创建和删除 namespace
#创建 namespace
#./pulsar-admin namespaces create [namespaceName]
$./pulsar-admin namespaces create public/default
#删除 namespace
#./pulsar-admin namespaces delete [namespaceName]
$./pulsar-admin namespaces delete public/default
#强制删除 --force
$./pulsar-admin namespaces delete --force public/default
2.3 创建、删除和查询 topic
#创建 topic
#./pulsar-admin topics create-partitioned-topic --partitions [partitionNum] [topicName]
$./pulsar-admin topics create-partitioned-topic --partitions 2 persistent://public/default/lt-test
#删除 topic
#./pulsar-admin topics delete-partitioned-topic [topicName]
$./pulsar-admin topics delete-partitioned-topic persistent://public/default/lt-test
#强制删除 --force
./pulsar-admin topics delete-partitioned-topic --force persistent://public/default/lt-test
#查询 topic
#./pulsar-admin topics list-partitioned-topics [namespaceName]
$./pulsar-admin topics list-partitioned-topics persistent://public/default/lt-test
2.4 消费组相关操作
#清除某 topic 的消费组消息积压
#./pulsar-admin topics clear-backlog --subscription [subscriptionName] [topicName]
$./pulsar-admin topics clear-backlog --subscription lt-test persistent://public/default/lt-test
#删除某 topic 的消费组
#./pulsar-admin topics unsubscribe --subscription [subscriptionName] [topicName]
#./pulsar-admin topics unsubscribe --subscription lt-test persistent://public/default/lt-test
#重置某 topic 的消费组消费位置
$./pulsar-admin topics reset-cursor --time [eg:100m, 3h, 2d] --subscription lt-test persistent://public/default/lt-test
2.5 补齐 topic 的 partition 分区
#./pulsar-admin topics create-missed-partitions [topicName]
$./pulsar-admin topics create-missed-partitions persistent://public/default/lt-test
2.6 查找 topic 的 broker ower
#./pulsar-admin topics lookup [topicName]
$./pulsar-admin topics lookup persistent://public/default/lt-test