kafka生产数据和多进程消费数据

2021-04-25  本文已影响0人  千沙qiansha

配置信息

    KAFKA_FROM_ALTER_MANGER_TOPIC = "alter_xxx_from_alter_manager_123"
    KAFKA_CUSTOMER_GROUP = "group_alter_xxx_from_manager"
    KAFKA_ALERT_MANAGER_CLUSTER = ['192.168.xx.xx:9092', '192.168.xx.xx:9092', '192.168.xx.xx:9092']

生产数据

from kafka import KafkaProducer
from django.conf import settings
import json
import os


class KP(object):
    """
    模拟kafka生产者往指定topic发布消息
    """

    p = None
    tp = None

    def __init__(self):
        self.tp = settings.KAFKA_FROM_ALTER_MANGER_TOPIC
        self.p = KafkaProducer(bootstrap_servers=settings.KAFKA_ALERT_MANAGER_CLUSTER)

    def send_msg(self, dat):
        try:
            d = json.dumps(dat)
        except Exception as e:
            print("json dumps error: {}".format(e))
            return

        try:
            future = self.p.send(self.tp, d.encode())
            self.p.flush()
            ret = future.get(timeout=1)
            print("return: {}".format(ret))
        except Exception as e:
            print("error: {}".format(e))

多进程消费数据

import json
import os
import multiprocessing
from django.conf import settings
from django.db import connections
from common.utils import get_logger
from kafka import KafkaConsumer
from datetime import datetime

logger = get_logger(__file__)


class AlertWatcher:
    """
    启动进程消费alert manager发布到kafka的告警消息
    处理消息并保持到数据库
    """
    def __init__(self):
        self.name = 'alter_msg_watcher'

    def run(self):
        executor = multiprocessing.Pool(processes=20)
        custom = KafkaConsumer(settings.KAFKA_FROM_ALTER_MANGER_TOPIC,
                               group_id=settings.KAFKA_CUSTOMER_GROUP,
                               bootstrap_servers=settings.KAFKA_ALERT_MANAGER_CLUSTER,
                               max_poll_records=1)
        for msg in custom:
            executor.apply_async(self.handle_alert_msg, (msg,))
        executor.close()

    @staticmethod
    def handle_alert_msg(_msg):
        # 关闭无效的mysql连接
        for conn in connections.all():
            conn.close_if_unusable_or_obsolete()

        # --- for loop sub kafka topic msg ---
        handle_start = datetime.now()
        logger.info("===handle alert=== %s pid:%s got alert msg from kafka: %s" % (handle_start, os.getpid(), _msg))

        try:
            dat = json.loads(_msg.value.decode())
        except Exception as e:
            logger.error("msg: {}, load err: {}".format(_msg, e))
            return

        # --- handle alert msg ---
        alerts = dat.get("alerts", None)
        if not alerts:
            logger.error("dat: {}, no ALTERS value.".format(dat))
            return

上一篇 下一篇

猜你喜欢

热点阅读