Dubbo源码分析(十一) 序列化机制

2018-11-13  本文已影响0人  skyguard

下面我们来说一下Dubbo的序列化机制。数据在网络上进行传输,先要把数据序列化成字节流,接收端收到数据后,再反序列化成对象。现在我们就来说一下Dubbo的序列化机制是怎么实现的。
先来看一个类TransportCodec,这个类有两个方法:encode和decode,分别是序列化和反序列化的实现。来看一下是怎么实现的

public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {
    // 获得反序列化的 ObjectOutput 对象
    OutputStream output = new ChannelBufferOutputStream(buffer);
    ObjectOutput objectOutput = getSerialization(channel).serialize(channel.getUrl(), output);
    // 写入 ObjectOutput
    encodeData(channel, objectOutput, message);
    objectOutput.flushBuffer();
    // 释放
    if (objectOutput instanceof Cleanable) {
        ((Cleanable) objectOutput).cleanup();
    }
}

public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    // 反序列化
    InputStream input = new ChannelBufferInputStream(buffer);
    ObjectInput objectInput = getSerialization(channel).deserialize(channel.getUrl(), input);
    Object object = decodeData(channel, objectInput);
    // 释放
    if (objectInput instanceof Cleanable) {
        ((Cleanable) objectInput).cleanup();
    }
    return object;
}

DubboCodec是TransportCodec的子类,这个类也实现了序列化和反序列化。看一下是怎么实现的

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    byte flag = header[2];
    // 获得 Serialization 对象
    byte proto = (byte) (flag & SERIALIZATION_MASK);
    Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
    // 获得请求||响应编号
    // get request id.
    long id = Bytes.bytes2long(header, 4);
    // 解析响应
    if ((flag & FLAG_REQUEST) == 0) {
        // decode response.
        Response res = new Response(id);
        // 若是心跳事件,进行设置
        if ((flag & FLAG_EVENT) != 0) {
            res.setEvent(Response.HEARTBEAT_EVENT);
        }
        // 设置状态
        // get status.
        byte status = header[3];
        res.setStatus(status);
        // 正常响应状态
        if (status == Response.OK) {
            try {
                Object data;
                // 解码心跳事件
                if (res.isHeartbeat()) {
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                // 解码其它事件
                } else if (res.isEvent()) {
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                // 解码普通响应
                } else {
                    DecodeableRpcResult result;
                    // 在通信框架(例如,Netty)的 IO 线程,解码
                    if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto);
                        result.decode();
                    // 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
                    } else {
                        result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto);
                    }
                    data = result;
                }
                // 设置结果
                res.setResult(data);
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode response failed: " + t.getMessage(), t);
                }
                res.setStatus(Response.CLIENT_ERROR);
                res.setErrorMessage(StringUtils.toString(t));
            }
        // 异常响应状态
        } else {
            res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
        }
        return res;
    // 解析请求
    } else {
        // decode request.
        Request req = new Request(id);
        req.setVersion("2.0.0");
        // 是否需要响应
        req.setTwoWay((flag & FLAG_TWOWAY) != 0);
        // 若是心跳事件,进行设置
        if ((flag & FLAG_EVENT) != 0) {
            req.setEvent(Request.HEARTBEAT_EVENT);
        }
        try {
            Object data;
            // 解码心跳事件
            if (req.isHeartbeat()) {
                data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
            // 解码其它事件
            } else if (req.isEvent()) {
                data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
            // 解码普通请求
            } else {
                // 在通信框架(例如,Netty)的 IO 线程,解码
                DecodeableRpcInvocation inv;
                if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                    inv = new DecodeableRpcInvocation(channel, req, is, proto);
                    inv.decode();
                // 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
                } else {
                    inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                }
                data = inv;
            }
            req.setData(data);
        } catch (Throwable t) {
            if (log.isWarnEnabled()) {
                log.warn("Decode request failed: " + t.getMessage(), t);
            }
            // bad request
            req.setBroken(true);
            req.setData(t);
        }
        return req;
    }
}

再到deserialize方法

private ObjectInput deserialize(Serialization serialization, URL url, InputStream is) throws IOException {
    return serialization.deserialize(url, is);
}

下面我们来看一个接口Serialization,这个接口是序列化器的接口

 /**
 * create serializer
 *
 * 创建 ObjectOutput 对象,序列化输出到 OutputStream
 *
 * @param url URL
 * @param output 输出流
 * @return serializer
 * @throws IOException 当发生 IO 异常时
 */
@Adaptive
ObjectOutput serialize(URL url, OutputStream output) throws IOException;

/**
 * create deserializer
 *
 * 创建 ObjectInput 对象,从 InputStream 反序列化
 *
 * @param url URL
 * @param input 输入流
 * @return deserializer
 * @throws IOException 当发生 IO 异常时
 */
@Adaptive
ObjectInput deserialize(URL url, InputStream input) throws IOException;

JavaSerialization实现了Serialization,是用jdk序列化的方式实现的

public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
    return new JavaObjectOutput(out);
}

public ObjectInput deserialize(URL url, InputStream is) throws IOException {
    return new JavaObjectInput(is);
}

看下JavaObjectInput的实现

public Object readObject() throws IOException, ClassNotFoundException {
    byte b = getObjectInputStream().readByte();
    if (b == 0)
        return null;

    return getObjectInputStream().readObject();
}

再看下JavaObjectOutput的实现

public void writeObject(Object obj) throws IOException {
    if (obj == null) { // 空
        getObjectOutputStream().writeByte(0); // 空
    } else {
        getObjectOutputStream().writeByte(1); // 非空
        getObjectOutputStream().writeObject(obj); // 对象
    }
}

下面我们再介绍一种序列化的实现方式,就是kryo,这是一个高性能的序列化框架,我们来看一下Dubbo是怎么实现的。先来看一个类KryoSerialization

public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
    return new KryoObjectOutput(out);
}

public ObjectInput deserialize(URL url, InputStream is) throws IOException {
    return new KryoObjectInput(is);
}

再看一下KryoObjectInput这个类

public Object readObject() throws IOException, ClassNotFoundException {
    // TODO optimization
    try {
        return kryo.readClassAndObject(input);
    } catch (KryoException e) {
        throw new IOException(e);
    }
}

再看一下KryoObjectOutput这个类

public void writeObject(Object v) throws IOException {
    // TODO carries class info every time.
    kryo.writeClassAndObject(output, v);
}

kryo对象是通过KryoUtils获取到的,看一下KryoUtils这个类

public static Kryo get() {
    return kryoFactory.getKryo();
}

public static void release(Kryo kryo) {
    kryoFactory.returnKryo(kryo);
}

public static void register(Class<?> clazz) {
    kryoFactory.registerClass(clazz);
}

public static void setRegistrationRequired(boolean registrationRequired) {
    kryoFactory.setRegistrationRequired(registrationRequired);
}

这样就实现了kryo的序列化机制。
Dubbo的序列化机制就介绍到这里了。

上一篇下一篇

猜你喜欢

热点阅读