大数据精进之路

flume1.9源码分析(二)source、sink、chann

2019-06-07  本文已影响0人  CarsonCao

1.启动

1.1 启动总入口

从上节flume1.9源码分析(一)从编译到启动 中可知,flume的启动方式分为两种:(1)自动监听配置文件变化利用eventBus发送事件重启所有组件;(2)没有监听配置文件直接启动。两种方式的启动入口都在类Applicationpublic void handleConfigurationEvent(MaterializedConfiguration conf) {...}函数中。


  @Subscribe
  public void handleConfigurationEvent(MaterializedConfiguration conf) {
    try {
      lifecycleLock.lockInterruptibly();
      stopAllComponents();
      startAllComponents(conf);
    } catch (InterruptedException e) {
      logger.info("Interrupted while trying to handle configuration event");
      return;
    } finally {
      // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
      if (lifecycleLock.isHeldByCurrentThread()) {
        lifecycleLock.unlock();
      }
    }
  }

我们仔细看下startAllComponents函数,先看下定义:

private void startAllComponents(MaterializedConfiguration materializedConfiguration) {...}

MaterializedConfiguration对象作为startAllComponents函数的形参,在Application里调用了PropertiesFileConfigurationProvider的getConfiguration()方法得到MaterializedConfiguration对象:

 PropertiesFileConfigurationProvider configurationProvider =
              new PropertiesFileConfigurationProvider(agentName, configurationFile);
          application = new Application();
          application.handleConfigurationEvent(configurationProvider.getConfiguration());

1.2 物化配置

getConfiguration方法实现了配置参数的物化,将配置参数转换成了对应的对象,并将channel和对应的source和sink进行关联:

public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
    FlumeConfiguration fconfig = getFlumeConfiguration();
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        loadChannels(agentConf, channelComponentMap);
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
        Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
        for (String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.get(channelName);
          if (channelComponent.components.isEmpty()) {
            LOGGER.warn(String.format("Channel %s has no components connected" +
                " and has been removed.", channelName));
            channelComponentMap.remove(channelName);
            Map<String, Channel> nameChannelMap =
                channelCache.get(channelComponent.channel.getClass());
            if (nameChannelMap != null) {
              nameChannelMap.remove(channelName);
            }
          } else {
            LOGGER.info(String.format("Channel %s connected to %s",
                channelName, channelComponent.components.toString()));
            conf.addChannel(channelName, channelComponent.channel);
          }
        }
        for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        }
        for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          conf.addSinkRunner(entry.getKey(), entry.getValue());
        }
      } catch (InstantiationException ex) {
        LOGGER.error("Failed to instantiate component", ex);
      } finally {
        channelComponentMap.clear();
        sourceRunnerMap.clear();
        sinkRunnerMap.clear();
      }
    } else {
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
    }
    return conf;
  }

得到各个组件的对象,Application类的startAllComponents函数便可以对各个组件进行启动。
首先启动channel,确保channel都启动之后再启动sink和source,以channel为例我们来看下flume是怎么启动他们的:

  private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
