kafka

Kafka扩容节点和分区迁移

2016-12-17  本文已影响2973人  eriolchan

背景

最近工作中碰到Kafka 节点的网卡成为了性能瓶颈,为了提高整个消息队列的输出吞吐量,需要将数据量大的Topic 迁移到新的Kafka节点上。

操作步骤

1. 新建Kafka 节点

通过CDH 管理界面在新机器上安装Kafka 服务,并得到相应的Kafka broker id。(假设为140, 141)

2. 创建要迁移的Topic 列表

查看所有的Topic

$ cd /opt/cloudera/parcels/KAFKA/bin
$ ./kafka-topics --describe --zookeeper 10.1.1.50:2181/kafka

如果要删除某些不用的Topic,可运行命令

$ ./kafka-run-class kafka.admin.TopicCommand --delete --topic test_p1_r1 --zookeeper 10.1.1.50:2181/kafka

新建文件topics-to-move.json,包含要迁移到Topic 列表。这里只迁移了一个Topic,也可以是多个Topic。

{
  "topics": [{"topic": "sdk_counters"}],
  "version": 1
}

3. 生成Topic 分区分配表

使用kafka-reassign-partitions 工具生成分区分配表,其中需要指定topics-to-move.json 文件和迁移目标节点的broker id

$ ./kafka-reassign-partitions --zookeeper 10.1.1.50:2181/kafka --topics-to-move-json-file ~/kafka/topics-to-move.json --broker-list "140,141" --generate

将生成以下结果

Current partition replica assignment

{"version":1,"partitions":[{"topic":"sdk_counters","partition":1,"replicas":[61,62]},{"topic":"sdk_counters","partition":0,"replicas":[62,61]}]}

Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"sdk_counters","partition":1,"replicas":[140,141]},{"topic":"sdk_counters","partition":0,"replicas":[141,140]}]}

将Current partition replica assignment 的内容保存到rollback-cluster-reassignment.json,用于回滚操作。将Proposed partition reassignment configuration 的内容保存到expand-cluster-reassignment.json,用于执行迁移操作。

在这里也可以手工编辑expand-cluster-reassignment.json 文件更改replica 和partition 配置。也可以在迁移之前更改Topic 的分区数 (6)。

$ ./kafka-topics --zookeeper 10.1.1.50:2181/kafka --alter --topic sdk_counters --partitions 6

4. 执行迁移操作

$ ./kafka-reassign-partitions --zookeeper 10.1.1.50:2181/kafka --reassignment-json-file ~/kafka/expand-cluster-reassignment.json --execute

迁移操作会将指定Topic 的数据文件移动到新的节点目录下,这个过程可能需要等待很长时间,视Topic 的数据量而定。可以运行以下命令查看执行状态。

$ ./kafka-reassign-partitions --zookeeper 10.1.1.50:2181/kafka --reassignment-json-file ~/kafka/expand-cluster-reassignment.json --verify

状态有两种,in progress 表示正在迁移,completed successlly 表示已经成功完成迁移。在此过程中,可以在各个Kafka 的节点上使用iftop 工具实时监控网络带宽。可以观察到迁移的source 节点使用了大量的输出带宽,迁移的target 节点使用了大量的输入带宽。由于在迁移过程中,会占用大量的网卡带宽进行数据传输,可能会影响到其他Topic 和应用程序的带宽使用。

迁移完成后,原先的节点下将不存在该Topic 的数据文件。

优化

减少迁移的数据量

如果要迁移的Topic 有大量数据(Topic 默认保留1天的数据),可以在迁移之前临时动态地调整retention.ms 来减少数据量,Kafka 会主动purge 掉1个小时之前的数据。

$ ./kafka-topics --zookeeper 10.1.1.50:2181/kafka --alter --topic sdk_counters --config retention.ms=3600000

在迁移完成后,恢复原先设置

$ ./kafka-topics --zookeeper 10.1.1.50:2181/kafka --alter --topic sdk_counters --config retention.ms=86400000

在迁移过程中,不会影响应用程序写Kafka,在迁移完成后需要查看应用程序是否运行正常。

在已有的Topic 上增加分区

如果使用kafka-topics 动态地增加partition 数目,则新增的partition 可能会出现在迁移之前的机器上。这时可以使用kafka-reassign-partitions 工具并手动更改分区分配表以保证所有的分区都在迁移后的机器上。注意要保持旧的分区中的节点分配和replica 和之前相同,否则会导致Kafka 对旧分区的重新迁移,增加了迁移时间,并且可能导致正在运行的程序因为分区失效而出错。

重新指定partition leader

有时候由于节点down 了,partition 的leader 可能不是我们prefer 的,这时,可以通过kafka-preferred-replica-election 工具将replica 中的第一个节点作为该分区的leader。
手动编辑topicPartitionList.json 文件,指定要重新分配leader 的分区。

{"partitions":[{"topic":"sdk_counters","partition":5}]}

执行命令

$ ./kafka-preferred-replica-election --zookeeper 10.1.1.50:2181/kafka -path-to-json-file ~/kafka/topicPartitionList.json

中断迁移任务

一旦启动reassign 脚本,则无法停止迁移任务。如果需要强制停止,可以通过zookeeper 进行修改。

$ zookeeper-client -server 10.1.1.50:2181/kafka
[zk] delete /admin/reassign_partitions
上一篇下一篇

猜你喜欢

热点阅读