用Python创建自定义的Kafka Topic

2017-07-26  本文已影响900人  L3nvy

背景

项目中需要创建分区(partitions)数不同的Topic。在server.properties中可以配置默认的Topic分区数量,但是不能在需要的时候任意改变。(使用Producer API会自动创建Topic)

简单的Solution

翻遍了Kafka-python的文档,没有发现kafka-python提供了类似client.create_topic(name='test', num_partitions=3)这样简单的API。只能往底层探索了,果然发现了两个关键信息。

KafkaClient API中有这样一个方法:

屏幕快照 2017-07-26 16.29.26.png

kafka.protocol.admin — kafka-python 1.3.4.dev documentation中:

屏幕快照 2017-07-26 16.30.46.png

显然,我们只需要构建一个CreateTopicsRequest的请求,然后通过KafkaClient的send()方法发送给控制节点(由于本小白也不太清楚Kafka的机制,测试的时候,不是控制节点,会报错。也不清楚各个版本的区别,下面代码用的是v0版本。🤣)

原理就是这样,还是很简单的。

糟糕的Code


def create_topic(self, topic='topic', num_partitions=3, configs=None, timeout_ms=3000, brokers=['localhost:9290'], no_partition_change=True):

        client = KafkaClient(bootstrap_servers=brokers)
        
        if topic not in client.cluster.topics(exclude_internal_topics=True): # Topic不存在

            request = admin.CreateTopicsRequest_v0(
                create_topic_requests=[(
                    topic,
                    num_partitions,
                    -1, # replication unset.
                    [], # Partition assignment.
                    [(key, value) for key, value in configs.items()],  # Configs
                )],
                timeout=timeout_ms
            )

            future = client.send(2, request)  # 2是Controller,发送给其他Node都创建失败。
            client.poll(timeout_ms=timeout_ms, future=future, sleep=False) # 这里

            result = future.value
            # error_code = result.topic_error_codes[0][1]
            print("CREATE TOPIC RESPONSE: ", result)  # 0 success, 41 NOT_CONTROLLER, 36 ALREADY_EXISTS
            client.close()
        else: # Topic已经存在
            print("Topic already exists!")
            return

最重要的Reference

上一篇 下一篇

猜你喜欢

热点阅读