//省略若干行
    for (Entry<String, Channel> entry :
        materializedConfiguration.getChannels().entrySet()) {
      try {
        logger.info("Starting Channel " + entry.getKey());
        supervisor.supervise(entry.getValue(),
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }
//省略若干行
}

调用方式跟Application的start方法一致,都是借助LifecycleSupervisor类内部的MonitorRunnable每间隔3s调度一次。
启动顺序是:Channel -> 等待所有channel都起来 -> Sink -> Source

在程序启动时,会启动MaterializedConfiguration中的所有的SourceRunner、Channel、SinkRunner。其中Channel的启动,没做什么特别的事情,就是初始化一下状态、创建一下计数器,算做一个被动的角色。

  @Override
  public synchronized void start() {
    channelCounter.start();
    channelCounter.setChannelSize(queue.size());
    channelCounter.setChannelCapacity(Long.valueOf(
            queue.size() + queue.remainingCapacity()));
    super.start();
  }

channel启动除了启动了monitorRunnable线程外,没有额外启动线程,sourceRunner和SinkRunner会启动其他线程。

2.Source,Sink与Channel的关联

interface Source:

public interface Source extends LifecycleAware, NamedComponent {

  /**
   * Specifies which channel processor will handle this source's events.
   *
   * @param channelProcessor
   */
  public void setChannelProcessor(ChannelProcessor channelProcessor);

  /**
   * Returns the channel processor th
   * at will handle this source's events.
   */
  public ChannelProcessor getChannelProcessor();

}

interface Sink:

public interface Sink extends LifecycleAware, NamedComponent {
  /**
   * <p>Sets the channel the sink will consume from</p>
   * @param channel The channel to be polled
   */
  public void setChannel(Channel channel);

  /**
   * @return the channel associated with this sink
   */
  public Channel getChannel();

  /**
   * <p>Requests the sink to attempt to consume data from attached channel</p>
   * <p><strong>Note</strong>: This method should be consuming from the channel
   * within the bounds of a Transaction. On successful delivery, the transaction
   * should be committed, and on failure it should be rolled back.
   * @return READY if 1 or more Events were successfully delivered, BACKOFF if
   * no data could be retrieved from the channel feeding this sink
   * @throws EventDeliveryException In case of any kind of failure to
   * deliver data to the next hop destination.
   */
  public Status process() throws EventDeliveryException;

  public static enum Status {
    READY, BACKOFF
  }
}

2.1 Source to Channel

Source 到Channel的流程

以taildirSource为例:

//省略若干行
      List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
      if (events.isEmpty()) {
        return false;
      }
      sourceCounter.addToEventReceivedCount(events.size());
      sourceCounter.incrementAppendBatchReceivedCount();
      try {
        getChannelProcessor().processEventBatch(events);
//省略若干行

在拿到事件之后,调用getChannelProcessor().processEventBatch(events)将数据封装成的事件导入到channel中,在这里实现了事务,保证数据exectly once放入到channel的队列中。在这期间,如果有拦截器(Interceptor),首先执行拦截器,然后ChannelProcessor调用选择器(selector)将event到对应的channel中。具体ChannelProcessor的处理逻辑如下:

public void processEventBatch(List<Event> events) {
    Preconditions.checkNotNull(events, "Event list must not be null");

    events = interceptorChain.intercept(events);

    Map<Channel, List<Event>> reqChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    Map<Channel, List<Event>> optChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    for (Event event : events) {
      List<Channel> reqChannels = selector.getRequiredChannels(event);

      for (Channel ch : reqChannels) {
        List<Event> eventQueue = reqChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          reqChannelQueue.put(ch, eventQueue);
        }
        eventQueue.add(event);
      }

      List<Channel> optChannels = selector.getOptionalChannels(event);

      for (Channel ch : optChannels) {
        List<Event> eventQueue = optChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          optChannelQueue.put(ch, eventQueue);
        }

        eventQueue.add(event);
      }
    }

    // Process required channels
    for (Channel reqChannel : reqChannelQueue.keySet()) {
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = reqChannelQueue.get(reqChannel);

        for (Event event : batch) {
          reqChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " + reqChannel, t);
          throw (Error) t;
        } else if (t instanceof ChannelException) {
          throw (ChannelException) t;
        } else {
          throw new ChannelException("Unable to put batch on required " +
              "channel: " + reqChannel, t);
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }

    // Process optional channels
    for (Channel optChannel : optChannelQueue.keySet()) {
      Transaction tx = optChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = optChannelQueue.get(optChannel);

        for (Event event : batch) {
          optChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
        if (t instanceof Error) {
          throw (Error) t;
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }
  }

2.2 Channel to Sink

我们再看一下sink与channel是如何关联的。

以LoggerSink为例:

  @Override
  public Status process() throws EventDeliveryException {
    Status result = Status.READY;
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    Event event = null;

    try {
      transaction.begin();
      event = channel.take();

      if (event != null) {
        if (logger.isInfoEnabled()) {
          logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
        }
      } else {
        // No event found, request back-off semantics from the sink runner
        result = Status.BACKOFF;
      }
      transaction.commit();
//...

在Sink类中有一个Channel对象,在物化配置(详见1.2章节)调用loadSinks()方法的时候,调用 sink.setChannel(channelComponent.channel);进行了初始化。
process方法是Sink类中的主要执行方法,这个方法哪里调用的呢?
我们看到它实现自接口Sink,IDE中“find Usages”发现是sinkProcessor调用了sink的process:

image.png
我们随便打开一个SinkProcessor(以LoadBalancingSinkProcessor为例)的调用位置:
 @Override
 public Status process() throws EventDeliveryException {
   Status status = null;

   Iterator<Sink> sinkIterator = selector.createSinkIterator();
   while (sinkIterator.hasNext()) {
     Sink sink = sinkIterator.next();
     try {
       status = sink.process();
       break;
     } catch (Exception ex) {
       selector.informSinkFailed(sink);
       LOGGER.warn("Sink failed to consume event. "
           + "Attempting next sink if available.", ex);
     }
   }

我们会发现SinkProcessor还有一个process方法,该方法实现自SinkProcessor接口,同样用“Find Usages”查询发现,最终是SinkRunnerstart()调用了process方法。SinkRunner初始化是在物化配置过程的loadSinkGroups里调用的。一个SinkGroup有一个SinkProcessor,如果没有没有group则默认调用DefaultSinkProcessor。

private void loadSinkGroups(AgentConfiguration agentConf,
      Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap)
          throws InstantiationException {
    Set<String> sinkGroupNames = agentConf.getSinkgroupSet();
    Map<String, ComponentConfiguration> compMap =
        agentConf.getSinkGroupConfigMap();
    Map<String, String> usedSinks = new HashMap<String, String>();
    for (String groupName: sinkGroupNames) {
      ComponentConfiguration comp = compMap.get(groupName);
      if (comp != null) {
        SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
        List<Sink> groupSinks = new ArrayList<Sink>();
        for (String sink : groupConf.getSinks()) {
          Sink s = sinks.remove(sink);
          if (s == null) {
            String sinkUser = usedSinks.get(sink);
            if (sinkUser != null) {
              throw new InstantiationException(String.format(
                  "Sink %s of group %s already " +
                      "in use by group %s", sink, groupName, sinkUser));
            } else {
              throw new InstantiationException(String.format(
                  "Sink %s of group %s does "
                      + "not exist or is not properly configured", sink,
                      groupName));
            }
          }
          groupSinks.add(s);
          usedSinks.put(sink, groupName);
        }
        try {
          SinkGroup group = new SinkGroup(groupSinks);
          Configurables.configure(group, groupConf);
          sinkRunnerMap.put(comp.getComponentName(),
              new SinkRunner(group.getProcessor()));
        } catch (Exception e) {
          String msg = String.format("SinkGroup %s has been removed due to " +
              "an error during configuration", groupName);
          LOGGER.error(msg, e);
        }
      }
    }
    // add any unassigned sinks to solo collectors
    for (Entry<String, Sink> entry : sinks.entrySet()) {
      if (!usedSinks.containsValue(entry.getKey())) {
        try {
          SinkProcessor pr = new DefaultSinkProcessor();
          List<Sink> sinkMap = new ArrayList<Sink>();
          sinkMap.add(entry.getValue());
          pr.setSinks(sinkMap);
          Configurables.configure(pr, new Context());
          sinkRunnerMap.put(entry.getKey(), new SinkRunner(pr));
        } catch (Exception e) {
          String msg = String.format("SinkGroup %s has been removed due to " +
              "an error during configuration", entry.getKey());
          LOGGER.error(msg, e);
        }
      }
    }
  }

所以总结来看,sink的调用过程是这样的:


sink的调用过程
上一篇下一篇

猜你喜欢

热点阅读