Spring Boot中使用cn.com.yd.commons.
2018-07-09 本文已影响28人
固安李庆海
本文主要讲解cn.com.yd.commons.grpc项目实战,开发环境为STS。
添加依赖
首先把cn.com.yd.commons.grpc项目使用mvn install命令进行编译打包,然后在新创建的Maven工程中添加依赖。
<dependency>
<groupId>cn.com.yd.commons</groupId>
<artifactId>grpc</artifactId>
<version>1.0.0</version>
</dependency>
GRPC服务端
创建一个类并继承CommonServiceGrpc.CommonServiceImplBase,然后重写其中的方法以便进行具体的业务处理。
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import cn.com.yd.commons.grpc.CommonServiceGrpc;
import cn.com.yd.commons.grpc.Request;
import cn.com.yd.commons.grpc.Response;
import cn.com.yd.commons.grpc.util.GrpcHelper;
import io.grpc.stub.StreamObserver;
@Service
public class CommonService extends CommonServiceGrpc.CommonServiceImplBase{
/**
* 简单GRPC方法
* @param request请求参数
* @param responseObserver响应结果
*/
@Override
public void handle(Request request, StreamObserver<Response> responseObserver) {
try {
//具体业务处理
JSONObject json = GrpcHelper.toJSONObject(request);
//获取指定的spring bean 名称
String bean = json.getString("bean");
//获取指定的执行方法
String method = json.getString("method");
//获取指定执行方法的参数
Object[] args = (Object[])json.get("args");
//获取指定的spring bean对象
Object obj = ApplicationContextUtil.getBean(bean);
//获取执行结果
Object result = GrpcHelper.execute(obj, method, args);
//将执行结果封装成Response对象
Response res =GrpcHelper.toResponse();
responseObserver.onNext(res);
} catch (Exception e) {
responseObserver.onError(e);
} finally {
//标识已经写完
responseObserver.onCompleted();
}
}
/**
* 客户端流式GRPC方法
* @param responseObserver响应结果
*/
@Override
public StreamObserver<Request> clientStreamingHandle(StreamObserver<Response> responseObserver) {
// 客户端每写入一个Request,服务端就会调用该方法
return new StreamObserver<Request>() {
int success = 0;
int errors = 0;
long startTime = System.nanoTime();
@Override
public void onNext(Request request) {
//每一个数据的具体业务处理;
success++;
}
@Override
public void onError(Throwable throwable) {
errors++;
throwable.printStackTrace();
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
String msg = "成功 " + success + " 个,失败 " + errors + " 个,一共耗时 " + seconds + " 秒";
Response rs = GrpcHelper.toResponse(true, msg);
responseObserver.onNext(rs);
responseObserver.onCompleted();
}
};
}
/**
* 服务端流式GRPC方法
* @param request请求参数
* @param responseObserver响应结果
*/
@Override
public void serverStreamingHandle(Request request, StreamObserver<Response> responseObserver) {
try {
//具体业务处理
JSONObject json = GrpcHelper.toJSONObject(request);
//获取指定的spring bean 名称
String bean = json.getString("bean");
//获取指定的执行方法
String method = json.getString("method");
//获取指定执行方法的参数
Object[] args = (Object[])json.get("args");
//获取指定的spring bean对象
Object obj = ApplicationContextUtil.getBean(bean);
//获取执行结果,执行结果应该是一个集合如Collection|JSONArray,每个元素是JSONObject对象
Object result = GrpcHelper.execute(obj, method, args);
if (datas instanceof Collection) {
for (Object item : (Collection) datas) {
Response res = GrpcHelper.toResponse(item);
responseObserver.onNext(res);
}
}
//遍历结果
for (JSONObject item : datas) {
Response res = GrpcHelper.toResponse(item);
responseObserver.onNext(res);
}
} catch (Exception e) {
responseObserver.onError(e);
} finally {
responseObserver.onCompleted();
}
}
/**
* 双向流式GRPC方法
* @param request请求参数
* @param responseObserver响应结果
*/
@Override
public StreamObserver<Request> bidirectionalStreamingHandle(StreamObserver<Response> responseObserver) {
return new StreamObserver<Request>() {
int success = 0;
int errors = 0;
long startTime = System.currentTimeMillis();
@Override
public void onNext(Request request) {
System.out.println("GRPC服务端双向流式方法bidirectionalStreamingHandle中得到参数:" + request);
inhandle(request);
success++;
}
@Override
public void onError(Throwable t) {
errors++;
responseObserver.onError(t);
t.printStackTrace();
}
@Override
public void onCompleted() {
long ms = System.currentTimeMillis() - startTime;
String msg = "成功 " + success + " 个,失败 " + errors + " 个,一共耗时 " + ms + " 毫秒";
Response rs = GrpcHelper.toResponse(msg);
responseObserver.onNext(rs);
responseObserver.onCompleted();
}
};
}
}
GRPC客户端
GRPC客户端的代码应该位于一个新的Maven或者SpringBoot工程中。
新建一个Controller并在业务方法中调用GRPC服务端的具体业务实现方法。
以MongoDB的CRUD操作为例进行GRPC客户端代码说明。
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PatchMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import cn.com.yd.commons.grpc.CommonServiceGrpc;
import cn.com.yd.commons.grpc.Request;
import cn.com.yd.commons.grpc.Response;
import cn.com.yd.commons.grpc.util.GrpcHelper;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
@RestController
@RequestMapping("${api-url}/mongo")
public class MongoCrudController {
@Autowired
private GrpcClient grpcClient;
/**
* 向MongoDB中添加一行记录,简单GRPC方法使用示例
* @param collection 数据表名称
* @param json 要新增的一行数据
* @return
* @throws Exception
*/
@PostMapping("/{collection}")
public JSONObject create(String collection,String json) throws Exception {
Request req = GrpcHelper.toRequest("mongodbService","create",new Object[]{collection, JSON.parseObject(json)});
Response res = grpcClient.getBlockingStub().handle(req);
return GrpcHelper.toJSONObject(res);
}
/**
* 向MongoDB中批量添加记录,客户端流式方法使用示例
* @param collection 数据表名称
* @param jsons 要新增的数据,JSON格式的数组
* @return
* @throws Exception
*/
@PostMapping("/{collection}/batch")
public JSONObject batchCreate(String collection,String jsons) throws Exception {
//建一个应答者接受返回数据
StreamObserver<Response> responseObserver = new StreamObserver<Response>() {
@Override
public void onNext(Response res) {
JSONObject obj = GrpcHelper.toJSONObject(res);
System.out.println("服务端返回 :" + obj);
}
@Override
public void onError(Throwable t) {
System.out.println("服务端返回错误:"+t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("execute finish");
}
};
//客户端写入操作
StreamObserver<Request> requestObserver = grpcClient.getAsyncStub().clientStreamingHandle(responseObserver);
try {
for (Object obj:JSON.parseArray(jsons)) {
Request res= GrpcHelper.toRequest("mongodbService","create",new Object[]{collection, JSON.parseObject(json)});
requestObserver.onNext(res);
}
} catch (RuntimeException e) {
requestObserver.onError(e);
}finally{
//标识已经写完
requestObserver.onCompleted();
}
return GrpcHelper.success();
}
/**
*从MongoDB中进行综合条件查询,服务端流式GRPC方式使用示例
* @param collection 数据表名称
* @param queryCriterias json数组格式的查询条件
* @param orders 排序字段排序方式,如age:ASC##tall:DESC
* @param page 页数,从0开始
* @param pageSize 每页的行数,默认10
* @return
*/
@GetMapping()
public JSONObject query(String collection,String queryCriterias, String orders, Integer page, Integer pageSize) {
JSONOArray qc = JSON.parseArray(queryCriterias);
Request req = GrpcHelper.toRequest("mongodbService","findDatas",New Object[]{collection, qc, orders, page, pageSize});
Iterator<Response> it = grpcClient.getBlockingStub().serverStreamingHandle(req);
while(it.hasNext()){
Response res = it.next();
JSONObject item = GrpcHelper.toJSONObject(res);
System.out.println(item);
}
return GrpcHelper.success();
}
}
启动GrpcServer
在Spring Boot启动类中增加
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import cn.com.yd.commons.grpc.util.GrpcServer;
import cn.com.yd.fis.server.service.CommonService;
import cn.com.yd.fis.server.util.ApplicationContextUtil;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
ApplicationContextUtil.getBean(StartGrpcServer.class).start();
}
}
辅助工具类
Spring Bean 工具类
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(org.springframework.context.ApplicationContext arg0) throws BeansException {
if (null == applicationContext) {
applicationContext = arg0;
}
}
// 通过name获取 Bean.
public static Object getBean(String name) {
return applicationContext.getBean(name);
}
// 通过class获取Bean.
public static <T> T getBean(Class<T> clazz) {
return applicationContext.getBean(clazz);
}
// 通过name,以及Clazz返回指定的Bean
public static <T> T getBean(String name, Class<T> clazz) {
return applicationContext.getBean(name, clazz);
}
}
启动GRPCServer 工具类
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import cn.com.yd.commons.grpc.util.GrpcServer;
import cn.com.yd.fis.server.service.CommonService;
@Component
public class StartGrpcServer {
@Value("${grpc.port}")
private int port;
public void start(){
try {
CommonService cs = ApplicationContextUtil.getBean(CommonService.class);
GrpcServer gs = new GrpcServer(port,cs);
gs.start();
gs.blockUntilShutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
在客户端工程中配置GrpcClient
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import cn.com.yd.commons.grpc.util.GrpcClient;
@Configuration
public class GrpcClientConfig {
/** GRPC 服务器IP地址 */
@Value("${grpc.server}")
private String server;
/** GRPC 监听的端口 */
@Value("${grpc.port}")
private int port;
@Bean
public GrpcClient create(){
return new GrpcClient(server, port);
}
}
参数配置
服务端参数配置
在服务端的application.properties中增加
# grpc端口号,只要没有被占用即可,具体数值根据喜好设定
grpc.port=50051
客户端参数配置
在客户端的application.properties中增加
# GRPC服务器地址
grpc.server=localhost
# GRPC监听的端口号
grpc.port=50051
运行
打包
和普通的SpringBoot工程没有区别,在STS中鼠标点击工程名称弹出右键菜单,然后选择Run as->Maven Install菜单项即可进行打包。第一次打包时需要联网下载一些必须的jar,过程会有点慢,耐心等待。
启动
将打包后的jar和配置文件拷贝到一个新的文件夹中,注意路径中不能包含有中文。
文件目录大致如下图

其中fis-server.bat是启动脚本,直接点击即可运行。
启动脚本
title "Finance information system V1.0"
java -jar fis-server-1.0.0.jar --spring.config.location=application-dev.properties