消息堆积问题排查

2018-10-19  本文已影响304人  zxRay

背景

我们有一个类似于消息中间件的产品,会将数据push给下游业务系统消费。业务系统可以通过我们的sdk获取消息,然后消费

问题

有一次收到报警,一个业务方消息堆积了几十万条,捞取一条消息观察,发现消息的发送跟收到消费完成ack有几分钟的时间间隔,这消费RT也太长了

分析

初步分析后,怀疑可能是以下原因造成:
1. 服务器load问题,导致发送消息线程未及时发出,造成堆积
2. 网络原因没有发出去
3. 客户端原因,消费速度太慢,严重阻塞消息的发送

排查过程

1. 服务端排查

先用top查看,cpu、load都比较低,用dstat查看网络流量也比较正常,初步感觉服务器没啥问题。
使用netstat查看,引起注意的是Send-Q有40多k,Send-Q的意思是
Send-Q:在发送缓冲区实际未发送或者没有收到对端Ack的字节数

说明:应用已经把数据写到Socket缓冲区,因网络原因导致数据没有发送。

2. 网络原因排查

使用tcpdump抓取业务方ip的数据包,win被不时的变成0
win:发送窗口,表示对端能够接受的数据量

说明:大致可以表明业务方消费能力不足,导致消息IO线程阻塞,没有调用recv接受消息

3. 客户端排查

咨询业务方后,业务方表示获取消息后就直接提交到线程池中消费,应该不会阻塞消息IO线程。
登录业务方服务器,使用netstat发现Recv-Q很大,基本可以确定业务应用消息IO线程没有及时调用recv造成数据堆积。
使用jstack抓取线程栈,果然消息IO线程全部处于BLOCK状态,并且全部阻塞与一个LinkedBlockingQueue. 到此,基本确定因为消费线程处理能力不足,任务提交到阻塞队列,但阻塞队列满后阻塞了消息IO线程,查看业务代码,基本确定是这个原因。

总结

通过上面的排查,该问题的原因:
业务消费能力不足->线程池忙->阻塞队列满->消息IO线程阻塞->没有调用recv->数据堆积到Recv-Q->发送窗口被置为0->数据堆积到服务端Send-Q->消息堆积

解决方法: 业务方优化消费能力

...

这是一个典型的生产与消费速度不匹配的问题,这类问题非常常见:

  1. 线程池:消费能力不足,任务会被放到任务队列中,如果队列无界,那么可能会爆,如果队列有界,那么任务入队列失败,会选择一种拒绝策略拒绝该任务,或者直接阻塞生产线程
  2. 连接池:很多db类产品基本还是使用连接池获取连接,当连接池满时,此时获取连接线程会阻塞一会,如果maxWaitTime还不能获取,那就直接抛异常

在Netty发送数据的时候,也需要考虑一个发送速度过快问题,以一个简单的echo client demo为例(连接创建时,向服务端写1000000000次数据)

public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channelActive..");
        for(int i = 0; i<1000000000;i++){
            ctx.write(Unpooled.copiedBuffer("test test test test test!", CharsetUtil.UTF_8));
            System.out.println(i);
        }
        ctx.flush();
    }

jstat -gcutil pid 3000抓下gc情况

S0     S1     E      O      M     CCS    YGC     YGCT    FGC    FGCT     GCT   
  0.00   0.00  24.00   0.00  17.39  19.90      0    0.000     0    0.000    0.000
  0.00  99.96  47.50   7.20  95.82  87.88      1    0.231     0    0.000    0.231
 99.93   0.00  87.49  22.08  95.92  87.92      2    0.530     0    0.000    0.530
 99.96   0.00   5.88  52.14  95.92  87.92      4    1.150     0    0.000    1.150
  0.00  99.93  35.52  67.18  95.92  87.92      5    1.479     0    0.000    1.479
 99.93   0.00   0.00  82.22  95.92  87.92      6    1.833     1    0.000    1.833
  0.00   0.00 100.00  87.62  95.92  87.92      6    1.833     2    2.285    4.118
  0.00   0.00 100.00  94.14  95.92  87.92      6    1.833     3    3.862    5.695
  0.00   0.00  74.11  99.98  95.92  87.92      6    1.833     4    7.324    9.156
  0.00   0.00 100.00  99.98  95.92  87.92      6    1.833     6    9.103   10.935
 

很快系统就不停在fullgc,查看内存中主要的类为:

jmap -histo 11184

 num     #instances         #bytes  class name
----------------------------------------------
   1:       2915777      256588376  io.netty.buffer.PooledUnsafeDirectByteBuf
   2:       2915777      186609728  io.netty.channel.ChannelOutboundBuffer$Entry
   3:       5831554      186609728  io.netty.util.Recycler$DefaultHandle
   4:       2915777      116631080  io.netty.channel.DefaultChannelPromise

在使用非阻塞IO的发送数据时,应用都需要配一个发送缓冲区,因为你不知道啥时候IO可写. netty在write数据的时候,会将数据包装成一个Entry,放入到自己的发送缓冲区ChannelOutboundBuffer(每个channel一个), 这个ChannelOutboundBuffer实际上就是一个链表,它是无界的. 代码如下:

--io.netty.channel.ChannelOutboundBuffer#addMessage
public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }

即write会一直的创建新节点,直到OOM.

那Netty中高水位WRITE_BUFFER_WATER_MARK为什么没启作用 ? 因为,这个配置只控制IO是否可写并且触发fireChannelWritabilityChanged事件有用户自行处理。它并不会控制channel.write行为. 代码在上面的addMessage的最后一句incrementPendingOutboundBytes

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }

private void setUnwritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue | 1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue == 0 && newValue != 0) {
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

所以,在用Netty发送数据的时候,要注意发送速度,避免OOM,同时也避免发送不及时,引起不必要的FullGC

上一篇下一篇

猜你喜欢

热点阅读