vert.x & nettyjava

序列化

2019-08-03  本文已影响0人  suxin1932

Apache Dubbo 提供了较为完善的序列化实现方案, 可参考其实现方法

Apache Dubbo--Serialization.png

1.序列化概述

#序列化 (编码, write)
把对象转换为字节序列的过程称为对象的序列化;常用于:
>> 把对象的字节序列永久地写到硬盘上,通常存放在一个文件中;
>> 在网络上传输(写)对象的字节序列。

#反序列化 (解码, read)
把字节序列恢复为对象的过程称为对象的反序列化。常用于:
>> 把永久地保存到硬盘上文件中的字节序列读取到内存中成一个对象;
>> 把网络上传送的字节序列读取到内存中成一个对象。

序列化与反序列化普遍应用在网络传输、RMI等场景中。

在很多应用中,需要对某些对象进行序列化,让它们离开内存空间,入住物理硬盘,以便长期保存。
比如最常见的是Web服务器中的Session对象,当有 10万用户并发访问,
就有可能出现10万个Session对象,内存可能吃不消,
于是Web容器就会把一些seesion先序列化到硬盘中,等要用了,再把保存在硬盘中的对象还原到内存中。

当两个进程在进行远程通信时,彼此可以发送各种类型的数据。
无论是何种类型的数据,都会以二进制序列的形式在网络上传送。
>> 发送方需要把这个Java对象转换为字节序列,才能在网络上传送;
>> 接收方则需要把字节序列再恢复为Java对象。

2.常见的序列化方式

2.1 jdk的序列化机制

jdk中除了使用Serializable接口进行序列化以外,还可以使用Externalizable接口来进行序列化。

2.1.1 类图

jdk序列化-反序列化--类图.png

2.1.2 序列化关键字说明

#transient
用来修饰对象属性,表示此属性在默认序列化过程中不会被处理,但是可以采用另外的手段进行处理。

#transient 与 static
静态变量不是对象状态的一部分,因此它不参与序列化。
所以将静态变量声明为transient变量是没有用处的。

#final 与 transient
final变量将直接通过值参与序列化,所以将final变量声明为transient变量不会产生任何影响。

#serialVersionUID
表示序列化版本号,用于对象的版本控制。
当两个类的序列化ID一致时允许反序列化,默认可以采用编译器提供的值1L。
Java序列化过程依赖于正确的serialVersionUID恢复序列化对象的状态,
并在serialVersionUID不匹配时抛出java.io.InvalidClassException 异常。
为提高serialVersionUID的独立性和确定性,建议在一个可序列化类中显式定义serialVersionUID,并赋值。

2.1.3 序列化类中相关方法

ObjectOutputStream#writeObject0.png

相关问题

#采用默认序列化机制,类的静态字段会被序列化吗?
类的静态字段不会被序列化,不过可以采用自定义序列化逻辑对静态变量进行序列化。

#共享对象序列化问题  
当序列化的两个对象都包含另外一个对象的引用时,在反序列化时,另外一个对象只会出现一次。

2.gRPC

build.gradle


group 'com.zy'
version '1.0-SNAPSHOT'

apply plugin: 'com.google.protobuf'
apply plugin: 'java'
sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    // mavenCentral()
    maven {
        url 'http://maven.aliyun.com/nexus/content/groups/public/'
    }
}
// api或compile关键字引用的包对于其他module来说是可见的, api 用来替换 compile
// implementation关键字引用的包对于其他module来说是不可见的。
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.11'
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile 'io.vertx:vertx-core:3.6.3'
    compile 'io.vertx:vertx-web:3.6.3'
    compile 'io.vertx:vertx-web-client:3.6.3'
    compile 'io.vertx:vertx-rx-java2:3.6.3'
    compile 'org.projectlombok:lombok:1.16.20'
    compile group: 'com.google.protobuf', name: 'protobuf-java', version: '3.9.1'
    compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.9.1'
    implementation 'io.grpc:grpc-netty-shaded:1.23.0'
    implementation 'io.grpc:grpc-protobuf:1.23.0'
    implementation 'io.grpc:grpc-stub:1.23.0'
}
// 生成grpc代码 --> 参考 https://github.com/grpc/grpc-java
// For protobuf-based codegen, you can put your proto files in the
// src/main/proto and src/test/proto directories along with an appropriate plugin.
buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.8'
    }
}

protobuf {
    // 看源码: 修改生成的protobuf下类的路径
    generateProtoTasks.generatedFilesBaseDir = "src"
    protoc {
        artifact = "com.google.protobuf:protoc:3.9.0"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.23.0'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {
                // 看源码: 修改生成的grpc包下类的路径
                outputSubDir = 'java'
            }
        }
    }
}

src/main/proto 目录下定义 .proto文件, 如: Stu.proto

// 编写.proto文件参考 --> https://developers.google.cn/protocol-buffers/docs/proto3

// 生成grpc代码 --> 参考 https://github.com/grpc/grpc-java
// For protobuf-based codegen, you can put your proto files in the
// src/main/proto and src/test/proto directories along with an appropriate plugin.

