Java 杂谈手写RPC框架

手写RPC框架(3)-引入Hessian序列化工具

2019-07-14  本文已影响12人  jwfy

本人微信公众号(jwfy)欢迎关注

手写RPC框架
1、手写一个RPC框架,看看100个线程同时调用效果如何
2、手写RPC框架(2)-引入zookeeper做服务治理

本次进行第三个版本的迭代,支持了自定义序列化工具,在代码中默认实现了Java内置的序列化方式以及Hessian序列化方式,并对比其两者的效果,以及泛型参数传递的小优化点、空指针的处理等。最后聊一下InputStream.read 读取数据的问题以及整个手写框架的进度情况

image

这张图在手写RPC框架(2)-引入zookeeper做服务治理已经贴出了过,而我们这次的重点也就是在这里的1-4个点上。

在使用类似于Hessian序列化工具时,需要先引入该jar包

<!--hessian-->
<dependency>
    <groupId>com.caucho</groupId>
    <artifactId>hessian</artifactId>
    <version>4.0.38</version>
</dependency>

RPC 实践 V3版本

image

圈住的代码就是本次的重点序列化和反序列化

MessageProtocol 消息协议

/**
 * 请求、应答 解析和反解析,包含了序列化以及反序列化操作
 *
 * @author jwfy
 */
public interface MessageProtocol {

    /**
     * 服务端解析从网络传输的数据,转变成request
     * @param inputStream
     * @return
     */
    RpcRequest serviceToRequest(InputStream inputStream);

    /**
     * 服务端把计算结果包装好,通过outputStream 返回给客户端
     * @param response
     * @param outputStream
     * @param <T>
     */
     <T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream);

    /**
     * 客户端把请求拼接好,通过outputStream发送到服务端
     * @param request
     * @param outputStream
     */
     void clientToRequest(RpcRequest request, OutputStream outputStream);

    /**
     * 客户端接收到服务端响应的结果,转变成response
     * @param inputStream
     */
    <T> RpcResponse<T>  clientGetResponse(InputStream inputStream);
}

主要是修改了serviceToRequest以及clientGetResponse接口,从参数挪到返回值,使其更加容易理解

MessageProtocol 消息协议实现类

public class DefaultMessageProtocol implements MessageProtocol {

    private SerializeProtocol serializeProtocol;

    public DefaultMessageProtocol() {
        // 默认是采用了Hessian协议
        this.serializeProtocol = new HessianSerialize();
    }

    public void setSerializeProtocol(SerializeProtocol serializeProtocol) {
        // 可通过set方法替换序列化协议
        this.serializeProtocol = serializeProtocol;
    }

