个人学习

GRPC以及python实现

2020-02-19  本文已影响0人  臻甄

GRPC简介

A high-performance, open-source universal RPC framework --官网

rpc通信

RPC(remote procedure call 远程过程调用)

$ python -m pip install grpcio   # 安装gprc
$ python -m pip install grpcio-tools  #安装grpc-tools

QuitStart Python gRPC

下载代码

$ git clone -b v1.27.0 https://github.com/grpc/grpc
$ cd grpc/examples/python/helloworld

运行gRPC应用:用两个终端窗口一个运行Server进程,一个运行Client进程

$ python greeter_server.py  # 启动Server
$ python greeter_client.py # 在另外一个terminal启动client

Server端代码

"""The Python implementation of the GRPC helloworld.Greeter server."""

from concurrent import futures
import logging

import grpc

import helloworld_pb2
import helloworld_pb2_grpc

class Greeter(helloworld_pb2_grpc.GreeterServicer):
    # 自定义对外开放的API接口
    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)

def serve():
    # 使用ThreadPool并发处理Server任务
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    # 把对应的Greeter任务添加到rpc server中
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    # 这里使用的非安全接口,gRPC支持TLS/SSL安全连接,以及各种鉴权机制
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    logging.basicConfig()
    serve()

Client端代码

"""The Python implementation of the GRPC helloworld.Greeter client."""

from __future__ import print_function
import logging

import grpc

import helloworld_pb2
import helloworld_pb2_grpc

def run():
    # NOTE(gRPC Python Team): .close() is possible on a channel and should be
    # used in circumstances in which the with statement does not fit the needs
    # of the code.
    # 使用with语法保证channel自动close
    with grpc.insecure_channel('localhost:50051') as channel:
        # 客户端通过stub来实现rpc通信
        stub = helloworld_pb2_grpc.GreeterStub(channel)
        # 客户端必须使用定义好的类型,这里是HelloRequest类型
        response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
    print("Greeter client received: " + response.message)


if __name__ == '__main__':
    logging.basicConfig()
    run()

修改代码增添自己的定制API

从proto文件中生成自己的grpc代码

首先,拷贝examples/protos文件夹到examples/python/helloworld文件夹中并修改helloworld.proto

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
  // Sends another greeting 
  rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}  // 增加一行
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

在终端执行下面命令,重新生成helloworld_pb2.pyhelloworld_pb2_grpc.py

$ python -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/helloworld.proto

可以通过vimdiff对比一下update前后代码改动

修改greeter_server.py增加一个Server函数SayHelloAgain

class Greeter(helloworld_pb2_grpc.GreeterServicer):
    # 自定义对外开放的API接口
    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
    def SayHelloAgain(self, request, context):     
        return helloworld_pb2.HelloReply(message='Hello again, %s!' % request.name)

修改greeter_client.py增加一个client端的调用SayHelloAgain函数的步骤

def run():
    # NOTE(gRPC Python Team): .close() is possible on a channel and should be
    # used in circumstances in which the with statement does not fit the needs
    # of the code.
    # 使用with语法保证channel自动close
    with grpc.insecure_channel('localhost:50051') as channel:
        # 客户端通过stub来实现rpc通信
        stub = helloworld_pb2_grpc.GreeterStub(channel)
        # 客户端必须使用定义好的类型,这里是HelloRequest类型
        response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
        print("Greeter client received: " + response.message)
        # new
        response = stub.SayHelloAgain(helloworld_pb2.HelloRequest(name='you'))   
        print("Greeter client received: " + response.message)

重新在不同的终端运行Server和Client:

$ python greeter_server.py  # 启动Server 
$ python greeter_client.py # 在另外一个terminal启动client

Proto定义服务

service RouteGuide {
   // (Method definitions not shown)
}
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
// Obtains the Features available within the given Rectangle.  Results are 
// streamed rather than returned at once (e.g. in a response message with a 
// repeated field), as the rectangle may cover a large area and contain a 
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}

*(4)bidirectionally-streaming RPC:双向流式RPC,双方都使用读写流发送一系列消息。这两个流是独立运行的,因此客户端和服务器可以按照自己喜欢的顺序进行读写:例如,服务器可以在写响应之前等待接收所有客户端消息,或者可以先读取一条消息再写入一条消息,或其他一些读写组合。每个流中的消息顺序都会保留。您可以通过在请求和响应之前都放置stream关键字来指定这种类型的方法。

// Accepts a stream of RouteNotes sent while a route is being traversed, 
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}
{
    "location": {
        "latitude": 407838351,
        "longitude": -746143763
    },
    "name": "Patriots Path, Mendham, NJ 07945, USA"
}
# request:一个location的经度和纬度
# response:是location,包含name
def GetFeature(self, request, context):
  feature = get_feature(self.db, request)
  if feature is None:
    return route_guide_pb2.Feature(name="", location=request)
  else:
    return feature
# request:一个地区的四个边的经度纬度
# response:落在这个地区的所有点,按照yield的方式挨个返回,流式返回
def ListFeatures(self, request, context):
  left = min(request.lo.longitude, request.hi.longitude)
  right = max(request.lo.longitude, request.hi.longitude)
  top = max(request.lo.latitude, request.hi.latitude)
  bottom = min(request.lo.latitude, request.hi.latitude)
  for feature in self.db:
    if (feature.location.longitude >= left and
        feature.location.longitude <= right and
        feature.location.latitude >= bottom and
        feature.location.latitude <= top):
      yield feature
# request:是一个迭代器iterator,包含一系列location
# response: 筛选所有在数据库里的点,返回点的个数,数据库内点的个数,按顺序走完所有location的距离长度
def RecordRoute(self, request_iterator, context):
  point_count = 0
  feature_count = 0
  distance = 0.0
  prev_point = None

  start_time = time.time()
  for point in request_iterator:
    point_count += 1
    if get_feature(self.db, point):
      feature_count += 1
    if prev_point:
      distance += get_distance(prev_point, point)
    prev_point = point

  elapsed_time = time.time() - start_time
  return route_guide_pb2.RouteSummary(point_count=point_count,
                                      feature_count=feature_count,
                                      distance=int(distance),
                                      elapsed_time=int(elapsed_time))
# request: 是一个迭代器iterator,包含一系列location
# response: 根据每个location,找到location集里一样的点
def RouteChat(self, request_iterator, context):
  prev_notes = []
  for new_note in request_iterator:
    for prev_note in prev_notes:
      if prev_note.location == new_note.location:
        yield prev_note
    prev_notes.append(new_note)

启动Server

完成了RouteGuide的所有API,就可以启动Server了,另外start()不会阻塞,所以如果在服务期间代码没有其他事情要做,server则可能需要进入睡眠循环。

def serve():
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
      RouteGuideServicer(), server)
  server.add_insecure_port('[::]:50051')
  server.start()

创建Client

参考完整的Client代码:
examples/python/route_guide/route_guide_client.py.

channel = grpc.insecure_channel('localhost:50051') 
stub = route_guide_pb2_grpc.RouteGuideStub(channel)
# 同步调用
feature = stub.GetFeature(point)
# 异步调用
feature_future = stub.GetFeature.future(point) 
feature = feature_future.result()
for feature in stub.ListFeatures(rectangle):
# 同步
route_summary = stub.RecordRoute(point_iterator)
# 异步
route_summary_future = stub.RecordRoute.future(point_iterator)
route_summary = route_summary_future.result()
for received_route_note in stub.RouteChat(sent_route_note_iterator):

参考:

gRPC 官网
gRPC python QuickStart
gRPC python basic tutorial

上一篇 下一篇

猜你喜欢

热点阅读