syntax = "proto3";

option java_package = "com.zy.proto";
option java_outer_classname = "Student";
option java_multiple_files = true;

// 这里的出入参 --> 必须是 message, 不能单独用 int32, string 等基本类型
service StudentService {
    // 1.入参是 对象, 出参也是对象
    rpc QueryStuById(SearchRequest) returns (SearchResponse);

    // 2.入参是 对象, 出参是流
    rpc QueryStuByIdStream2Resp(SearchRequest) returns (stream StuResponse);

    // 3.入参是 流, 出参是对象: 异步实现
    rpc QueryStuByIdReqByStream(stream SearchRequest) returns (StuResponseList);

    // 4.入参是 流, 出参也是流: 异步实现
    rpc QueryStuByIdReqAndResp2Stream(stream StuRequestStream) returns (stream StuResponseStream);
}

message SearchRequest {
    string id = 1;
//    int32 page_number = 2;
//    int32 result_per_page = 3;
}

message SearchResponse {
    string name = 1;
}

message StuResponse {
    string id = 1;
    string name = 2;
    string age = 3;
}

message StuResponseList {
    repeated StuResponse stuResponse = 1;
}

message StuRequestStream {
    string req = 1;
}

message StuResponseStream {
    string resp = 1;
}

执行 gradle generateProto 命令, 生成代码

生成代码.png

编写gRPC server 与 client

grpc server

package com.zy.grpc;

import com.zy.proto.*;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;

import java.io.IOException;

/**
 * server 与 client 参考
 * https://www.grpc.io/docs/quickstart/java/ 生成的 HelloWorldServer 与 HelloWorldClient
 */
public class GrpcServer {

    private Server server;

    public static void main(String[] args) throws InterruptedException, IOException {
        GrpcServer grpcServer = new GrpcServer();
        grpcServer.start();
        grpcServer.blockUntilShutdown();
    }

    private void start() throws IOException {
        this.server = ServerBuilder.forPort(9090).addService(new GrpcServer.StudentServiceImpl()).build().start();
        System.out.println("server started");
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.err.println("*** shutting down gRPC server since JVM is shutting down");
            GrpcServer.this.stop();
            System.err.println("*** server shut down");
        }));
    }

    private void stop() {
        if (this.server != null) {
            this.server.shutdown();
        }
    }

    private void blockUntilShutdown() throws InterruptedException {
        if (this.server != null) {
            this.server.awaitTermination();
        }
    }

    static class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
        /**
         * 1.入参是 对象, 出参也是对象
         *
         * @param request
         * @param responseObserver
         */
        @Override
        public void queryStuById(SearchRequest request, StreamObserver<SearchResponse> responseObserver) {
            System.out.println("客户端请求参数id为: " + request.getId());
            // 当发生异常时, 回调函数为:
            // responseObserver.onError(new RuntimeException("there is an error to request by id"));
            // 当成功时, 回调函数为:
            responseObserver.onNext(SearchResponse.newBuilder().setName("tom" + request.getId()).build());
            // 当完成时, 回调函数为:
            responseObserver.onCompleted();
        }

        /**
         * 2.入参是 对象, 出参是流
         *
         * @param request
         * @param responseObserver
         */
        @Override
        public void queryStuByIdStream2Resp(SearchRequest request, StreamObserver<StuResponse> responseObserver) {
            System.out.println("客户端请求参数id为: " + request.getId());
            responseObserver.onNext(StuResponse.newBuilder().setName("stream2Resp --> jerry" + request.getId()).setAge(request.getId()).setId(request.getId()).build());
            responseObserver.onNext(StuResponse.newBuilder().setName("stream2Resp --> john" + request.getId()).setAge(request.getId()).setId(request.getId()).build());
            responseObserver.onNext(StuResponse.newBuilder().setName("stream2Resp --> trump" + request.getId()).setAge(request.getId()).setId(request.getId()).build());
            responseObserver.onCompleted();
        }

        /**
         * 3.入参是 流, 出参是对象: 异步实现
         *
         * @param responseObserver
         * @return
         */
        @Override
        public StreamObserver<SearchRequest> queryStuByIdReqByStream(StreamObserver<StuResponseList> responseObserver) {
            return new StreamObserver<SearchRequest>() {
                @Override
                public void onNext(SearchRequest value) {
                    System.out.println("onNext --> " + value.getId());
                }

                @Override
                public void onError(Throwable t) {
                    System.out.println(t.getMessage());
                }

                @Override
                public void onCompleted() {
                    StuResponse response1 = StuResponse.newBuilder().setId("1").setName("tommy1").setAge("10").build();
                    StuResponse response2 = StuResponse.newBuilder().setId("2").setName("tommy2").setAge("20").build();
                    StuResponseList list = StuResponseList.newBuilder().addStuResponse(response1).addStuResponse(response2).build();
                    responseObserver.onNext(list);
                    responseObserver.onCompleted();
                }
            };
        }

        /**
         * 4.入参是 流, 出参也是流: 异步实现
         *
         * @param responseObserver
         * @return
         */
        @Override
        public StreamObserver<StuRequestStream> queryStuByIdReqAndResp2Stream(StreamObserver<StuResponseStream> responseObserver) {
            return new StreamObserver<StuRequestStream>() {
                @Override
                public void onNext(StuRequestStream value) {
                    System.out.println("receive request: " + value.getReq());
                    responseObserver.onNext(StuResponseStream.newBuilder().setResp("hello" + value.getReq()).build());
                }

                @Override
                public void onError(Throwable t) {
                    System.out.println(t.getMessage());
                }

                @Override
                public void onCompleted() {
                    responseObserver.onCompleted();
                }
            };
        }
    }
}

