java版gRPC实战之五:双向流

2021-07-27  本文已影响0人  程序员欣宸

欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

本篇概览

  1. 在proto文件中定义双向流类型的gRPC接口,再通过proto生成java代码
  2. 开发服务端应用
  3. 开发客户端应用
  4. 验证

源码下载

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
在这里插入图片描述 在这里插入图片描述

在proto文件中定义双向流类型的gRPC接口

// gRPC服务,这是个在线商城的库存服务
service StockService {
    // 双向流式:批量扣减库存
    rpc BatchDeduct (stream ProductOrder) returns (stream DeductReply) {}
}

// 扣减库存返回结果的数据结构
message DeductReply {
    // 返回码
    int32 code = 1;
    // 描述信息
    string message = 2;
}
在这里插入图片描述 在这里插入图片描述

开发服务端应用

// 使用springboot插件
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    // 作为gRPC服务提供方,需要用到此库
    implementation 'net.devh:grpc-server-spring-boot-starter'
    // 依赖自动生成源码的工程
    implementation project(':grpc-lib')
    // annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessor
    annotationProcessor 'org.projectlombok:lombok'
}
spring:
  application:
    name: double-stream-server-side
# gRPC有关的配置,这里只需要配置服务端口号
grpc:
  server:
    port: 9901
package grpctutorials;

import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;

@GrpcService
@Slf4j
public class GrpcServerService extends StockServiceGrpc.StockServiceImplBase {

    @Override
    public StreamObserver<ProductOrder> batchDeduct(StreamObserver<DeductReply> responseObserver) {
        // 返回匿名类,给上层框架使用
        return new StreamObserver<ProductOrder>() {

            private int totalCount = 0;

            @Override
            public void onNext(ProductOrder value) {
                log.info("正在处理商品[{}],数量为[{}]",
                        value.getProductId(),
                        value.getNumber());

                // 增加总量
                totalCount += value.getNumber();

                int code;
                String message;

                // 假设单数的都有库存不足的问题
                if (0 == value.getNumber() % 2) {
                    code = 10000;
                    message = String.format("商品[%d]扣减库存数[%d]成功", value.getProductId(), value.getNumber());
                } else {
                    code = 10001;
                    message = String.format("商品[%d]扣减库存数[%d]失败", value.getProductId(), value.getNumber());
                }

                responseObserver.onNext(DeductReply.newBuilder()
                        .setCode(code)
                        .setMessage(message)
                        .build());
            }

            @Override
            public void onError(Throwable t) {
                log.error("批量减扣库存异常", t);
            }

            @Override
            public void onCompleted() {
                log.info("批量减扣库存完成,共计[{}]件商品", totalCount);
                responseObserver.onCompleted();
            }
        };
    }
}

开发客户端应用

plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'net.devh:grpc-client-spring-boot-starter'
    implementation project(':grpc-lib')
}
server:
  port: 8082
spring:
  application:
    name: double-stream-client-side

grpc:
  client:
    # gRPC配置的名字,GrpcClient注解会用到
    double-stream-server-side:
      # gRPC服务端地址
      address: 'static://127.0.0.1:9901'
      enableKeepAlive: true
      keepAliveWithoutCalls: true
      negotiationType: plaintext
package com.bolingcavalry.grpctutorials;

import io.grpc.stub.StreamObserver;

public interface ExtendResponseObserver<T> extends StreamObserver<T> {
    String getExtra();
}
package grpctutorials;

import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Service
@Slf4j
public class GrpcClientService {

    @GrpcClient("double-stream-server-side")
    private StockServiceGrpc.StockServiceStub stockServiceStub;

    /**
     * 批量减库存
     * @param count
     * @return
     */
    public String batchDeduct(int count) {

        CountDownLatch countDownLatch = new CountDownLatch(1);

        // responseObserver的onNext和onCompleted会在另一个线程中被执行,
        // ExtendResponseObserver继承自StreamObserver
        ExtendResponseObserver<DeductReply> responseObserver = new ExtendResponseObserver<DeductReply>() {

            // 用stringBuilder保存所有来自服务端的响应
            private StringBuilder stringBuilder = new StringBuilder();

            @Override
            public String getExtra() {
                return stringBuilder.toString();
            }

            /**
             * 客户端的流式请求期间,每一笔请求都会收到服务端的一个响应,
             * 对应每个响应,这里的onNext方法都会被执行一次,入参是响应内容
             * @param value
             */
            @Override
            public void onNext(DeductReply value) {
                log.info("batch deduct on next");
                // 放入匿名类的成员变量中
                stringBuilder.append(String.format("返回码[%d],返回信息:%s<br>" , value.getCode(), value.getMessage()));
            }

            @Override
            public void onError(Throwable t) {
                log.error("batch deduct gRPC request error", t);
                stringBuilder.append("batch deduct gRPC error, " + t.getMessage());
                countDownLatch.countDown();
            }

            /**
             * 服务端确认响应完成后,这里的onCompleted方法会被调用
             */
            @Override
            public void onCompleted() {
                log.info("batch deduct on complete");
                // 执行了countDown方法后,前面执行countDownLatch.await方法的线程就不再wait了,
                // 会继续往下执行
                countDownLatch.countDown();
            }
        };

        // 远程调用,此时数据还没有给到服务端
        StreamObserver<ProductOrder> requestObserver = stockServiceStub.batchDeduct(responseObserver);

        for(int i=0; i<count; i++) {
            // 每次执行onNext都会发送一笔数据到服务端,
            // 服务端的onNext方法都会被执行一次
            requestObserver.onNext(build(101 + i, 1 + i));
        }

        // 客户端告诉服务端:数据已经发完了
        requestObserver.onCompleted();

        try {
            // 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,
            // 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,
            // await的超时时间设置为2秒
            countDownLatch.await(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("countDownLatch await error", e);
        }

        log.info("service finish");
        // 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得
        return responseObserver.getExtra();
    }

    /**
     * 创建ProductOrder对象
     * @param productId
     * @param num
     * @return
     */
    private static ProductOrder build(int productId, int num) {
        return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
    }
}
package grpctutorials;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class GrpcClientController {

    @Autowired
    private GrpcClientService grpcClientService;

    @RequestMapping("/")
    public String printMessage(@RequestParam(defaultValue = "1") int count) {
        return grpcClientService.batchDeduct(count);
    }
}

验证

在这里插入图片描述 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述

你不孤单,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos

上一篇下一篇

猜你喜欢

热点阅读