Spring Boot 企业级实战GRPC

Spring Boot中使用cn.com.yd.commons.

2018-07-09  本文已影响28人  固安李庆海

本文主要讲解cn.com.yd.commons.grpc项目实战,开发环境为STS。

添加依赖

首先把cn.com.yd.commons.grpc项目使用mvn install命令进行编译打包,然后在新创建的Maven工程中添加依赖。

<dependency>
    <groupId>cn.com.yd.commons</groupId>
    <artifactId>grpc</artifactId>
    <version>1.0.0</version>
</dependency>

GRPC服务端

创建一个类并继承CommonServiceGrpc.CommonServiceImplBase,然后重写其中的方法以便进行具体的业务处理。

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import cn.com.yd.commons.grpc.CommonServiceGrpc;
import cn.com.yd.commons.grpc.Request;
import cn.com.yd.commons.grpc.Response;
import cn.com.yd.commons.grpc.util.GrpcHelper;
import io.grpc.stub.StreamObserver;
@Service
public class CommonService extends CommonServiceGrpc.CommonServiceImplBase{
/**
* 简单GRPC方法 
* @param request请求参数
* @param responseObserver响应结果
*/
@Override
public void handle(Request request, StreamObserver<Response> responseObserver) {
  try {
    //具体业务处理
    JSONObject json = GrpcHelper.toJSONObject(request);
    //获取指定的spring bean 名称
    String bean = json.getString("bean");
    //获取指定的执行方法
    String method = json.getString("method");
    //获取指定执行方法的参数
    Object[] args = (Object[])json.get("args");
    //获取指定的spring bean对象
    Object obj = ApplicationContextUtil.getBean(bean);
    //获取执行结果
    Object result = GrpcHelper.execute(obj, method, args);
    //将执行结果封装成Response对象
    Response res =GrpcHelper.toResponse();
    responseObserver.onNext(res);
  } catch (Exception e) {
    responseObserver.onError(e);
  } finally {
    //标识已经写完
    responseObserver.onCompleted();
  }
}

/**
* 客户端流式GRPC方法 
* @param responseObserver响应结果
*/
@Override
public StreamObserver<Request> clientStreamingHandle(StreamObserver<Response> responseObserver) {
  // 客户端每写入一个Request,服务端就会调用该方法
  return new StreamObserver<Request>() {
    int success = 0;
    int errors = 0;
    long startTime = System.nanoTime();
    @Override
    public void onNext(Request request) {
      //每一个数据的具体业务处理;
      success++;
    }

    @Override
    public void onError(Throwable throwable) {
      errors++;
      throwable.printStackTrace();
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      String msg = "成功 " + success + " 个,失败 " + errors + " 个,一共耗时 " + seconds + " 秒";
      Response rs = GrpcHelper.toResponse(true, msg);
      responseObserver.onNext(rs);
      responseObserver.onCompleted();
    }
  };
}

/**
 * 服务端流式GRPC方法
 * @param request请求参数
 * @param responseObserver响应结果
 */
@Override
public void serverStreamingHandle(Request request, StreamObserver<Response> responseObserver) {
  try {
    //具体业务处理
    JSONObject json = GrpcHelper.toJSONObject(request);
    //获取指定的spring bean 名称
    String bean = json.getString("bean");
    //获取指定的执行方法
    String method = json.getString("method");
    //获取指定执行方法的参数
    Object[] args = (Object[])json.get("args");
    //获取指定的spring bean对象
    Object obj = ApplicationContextUtil.getBean(bean);
    //获取执行结果,执行结果应该是一个集合如Collection|JSONArray,每个元素是JSONObject对象
    Object result = GrpcHelper.execute(obj, method, args);
    if (datas instanceof Collection) {
      for (Object item : (Collection) datas) {
        Response res = GrpcHelper.toResponse(item);
        responseObserver.onNext(res);
      }
    }
    //遍历结果
    for (JSONObject item : datas) {
      Response res = GrpcHelper.toResponse(item);
      responseObserver.onNext(res);
    }
  } catch (Exception e) {
    responseObserver.onError(e);
  } finally {
    responseObserver.onCompleted();
  }
}

/**
 * 双向流式GRPC方法
 * @param request请求参数
 * @param responseObserver响应结果
 */
@Override 
public StreamObserver<Request> bidirectionalStreamingHandle(StreamObserver<Response> responseObserver) {
  return new StreamObserver<Request>() {
    int success = 0;
    int errors = 0;
    long startTime = System.currentTimeMillis();

    @Override
    public void onNext(Request request) {
      System.out.println("GRPC服务端双向流式方法bidirectionalStreamingHandle中得到参数:" + request);
      inhandle(request);
      success++;
    }

    @Override
    public void onError(Throwable t) {
      errors++;
      responseObserver.onError(t);
      t.printStackTrace();
    }

    @Override
    public void onCompleted() {
      long ms = System.currentTimeMillis() - startTime;
      String msg = "成功 " + success + " 个,失败 " + errors + " 个,一共耗时 " + ms + " 毫秒";
      Response rs = GrpcHelper.toResponse(msg);
      responseObserver.onNext(rs);
      responseObserver.onCompleted();
    }
  };
  }
}

