dubbo源码6-transporter-netty4

2019-01-05  本文已影响0人  modou1618

transporter层支持netty,mina,http等协议。本文介绍基于netty4的实现。

一 NettyClient

1.1 类关系图

NettyClient类关系图.png

1.2 初始化

1.2.1 NettyClient初始化

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        super(url, wrapChannelHandler(url, handler));
    }

    protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
        url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
        url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
        return ChannelHandlers.wrap(handler, url);
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }

1.2.2 AbstractClient初始化

protected void doOpen() throws Throwable {
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        bootstrap = new Bootstrap();
        bootstrap.group(nioEventLoopGroup)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                .channel(NioSocketChannel.class);

        if (getConnectTimeout() < 3000) {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        } else {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
        }

        bootstrap.handler(new ChannelInitializer() {

            @Override
            protected void initChannel(Channel ch) throws Exception {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        .addLast("decoder", adapter.getDecoder())
                        .addLast("encoder", adapter.getEncoder())
                        .addLast("handler", nettyClientHandler);
            }
        });
    }

1.2.3 AbstractEndpoint初始化

1.2.4 AbstractPeer初始化

1.3 收包handler介绍

1.3.1 MultiMessageHandler

    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            for (Object obj : list) {
                handler.received(channel, obj);
            }
        } else {
            handler.received(channel, message);
        }
    }

1.3.2 HeartbeatHandler

心跳报文处理,见exchanger层介绍

1.3.3 异步线程池收包处理

配置值 说明
all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,⼼跳等。
direct 所有消息都不派发到线程池,全部在 IO 线程上直接执⾏。
message 只有请求响应消息派发到线程池。其它连接断开事件,⼼跳等消息,直接在 IO 线程上执⾏。
execution 只有请求消息派发到线程池。响应和其它连接断开事件,⼼跳等消息,直接在 IO 线程上执⾏。
connection 一个单线程线程池,处理连接断开事件,connect.queue.capacity配置指定可缓存的连接请求最大数量。其它消息派发到另一个线程池。
配置值 说明
fixed 固定⼤⼩线程池,new ThreadPoolExecutor(200, 200, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url))
cached 缓存线程池。new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60*1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url))
limited 最大线程数量线程池。new ThreadPoolExecutor(0, 200, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url))
public enum ChannelState {
        CONNECTED,
        DISCONNECTED,
        SENT,[图片上传中...(image.png-b4f324-1546695374507-0)]

        RECEIVED,
        CAUGHT
    }

1.3.4 收包handler完整层次图

收包handler.png

二 NettyServer

2.1 类关系图

NettyServer类.png

2.2 初始化

2.2.1 NettyServer初始化

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

2.2.2 AbstractServer初始化

protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

2.3 收包处理

收包处理函数与NettyClient流程相同。

上一篇下一篇

猜你喜欢

热点阅读