技术干货

【第14篇】Netty核心四大组件关系与构成分析

2019-05-27  本文已影响0人  爱学习的蹭蹭

1 FastThreadLocalThread

Summary:
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
简介:
Netty是一个NIO客户端服务器框架,支持快速、简单地开发协议服务器和客户端等网络应用程序。它大大简化和流线网络编程,如TCP和UDP套接字服务器。

FastThreadLocalThread

2、ThreadLocal

  public void set(T value) {
         Thread t = Thread.currentThread();
         ThreadLocalMap map = getMap(t);
         if (map != null)
             map.set(this, value);
         else
             createMap(t, value);
     }

2、 ChannelPipeline

3、DefaultChannelPipeline

public class DefaultChannelPipeline implements ChannelPipeline {
//...略
    /**
    * 当AbstractChannel注册的时候被设置为true,设置之后以后就不会被改变。
    */
    private boolean registered;
 //...略
   public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);//判断是否已经添加过。
            newCtx = newContext(group, filterName(name, handler), handler);//重要方法,创建有一个context
            addLast0(newCtx);//最后添加到Pipeline的handlers集合里边的对象,准确的说不是handler,而是context。
          //如果注册的是false,这意味着通道还没有在eventloop上注册。
          //在本例中,我们将上下文添加到管道中,并添加一个将调用的任务
          // ChannelHandler.handlerAdded(…)一旦注册了通道。
          //如果registered是false,意味着channel没有在事件循环组中注册过,
            //这种情况下我们将context添加到pipeline 当中,并且添加一个回调任务,当channel 被注册的时候,回调任务会执行
            //ChannelHandler.handlerAdded(...)方法。
            if (!registered) {
                newCtx.setAddPending();//将当前context挂起。
                callHandlerCallbackLater(newCtx, true);//建议一个线程任务稍后执行。
                return this;
            }
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        //我们自己重写的handler的handlerAdded方法会被执行。
        callHandlerAdded0(newCtx);
        return this;
    }

    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {//不是共享的,并且被添加过直接抛出异常
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;//设置added 标志位为true
        }
    }
    //创建一个context,this是DefaultChannelPipeline,group为null,
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
    //如果name为空,生成一个名字
    private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            return generateName(handler);
        }
        //判断名字是否重复
        checkDuplicateName(name);
        return name;
    }

//生成名字的私有方法,nameCaches是一个FastThreadLocal(ThreadLocal原生ThreadLocal的封装,区别在于ThreadLocal是使用hash散列的 
方式,而FastThreadLocal使用的是数组,用的索引定位,比ThreadLocal性能上稍微快了一些,可以看到netty对性能要求非常高。)
    private String generateName(ChannelHandler handler) {
        Map<Class<?>, String> cache = nameCaches.get();
        Class<?> handlerType = handler.getClass();
        String name = cache.get(handlerType);
        if (name == null) {
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }

       //用户不太可能在同一类型中放置多个处理程序,但是一定要避免
      //任何名称冲突。注意,我们不缓存这里生成的名称。
        if (context0(name) != null) {
            String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }
//添加一个context到pipline操作(pipline默认只有tail和head2个节点),其实就是双向 链表的添加节点的操作。
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    //建立一个稍后执行的任务。
    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;
        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
            pendingHandlerCallbackHead = task;
        } else {
            // Find the tail of the linked-list.
            //将新建的任务添加到链表里边
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }

    //context被添加到pipline之后调用callHandlerAdded0,我们自己写的handler的handlerAdded方法会被执行,这也是handlerAdded
    //为什么会被首先执行的原因。
    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
       ...略 
            ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();
           ...略      

4、DefaultChannelHandlerContext

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
    private final ChannelHandler handler;//持有Handler的引用,从这里可以看出一个context对应一个Handler。
    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

    //获取持有的Handler
    public ChannelHandler handler() {
        return handler;
    }
//入栈处理器是ChannelInboundHandler的实现
    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }
//出栈处理器是ChannelOutboundHandler的实现
    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
}

5、DefaultChannelHandlerContext的super构造器结构:

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;//赋值pipeline( private final DefaultChannelPipeline pipeline;)
        //DefaultChannelPipeline 持有Channel的引用
        this.executor = executor;
        this.inbound = inbound;//入栈处理器
        this.outbound = outbound;//出栈处理器
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

ChannelInitializer

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
....略
    /**
     *当initChannel方法被调用完毕返回的时候,当前ChannelInitializer对象会被从pipline里边删除掉。
     */
    protected abstract void initChannel(C ch) throws Exception;


    /**
     * {@inheritDoc} If override this method ensure you call super!
     * 如果重写,确保调用父类的方法。
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            //对于当前的DefaultChannelPipeline实现,这应该总是正确的。
            //在handlerAdded(…)中调用initChannel(…)的好处是没有订单
            //如果一个通道初始化器将添加另一个通道初始化器,会让人感到惊讶。这是所有的处理程序
            //将按预期顺序添加。
            initChannel(ctx);
        }
    }
    //
    @SuppressWarnings("unchecked")
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
            //初始化。
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
            掉完initChannel之后从pipline删除当前对象
                remove(ctx);
            }
            return true;
        }
        return false;
    }
//删除逻辑,首先拿到ChannelPipeline ,然后remove掉
    private void remove(ChannelHandlerContext ctx) {
        try {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        } finally {
        //同时删除对应的context
            initMap.remove(ctx);
        }
    }

public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

        pipeline.addLast(new TextWebSocketFrameHandler());
    }
}
上一篇 下一篇

猜你喜欢

热点阅读