GRPC客户端

GRPC客户端的代码应该位于一个新的Maven或者SpringBoot工程中。
新建一个Controller并在业务方法中调用GRPC服务端的具体业务实现方法。
以MongoDB的CRUD操作为例进行GRPC客户端代码说明。

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PatchMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import cn.com.yd.commons.grpc.CommonServiceGrpc;
import cn.com.yd.commons.grpc.Request;
import cn.com.yd.commons.grpc.Response;
import cn.com.yd.commons.grpc.util.GrpcHelper;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
@RestController
@RequestMapping("${api-url}/mongo")
public class MongoCrudController {
@Autowired
private GrpcClient grpcClient;
/**
 * 向MongoDB中添加一行记录,简单GRPC方法使用示例
 * @param collection 数据表名称
 * @param json 要新增的一行数据
 * @return
 * @throws Exception
 */
@PostMapping("/{collection}")
public JSONObject create(String collection,String json) throws Exception {
  Request req = GrpcHelper.toRequest("mongodbService","create",new Object[]{collection, JSON.parseObject(json)});
  Response res = grpcClient.getBlockingStub().handle(req);
  return GrpcHelper.toJSONObject(res);
}

/**
 * 向MongoDB中批量添加记录,客户端流式方法使用示例
 * @param collection 数据表名称
 * @param jsons 要新增的数据,JSON格式的数组
 * @return
 * @throws Exception
 */
@PostMapping("/{collection}/batch")
public JSONObject batchCreate(String collection,String jsons) throws Exception {
  //建一个应答者接受返回数据
  StreamObserver<Response> responseObserver = new StreamObserver<Response>() {
    @Override
    public void onNext(Response res) {
      JSONObject obj = GrpcHelper.toJSONObject(res);
      System.out.println("服务端返回 :" + obj);
     }
    @Override
    public void onError(Throwable t) {
      System.out.println("服务端返回错误:"+t.getMessage());
    }
    
    @Override
    public void onCompleted() {
      System.out.println("execute finish");
    }
  };
  //客户端写入操作
  StreamObserver<Request> requestObserver =   grpcClient.getAsyncStub().clientStreamingHandle(responseObserver);
  try {
    for (Object obj:JSON.parseArray(jsons)) {
      Request res= GrpcHelper.toRequest("mongodbService","create",new Object[]{collection, JSON.parseObject(json)});
      requestObserver.onNext(res);
    }
  } catch (RuntimeException e) {
    requestObserver.onError(e);
  }finally{
    //标识已经写完
    requestObserver.onCompleted();
  }
  return GrpcHelper.success();
}

/**
 *从MongoDB中进行综合条件查询,服务端流式GRPC方式使用示例
 * @param collection 数据表名称
 * @param queryCriterias json数组格式的查询条件
 * @param orders 排序字段排序方式,如age:ASC##tall:DESC
 * @param page 页数,从0开始
 * @param pageSize 每页的行数,默认10
 * @return
 */
@GetMapping()
public JSONObject query(String collection,String queryCriterias, String orders, Integer page, Integer pageSize) {
  JSONOArray qc = JSON.parseArray(queryCriterias);
  Request req = GrpcHelper.toRequest("mongodbService","findDatas",New Object[]{collection, qc, orders, page, pageSize});
  Iterator<Response> it = grpcClient.getBlockingStub().serverStreamingHandle(req);
  while(it.hasNext()){
    Response res = it.next();
    JSONObject item = GrpcHelper.toJSONObject(res);
    System.out.println(item);
  }
  return GrpcHelper.success();
  }
}

