GRPC以及python实现
GRPC简介
rpc通信A high-performance, open-source universal RPC framework --官网
RPC(remote procedure call 远程过程调用)
- 是什么:提供一套应用程序之间可以通信的机制,使用C/S模型,Client进程调用Server进程提供的接口就像是调用本地函数一样。
-
为什么:相比较restful API的优势:
(1)gRPC都使用http底层传输协议建立C/S通信模型,但gRPC使用http2
(2)gRPC可以通过protobuf来定义由严格约束条件的接口,在提供公共API服务时,如果不希望client端给我们传递乱七八糟的数据,就可以定义输入的约束。
(3)gRPC通过protobuf将数据转化为二进制编码,减少传输数据量来提高传输性能。在需要服务器传递大量数据的场景下效率更高。
(4)gRPC通过http2.0可以方便的使用streaming模式做流式通信(通常的流式数据:视频流) -
怎么用:grpc官网的一个简单Python栗子
(1)在.proto文件中定义服务。
(2)使用协议缓冲区编译器生成服务器和客户端代码。
(3)使用Python gRPC API为您的服务编写一个简单的客户端和服务器。 - 安装:
$ python -m pip install grpcio # 安装gprc
$ python -m pip install grpcio-tools #安装grpc-tools
- Python grpc tutorial:The example code for this tutorial is in grpc/grpc/examples/python/route_guide.
QuitStart Python gRPC
下载代码
$ git clone -b v1.27.0 https://github.com/grpc/grpc
$ cd grpc/examples/python/helloworld
运行gRPC应用:用两个终端窗口一个运行Server进程,一个运行Client进程
$ python greeter_server.py # 启动Server
$ python greeter_client.py # 在另外一个terminal启动client
Server端代码
"""The Python implementation of the GRPC helloworld.Greeter server."""
from concurrent import futures
import logging
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
class Greeter(helloworld_pb2_grpc.GreeterServicer):
# 自定义对外开放的API接口
def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
def serve():
# 使用ThreadPool并发处理Server任务
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# 把对应的Greeter任务添加到rpc server中
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
# 这里使用的非安全接口,gRPC支持TLS/SSL安全连接,以及各种鉴权机制
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig()
serve()
Client端代码
"""The Python implementation of the GRPC helloworld.Greeter client."""
from __future__ import print_function
import logging
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
def run():
# NOTE(gRPC Python Team): .close() is possible on a channel and should be
# used in circumstances in which the with statement does not fit the needs
# of the code.
# 使用with语法保证channel自动close
with grpc.insecure_channel('localhost:50051') as channel:
# 客户端通过stub来实现rpc通信
stub = helloworld_pb2_grpc.GreeterStub(channel)
# 客户端必须使用定义好的类型,这里是HelloRequest类型
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
print("Greeter client received: " + response.message)
if __name__ == '__main__':
logging.basicConfig()
run()
修改代码增添自己的定制API
从proto文件中生成自己的grpc代码
首先,拷贝examples/protos
文件夹到examples/python/helloworld
文件夹中并修改helloworld.proto
:
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
// Sends another greeting
rpc SayHelloAgain (HelloRequest) returns (HelloReply) {} // 增加一行
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
在终端执行下面命令,重新生成helloworld_pb2.py
和helloworld_pb2_grpc.py
$ python -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/helloworld.proto
可以通过vimdiff对比一下update前后代码改动
修改greeter_server.py
增加一个Server函数SayHelloAgain
class Greeter(helloworld_pb2_grpc.GreeterServicer):
# 自定义对外开放的API接口
def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
def SayHelloAgain(self, request, context):
return helloworld_pb2.HelloReply(message='Hello again, %s!' % request.name)
修改greeter_client.py
增加一个client端的调用SayHelloAgain函数的步骤
def run():
# NOTE(gRPC Python Team): .close() is possible on a channel and should be
# used in circumstances in which the with statement does not fit the needs
# of the code.
# 使用with语法保证channel自动close
with grpc.insecure_channel('localhost:50051') as channel:
# 客户端通过stub来实现rpc通信
stub = helloworld_pb2_grpc.GreeterStub(channel)
# 客户端必须使用定义好的类型,这里是HelloRequest类型
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
print("Greeter client received: " + response.message)
# new
response = stub.SayHelloAgain(helloworld_pb2.HelloRequest(name='you'))
print("Greeter client received: " + response.message)
重新在不同的终端运行Server和Client:
$ python greeter_server.py # 启动Server
$ python greeter_client.py # 在另外一个terminal启动client
Proto定义服务
- 可以在
examples/protos/route_guide.proto
中看到完整的.proto
文件 - 定义Service
service RouteGuide {
// (Method definitions not shown)
}
- 然后,在服务定义中定义rpc方法,并指定它们的请求和响应类型。gRPC可以定义四种服务方法,所有这些方法都在RouteGuide服务中看到:
- (1)simple RPC :一种简单的RPC,客户端使用Stub将请求发送到服务器,然后等待响应返回,就像普通的函数调用一样。
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
- (2)response-streaming RPC:响应流式RPC,客户端向服务器发送请求,并获取 流 以读取回一系列消息。客户端从返回的流中读取,直到没有更多消息为止。如示例所示,您可以通过将stream关键字放在响应类型之前来指定响应流方法。
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
- (3)request-streaming RPC :请求流式RPC,客户端编写消息序列,然后再次使用提供的流将消息发送到服务器。客户端写完消息后,它将等待服务器读取所有消息并返回其响应。您可以通过将stream关键字放在请求类型之前来指定请求流方法。
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
*(4)bidirectionally-streaming RPC:双向流式RPC,双方都使用读写流发送一系列消息。这两个流是独立运行的,因此客户端和服务器可以按照自己喜欢的顺序进行读写:例如,服务器可以在写响应之前等待接收所有客户端消息,或者可以先读取一条消息再写入一条消息,或其他一些读写组合。每个流中的消息顺序都会保留。您可以通过在请求和响应之前都放置stream关键字来指定这种类型的方法。
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
-
.proto
文件还包含我们服务方法中使用的所有请求和响应类型的协议缓冲区消息类型定义-例如,这是Point消息类型:
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
- 举个栗子:
route_guide_server.py
里的class RouteGuideServicer
且数据库里的数据如下:每个地点都有name和location,其中location里包含经度和纬度:
{
"location": {
"latitude": 407838351,
"longitude": -746143763
},
"name": "Patriots Path, Mendham, NJ 07945, USA"
}
- (1)simple RPC:简单的RPC。
# request:一个location的经度和纬度
# response:是location,包含name
def GetFeature(self, request, context):
feature = get_feature(self.db, request)
if feature is None:
return route_guide_pb2.Feature(name="", location=request)
else:
return feature
- (2)response-streaming RPC:响应流式RPC。
# request:一个地区的四个边的经度纬度
# response:落在这个地区的所有点,按照yield的方式挨个返回,流式返回
def ListFeatures(self, request, context):
left = min(request.lo.longitude, request.hi.longitude)
right = max(request.lo.longitude, request.hi.longitude)
top = max(request.lo.latitude, request.hi.latitude)
bottom = min(request.lo.latitude, request.hi.latitude)
for feature in self.db:
if (feature.location.longitude >= left and
feature.location.longitude <= right and
feature.location.latitude >= bottom and
feature.location.latitude <= top):
yield feature
- (3)request-streaming RPC:请求流式RPC。
# request:是一个迭代器iterator,包含一系列location
# response: 筛选所有在数据库里的点,返回点的个数,数据库内点的个数,按顺序走完所有location的距离长度
def RecordRoute(self, request_iterator, context):
point_count = 0
feature_count = 0
distance = 0.0
prev_point = None
start_time = time.time()
for point in request_iterator:
point_count += 1
if get_feature(self.db, point):
feature_count += 1
if prev_point:
distance += get_distance(prev_point, point)
prev_point = point
elapsed_time = time.time() - start_time
return route_guide_pb2.RouteSummary(point_count=point_count,
feature_count=feature_count,
distance=int(distance),
elapsed_time=int(elapsed_time))
- (4)bidirectionally-streaming RPC:双向流式RPC。
# request: 是一个迭代器iterator,包含一系列location
# response: 根据每个location,找到location集里一样的点
def RouteChat(self, request_iterator, context):
prev_notes = []
for new_note in request_iterator:
for prev_note in prev_notes:
if prev_note.location == new_note.location:
yield prev_note
prev_notes.append(new_note)
启动Server
完成了RouteGuide
的所有API,就可以启动Server了,另外start()
不会阻塞,所以如果在服务期间代码没有其他事情要做,server则可能需要进入睡眠循环。
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
RouteGuideServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
创建Client
参考完整的Client代码:
examples/python/route_guide/route_guide_client.py.
- 首先,创建一个Stub:
channel = grpc.insecure_channel('localhost:50051')
stub = route_guide_pb2_grpc.RouteGuideStub(channel)
-
然后,Call service methods:
对于返回单个响应的RPC方法(“response-unary”方法),gRPC Python支持同步(阻塞)和异步(非阻塞)控制流语义。对于响应流式RPC方法,调用立即返回响应值的迭代器。调用该迭代器的next()
方法块,直到要从该迭代器产生的响应变为可用为止。 - (1)Simple RPC:简单RPC。GetFeature的同步调用几乎与调用本地方法一样简单。RPC调用等待服务器响应,并且将返回响应或引发异常;GetFeature的异步调用与此类似,但是就像在线程池中异步调用本地方法一样:
# 同步调用
feature = stub.GetFeature(point)
# 异步调用
feature_future = stub.GetFeature.future(point)
feature = feature_future.result()
- (2)Response-streaming RPC:响应流RPC。调用响应流ListFeatures类似于使用序列类型:
for feature in stub.ListFeatures(rectangle):
- (3)Request-streaming RPC:请求流式RPC。调用请求流式RecordRoute类似于将迭代器传递给本地方法。就像上面的简单RPC也返回单个响应一样,可以同步或异步调用它:
# 同步
route_summary = stub.RecordRoute(point_iterator)
# 异步
route_summary_future = stub.RecordRoute.future(point_iterator)
route_summary = route_summary_future.result()
- (4)Bidirectional streaming RPC:双向流RPC。调用双向流RouteChat具有服务流和响应流语义的组合(在服务端就是这种情况):
for received_route_note in stub.RouteChat(sent_route_note_iterator):