RabbitMQRabbitMQ

RabbitMQ+GRPC的快速使用(1)

2019-08-02  本文已影响0人  bwisgood

使用背景

之前写的同步发送通知随着业务量的增长,已经不再适用,所以快速实现一个基本的rq队列+grpc的方式来投递通知数据并交给rq的worker去调用grpc的服务。
但是之前调用的地方太多了,所以最好还是以patch的方式去修改

思路

原有的结构大致为图1所示


图1

首先flask调用grpc再由grpc请求微信服务器发送消息,然后由微信响应请求后返回通知结果给grpc,grpc再返回结果给flask最终返回给客户端,所以除非等到grpc返回调用结果,否则将会一直阻塞
现在则为


图2
flask投递消息到队列中去就就结束了,直接返回到客户端,这里就不会阻塞,而是让监听rabbitMQ的worker去执行

这里暂时只创建一个队列去分发所有类型的通知所以message的格式需要固定
{"method":"method_name", "data":{}},客户端调用publish传入对应的参数即可

# client.py
import pika
import pickle


class RabbitClient(object):
    def __init__(self, host="localhost", port=5672, routing_key=None):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=host, port=port))
        self.channel = self.connection.channel()
        self.routing_key = routing_key

    def publish(self, method_name, **kwargs):
        message = self.package(method_name, **kwargs)

        self.channel.basic_publish(exchange='', routing_key=self.routing_key, body=message)

    def package(self, method_name, **kwargs):
        temp = {"method": method_name}
        temp.update(data=kwargs)
        return pickle.dumps(temp)
# 这里是调用的工具module,原来的方式已经注释
from apps import rq


# def sen_message_test(user_id, message):
#     """
#
#     :param user_id:
#     :param message: {"title":"","message":""}
#     :return:
#     """
#     with grpc.insecure_channel("{0}:{1}".format(_HOST, _PORT)) as channel:
#         client = send_server_pb2_grpc.SendServiceStub(channel=channel)
#         response = client.SendMessage(send_server_pb2.SendMessageParam(user_id=user_id, message=json.dumps(message)))
#     print("received: " + str(response))

def sen_message_test(user_id, message):
    rq.publish("sen_message_test", user_id=user_id, message=message)

def debt_remind_test(user_id=None, bill_id=None):
    rq.publish("debt_remind_test", user_id, bill_id)

def repair_remind_test(user_id=None, repair_id=None):
    rq.publish("repair_remind_test", user_id=user_id, repair_id=repair_id)

# 太多了就不全列出来了,总之就是要保证原来的业务逻辑代码还能用
# worker.py
import pika
import pickle


class RabbitServer(object):
    def __init__(self, host="localhost", port=5672, queue=None):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=host, port=port))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=queue, durable=True)

        self.channel.basic_consume(on_message_callback=self.callback, queue=queue, auto_ack=True)
        self.dispatcher = RpcMethodDispatcher()
        self.setup = self.dispatcher.setup

    def callback(self, ch, method, properties, body):
        body = pickle.loads(body)
        print(body)

        func = self.dispatcher.dispatch(body.get("method"))
        if not func:
            return
        try:
            func(**body.get("data"))
        except Exception as e:
            print(e)

    def run(self):
        print("wait")
        self.channel.start_consuming()


class RpcMethodDispatcher(object):
    def __init__(self):
        self.map = []

    def setup(self, name):
        # 和message中的method相互对应类似于@app.route("/"),将所有路由添加过来
        def deco(f):
            self.map.append(MethodMap(name, f))

            def wrapper(*args, **kwargs):
                return f(*args, **kwargs)

            return wrapper

        return deco

    def dispatch(self, name):
        for i in self.map:
            if i.name == name:
                return i.method


class MethodMap(object):
    def __init__(self, name, method):
        self.name = name
        self.method = method


server = RabbitServer(queue="task_queue")

if __name__ == '__main__':
    server.run()

给标题后面加了个(1),我知道这玩意儿很快就会还要修改
可能看到这里就会有同学问了,为啥不new一个thread去执行嘞?


image.png
上一篇下一篇

猜你喜欢

热点阅读