启动GrpcServer

在Spring Boot启动类中增加

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import cn.com.yd.commons.grpc.util.GrpcServer;
import cn.com.yd.fis.server.service.CommonService;
import cn.com.yd.fis.server.util.ApplicationContextUtil;

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
        ApplicationContextUtil.getBean(StartGrpcServer.class).start();
    }
}

辅助工具类

Spring Bean 工具类

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextUtil implements ApplicationContextAware {
    private static ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(org.springframework.context.ApplicationContext arg0) throws BeansException {
        if (null == applicationContext) {
            applicationContext = arg0;
        }
    }

    // 通过name获取 Bean.
    public static Object getBean(String name) {
        return applicationContext.getBean(name);
    }

    // 通过class获取Bean.
    public static <T> T getBean(Class<T> clazz) {
        return applicationContext.getBean(clazz);
    }

    // 通过name,以及Clazz返回指定的Bean
    public static <T> T getBean(String name, Class<T> clazz) {
        return applicationContext.getBean(name, clazz);
    }
}

启动GRPCServer 工具类

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import cn.com.yd.commons.grpc.util.GrpcServer;
import cn.com.yd.fis.server.service.CommonService;

@Component
public class StartGrpcServer {
    @Value("${grpc.port}")
    private int port;
    public void start(){
        try {
            CommonService cs = ApplicationContextUtil.getBean(CommonService.class);
            GrpcServer gs = new GrpcServer(port,cs);
            gs.start();
            gs.blockUntilShutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在客户端工程中配置GrpcClient

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import cn.com.yd.commons.grpc.util.GrpcClient;

@Configuration
public class GrpcClientConfig {
    /** GRPC 服务器IP地址 */
    @Value("${grpc.server}")
    private String server;
    /** GRPC 监听的端口 */
    @Value("${grpc.port}")
    private int port;
    
    @Bean
    public GrpcClient create(){
        return new GrpcClient(server, port);
    }
}

参数配置

服务端参数配置

在服务端的application.properties中增加

# grpc端口号,只要没有被占用即可,具体数值根据喜好设定
grpc.port=50051

客户端参数配置

在客户端的application.properties中增加

# GRPC服务器地址
grpc.server=localhost
# GRPC监听的端口号
grpc.port=50051

运行

打包

和普通的SpringBoot工程没有区别,在STS中鼠标点击工程名称弹出右键菜单,然后选择Run as->Maven Install菜单项即可进行打包。第一次打包时需要联网下载一些必须的jar,过程会有点慢,耐心等待。

启动

将打包后的jar和配置文件拷贝到一个新的文件夹中,注意路径中不能包含有中文。
文件目录大致如下图


Spring Boot 打包结果.png

其中fis-server.bat是启动脚本,直接点击即可运行。

启动脚本

title "Finance information system V1.0"
java -jar fis-server-1.0.0.jar --spring.config.location=application-dev.properties
上一篇 下一篇

猜你喜欢

热点阅读