kafka topic数据转移到另一个topic

2017-09-29  本文已影响84人  君子月满楼

需求说明:数据在不断打到kakfa A队列,B队列也需要这些数据,由于数据量非常大,不方便人工操作,所以有了以下代码。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import json
from kafka import KafkaClient, SimpleProducer, SimpleConsumer


class Kafka_producer():
    '''
    使用kafka的生产模块
    '''

    def __init__(self, kafkahost,kafkaport, kafkatopic):
        self.client = KafkaClient("%s:%s"%(kafkahost,kafkaport))
        self.producer = SimpleProducer(self.client)
        self.kafkatopic = kafkatopic

    def sendjsondata(self, msg):
        try:
            self.producer.send_messages(self.kafkatopic,str(msg))
        except KeyboardInterrupt,  e:
            print e


class Kafka_consumer():
    '''
    使用Kafka—python的消费模块
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.client = KafkaClient("%s:%s"%(kafkahost,kafkaport))
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.consumer = SimpleConsumer(self.client, self.groupid, self.kafkatopic)


    def consume_data(self):
        try:
            for message in self.consumer:
                # print json.loads(message.value)
                yield message
        except KeyboardInterrupt, e:
            print e

def main():
    '''
    将"bsaata_tcp_tmp"队列中数据打到"internal_app_bsaata.nta_event_tmp"队列
    '''
    ##测试生产模块
    
    consumer = Kafka_consumer('10.67.19.12', 9092, "bsaata_tcp_tmp", 'com.nsfocus.bsaata.merge')
    message = consumer.consume_data()
    producer = Kafka_producer('10.67.19.12', 9092, "internal_app_bsaata.nta_event_tmp")
    for i in message:
        producer.sendjsondata(i.message.value)


if __name__ == '__main__':
    main()

备注:
1、消费者端口号目前尝试结果是:不能使用2181,只能使用9092,原因未知。
2、查看topic的groupid方法:到zookeeper组件路径下的bin中运行zkCli.sh,然后一个节点的找,直到找到topic名字,其父节点的父节点就是groupid。

[zk: localhost:2181(CONNECTED) 5] ls /consumers/com.nsfocus.bsaata.merge/offsets
[internal_app_bsaata.nta_srcip_traffic_tmp, internal_app_bsaata.nta_attackip_traffic_tmp, internal_app_bsaata.nta_event_tmp, bsaata_tcp_tmp, internal_app_bsaata.nta_traffic_total_tmp, internal_app_bsaata.nta_inf_traffic_tmp]

3、从消费者API中获取的message不可以直接打入B队列。因为message类型不是str,message.message.value才是真正的数据。

上一篇 下一篇

猜你喜欢

热点阅读