dubbo - server的bind过程分析
2019-05-21 本文已影响37人
晴天哥_王志
开篇
这篇文章主要是为了讲清楚dubbo server端在bind过程中整个调用链,之前在dubbo服务发布的流程中已经讲解过在dubbo的服务发布过程中底层最终是通过bind()方法来实现监听的。
这篇文章会对bind的过程进行细化讲解,包括核心的Exchangers、HeaderExchanger、HeaderExchangeServer等类。
调用链
服务调用链路图说明:
- 重点关注类Exchangers、HeaderExchanger、HeaderExchangeServer、Transporters、NettyTransporter、NettyServer。
DubboProtocol
public class DubboProtocol extends AbstractProtocol {
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 省略相关代码
openServer(url);
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
// 省略相关代码
ExchangeServer server;
try {
// 核心的代码绑定
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
}
说明:
- DubboProtocol类的createServer()方法调用Exchangers.bind()进入Exchangers类。
Exchangers & HeaderExchanger
public class Exchangers {
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 绑定过程的入口
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
// header=com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
}
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
说明:
- Exchangers的bind()方法内部调用getExchanger(url).bind()。
- getExchanger()方法内部返回HeaderExchanger对象,涉及SPI机制。
- getExchanger(url).bind()执行HeaderExchanger的bind()方法。
- HeaderExchanger的bind内部返回HeaderExchangeServer对象。
- HeaderExchangeServer类的构造函数参数是Transporters.bind()返回的server值。
- Transporters.bind()方法返回NettyServer对象。
HeaderExchangeServer
public class HeaderExchangeServer implements ExchangeServer {
private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
new NamedThreadFactory(
"dubbo-remoting-server-heartbeat",
true));
private final Server server;
// heartbeat timer
private ScheduledFuture<?> heatbeatTimer;
// heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
private int heartbeatTimeout;
private AtomicBoolean closed = new AtomicBoolean(false);
public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
}
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
startHeatbeatTimer();
}
public void reset(URL url) {
server.reset(url);
try {
if (url.hasParameter(Constants.HEARTBEAT_KEY)
|| url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {
int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
if (t < h * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
if (h != heartbeat || t != heartbeatTimeout) {
heartbeat = h;
heartbeatTimeout = t;
startHeatbeatTimer();
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
@Deprecated
public void reset(com.alibaba.dubbo.common.Parameters parameters) {
reset(getUrl().addParameters(parameters.getParameters()));
}
public void send(Object message) throws RemotingException {
if (closed.get()) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
}
server.send(message);
}
public void send(Object message, boolean sent) throws RemotingException {
if (closed.get()) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
}
server.send(message, sent);
}
}
说明:
- HeaderExchangeServer类包含变量Server server,server为NettyServer对象。
- HeaderExchangeServer对相当于对Server类进行了一层分装,所有对Server层的操作都通过HeaderExchangeServer进行封装并对外提供服务。
Transporters
public class Transporters {
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
// netty4=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter
// netty=com.alibaba.dubbo.remoting.transport.netty.NettyTransporter
// mina=com.alibaba.dubbo.remoting.transport.mina.MinaTransporter
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
}
说明:
- Transporters.bind()方法通过getTransporter()返回NettyTransporter对象,涉及SPI机制。
- getTransporter().bind()方法执行到NettyTransporter.bind()方法,返回NettyServer对象。
NettyTransporter
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
public class NettyServer extends AbstractServer implements Server {
private Map<String, Channel> channels; // <ip:port, channel>
private ServerBootstrap bootstrap;
private org.jboss.netty.channel.Channel channel;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
@Override
protected void doOpen() throws Throwable {
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
@Override
protected void doClose() throws Throwable {
try {
if (channel != null) {
// unbind.
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
if (channels != null && channels.size() > 0) {
for (com.alibaba.dubbo.remoting.Channel channel : channels) {
try {
channel.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (bootstrap != null) {
// release external resource.
bootstrap.releaseExternalResources();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (channels != null) {
channels.clear();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
public Collection<Channel> getChannels() {
Collection<Channel> chs = new HashSet<Channel>();
for (Channel channel : this.channels.values()) {
if (channel.isConnected()) {
chs.add(channel);
} else {
channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
}
}
return chs;
}
public Channel getChannel(InetSocketAddress remoteAddress) {
return channels.get(NetUtils.toAddressString(remoteAddress));
}
public boolean isBound() {
return channel.isBound();
}
}
说明:
- NettyTransporter的bind()返回NettyServer对象。
- NettyTransporter的connect()返回NettyClient对象。
- NettyServer内部绑定的流程都是Netty相关的服务。
GrizzlyTransporter
public class GrizzlyTransporter implements Transporter {
public static final String NAME = "grizzly";
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new GrizzlyServer(url, listener);
}
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new GrizzlyClient(url, listener);
}
}
public class GrizzlyServer extends AbstractServer {
private static final Logger logger = LoggerFactory.getLogger(GrizzlyServer.class);
private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
private TCPNIOTransport transport;
public GrizzlyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
}
@Override
protected void doOpen() throws Throwable {
FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
filterChainBuilder.add(new TransportFilter());
filterChainBuilder.add(new GrizzlyCodecAdapter(getCodec(), getUrl(), this));
filterChainBuilder.add(new GrizzlyHandler(getUrl(), this));
TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
ThreadPoolConfig config = builder.getWorkerThreadPoolConfig();
config.setPoolName(SERVER_THREAD_POOL_NAME).setQueueLimit(-1);
String threadpool = getUrl().getParameter(Constants.THREADPOOL_KEY, Constants.DEFAULT_THREADPOOL);
if (Constants.DEFAULT_THREADPOOL.equals(threadpool)) {
int threads = getUrl().getPositiveParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
config.setCorePoolSize(threads).setMaxPoolSize(threads)
.setKeepAliveTime(0L, TimeUnit.SECONDS);
} else if ("cached".equals(threadpool)) {
int threads = getUrl().getPositiveParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
config.setCorePoolSize(0).setMaxPoolSize(threads)
.setKeepAliveTime(60L, TimeUnit.SECONDS);
} else {
throw new IllegalArgumentException("Unsupported threadpool type " + threadpool);
}
builder.setKeepAlive(true).setReuseAddress(false)
.setIOStrategy(SameThreadIOStrategy.getInstance());
transport = builder.build();
transport.setProcessor(filterChainBuilder.build());
transport.bind(getBindAddress());
transport.start();
}
@Override
protected void doClose() throws Throwable {
try {
transport.stop();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
public boolean isBound() {
return !transport.isStopped();
}
public Collection<Channel> getChannels() {
return channels.values();
}
public Channel getChannel(InetSocketAddress remoteAddress) {
return channels.get(NetUtils.toAddressString(remoteAddress));
}
@Override
public void connected(Channel ch) throws RemotingException {
channels.put(NetUtils.toAddressString(ch.getRemoteAddress()), ch);
super.connected(ch);
}
@Override
public void disconnected(Channel ch) throws RemotingException {
channels.remove(NetUtils.toAddressString(ch.getRemoteAddress()));
super.disconnected(ch);
}
}
说明:
- GrizzlyTransporter和NettyTransporter对象的实现逻辑是一致的。