微服务架构和实践Dubbo剖析程序员

dubbo剖析:六 网络通信之 -- 请求响应模型

2018-05-07  本文已影响442人  益文的圈

注:文章中使用的dubbo源码版本为2.5.4

零、文章目录

一、Dubbo的三种RPC调用方式

1.1 异步&无返回值

a)服务引用配置如下:

    <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
        <dubbo:method name="sayHello" async="true" return="false"/>
    </dubbo:reference>

b)服务调用方式如下:

String hello = demoService.sayHello("world" + i);

1.2 异步&有返回值

a)服务引用配置如下:

    <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
        <dubbo:method name="sayHello" async="true"/>
    </dubbo:reference>

b)服务调用方式如下:

demoService.sayHello("world" + i);
Future<String> temp = RpcContext.getContext().getFuture();
String hello = temp.get();

1.3 异步变同步

a)服务引用配置如下:

    <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
        <dubbo:method name="sayHello" async="false"/>
    </dubbo:reference>

b)服务调用方式如下:

String hello = demoService.sayHello("world" + i);

二、关键类介绍

2.1 RPC请求消息封装(Request)

dubbo的交换层定义了RPC请求的封装类Request,它包含了一个RPC请求所具备的关键信息。

public class Request {

    //请求ID生成器,AtomicLong.inc实现
    private static final AtomicLong INVOKE_ID = new AtomicLong(0);

    //RPC调用的请求ID,在单个Client端内全局唯一
    private final long mId;
    
    //RPC请求响应消息协议版本
    private String mVersion;

    //是否为双向请求响应
    private boolean mTwoWay = true;
    
    //实际RPC调用的请求数据,对应了Invocation类,调用参数都封装在这里了
    private Object mData;

    //Request初始化时,生成请求ID
    public Request() {
        mId = newId();
    }

    //请求ID生成方法
    private static long newId() {
        // getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID
        return INVOKE_ID.getAndIncrement();
    }
}

2.2 RPC响应消息封装(Response)

dubbo的交换层定义了RPC响应的封装类Response,它包含了一个RPC响应所具备的关键信息。

public class Response {

    /**
     * ok.
     */
    public static final byte OK = 20;

    /**
     * clien side timeout.
     */
    public static final byte CLIENT_TIMEOUT = 30;

    /**
     * server side timeout.
     */
    public static final byte SERVER_TIMEOUT = 31;

     // ...省略一些状态码...
    
    //RPC调用的请求ID,默认为0,从Request中获取
    private long mId = 0;

    //RPC请求响应消息协议版本
    private String mVersion;

    //响应状态码,默认OK,出现异常时重新设置
    private byte mStatus = OK;

    //响应错误信息
    private String mErrorMsg;
    
    //实际RPC调用的响应数据,对应实际实现类的方法执行结果
    private Object mResult;
}

2.3 RPC调用Future接口(ResponseFuture)

dubbo的交换层定义了RPC调用的响应Future接口ResponseFuture,它封装了请求响应模式,例如提供了将异步网络通信转换成同步RPC调用的关键方法Object get(int timeoutInMillis)

public interface ResponseFuture {

    /**
     * 获取RPC远程执行结果,异步IO转同步RPC的关键方法
     */
    Object get() throws RemotingException;

    /**
     * 获取RPC远程执行结果,异步IO转同步RPC的关键方法
     */
    Object get(int timeoutInMillis) throws RemotingException;

    /**
     * set callback. 响应回调模式
     */
    void setCallback(ResponseCallback callback);

    /**
     * RPC调用是否完成
     */
    boolean isDone();

}

三、DefaultFuture实现

DefaultFutureResponseFuture接口的实现类,具体实现了接口定义的方法。

3.1 请求响应信息的承载

public class DefaultFuture implements ResponseFuture {
    
    //<请求ID,消息通道> 的映射关系
    private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
    
    //<请求ID,未完成状态的RPC请求> 的映射关系
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();

    //RPC调用的请求ID,构造器中从Request获取
    private final long id;

    //消息通道,构造器传入
    private final Channel channel;

    //RPC请求消息,构造器传入
    private final Request request;

    //RPC响应消息
    private volatile Response response;

    //RPC响应回调器
    private volatile ResponseCallback callback;

    //构造器
    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // put into waiting map.
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
}

3.2 RPC执行超时检测

public class DefaultFuture implements ResponseFuture {

    //RPC超时轮询线程,不断轮询超时状态的FUTURES,主动移除并返回超时结果
    static {
        Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
        th.setDaemon(true);
        th.start();
    }

    //RPC执行超时时间,构造器传入,默认值1s
    private final int timeout;
    
    //DefaultFuture构建时间
    private final long start = System.currentTimeMillis();

}

3.3 异步网络通信转同步RPC调用

public class DefaultFuture implements ResponseFuture {

    //响应消息处理互斥锁,get()、doReceived()、setCallback()方法中使用
    private final Lock lock = new ReentrantLock();
    //请求响应模式Condition,通过get()中的await和doReceived()中的signal完成IO异步转RPC同步
    private final Condition done = lock.newCondition();

    //RPC响应消息接收方法
    public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
    //将响应结果放入response,通知在done上等待的业务线程,并执行invokeCallback方法触发所有绑定的Callbask执行
    private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }

    //RPC执行结果同步获取方法,RPC的同步请求模式就依赖此方法完成,依赖done.await同步等待RPC执行结果
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

}

3.4 异步网络通信转同步RPC调用的两个关键点

a)发起RPC调用请求的业务线程,是如何同步阻塞等待直到RPC响应返回的?

b)对于全双工的网络通信,在多线程并发请求响应的情况下,如果找到RPC响应Response对应的RPC请求Request?

四、调用入口流程

dubbo剖析:五 请求发送与接收 中讲到,服务引用方调用dubbo的代理对象发起RPC请求时,最终会执行到DubboInvoker.doInvoke()方法:

    protected Result doInvoke(final Invocation invocation) throws Throwable {
        
        //入参构建及获取ExchangeClient
        RpcInvocation inv = (RpcInvocation) invocation;
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }

        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            //case1. 异步,无返回值
            if (isOneway) { 
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } 
            //case2. 异步,有返回值
            else if (isAsync) {   
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } 
            //case3. 异步转同步(默认的通信方式)
            else {   
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

五、服务引用方请求响应模型总结

服务引用方请求响应模型总结
上一篇 下一篇

猜你喜欢

热点阅读