grpc client

package com.zy.grpc;

import com.zy.proto.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class GrpcClient {

    private final ManagedChannel channel;
    private final StudentServiceGrpc.StudentServiceBlockingStub blockingStub;
    private final StudentServiceGrpc.StudentServiceStub stub;

    public GrpcClient(String host, int port) {
        this(ManagedChannelBuilder.forAddress(host, port).usePlaintext().build());
    }

    GrpcClient(ManagedChannel channel) {
        this.channel = channel;
        this.blockingStub = StudentServiceGrpc.newBlockingStub(channel);
        this.stub = StudentServiceGrpc.newStub(channel);
    }

    /**
     * 1.入参是 对象, 出参也是对象
     * @param id
     */
    private void queryStuById(String id) {
        try {
            SearchResponse response = this.blockingStub.queryStuById(SearchRequest.newBuilder().setId(id).build());
            System.out.println("response --> " + response.getName());
        } catch (Exception e) {
            System.out.println("failed to request queryStuById");
            e.printStackTrace();
        }
    }

    /**
     * 2.入参是 对象, 出参是流
     * @param id
     */
    private void queryStuByIdStream2Resp(String id) {
        try {
            Iterator<StuResponse> resp = this.blockingStub.queryStuByIdStream2Resp(SearchRequest.newBuilder().setId(id).build());
            while (resp.hasNext()) {
                StuResponse next = resp.next();
                System.out.println(next);
            }
        } catch (Exception e) {
            System.out.println("failed to request queryStuByIdStream2Resp");
            e.printStackTrace();
        }
    }

    /**
     * 3.入参是 流, 出参是对象: 异步实现
     * @param list
     */
    private void queryStuByIdReqByStream(List<String> list) {
        if (list == null || list.size() == 0) {
            return;
        }
        StreamObserver<StuResponseList> responseObserver = new StreamObserver<StuResponseList>() {
            @Override
            public void onNext(StuResponseList value) {
                value.getStuResponseList().forEach(System.out::println);
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("completed");
            }
        };

        StreamObserver<SearchRequest> requestObserver = stub.queryStuByIdReqByStream(responseObserver);
        list.forEach(id -> requestObserver.onNext(SearchRequest.newBuilder().setId(id).build()));
        requestObserver.onCompleted();
    }

    /**
     * 4.入参是 流, 出参也是流: 异步实现
     * @param list
     */
    private void queryStuByIdReqAndResp2Stream(List<String> list) {
        if (list == null || list.size() == 0) {
            return;
        }
        StreamObserver<StuResponseStream> responseStreamStreamObserver = new StreamObserver<StuResponseStream>() {
            @Override
            public void onNext(StuResponseStream value) {
                System.out.println(value.getResp());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("completed");
            }
        };
        StreamObserver<StuRequestStream> request = stub.queryStuByIdReqAndResp2Stream(responseStreamStreamObserver);
        list.forEach(req -> request.onNext(StuRequestStream.newBuilder().setReq(req).build()));
        request.onCompleted();
    }

    private void shutdown() throws InterruptedException {
        this.channel.shutdown().awaitTermination(5L, TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws InterruptedException {
        GrpcClient grpcClient = new GrpcClient("127.0.0.1", 9090);
        try {
            System.out.println("-----------------------1.入参是 对象, 出参也是对象----------------------");
            grpcClient.queryStuById("1");
            System.out.println("-----------------------2.入参是 对象, 出参是流----------------------");
            grpcClient.queryStuByIdStream2Resp("2");
            System.out.println("-----------------------3.入参是 流, 出参是对象: 异步实现-----------------------");
            List<String> list = new ArrayList<>();
            list.add("1");
            list.add("2");
            list.add("3");
            grpcClient.queryStuByIdReqByStream(list);
            System.out.println("-----------------------4.入参是 流, 出参也是流: 异步实现-----------------------");
            grpcClient.queryStuByIdReqAndResp2Stream(list);
        } finally {
            grpcClient.shutdown();
        }
    }
}

https://www.cnblogs.com/leesf456/p/5328466.html
https://blog.csdn.net/u013597009/article/details/78538018
https://blog.csdn.net/xpsallwell/article/details/80421882
https://www.jb51.net/article/107434.htm (全面参考)

上一篇下一篇

猜你喜欢

热点阅读