.NET Core gRPC 流式调用
gRPC 使用 Protocol buffers 作为接口定义语言(IDL)来描述服务接口和输入输出消息的结构,目前支持 4 种定义服务方法类型:
类型 | 说明 |
---|---|
简单 RPC | 客户端传入一个请求对象,服务端返回一个结果对象 |
客户端流式 RPC | 客户端传入多个请求对象,服务端返回一个结果对象 |
服务端流式 RPC | 客户端传入一个请求对象,服务端返回多个结果对象 |
双向流式 RPC | 客户端传入多个请求对象,服务端返回多个结果对象 |
RPC 定义
简单 RPC:一般这种方式使用较多,如下:定义 SayHello
方法,输入 HelloRequest
,返回 HelloResponse
。
service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);
}
message HelloRequest {
string greeting = 1;
}
message HelloResponse {
string reply = 1;
}
而流式 RPC 定义与 简单 RPC 的区别只是在请求或返回参数前增加了 stream
关键词,如下:
service HelloService {
// 客户端流式 RPC
rpc SayHello1 (stream HelloRequest) returns (HelloResponse);
// 服务端流式 RPC
rpc SayHello2 (HelloRequest) returns (stream HelloResponse);
// 双向流式 RPC
rpc SayHello3 (stream HelloRequest) returns (stream HelloResponse);
}
gRPC 能支持流式调用本质是因为 gRPC 通信是基于 HTTP/2 实现的,HTTP/2 具有流的概念,流是为了实现 HTTP/2 的多路复用。流是服务器和客户端在 HTTP/2 连接内用于交换帧数据的独立双向序列,逻辑上可看做一个较为完整的交互处理单元,即表达一次完整的资源请求、响应数据交换流程。
使用场景
在 gRPC 中接收消息大小限制参数 grpc.max_receive_message_length
默认是 4M ,如果大于该值,则会提示:Received message larger than max (xxxxxx vs. 4194304)
,当然我们可以修改默认值解决问题,但如果默认值支持过大对服务器资源也是一种消耗,这时候其实应该考虑使用流式调用,有效将数据进行分批处理,提高性能。
示例
这里主要介绍一下双向流式 RPC(客户端和服务端流式 RPC 类似),完整代码请 前往这里查看 。双向流模拟功能是客户端流式输入文件路径,服务端针对每个文件每次最多读取 1M 的数据返回,客户端拿到数据后生成新文件。
接口定义
syntax = "proto3";
package GrpcStream;
service StreamTest {
// 双向流程 RPC
rpc BidirectionalStream(stream BidirectionalStreamRequest) returns (stream BidirectionalStreamResponse) {}
}
message BidirectionalStreamRequest {
// 文件路径
string file_path = 1;
}
message BidirectionalStreamResponse {
// 文件路径
string file_path = 1;
// 数据
bytes data = 2;
}
代码实现
这里是基于 .NET Core 3.0 使用 gRPC,可以通过 VS 预置的 gRPC 服务 模板来创建服务端,创建后将默认的 porto 文件替换成上面的内容。
服务端代码实现:
public override async Task BidirectionalStream(IAsyncStreamReader<BidirectionalStreamRequest> requestStream, IServerStreamWriter<BidirectionalStreamResponse> responseStream, ServerCallContext context)
{
var i = 0;
// 监听客户端数据输入
while (await requestStream.MoveNext())
{
// 打印次数
Console.WriteLine(i++);
using var fs = File.Open(requestStream.Current.FilePath, FileMode.Open);
var leftSize = fs.Length;
// 1M
var buff = new byte[1048576];
while (leftSize > 0)
{
var len = await fs.ReadAsync(buff);
leftSize -= len;
Console.WriteLine($"response {requestStream.Current.FilePath} {len} bytes");
// 流式返回数据
await responseStream.WriteAsync(new BidirectionalStreamResponse
{
FilePath = requestStream.Current.FilePath,
Data = ByteString.CopyFrom(buff, 0, len)
});
}
}
}
客户端代码实现:
// 测试文件,key 是已存在的文件,value 是需要生成的文件
static readonly Dictionary<string, string> fileDic = new Dictionary<string, string>()
{
{@"d:\dapr\daprd_windows_amd64.zip", @"d:\dapr\daprd_windows_amd64_new.zip" },
{@"d:\dapr\injector_windows_amd64.zip", @"d:\dapr\injector_windows_amd64_new.zip" },
};
static StreamTest.StreamTestClient client;
static async Task Main(string[] args)
{
// 连接 gRPC 服务
var channel = GrpcChannel.ForAddress("https://localhost:5001");
client = new StreamTest.StreamTestClient(channel);
await BidirectionalStreamTestAsync();
Console.ReadKey();
}
static async Task BidirectionalStreamTestAsync()
{
using var call = client.BidirectionalStream();
var responseTask = Task.Run(async () =>
{
// 接收返回值
var iterator = call.ResponseStream;
// 监听服务端数据返回
while (await iterator.MoveNext())
{
Console.WriteLine($"write to new file {fileDic[iterator.Current.FilePath]} {iterator.Current.Data.Length} bytes");
// 写入新文件
using var fs = new FileStream(fileDic[iterator.Current.FilePath], FileMode.Append);
iterator.Current.Data.WriteTo(fs);
}
});
var rand = new Random();
foreach (var item in fileDic)
{
// 流式输入
await call.RequestStream.WriteAsync(new BidirectionalStreamRequest
{
FilePath = item.Key
});
await Task.Delay(rand.Next(200));
}
await call.RequestStream.CompleteAsync();
await responseTask;
}
执行结果:
result