nifi源码剖析-Processor调用处理过程

2017-06-15  本文已影响0人  皮皮猿的博客

从AbstractProcessor方法开始

public abstract class AbstractProcessor extends AbstractSessionFactoryProcessor {
    @Override
    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
        final ProcessSession session = sessionFactory.createSession(); // 创建StandardProcessSession
        try {
            onTrigger(context, session);
            session.commit();  // checkout + commit 
        } catch (final Throwable t) {
            getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
            session.rollback(true);
            throw t;
        }
    }
    // 具体到Processor的onTrigger方法
    public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
}

首先进入每个的Processor的onTrigger()方法,该方法里先会调用session.write()方法,然后调用session.transfer方法
write方法里构建流

StandardFlowFileQueue: 队列的putAll方法

调度的循环开始: TimerDrivenSchedulingAgent->doSchedule()方法(只是一种策略)

FlowController里设置调度策略和对应agent的关系的map(StandardProcessScheduler)

上一篇 下一篇

猜你喜欢

热点阅读