06-dubbo客户端接收请求过程
前一篇文章分析了客户端发送请求的过程05-dubbo客户端调用流程分析现在再来分析客户端接收响应的过程。下面是接受响应的调用链
org.apache.dubbo.remoting.transport.AbstractPeer#received
com.alibaba.dubbo.remoting.transport.MultiMessageHandler#received
com.alibaba.dubbo.remoting.exchange.support.header.HeartbeatHandler#received
com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run
com.alibaba.dubbo.remoting.transport.DecodeHandler#received
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleResponse
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#received
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#doReceived
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#get(int)
在netty中客户端接收响应会调用ChannelHandle接口里面的received方法我们的调用链就从AbstractPeer#received
开始netty中一般发送请求和接收请求都有响应的编码解码器,我们下一篇再来分析这个编码解码器。中间进过一些列的调用最终是执行到AllChannelHandle类里面
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//省略部分代码
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
可以看到客户端接收请求是通过线程池来执行的响应cexecutor.execute(Runnable)
进入Runnable里面可以看到线程立刻执行run方法
// com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run
public void run() {
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
} else {
//忽略部分代码
}
}
run()方法主要执行received方法
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
首先解码执行decode(((Response) message).getResult());
将响应对象进行解码,然后继续调用handler.received(channel, message);
@Override
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
//忽略部分代码
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
//忽略部分代码
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
前面解码器将对象解析成Response对象,所以进入handleResponse(channel, (Response) message);
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
有看到亲切的DefaultFuture。在发送请求的时候就实例了DefaultFuture然后将DefaultFuture放到了Map里面,这里调用了DefaultFuture的静态方法准备写入响应对象。
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
//省略部分代码
}
} finally {
CHANNELS.remove(response.getId());
}
}
通过FUTURES.remove(response.getId());拿到前面发送请求时放入的DefaultFuture,然后调用doReceived将response对象赋值到DefaultFuture对象里面。注意此事使用的DefaultFuture和发送请求时使用的DefaultFuture是同一个。这样在发送请求的时候我们最后不是调用了get()方法进行阻塞等待吗?发送时候阻塞等待,在获得响应之后将response写到同一个DefaultFuture里面这样get()就可以拿到Response对象这样整个就调用成功了
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
以上是get阻塞等待的逻辑,比较简单就是调用await方法进行等待,设置了超时时间