记录一次线上频繁宕机的案例
一、业务描述
为了实时监控业务系统的健康状态,我们需要采集各业务系统的指标数据.所以定义了一系列的上报协议,其格式是以"|"分隔的字符串。例如:
30#module_name|time|dst_ip|dst_server|dst_service|src_ip|src_server|src_service|total_num|success_num|err_num|总等待队列|最大等待队列|总耗时|最大耗时|时耗
为了在监控曲线的查询条件中实时拉取到最新上报的列(比如:IDC信息,模块下的接口名),我们需要将上报协议中的所有可作为查询条件的列去重后存入DB,这样就可以在前端进行展示。出于这样一个需求,我们上线了一个纬度值汇总的服务,其大概流程可以简化为如下两步:
- 从kafka循环取出消息(格式是以|分隔的字符串),然后传给下面的bolt。
- bolt主要是完成消息的解析、过滤、去重操作,然后按规则入库。
二、技术架构
线上由3个节点组成的storm集群来做纬度值的汇总,storm中配置了5个bolt(不了解storm的童鞋可以理解成Task,即一个bolt处理完后自己预订的逻辑后传给下一个bolt,直到全部bolt处理完),使用了redis做数据去重。由于数据量非常大,一天的数据量在20亿左右,redis也是采用的集群模式(分片)。每个storm节点启动了4个worker(单独的进程),堆配置了4G。storm由一个supervisor节点、nimbus节点和worker组成,其中的supervisor和nimbus不负责具体的逻辑,只负责任务的协调和分配,真正做事的是worker节点,向storm集群提交完topology(拓扑)后,storm会给我们fork出一系列worker(可配置),然后触发任务,storm集群之间是通过zk来做协调。

三、现象描述
storm的worker进程平均每5分钟宕机一次,然后被supervisor节点拉起,然后5分钟又宕机。而且没有oom日志,通过查看dmsg,也没发现killer日志。在log文件中也没有任何oom或者jvm crash的蛛丝马迹。瞬间有点慌了,心里默念:"storm是什么鬼?"。
四、问题定位思路
由于对storm不太熟悉,所以刚开始一直怀疑是supervisor节点接受不到worker节点的心跳导致的。查看nimbus和supervisor的日志,都没有太多有用信息,只是打印出worker宕机,又被重新拉起的日志,然后再zk上注册worker的信息。查看zk的log,也没有发现可疑日志。看来问题不是太顺利,没有那么容易解决。所以尝试将worker的堆调到8G,worker从原来的5分钟勉强可以撑到10分钟。通过jstat查看发现old区只增不减,很快就达到100%。

通过jmap查看,发现其中最占内存的都是char[]:

由于我们处理的消息都是以|分隔的字符串,而且每个bolt都做了分隔操作,所以怀疑每个bolt同时做截取操作,但是会占用多份内存。查看jvm手册,配置了
-XX:StringTableSize=65536
这样一个jvm参数,希望能针对同一个字符串做到只耗费一份内存,但是依旧无效。通过jmap发现TupleImpl
也占用内存非常大,也怀疑从storm取出的消息没有手动ack,导致无限重复消费",将ack机制去掉后发现问题依旧存在。看来问题真心没有我们想象中的那么简单,不是top -> top -Hp就可以搞定的(呵呵)。后面还是沉下心来分析gc日志,平均几秒钟一次fullgc,ygc也是不断发生。而且fullgc的时间的有的持续在10几秒钟,每次fullgc后,old区基本回收不了,只能眼睁睁的看着宕机,可见数据量还是非常大的。
由于storm oom了也没有产生hprof日志,所以只能借助于人工
jmap -dump:format=b,file=xx.hprof
来手动dump。可能是由于worker内存不够的缘故,每次dump要么直接把worker弄宕机,要么就抛出一个attach EOF
的错误,瞬间感觉知识不够用。所以只能在worker被拉起的时候立马dump出来,虽然不能代表最终问题,但是应该不会相差太远(毕竟数据量很大,业务比较单一)。通过MAT查看最大对象,如下图:
然后通过list objects->list outgoing refrences查看它引用的内容:

果不其然,都是从kafka取出的消息,但是又不能作为解决问题的依据~
接下来通过Dominator视图,发现占最大内存的是Storm的DisruptorQueue,然后一层一层的展开内部引用,发现其中引用到了一个ThreadLocalBatcher,里面竟然引用到了一个无界队列ConcurrentLinkedQueue:

