springboot

Java进阶-Netty-进阶

2022-04-08  本文已影响0人  GIT提交不上

一、Reactor线程

  源码基于4.1.6.Final版本。

1.1 Reactor线程启动

  NioEventLoop的run方法是reactor线程的主体,在第一次添加任务的时候被启动。

@Override
public void execute(Runnable task) {
    ...
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        ...
    }
    ...
}

1.2 Reactor线程执行

@Override
protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }
            processSelectedKeys();
            runAllTasks(...);
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        ...
    }
image.png
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
      selector.wakeup();
}

定时任务截止事时间快到了,中断本次轮询
轮询过程中发现有任务加入,中断本次轮询
阻塞式select操作

  netty 会在每次进行 selector.select(timeoutMillis) 之前记录一下开始时间currentTimeNanos,在select之后记录一下结束时间,判断select操作是否至少持续了timeoutMillis秒,如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug,当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector。

processSelectedKeys();

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized(selectedKeys.flip());
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

取出IO事件以及对应的netty channel类
处理该channel
判断是否该再来次轮询

对于boss NioEventLoop来说,轮询到的是基本上就是连接事件,后续的事情就通过他的pipeline将连接扔给一个worker NioEventLoop处理
对于worker NioEventLoop来说,轮询到的基本上都是io读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理
SelectedSelectionKeySet

  netty使用数组替换掉jdk原生的HashSet来保证IO事件的高效处理,每个SelectionKey上绑定了netty类AbstractChannel对象作为attachment,在处理每个SelectionKey的时候,就可以找到AbstractChannel,然后通过pipeline的方式将处理串行到ChannelHandler,回调到用户方法。

用户自定义普通任务
非当前reactor线程调用channel的各种方法
用户自定义定时任务:1)若干时间后执行一次 2)每隔一段时间执行一次 3)每次执行结束,隔一定时间再执行一次

  taskQueue在NioEventLoop中默认是mpsc队列,mpsc队列,即多生产者单消费者队列,netty使用mpsc,方便的将外部线程的task聚集,在reactor线程内部用单线程来串行执行。

  reactor线程task调度:

当前reactor线程调用当前eventLoop执行任务,直接执行,否则,添加到任务队列稍后执行
netty内部的任务分为普通任务和定时任务,分别落地到MpscQueue和PriorityQueue
netty每次执行任务循环之前,会将已经到期的定时任务从PriorityQueue转移到MpscQueue
netty每隔64个任务检查一下是否该退出任务循环

二、服务端启动

b.bind(8888).sync();

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
} 

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    //...
    final ChannelFuture regFuture = initAndRegister();
    //...
    final Channel channel = regFuture.channel();
    //...
    doBind0(regFuture, channel, localAddress, promise);
    //...
    return promise;
}
image.png

用户调用方法 Bootstrap.bind(port) 第一步就是通过反射的方式new一个NioServerSocketChannel对象,并且在new的过程中创建了一系列的核心组件。

设置option和attr
设置新接入channel的option和attr
加入新连接处理器

1)设置启动类参数,最重要的就是设置channel
创建server对应的channel,创建各大组件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等
2)初始化server对应的channel,设置一些attr,option,以及设置子channel的attr,option,给server的channel添加新channel接入器,并出发addHandler,register等事件
3)调用到jdk底层做端口绑定,并触发active事件,active触发的时候,真正做服务端口绑定

三、新连接接入

所有的channel底层都会有一个与unsafe绑定,每种类型的channel实际的操作都由unsafe来实现

image.png

  流水线的开始就是HeadContxt,流水线的结束就是TailConext,HeadContxt中调用Unsafe做具体的操作,TailConext中用于向用户抛出pipeline中未处理异常以及对未处理消息的警告.

//NioEventLoop.java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
}

四、pipeline

4.1 pipeline 初始化

  pipeline是channel其中的一员,在AbstractChannel中被创建。

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

  pipeline中保存了channel的引用,默认情况下,一条pipeline会有两个节点,head和tail。

image.png

4.2 pipeline添加节点

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new Spliter())
         p.addLast(new Decoder());
         p.addLast(new BusinessHandler())
         p.addLast(new Encoder());
     }
});
image.png
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1.检查是否有重复handler
        checkMultiplicity(handler);
        // 2.创建节点
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3.添加节点
        addLast0(newCtx);
    }
   
    // 4.回调用户方法
    callHandlerAdded0(handler);
    
    return this;
}

netty中用两个字段来表示这个channelHandlerContext属于inBound还是outBound,或者两者都是。

image.png
private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev; // 1
    newCtx.next = tail; // 2
    prev.next = newCtx; // 3
    tail.prev = newCtx; // 4
}
image.png

4.3 pipeline删除节点

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    
    return this;
}
image.png

4.4 pipeline其他

五、writeAndFlush

image.png

六、拆包器

netty中的拆包内部会有一个累加器,每次读取到数据都会不断累加,然后尝试对累加到的数据进行拆包,拆成一个完整的业务数据包,这个基类叫做 ByteToMessageDecoder

netty将具体如何拆包抽象出一个decode方法,不同的拆包器实现不同的decode方法,就能实现不同协议的拆包

上一篇 下一篇

猜你喜欢

热点阅读