ASP.NET Core见识录Amazing .NETdotNET

.NET Core gRPC 流式调用

2019-10-26  本文已影响0人  BeckJin

gRPC 使用 Protocol buffers 作为接口定义语言(IDL)来描述服务接口和输入输出消息的结构,目前支持 4 种定义服务方法类型:

类型 说明
简单 RPC 客户端传入一个请求对象,服务端返回一个结果对象
客户端流式 RPC 客户端传入多个请求对象,服务端返回一个结果对象
服务端流式 RPC 客户端传入一个请求对象,服务端返回多个结果对象
双向流式 RPC 客户端传入多个请求对象,服务端返回多个结果对象

RPC 定义

简单 RPC:一般这种方式使用较多,如下:定义 SayHello 方法,输入 HelloRequest,返回 HelloResponse

service HelloService {
  rpc SayHello (HelloRequest) returns (HelloResponse);
}

message HelloRequest {
  string greeting = 1;
}

message HelloResponse {
  string reply = 1;
}

而流式 RPC 定义与 简单 RPC 的区别只是在请求或返回参数前增加了 stream 关键词,如下:

service HelloService {
  // 客户端流式 RPC
  rpc SayHello1 (stream  HelloRequest) returns (HelloResponse);
  // 服务端流式 RPC
  rpc SayHello2 (HelloRequest) returns (stream HelloResponse);
  // 双向流式 RPC
  rpc SayHello3 (stream  HelloRequest) returns (stream HelloResponse);
}

gRPC 能支持流式调用本质是因为 gRPC 通信是基于 HTTP/2 实现的,HTTP/2 具有流的概念,流是为了实现 HTTP/2 的多路复用。流是服务器和客户端在 HTTP/2 连接内用于交换帧数据的独立双向序列,逻辑上可看做一个较为完整的交互处理单元,即表达一次完整的资源请求、响应数据交换流程。

使用场景

在 gRPC 中接收消息大小限制参数 grpc.max_receive_message_length 默认是 4M ,如果大于该值,则会提示:Received message larger than max (xxxxxx vs. 4194304),当然我们可以修改默认值解决问题,但如果默认值支持过大对服务器资源也是一种消耗,这时候其实应该考虑使用流式调用,有效将数据进行分批处理,提高性能。

示例

这里主要介绍一下双向流式 RPC(客户端和服务端流式 RPC 类似),完整代码请 前往这里查看 。双向流模拟功能是客户端流式输入文件路径,服务端针对每个文件每次最多读取 1M 的数据返回,客户端拿到数据后生成新文件。

接口定义

syntax = "proto3";

package GrpcStream;

service StreamTest {
  // 双向流程 RPC
  rpc BidirectionalStream(stream BidirectionalStreamRequest) returns (stream BidirectionalStreamResponse) {}
}

message BidirectionalStreamRequest {
  // 文件路径
  string file_path = 1;
}
message BidirectionalStreamResponse {
  // 文件路径
  string file_path = 1;
  // 数据
  bytes data = 2;
}

代码实现

这里是基于 .NET Core 3.0 使用 gRPC,可以通过 VS 预置的 gRPC 服务 模板来创建服务端,创建后将默认的 porto 文件替换成上面的内容。

服务端代码实现:

public override async Task BidirectionalStream(IAsyncStreamReader<BidirectionalStreamRequest> requestStream, IServerStreamWriter<BidirectionalStreamResponse> responseStream, ServerCallContext context)
{
  var i = 0;
  // 监听客户端数据输入
  while (await requestStream.MoveNext()) 
  {
    // 打印次数
    Console.WriteLine(i++); 
    using var fs = File.Open(requestStream.Current.FilePath, FileMode.Open);
    var leftSize = fs.Length;
    // 1M
    var buff = new byte[1048576]; 
    while (leftSize > 0)
    {
      var len = await fs.ReadAsync(buff);
      leftSize -= len;
      Console.WriteLine($"response {requestStream.Current.FilePath} {len} bytes");
      // 流式返回数据
      await responseStream.WriteAsync(new BidirectionalStreamResponse
      {
        FilePath = requestStream.Current.FilePath,
        Data = ByteString.CopyFrom(buff, 0, len)
      });
    }
  }
}

客户端代码实现:

// 测试文件,key 是已存在的文件,value 是需要生成的文件
static readonly Dictionary<string, string> fileDic = new Dictionary<string, string>()
{
  {@"d:\dapr\daprd_windows_amd64.zip", @"d:\dapr\daprd_windows_amd64_new.zip" },
  {@"d:\dapr\injector_windows_amd64.zip", @"d:\dapr\injector_windows_amd64_new.zip" },
};
static StreamTest.StreamTestClient client;

static async Task Main(string[] args)
{
  // 连接 gRPC 服务 
  var channel = GrpcChannel.ForAddress("https://localhost:5001");
  client = new StreamTest.StreamTestClient(channel);
  await BidirectionalStreamTestAsync();
  Console.ReadKey();
}

static async Task BidirectionalStreamTestAsync()
{
  using var call = client.BidirectionalStream();
  var responseTask = Task.Run(async () =>
  {
    // 接收返回值
    var iterator = call.ResponseStream;
    // 监听服务端数据返回
    while (await iterator.MoveNext())
    {
      Console.WriteLine($"write to new file {fileDic[iterator.Current.FilePath]} {iterator.Current.Data.Length} bytes");
      // 写入新文件
      using var fs = new FileStream(fileDic[iterator.Current.FilePath], FileMode.Append);
      iterator.Current.Data.WriteTo(fs);
    }
  });

  var rand = new Random();
  foreach (var item in fileDic)
  {
    // 流式输入
    await call.RequestStream.WriteAsync(new BidirectionalStreamRequest
    {
      FilePath = item.Key
    });
    await Task.Delay(rand.Next(200));
  }
  await call.RequestStream.CompleteAsync();
  await responseTask;
}

执行结果:

result

参考资料

上一篇下一篇

猜你喜欢

热点阅读