dubbo源码分析(二) 从dubbo协议分析Protocol层
2017-09-23 本文已影响176人
hahaee
上一篇我们简单描述了dubbo服务暴露-服务引用的流程
这一篇我们从dubbo协议来具体分析一下Protocol层
export()过程
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
//将exporter存到map里
exporterMap.put(key, exporter);
...
openServer(url);
return exporter;
}
/**
* 开启服务
*
* @param url
*/
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
//map中不存在创建server
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
}
}
/**
* 创建服务
*
* @param url
* @return
*/
private ExchangeServer createServer(URL url) {
...
ExchangeServer server;
try {//启动服务监听 传入了requestHandler
//当收到客户端调用时会调用requestHandler.received()方法
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
...
return server;
}
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
//获取到对应的invoker
Invoker<?> invoker = getInvoker(channel, inv);
...
//根据 Invocation 调用信息,调用真正服务实现
return invoker.invoke(inv);
}
...
}
...
};
/**
* 根据请求参数获取到对应的服务端invoker
*
* @param channel
* @param inv
* @return
* @throws RemotingException
*/
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(Constants.PATH_KEY);
...
//生成serviceKey
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv
.getAttachments().get(Constants.GROUP_KEY));
//从map中找到exporter
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
...
return exporter.getInvoker();
}
概括一下:
export()就是根据服务url生成Exporter并存在map中,然后暴露服务,设置回调.
当客户端调用请求时进入回调,根据请求url找到存在map中的Exporter,
最后用Exporter中的Invoker调用真正的服务
注意:服务端客户端都存在Invoker对象,但两者有所区别.
客户端Invoker用于沟通服务端实现远程调用 如:DubboInvoker
服务端Invoker用于调用真正服务实现 一般都继承AbstractProxyInvoker 见下述代码段
com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory#getInvoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}
refer()过程
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
refer()就是根据url生成Invoker
调用过程来看DubboInvoker.doInvoke()方法
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
...
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//是否有返回值
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {//无返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {//异步
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {//同步
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (Exception e) {
...
}
}
/**
* Invocation. (API, Prototype, NonThreadSafe)
* 封装远程调用信息(方法名 参数)
*/
public interface Invocation {
/**
*方法名
*/
String getMethodName();
/**
*方法参数类型
*/
Class<?>[] getParameterTypes();
/**
*方法参数
*/
Object[] getArguments();
/**
*冗余参数
*/
Map<String, String> getAttachments();
String getAttachment(String key);
String getAttachment(String key, String defaultValue);
Invoker<?> getInvoker();
}
可以看到doInvoker方法会通过client对象执行远程调用
到此,大家对Protocol层三大对象应该有了一个简单的了解.