python从kafka消费数据且写入kafka

2021-01-13  本文已影响0人  hao_yu

简单记录一个读写kafka demo

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import logging
import time  # 引入time模块
from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka.errors import KafkaError

# 循环发送数据次数
n = 1

#数据从174获取,发送到175
KAFAKA_TOPIC = "poseidon_receiver_raw_data_RTCM3.2"
KAFAKA_HOST_PRODUCTER = "192.168.xx.xx"
KAFAKA_HOST_CONSUMER = "192.168.xx.xx"

KAFAKA_PORT = 9092

logging.basicConfig(
    level=logging.INFO,  # 定义输出到文件的log级别,大于此级别的都被输出
    # format='%(asctime)s  %(filename)s %(levelno)s : %(levelname)s %(message)s',  # 定义输出log的格式
    format='%(asctime)s : %(message)s',  # 定义输出log的格式
    datefmt='%Y-%m-%d %A %H:%M:%S',  # 时间
    filename='obs_info.log',  # log文件名
    filemode='w')  # 写入模式“w”或“a”


class Kafka_producer():
    '''''
    生产模块:根据不同的key,区分消息
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        print("producer:h,p,t", kafkahost, kafkaport, kafkatopic)
        bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort
        )
        print("boot svr:", bootstrap_servers)
        self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

    def send(self, k, v):
        try:
            producer = self.producer

            k = k.encode('utf-8')
            resp = producer.send(self.kafkatopic, key=k, value=v)
            # print(resp.succeeded())
            producer.flush()
        except KafkaError as e:
            print(e)


class Kafka_consumer():
    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,
                                      bootstrap_servers='{kafka_host}:{kafka_port}'.format(
                                          kafka_host=self.kafkaHost,
                                          kafka_port=self.kafkaPort)
                                      )

    def consume_data(self):
        try:
            for message in self.consumer:
                yield message
        except KeyboardInterrupt as e:
            print(e)


if __name__ == '__main__':
    group = 'tunnel2QA'
    consumer = Kafka_consumer(KAFAKA_HOST_PRODUCTER, KAFAKA_PORT, KAFAKA_TOPIC, group)
    producer = Kafka_producer(KAFAKA_HOST_CONSUMER, KAFAKA_PORT, KAFAKA_TOPIC)
    message = consumer.consume_data()
    pre_time = 0
    for msg in message:
        key = str(msg.key, "utf-8")

        ticks = int(time.time())
        if pre_time != ticks:
            for i in range(0, n):
                key = i << 8 + 1 # key自定义
                producer.send(str(key), msg.value)
        pre_time = ticks


上一篇 下一篇

猜你喜欢

热点阅读