查看一下ThreadLocalBatcher
的源码:
private class ThreadLocalBatcher implements ThreadLocalInserter {
private final ReentrantLock _flushLock;
private final ConcurrentLinkedQueue<ArrayList<Object>> _overflow;
private ArrayList<Object> _currentBatch;
public ThreadLocalBatcher() {
_flushLock = new ReentrantLock();
_overflow = new ConcurrentLinkedQueue<ArrayList<Object>>();
_currentBatch = new ArrayList<Object>(_inputBatchSize);
}
//called by the main thread and should not block for an undefined period of time
public synchronized void add(Object obj) {
_currentBatch.add(obj);
_overflowCount.incrementAndGet();
if (_enableBackpressure && _cb != null && (_metrics.population() + _overflowCount.get()) >= _highWaterMark) {
try {
if (!_throttleOn) {
_throttleOn = true;
_cb.highWaterMark();
}
} catch (Exception e) {
throw new RuntimeException("Exception during calling highWaterMark callback!", e);
}
}
if (_currentBatch.size() >= _inputBatchSize) {
boolean flushed = false;
if (_overflow.isEmpty()) {
try {
publishDirect(_currentBatch, false);
_overflowCount.addAndGet(0 - _currentBatch.size());
_currentBatch.clear();
flushed = true;
} catch (InsufficientCapacityException e) {
//Ignored we will flush later
}
}
if (!flushed) {
_overflow.add(_currentBatch);
_currentBatch = new ArrayList<Object>(_inputBatchSize);
}
}
}
//May be called by a background thread
public synchronized void forceBatch() {
if (!_currentBatch.isEmpty()) {
_overflow.add(_currentBatch);
_currentBatch = new ArrayList<Object>(_inputBatchSize);
}
}
//May be called by a background thread
public void flush(boolean block) {
if (block) {
_flushLock.lock();
} else if (!_flushLock.tryLock()) {
//Someone else if flushing so don't do anything
return;
}
try {
while (!_overflow.isEmpty()) {
publishDirect(_overflow.peek(), block);
_overflowCount.addAndGet(0 - _overflow.poll().size());
}
} catch (InsufficientCapacityException e) {
//Ignored we should not block
} finally {
_flushLock.unlock();
}
}
}
大概心里有点眉目了,一般这种使用无界队列的场景只要没控制好生产者和消费者的处理速度,就非常容易导致内存溢出。所以问题基本锁定在storm客户端上,这里省略一些细节,直接看从ConcurrentLinkedQueue中消费数据的 flush
方法:
_flushLock.lock();
try {
while (!_overflow.isEmpty()) {
publishDirect(_overflow.peek(), block);
_overflowCount.addAndGet(0 - _overflow.poll().size());
}
} catch (InsufficientCapacityException e) {
//Ignored we should not block
} finally {
_flushLock.unlock();
}
其逻辑是: 如果队列不空,就直接从队列取出一个元素,但是不从队列中移除(peek),然后发布到disrutor中;发布完成后,移除掉刚刚发布的元素(poll)。看似没问题,但是在数据量非常大的情况下,如果发布disruptor一直失败,会导致永远没法从ConcurrentLinkedQueue中移除数据,oom是必然的。所以重新编译了该类,在catch InsufficientCapacityException
出打出了错误日志,错误日志确实在刷屏,正好验证猜想。
那怎么解决呢?摆在我面前的有如下几种方案:
- storm限流
明显不合适,如果做了限流会导致kafka积压非常严重,业务方目前还有提速的要求。 - storm有反压机制,能自动根据下游的积压情况来反向控制入口流量。
- 升级storm client版本。我们目前使用的是storm 1.2.1,而storm 1.2.2确实将flush方法标记成了错误,可能是storm 1.2.1的一个bug(待验证)。
- 重新编译DisruptorQueue
我尝试了第二种和第三种方案,反压机制一直没生效,不知道是不是客户端版本问题.而升级storm client又导致服务启动失败,不得已采用了第四种方案->重新编译DisrutorQueue。控制ConcurrentLinkedQueue的容量,以及当publish
失败的时候,控制丢弃消息的频率(业务上能容忍),如果超过我设置的一个最大容忍阈值,我就完全丢弃。而且触发flush
是通过一个TimerTask
来实现的:
TimerTask t = new TimerTask() {
@Override
public void run() {
invokeAll(flushInterval);
}
};
_pendingFlush.put(flushInterval, pending);
_timer.schedule(t, flushInterval, flushInterval);
_tt.put(flushInterval, t);
if (_isFlushing.compareAndSet(false, true)) {
for (ThreadLocalInserter batcher: _batchers.values()) {
batcher.forceBatch();
batcher.flush(true);
}
_isFlushing.set(false);
}
在run方法里会获取到所有的Flusher,然后调用最终的flush方法。客户端没有考虑掉flush出现异常的场景,必须将invokeAll catch住,然后改成ExecutorService的方式,减少定时调度间隔,问题得以解决。
五、最后
解决这个问题是漫长的,当然解决方式可能也不是最佳的,但至少业务上是能接受的。分析的过程中当然也涉及到一些jvm调优的内容,最终的jvm参数如下:
-Xmx13312m -Xms13312m -XX:MaxMetaspaceSize=512m
-XX:MetaspaceSize=512m
-XX:StringTableSize=65536
-XX:+UseG1GC -XX:MaxGCPauseMillis=1000
-XX:+ParallelRefProcEnabled -XX:ErrorFile=crash/hs_err_pid%p.log
-XX:HeapDumpPath=crash
-XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails -Xloggc:artifacts/gc.log
-XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+PrintClassHistogramBeforeFullGC
-XX:+PrintCommandLineFlags
-XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC
-Xloggc:artifacts/gc.log -XX:+OmitStackTraceInFastThrow -XX:+UnlockExperimentalVMOptions
-XX:G1NewSizePercent=20 -XX:G1MaxNewSizePercent=20
-XX:SurvivorRatio=6
考虑到我们的业务都是流水作业式的,所以把新生代调大、survivor的比例也调大了。
在一般情况下,线上的问题都比较好解决,难免会遇到些奇葩,比如:
- 没有日志(oom/crash)
- jvm相关的异常(attach 失败、EOF之类的)
- 第三方中间件客户端bug
要是遇到这种问题,一定要沉着冷静。一般靠google是搞不定的,还不如自己静下心来想办法。解决这个问题,得益于如下工具:
- jmap
- jstat
- MAT
- 阿里Arthas
- jstack
今天就到这吧~很晚了。毕竟知识有限,如果里面有讲错的地方,还请大家原谅,如果能指点一二,那就更好了!感谢大家阅读!