8.gRPC四种数据传输

2019-02-28  本文已影响0人  未知的证明

1.proto文件的编写(gRPC基于proto3语法)

四种方式的数据传输:

syntax = "proto3";

package com.liyuanfeng.proto;

option java_package = "com.liyuanfeng.proto";
option java_outer_classname = "StudentProto";
option java_multiple_files = true;


service StudentService {
    rpc getRealNameByUsername (MyRequest) returns (MyResponse) {
    }
    rpc GetStudentByAge (StudentRequest) returns (stream StudentResponse) {
    }
    rpc GetStudentsWrapperByAges (stream StudentRequest) returns (StudentResponseList) {
    }
    rpc BiTalk(stream StreamRequest) returns (StreamResponse){}
}


message MyRequest {
    string username = 1;
}

message MyResponse {
    string realname = 2;
}

message StudentRequest {
    int32 age = 1;
}
message StudentResponse {
    string name = 1;
    int32 age = 2;
    string city = 3;
}

message StudentResponseList {
    repeated StudentResponse studentResponse = 1;
}


message StreamRequest{
    string request_info = 1;
}
message StreamResponse{
    string response_info = 1;
}

2.配置gradle文件,使其stub和server代码生成到合适的位置

apply plugin: 'java'
apply plugin: 'com.google.protobuf'
group 'com.liyuanfeng'
version '1.0'
sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    maven { url 'https://maven.aliyun.com/repository/central' }
    maven { url 'https://maven.aliyun.com/repository/jcenter' }
    maven {//配置Maven仓库的地址
        url "http://repo.springsource.org/libs-milestone-local"
    }
}
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    // https://mvnrepository.com/artifact/io.netty/netty-all
    compile group: 'io.netty', name: 'netty-all', version: '4.1.6.Final'
    // https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java
    compile group: 'com.google.protobuf', name: 'protobuf-java', version: '3.3.1'
    // https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java-util
    compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.3.1'
    // https://mvnrepository.com/artifact/org.apache.thrift/libthrift
    compile group: 'org.apache.thrift', name: 'libthrift', version: '0.12.0'

    compile 'io.grpc:grpc-netty-shaded:1.18.0'
    compile 'io.grpc:grpc-protobuf:1.18.0'
    compile 'io.grpc:grpc-stub:1.18.0'

}


buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.5'
    }
}

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.5.1-1"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.18.0'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {
                setOutputSubDir  "java"
            }
        }
    }

    generateProtoTasks.generatedFilesBaseDir = "src"
}


tasks.withType(JavaCompile){
    options.encoding = "UTF-8"
}

sourceSets{
    main{
        proto{
            srcDir 'src/main/proto'
            srcDir 'src/main'
        }
    }
}

这里需要注意:proto文件的存放位置是有特殊要求,具体存放位置截图如下

proto文件存放位置

3.生成相应的proto的java代码,具体命令行是:

D:\Java\netty_lecture>gradle clean generateProto
命令行截图
生成代码截图

4.Server端的编写

