使用kazoo改写grpc程序

2019-08-06  本文已影响0人  bwisgood

之前我们用过grpc来实现过远程过程调用,这次我们将Zookeeper加入到我们的实践当中,所以需要改写其中的一部分程序
server:
在实现服务端的时候我们需要在开始服务之前将自己的服务信息注册到Zookeeper去,值得注意的是当我们需要测试断开连接是否会从Zookeeper中取消注册的时候我们最好在关闭的时候使用KazooClient.stop()方法,要不然连接不会瞬间关闭而是有一定的延迟,得出的测试效果也会大大折扣

import req_pb2_grpc as PService
import req_pb2 as Mess
import grpc
from concurrent import futures


# 实现被调用的具体代码
class DemoService(PService.DemoServicer):

    def __init__(self):
        self.city_db = {
            "beijing": [' python ', 'c++', 'go'],
            "shanghai": [' 产品 ', '123', '数学'],
            "wuhan": [' 语文 ', '英语', '数学']
        }
        self.answers = list(range(10))

    def Calculate(self, request, context):
        if request.num1 == 0:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details("cannot divide by 0")
            return Mess.Result()

        if request.op == Mess.Work.ADD:
            result = request.num1 + request.num2
        elif request.op == Mess.Work.SUBTRACT:
            result = request.num1 - request.num2
        else:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details("invalid op parameter")
            return Mess.Result()

        return Mess.Result(result=result)

        pass

    def GetCommunity(self, request, context):
        city = request.name
        subs = self.city_db.get(city)
        print("*" * 100)
        for sub in subs:
            import time
            # time.sleep(1)
            yield Mess.Subject(name=sub)

    def Accumulate(self, request_iterator, context):
        sum = 0
        for num in request_iterator:
            sum += num.val
            print(sum)
        return Mess.Sum(val=sum)

    def GuessNumber(self, request_iterator, context):
        for request in request_iterator:
            if request.val in self.answers:
                print(request.val)
                yield Mess.Answer(val=request.val, desc='bingo')


from kazoo.client import KazooClient
import json
import sys


class DistributeServer(object):
    def register_zookeeper(self, data):
        """
        注册到zookeeper
        :return:
        """
        # 创建kazoo客户端
        self.zk = KazooClient("127.0.0.1:2181")
        # 建立连接
        self.zk.start()
        # 在zookeeper中创建节点信息
        self.zk.ensure_path("/rpc")
        self.zk.create('/rpc/server', data, ephemeral=True, sequence=True)
        print("成功注册到zookeeper:", data.decode())

    # 开启服务器
    def serve(self, host, port):
        # 创建服务器-对象 多线程服务器
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        # 注册实现的服务方法到服务器对象中
        PService.add_DemoServicer_to_server(DemoService(), server)
        # 设置服务地址
        server.add_insecure_port('{}:{}'.format(host, port))
        # 开启服务
        print('start server')
        # 注册到zookeeper
        addr = {"host": host, "port": port}
        self.register_zookeeper(json.dumps(addr).encode())

        server.start()
        # 关闭服务
        import time

        try:
            time.sleep(1000)
        except KeyboardInterrupt:
            self.zk.stop()
            server.stop(0)
            print("stop server")
        pass


if __name__ == '__main__':
    port = sys.argv[1]
    host = sys.argv[2]
    server = DistributeServer()
    server.serve(port, host)

client:
在客户端需要进行rpc调用的时候首先从Zookeeper中获取当前的正常运行的服务有哪些,然后从中选取一个服务器去进行连接

import grpc
from grpc_demo import req_pb2_grpc, req_pb2


def invoke(stub):
    work = req_pb2.Work()
    work.num1 = 1
    work.num2 = 2
    work.op = req_pb2.Work.ADD
    result = stub.Calculate(work)
    print("result :", result.result)

    work.num1 = 0
    try:
        result = stub.Calculate(work)
    except grpc.RpcError as e:
        print('{}:{}'.format(e.code(), e.details()))


def invoke_get_sub(stub):
    city = req_pb2.City(name="beijing")
    subs = stub.GetCommunity(city)
    for sub in subs:
        print(sub.name)


def generate_delta():
    for i in range(10):
        import random
        import time
        time.sleep(0.5)
        delta = random.randint(1, 100)
        yield req_pb2.Delta(val=delta)


def invoke_sum(stub):
    delta_iterator = generate_delta()
    sum = stub.Accumulate(delta_iterator)
    print(sum.val)


def generate_number():
    for i in range(10):
        import random
        import time
        time.sleep(0.5)
        delta = random.randint(1, 100)
        yield req_pb2.Number(val=delta)


def invoke_guess(stub):
    number_iterator = generate_number()
    answers = stub.GuessNumber(number_iterator)
    for an in answers:
        print("{}:{}".format(an.val, an.desc))


from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
import json


class Client(object):
    def __init__(self):
        self.zk = KazooClient("127.0.0.1:2181")
        self.zk.start()
        self.servers = []
        self.get_servers()

    def get_servers(self, event=None):
        # 有变化的话回调g重新获取server列表
        servers = self.zk.get_children("/rpc", watch=self.get_servers)
        print("="*20)
        print("更新了!!!!!")
        print("="*20)

        if not servers:
            raise KazooException("暂无子节点")
        for server in servers:
            # server_list.append("/rpc/" + json(server_data))
            addr_data, node_state = self.zk.get("/rpc/" + server)
            self.servers.append(addr_data.decode())

    def get_server(self):
        import random
        server = json.loads(random.choice(self.servers))
        return "{}:{}".format(server["host"], server["port"])

    def run(self):
        # 获取分布式的channel 从zookeeper中获取
        ipport = self.get_server()
        print("REQUEST: ", ipport)
        # channel = None
        while True:
            try:
                channel = grpc.insecure_channel(ipport)
            except Exception:
                continue
            else:
                break
        # with grpc.insecure_channel(ipport) as channel:
        # 创建stub对象
        stub = req_pb2_grpc.DemoStub(channel)
        # invoke_get_sub(stub)
        try:
            invoke_get_sub(stub)
            # invoke(stub)
        except Exception as e:
            pass
        # invoke_guess(stub)

        channel.close()


if __name__ == '__main__':
    import time

    ins = Client()
    for i in range(50):
        time.sleep(1)
        ins.run()

测试:
我们这里可以开启多个server分别监听不同的端口,并且让客户端在执行的时候打印出请求的服务是哪一个
然后在客户端运行的过程中,关闭某一个server,这时候我们就可以看到客户端的终端会打印出”更新了“的字样,并且后续的请求都不会再去请求那个已经被我们关闭的服务,代表测试成功。

上一篇下一篇

猜你喜欢

热点阅读