
Dubbo Protocol 服务发布

2019-10-18  本文已影响0人  晴天哥_王志


 这篇文章的目的是尝试讲解Dubbo Protocol使用Netty Server作为服务端的服务发布过程,为了便于说明清楚,在网上找了一张Dubbo服务发布的流程图,这里我们关注Protocol到Server的部分逻辑,其他的分布其他文章。



public class DubboProtocol extends AbstractProtocol {

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // 生成exporter对象并保存至exporterMap
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        // 开始开启服务端监控

        return exporter;

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        // 通过createServer创建Server对象
                        serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override

    private ExchangeServer createServer(URL url) {

        // 省略非核心代码
        ExchangeServer server;
        try {
            // 服务发布的入口,requestHandler包含invokerMap的对象
            // server 是HeaderExchangeServer对象
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);

        return server;

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        // 省略代码



public class Exchangers {
    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);

    // String DEFAULT_EXCHANGER = "header";
    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);

    // header=org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger
    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, 
               new DecodeHandler(new HeaderExchangeHandler(handler))), true);

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, 
               new DecodeHandler(new HeaderExchangeHandler(handler))));


public class Transporters {

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        return getTransporter().bind(url, handler);

    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
public class Transporter$Adaptive implements Transporter {
    public Server bind(URL uRL, ChannelHandler channelHandler)
        throws RemotingException {

        if (uRL == null) {
            throw new IllegalArgumentException("url == null");

        URL uRL2 = uRL;
        String string = uRL2.getParameter("server",
                uRL2.getParameter("transporter", "netty"));

        Transporter transporter = (Transporter) ExtensionLoader

        return transporter.bind(uRL, channelHandler);

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty3";

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);


public class NettyServer extends AbstractServer implements Server {

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    // 保存连接的Channel信息
    private Map<String, Channel> channels; // <ip:port, channel>

    private ServerBootstrap bootstrap;

    private org.jboss.netty.channel.Channel channel;

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));

    protected void doOpen() throws Throwable {
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, 
                       getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
        // bind
        channel = bootstrap.bind(getBindAddress());
public class ChannelHandlers {

    // 关注核心代码逻辑
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));

public class Dispatcher$Adaptive implements Dispatcher {
    public ChannelHandler dispatch(ChannelHandler channelHandler, URL uRL) {
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");

        URL uRL2 = uRL;
        String string = uRL2.getParameter("dispatcher",
                    uRL2.getParameter("channel.handler", "all")));

        Dispatcher dispatcher = (Dispatcher) ExtensionLoader.getExtensionLoader(Dispatcher.class)

        return dispatcher.dispatch(channelHandler, uRL);

public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
public abstract class AbstractServer extends AbstractEndpoint implements Server {

    protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
    private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
    ExecutorService executor;
    private InetSocketAddress localAddress;
    private InetSocketAddress bindAddress;
    // 最大连接数
    private int accepts;
    private int idleTimeout;

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = ANYHOST_VALUE;
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        // 最大连接数从URL当中获取
        this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
        try {
            // 开启服务监听
        } catch (Throwable t) {
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));


上一篇 下一篇

