Flume选择器

2019-06-01  本文已影响0人  叫我不矜持

前言

Channel选择器是决定Source接受的特定事件写入到哪个Channel的组件,他们告诉Channel处理器,然后由其将事件写入到Channel。

Agent中各个组件的交互

由于Flume不是两阶段提交,事件被写入到一个Channel,然后事件在写入下一个Channel之前提交,如果写入一个Channel出现异常,那么之前已经写入到其他Channel的相同事件不能被回滚。当这样的异常发生时,Channel处理器抛出ChannelException异常,事务失败,如果Source试图再次写入相同的事件(大多数情况下,会再次写入,只有Syslog,Exec等Source不能重试,因为没有办法生成相同的数据),重复的事件将写入到Channel中,而先前的提交是成功的,这样在Flume中就发生了重复。

Channel选择器的配置是通过Channel处理器完成的,Channel选择器可以指定一组Channel是必须的,另一组的可选的。

Flume内置两种选择器,replicating 和 multiplexing,如果Source配置中没有指定选择器,那么会自动使用复制Channel选择器。

复制Channel选择器

如果Source没有指定Channel选择器,则该Source使用复制Channel选择器。该选择器复制每个事件到通过Source的Channels参数指定的所有Channel中。

复制Channel选择器还有一个配置参数optional,该参数指定的所有Channel都是可选的,当事件写入到这些Channel时又失败发生,则忽略这些失败。写入其他Channel的失败将导致Source抛出异常,并要求Source重试。

配置示例

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

如上所示,未能写入c3不会导致ChannelException异常抛出到Source,并且Source将通知前一阶段写入是成功的。

多路复用Channel选择器

多路复用Channel选择器是一种专门用于动态路由事件的Channel选择器,通过选择事件应该写入到哪个Channel,基于一个特定的事件头的值进行路由。可以结合拦截器使用。

多路复用Channel选择器寻找一个特定的报头,该报头通过通过选择器的配置指定,基于该报头的值,选择器返回事件写入Channel的一个子集,如果配置没有指定一个特定事件的报头,则该事件写入到默认Channel。

参数 描述
selector.type 默认replicating,多路复用为multiplexing
selector.header 默认flume.selector.header,配置为需要的报头名
selector.default 默认channel
selector.mapping.* 报头对应的值为多少时,写入到哪些Channel

对于每个事件,选择器查找配置中header参数指定的报头的键值,接下来检查该值是否与mapping中配置的值一致,如果一致Channel处理器会将事件写入到mapping配置的所有channel中。可选的映射也可以使用optional指定。如果选择器没有找到匹配或者报头不存在,那么他将事件写入到default参数配置的channel中。

如果一个事件没有映射到必须的channel,但是映射到一个或者多个可选的channel,那么该事件会被写到可选channel和默认的channel,任何写入到默认channel的失败将会导致抛出ChannelException异常。

下面是配置示例

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.optional.US = c4
a1.sources.r1.selector.default = c4

state值为CZ的事件写入到c1,state值为US的事件写入到c2,c3,如果都不满足写入到c4。

自定义Channel选择器

自定义的Channel需要实现ChannelSelector接口,或者继承AbstractChannelSelector类。

对应于每个事件,Channel处理器调用Channel选择器的getRequiredChannels和getoptionalChannels方法,返回需要和可选的将要写入事件的Channel列表。

AbstractChannelSelector 抽象类

public abstract class AbstractChannelSelector implements ChannelSelector {

  private List<Channel> channels;
  private String name;

  @Override
  public List<Channel> getAllChannels() {
    return channels;
  }

  @Override
  public void setChannels(List<Channel> channels) {
    this.channels = channels;
  }

  @Override
  public synchronized void setName(String name) {
    this.name = name;
  }

  @Override
  public synchronized String getName() {
    return name;
  }

  /**
   *
   * @return A map of name to channel instance.
   */

  protected Map<String, Channel> getChannelNameMap() {
    Map<String, Channel> channelNameMap = new HashMap<String, Channel>();
    for (Channel ch : getAllChannels()) {
      channelNameMap.put(ch.getName(), ch);
    }
    return channelNameMap;
  }

  /**
   * Given a list of channel names as space delimited string,
   * returns list of channels.
   * @return List of {@linkplain Channel}s represented by the names.
   */
  protected List<Channel> getChannelListFromNames(String channels,
          Map<String, Channel> channelNameMap) {
    List<Channel> configuredChannels = new ArrayList<Channel>();
    if (channels == null || channels.isEmpty()) {
      return configuredChannels;
    }
    String[] chNames = channels.split(" ");
    for (String name : chNames) {
      Channel ch = channelNameMap.get(name);
      if (ch != null) {
        configuredChannels.add(ch);
      } else {
        throw new FlumeException("Selector channel not found: "
                + name);
      }
    }
    return configuredChannels;
  }

}

ChannelSelector 接口

/**
 * <p>
 * Allows the selection of a subset of channels from the given set based on
 * its implementation policy. Different implementations of this interface
 * embody different policies that affect the choice of channels that a source
 * will push the incoming events to.
 * </p>
 */
public interface ChannelSelector extends NamedComponent, Configurable {

  /**
   * @param channels all channels the selector could select from.
   */
  public void setChannels(List<Channel> channels);

  /**
   * Returns a list of required channels. A failure in writing the event to
   * these channels must be communicated back to the source that received this
   * event.
   * @param event
   * @return the list of required channels that this selector has selected for
   * the given event.
   */
  public List<Channel> getRequiredChannels(Event event);


  /**
   * Returns a list of optional channels. A failure in writing the event to
   * these channels must be ignored.
   * @param event
   * @return the list of optional channels that this selector has selected for
   * the given event.
   */
  public List<Channel> getOptionalChannels(Event event);

  /**
   * @return the list of all channels that this selector is configured to work
   * with.
   */
  public List<Channel> getAllChannels();

}

Channel处理器调用SetChannel方法,该方法传递给所有的Channel,即选择器必须为每个事件选择Channel。该类实现了Configuration接口,所以当选择器初始化时调用Configure 方法。当每个事件要处理时,调用getRequiredChannel和getOptionChannel方法,getAllChannel方法必须返回在创建期间Channel处理器设置的所有Channel。

上一篇下一篇

猜你喜欢

热点阅读