package com.liyuanfeng.grpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class GrpcServer {

    private Server server;

    private void start() throws IOException {
        this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl()).build().start();
        System.out.println("server started!");
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {

            System.out.println("关闭JVM");
            GrpcServer.this.stop();
        }));
        System.out.println("执行到这里");
    }

    private void stop() {
        if (null != server) {
            this.server.shutdown();
        }
    }

    private void awaitTermination() throws InterruptedException {
        if (null != server) {
            this.server.awaitTermination();
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {

        GrpcServer grpcServer = new GrpcServer();
        grpcServer.start();
        grpcServer.awaitTermination();

    }
}

5.Service部分的代码编写

package com.liyuanfeng.grpc;

import com.liyuanfeng.proto.*;
import io.grpc.stub.StreamObserver;

import java.util.UUID;

public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
    @Override
    public void getRealNameByUsername(MyRequest request, StreamObserver<MyResponse> responseObserver) {
        System.out.println("接收到客户端信息:" + request.getUsername());
        responseObserver.onNext(MyResponse.newBuilder().setRealname("李远锋").build());
        responseObserver.onCompleted();
    }

    @Override
    public void getStudentByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
        System.out.println("接收到了客户端信息:" + request.getAge());

        responseObserver.onNext(StudentResponse.newBuilder().setName("周杰伦").setAge(35).setCity("中国").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("林俊杰").setAge(35).setCity("台湾").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("李连杰").setAge(35).setCity("香港").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("林志颖").setAge(35).setCity("中国").build());
        responseObserver.onCompleted();

    }

    @Override
    public StreamObserver<StudentRequest> getStudentsWrapperByAges(StreamObserver<StudentResponseList> responseObserver) {
        return new StreamObserver<StudentRequest>() {
            @Override
            public void onNext(StudentRequest value) {
                System.out.println("onNext" + value.getAge());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                StudentResponse studentResponse1 = StudentResponse.newBuilder().setName("zhangsan").setAge(20).setCity("Beijing").build();
                StudentResponse studentResponse2 = StudentResponse.newBuilder().setName("lisi").setAge(20).setCity("Beijing").build();
                StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse1).addStudentResponse(studentResponse2).build();
                responseObserver.onNext(studentResponseList);
                responseObserver.onCompleted();
            }
        };
    }

    @Override
    public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
        return new StreamObserver<StreamRequest>() {
            @Override
            public void onNext(StreamRequest value) {
                System.out.println(value.getRequestInfo());

                responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted!");
                responseObserver.onCompleted();
            }
        };
    }
}

6.客户端代码的编写

package com.liyuanfeng.grpc;

import com.liyuanfeng.proto.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.time.LocalDateTime;
import java.util.Iterator;

public class GrpcClient {
    public static void main(String[] args) throws InterruptedException {

        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8899)
                .usePlaintext().build();//usePlaintext没有加密
        StudentServiceGrpc.StudentServiceBlockingStub blockingStub = StudentServiceGrpc.newBlockingStub(managedChannel);


        StudentServiceGrpc.StudentServiceStub studentServiceStub = StudentServiceGrpc.newStub(managedChannel);

        MyResponse myResponse = blockingStub.getRealNameByUsername(MyRequest.newBuilder().setUsername("秦子豪").build());
        System.out.println(myResponse.getRealname());

        System.out.println("--------------------------------------");

        Iterator<StudentResponse> studentByAge = blockingStub.getStudentByAge(StudentRequest.newBuilder().setAge(20).build());

        while (studentByAge.hasNext()) {
            StudentResponse next = studentByAge.next();
            System.out.println(next.getName() + "," + next.getCity() + "," + next.getAge());
        }


        System.out.println("------------------------------------------------------------------------------");

        StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
            @Override
            public void onNext(StudentResponseList value) {

                value.getStudentResponseList().forEach(studentResponse -> {
                    System.out.println(studentResponse.getName());
                    System.out.println(studentResponse.getAge());
                    System.out.println(studentResponse.getCity());
                    System.out.println("********************");
                });

            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted!");
            }
        };


        StreamObserver<StudentRequest> studentsWrapperByAges = studentServiceStub.getStudentsWrapperByAges(studentResponseListStreamObserver);
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(10).build());
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(20).build());
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(30).build());
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(40).build());
        studentsWrapperByAges.onCompleted();




        StreamObserver<StreamRequest> streamRequestStreamObserver = studentServiceStub.biTalk(new StreamObserver<StreamResponse>() {
            @Override
            public void onNext(StreamResponse value) {
                System.out.println(value.getResponseInfo());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("lalala");
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted!");
            }
        });

        for (int i = 0; i < 10; i++) {            streamRequestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
        }
        Thread.sleep(100000);
    }
}
上一篇 下一篇

猜你喜欢

热点阅读