Dubbo源码分析----处理请求
2018-07-22 本文已影响87人
_六道木
Dubbo默认是使用Netty进行通信的,那么Netty会配置一个Handler,来处理一些事件,所以这个Handler是核心,主要找一下Dubbo初始化Netty的时候设置的Handler
回顾一下网络通信相关,在DubboProtocol中,会调用createServer方法返回一个Server对象,这是一个切入点
private ExchangeServer createServer(URL url) {
//....
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
//....
return server;
}
bind方法内部对requestHandler进行了层层的装饰,但是这里也不是我们要找的地方,在NettyServer的doOpen才是我们要找的(具体流程可以看下网络通信的那篇文章)
protected void doOpen() throws Throwable {
//....
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// ....
}
可以看到,这个NettyHandler是处理请求的入口,看下NettyHandler的结构图,其中最下面的handler是requestHandler
image.png
当Netty接收到请求的时候,会调用NettyServer的messageReceived方法,请求依次经过图中红色框框的Handler,不同的Handler有不同的功能,只需要看下几个关键的Handler
AllChannelHandler
这个Handler在讲Dubbo的Dispatcher的ThreadPool的时候讲过,就是将请求交给线程池处理
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
那么看下ChannelEventRunnable的run方法
public void run() {
switch (state) {
case CONNECTED:
handler.connected(channel);
break;
case DISCONNECTED:
handler.disconnected(channel);
break;
case SENT:
handler.sent(channel,message);
break;
case RECEIVED:
handler.received(channel, message);
break;
case CAUGHT:
handler.caught(channel, exception);
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
在线程中将请求交由下一个handler处理,从上面的图中可以知道AllChannelHandler下两个handler是DecodeHandler和HeaderExchangeHandler,这两个Handler做了一些特殊处理,然后将请求转发给requestHandler
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);// 通过参数信息获取本地Invoker
//....
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);//调用
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
//....
};
getInvoker方法如下
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(Constants.PATH_KEY);//xxx.xxx.xxx.xxxService
//如果是客户端的回调服务.
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
if (isStubServiceInvoke){
port = channel.getRemoteAddress().getPort();
}
//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if(isCallBackServiceInvoke){
path = inv.getAttachments().get(Constants.PATH_KEY)+"."+inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));// port+serviceName+version+group区分的key
// 暴露服务的时候会将Exporter放到map中
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null)
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
return exporter.getInvoker();
}
getInvoker也是比较简单的,这时候得到的Invoker相当于是本地服务的一个代理对象,invoke之后就会调用到真实的方法