netty

Netty高级功能(一):流控和流量整形

2019-10-03  本文已影响0人  雪飘千里

这一章节,我们通过例子学习netty的一些高级特性。

1、netty客户端流控

在有些场景下,由于各种原因,会导致客户端消息发送积压,进而导致OOM。

这三种情况下,如果客户端没有流控保护,这时候就很容易发生内存泄露。

原因:

在我们调用channel的write和writeAndFlush时
io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object, io.netty.channel.ChannelPromise),如果发送方为业务线程,则将发送操作封装成WriteTask(继承Runnable),放到Netty的NioEventLoop中执行,当NioEventLoop无法完成如此多的消息的发送的时候,发送任务队列积压,进而导致内存泄漏。

解决方案:

为了防止在高并发场景下,由于服务端处理慢导致的客户端消息积压,客户端需要做并发保护,防止自身发生消息积压。Netty提供了一个高低水位机制,可以实现客户端精准的流控

io.netty.channel.ChannelConfig#setWriteBufferHighWaterMark 高水位
io.netty.channel.ChannelConfig#setWriteBufferLowWaterMark 低水位

当发送队列待发送的字节数组达到高水位时,对应的channel就变为不可写状态,由于高水位并不影响业务线程调用write方法把消息加入到待发送队列,因此在消息发送时要先对channel的状态进行判断(ctx.channel().isWritable)。

这里涉及到的知识点是netty的消息发送机制。

netty的消息发送机制

业务调用write方法后,经过ChannelPipeline职责链处理,消息被投递到发送缓冲区待发送,调用flush之后会执行真正的发送操作,底层通过调用Java NIO的SocketChannel进行非阻塞write操作,将消息发送到网络上,

image.png

当用户线程(业务线程)发起write操作时,Netty会进行判断,如果发现不少NioEventLoop(I/O线程),则将发送消息封装成WriteTask,放入NioEventLoop的任务队列,由NioEventLoop线程执行,代码如下

io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)

    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }

        write(msg, true, promise);

        return promise;
    }

 private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }
 private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
        try {
//这里的executor执行的是netty自己实现的SingleThreadEventExecutor#execute方法,
            executor.execute(runnable);
        } catch (Throwable cause) {
            try {
                promise.setFailure(cause);
            } finally {
                if (msg != null) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    }

io.netty.util.concurrent.SingleThreadEventExecutor#execute

@Override
  public void execute(Runnable task) {
      if (task == null) {
          throw new NullPointerException("task");
      }

      boolean inEventLoop = inEventLoop();
      if (inEventLoop) {
          addTask(task);
      } else {
          startThread();
          addTask(task);
          if (isShutdown() && removeTask(task)) {
              reject();
          }
      }

      if (!addTaskWakesUp && wakesUpForTask(task)) {
          wakeup(inEventLoop);
      }
  }

Netty的NioEventLoop线程内部维护了一个Queue<Runnable> taskQuue,除了处理网络IO读写操作,同时还负责执行网络读写相关的Task,NioEventLoop遍历taskQueue,执行消息发送任务,代码调用入路径如下,具体的就不贴了,太长了
io.netty.channel.nio.NioEventLoop#run
-----> io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)
----->io.netty.util.concurrent.AbstractEventExecutor#safeExecute
这里safeExecute执行的task,就是前面write写入时包装的AbstractWriteTask,AbstractWriteTask的run中
io.netty.channel.AbstractChannelHandlerContext.AbstractWriteTask#run

经过一些系统处理操作,最终会调用io.netty.channel.ChannelOutboundBuffer#addMessage方法,将发送消息加入发送队列(链表)。

我们上面写的流程从NioSocketChannel到ChnnelOutbountBuffer,实际上在这个过程中,为了对发送速度和消息积压数进行控制,Netty还提供了高低水位机制,当消息队列中积压的待发送消息总字节数到达高水位时,修改Channel的状态为不可写,并发送通知事件;当消息发送完成后,对低水位进行判断,如果当前积压的待发送字节数低于低水位时,则修改channel状态为可写,并发送通知事件,具体代码见下
io.netty.channel.ChannelOutboundBuffer#incrementPendingOutboundBytes(long);
io.netty.channel.ChannelOutboundBuffer#decrementPendingOutboundBytes(long);

image.png

总结:在实际项目中,根据业务QPS规划,客户端处理性能、网络带宽、链路数、消息平均码流大小等综合因数,设置Netty高水位(setWriteBufferHighWaterMark)值,可以防止在发送队列处于高水位时继续发送消息,导致积压更严重,甚至发生内存泄漏。在系统中合理利用Netty的高低水位机制做消息发送的流控,既可以保护自身,同时又能减轻服务端的压力,可以提升系统的可靠性。

那么代码中,怎么使用呢?

image.png

同时在业务发送消息时,添加socketChannel.isWritable()是否可以发送判断

    public static boolean sendMessage(String clientId,Object message){
        if(StringUtils.isEmpty(clientId)){
            log.error(" clientId 为空,找不到客户端!");
            return false;
        }
        SocketChannel socketChannel = FactoryMap.getChannelByDevNo(clientId);
        if(socketChannel !=null ){
            if(socketChannel.isWritable()){
                socketChannel.writeAndFlush(message);
                //更新数据库中消息状态
                return true;
            }else {
                log.error("channel不可写");
                return false;
            }
        }else {
            log.error(" 客户端未连接服务器!发送消息失败!{}",clientId);
        }
        return false;
    }

2、netty服务端 流量整形

前面讲的流控(高低水位控制),主要是根据发送消息队列积压的大小来控制客户端channel的写状态,然后用户手动根据channel.isWritable()来控制消息是否发送,用户可以手动控制消息不能及时发送后的处理方案(比如,过期、超时)。通常用在客户端比较多。

流量整形呢,是一种主动调整流量输出速度的措施,一个典型的应用是基于下游网络节点的TPS指标控制本地流量的输出。大多数商用系统都由多个网元或者部件组成,例如参与短信互动,会涉及手机,基站,短信中心,短信网关,SP/CP等网元,不同网元或者部件的处理性能不同,为了防止突发的业务洪峰的 导致下游网元被冲垮,有时候需要消停提供流量整形功能。

image.png

Netty流量整形的主要作用:
1、防止由于上下游网元性能不均衡导致下游网元被冲垮,业务流程中断;
2、防止由于通信模块接收消息过快,后端业务线程处理不及时,导致出现“撑死”问题。
例如,之前有博客的读者咨询过我一个问题,他们设备向服务端不间断的上报数据,有1G左右,而服务端处理不过来这么多数据,这种情况下,其实就可以使用流量整形来控制接收消息速度。

原理和使用

原理:拦截channelRead和write方法,计算当前需要发送的消息大小,对读取和发送阈值进行判断,如果达到了阈值,则暂停读取和发送消息,待下一个周期继续处理,以实现在某个周期内对消息读写速度进行控制。

使用:将流量整形ChannelHandler添加到业务解码器之前,

image.png
注意事项:
上一篇 下一篇

猜你喜欢

热点阅读