    @Override
    public RpcRequest serviceToRequest(InputStream inputStream) {
        try {
            // 2、bytes -> request 反序列化
            byte[] bytes = readBytes(inputStream);
            // System.out.println("[2]服务端反序列化出obj:[" + new String(bytes) + "], length:" + bytes.length);
            System.out.println("[2]服务端反序列化出obj length:" + bytes.length);
            RpcRequest request = serializeProtocol.deserialize(RpcRequest.class, bytes);
            return request;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public <T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream) {
        try {
            // 3、把response 序列化成bytes 传给客户端
            byte[] bytes = serializeProtocol.serialize(RpcResponse.class, response);
            // System.out.println("[3]服务端序列化出bytes:[" + new String(bytes) + "], length:" + bytes.length);
            System.out.println("[3]服务端序列化出bytes length:" + bytes.length);
            outputStream.write(bytes);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void clientToRequest(RpcRequest request, OutputStream outputStream) {
        try {
            // 1、先把这个request -> bytes 序列化掉
            byte[] bytes = serializeProtocol.serialize(RpcRequest.class, request);
            // System.out.println("[1]客户端序列化出bytes:[" + new String(bytes) + "], length:" + bytes.length);
            System.out.println("[1]客户端序列化出bytes length:" + bytes.length);
            outputStream.write(bytes);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public <T> RpcResponse<T>  clientGetResponse(InputStream inputStream) {
        try {
            // 4、bytes 反序列化成response
            byte[] bytes = readBytes(inputStream);
            // System.out.println("[4]客户端反序列化出bytes:[" + new String(bytes) + "], length:" + bytes.length);
            System.out.println("[4]客户端反序列化出bytes length:" + bytes.length);
            RpcResponse response = serializeProtocol.deserialize(RpcResponse.class, bytes);
            return response;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private byte[] readBytes(InputStream inputStream) throws IOException {
        if (inputStream == null) {
            throw new RuntimeException("input为空");
        }
        // return fun1(inputStream);
        return fun2(inputStream);
        // return fun3(inputStream);
    }

    private byte[] fun1(InputStream inputStream) throws IOException {
        // 有个前提是数据最大是1024,并没有迭代读取数据
        byte[] bytes = new byte[1024];
        int count = inputStream.read(bytes, 0, 1024);
        return Arrays.copyOf(bytes, count);
    }

    private byte[] fun2(InputStream inputStream) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        int bufesize = 1024;
        while (true) {
            byte[] data = new byte[bufesize];
            int count = inputStream.read(data,0,bufesize);
            byteArrayOutputStream.write(data, 0, count);
            if (count < bufesize) {
                break;
            }
        }
        return byteArrayOutputStream.toByteArray();
    }

    /**
     * 有问题的fun3,调用之后会阻塞在read,可通过jstack查看相关信息
     * @param inputStream
     * @return
     * @throws IOException
     */
    private byte[] fun3(InputStream inputStream) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        int bufesize = 1024;

        byte[] buff = new byte[bufesize];
        int rc = 0;
        while ((rc = inputStream.read(buff, 0, bufesize)) > 0) {
            byteArrayOutputStream.write(buff, 0, rc);
            buff = new byte[bufesize];
        }
        byte[] bytes = byteArrayOutputStream.toByteArray();
        return bytes;
    }

}

四个方法也是依次针对上述四个步骤的流程操作,其中包含了对byte数据(内容和长度)的输出观察,在此就不细说了。重点关注下fun1、fun2、fun3 三个方法的细节操作,其中fun1和fun2都测试是没有问题的唯独第三个使用的inputStream.read会时常出现阻塞的情况,如下图观察线程运行情况。

image

目前还没有深入的了解InputStream.read方法的原理细节,本人也需要仔细学习和了解,后面将会作为一个单独的学习笔记进行补充说明

序列化&反序列化接口

public interface SerializeProtocol {
    /**
     * 序列化
     */
    <T> byte[] serialize(Class<T> clazz, T t);

    /**
     * 反序列化
     */
     <T> T deserialize(Class<T> clazz, byte[] bytes);
}

这个就没什么可说的,一个非常简单的序列化和反序列化接口,序列化返回byte数据,反序列化根据泛型返回对应实体对象

Hessian序列化&反序列化实现

public class HessianSerialize implements SerializeProtocol {

    @Override
    public <T> byte[] serialize(Class<T> clazz, T t) {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        Hessian2Output hessian2Output = new Hessian2Output(outputStream);
        try {
            hessian2Output.writeObject(t);
            // NOTICE 验证过,一定需要在flush刷新之前关闭hessian2Output,否则无法有效获取字节数据
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage());
        } finally {
            try {
                hessian2Output.close();
            } catch (IOException e){
                e.printStackTrace();
            }
        }
        try {
            outputStream.flush();
            byte[] bytes = outputStream.toByteArray();
            return bytes;
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage());
        } finally {
            try {
                outputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public <T> T deserialize(Class<T> clazz, byte[] bytes) {
        ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
        Hessian2Input hessian2Input = new Hessian2Input(inputStream);
        try {
            T t = (T) hessian2Input.readObject();
            return t;
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage());
        } finally {
            try {
                hessian2Input.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                inputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

关于Hessian的使用细节,自行查询官网http://hessian.caucho.com/
这里并未对抛出的异常做很好的处理,只是简单的抛出、日志输出而已
关于Java内置的序列化方式就不贴出来了,代码较为简单,只需要按照类似的套路自行编写即可。

实践

样例按照手写RPC框架(2)-引入zookeeper做服务治理所说的保持一致。

Hessian序列化工具

image
image

Java内置序列化工具

image
image

对比这两者结果很明显

单就这一个简单试验而言,Hessian的效率就比Java内置的高出100%还要多,可见在RPC框架中一个优秀的序列化框架多么重要,毕竟数据的大小即影响网络传输的速率,也影响序列号和反序列化的执行性能

总结思考

本次更新并没有更新太多内容,只是在v2版本上替换了之前的Java内置的序列化工具,而且本文也只是介绍了Hessian序列化工具,其实Google的ProtoBuffer也是一个不错的选择,甚至FastJson也可作为序列化工具使用。

文中还遗留了一个问题:InputStream.read 何时会被阻塞?,暂时未对其细节原理有更多的认识,接下来会出一篇附加的学习笔记好好学习总结一下其原因。

整个RPC学习笔记不出意外的话,应该还剩下3篇,一篇引入Netty,替换当前的BIO模型;一篇引入日志,并完善整个的代码的一些异常点;最后一篇结合Spring,使其成为一个项目中真正可用的Simple-RPC框架。此外关于SPI、快速失败、监控、等由于本人能力&精力问题看时间更新。

本学习笔记主要目的是学习和了解RPC框架,并及时进行总结和反思

如有想需要demo代码的可关注本人微信公众号,给我发私信。

本人微信公众号(搜索jwfy)欢迎关注

微信公众号
上一篇 下一篇

猜你喜欢

热点阅读