Flink源码解析flinkflink

Flink任务物理内存溢出问题定位

2019-03-21  本文已影响167人  铛铛铛clark

问题现象

一个使用10秒滚动窗口的任务在平稳运行一段时间之后出现了频繁的重启。在TaskManager日志中能看到以下文本:

2019-03-17 16:05:28,854 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.

原因定位

堆外内存排查(大型绕弯路现场,想知道直接原因可以直接跳到最后)

jeprof.pdf

最终问题定位(走完弯路)

    @Override
    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
        triggerContext.key = timer.getKey();
        triggerContext.window = timer.getNamespace();

        MergingWindowSet<W> mergingWindows;

        if (windowAssigner instanceof MergingWindowAssigner) {
            mergingWindows = getMergingWindowSet();
            W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
            if (stateWindow == null) {
                // Timer firing for non-existent window, this can only happen if a
                // trigger did not clean up timers. We have already cleared the merging
                // window and therefore the Trigger state, however, so nothing to do.
                return;
            } else {
                windowState.setCurrentNamespace(stateWindow);
            }
        } else {
            windowState.setCurrentNamespace(triggerContext.window);
            mergingWindows = null;
        }

        TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());

        int randomInt = random.nextInt(1000);
        if (triggerResult.isFire()) {
            ACC contents = windowState.get();
            if (randomInt == 1) {
                LOG.info("ClarkTest: Window state namespace: " + triggerContext.window + " and key " + triggerContext.key);
                LOG.info("ClarkTest: Window state value is going to fire is null ? " + (windowState.get() == null));
            }
            if (contents != null) {
                emitWindowContents(triggerContext.window, contents);
            }
        }

        if (triggerResult.isPurge()) {
            if (randomInt == 1) {
                LOG.info("ClarkTest: Window state get purged. ");
            }
            windowState.clear();
        }

        if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
            windowState.setCurrentNamespace(triggerContext.window);
            if (randomInt == 1) {
                LOG.info("ClarkTest: Window State namespace before cleaning: " + triggerContext.window + " and key " + triggerContext.key);
                LOG.info("ClarkTest: Window state value before clear is null ? " + (windowState.get() == null));
            }
            clearAllState(triggerContext.window, windowState, mergingWindows);
            if (randomInt == 1) {
                LOG.info("ClarkTest: Window state value after clear is null ? " + (windowState.get() == null));
            }

        }

        if (mergingWindows != null) {
            // need to make sure to update the merging state in state
            mergingWindows.persist();
        }
    }
上一篇下一篇

猜你喜欢

热点阅读