flume1.9源码分析(二)source、sink、chann
1.启动
1.1 启动总入口
从上节flume1.9源码分析(一)从编译到启动 中可知,flume的启动方式分为两种:(1)自动监听配置文件变化利用eventBus发送事件重启所有组件;(2)没有监听配置文件直接启动。两种方式的启动入口都在类Application
的public void handleConfigurationEvent(MaterializedConfiguration conf) {...}
函数中。
@Subscribe
public void handleConfigurationEvent(MaterializedConfiguration conf) {
try {
lifecycleLock.lockInterruptibly();
stopAllComponents();
startAllComponents(conf);
} catch (InterruptedException e) {
logger.info("Interrupted while trying to handle configuration event");
return;
} finally {
// If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
if (lifecycleLock.isHeldByCurrentThread()) {
lifecycleLock.unlock();
}
}
}
我们仔细看下startAllComponents
函数,先看下定义:
private void startAllComponents(MaterializedConfiguration materializedConfiguration) {...}
MaterializedConfiguration对象作为startAllComponents函数的形参,在Application里调用了PropertiesFileConfigurationProvider的getConfiguration()方法得到MaterializedConfiguration对象:
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName, configurationFile);
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
1.2 物化配置
getConfiguration方法实现了配置参数的物化,将配置参数转换成了对应的对象,并将channel和对应的source和sink进行关联:
public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
FlumeConfiguration fconfig = getFlumeConfiguration();
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
loadChannels(agentConf, channelComponentMap);
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
for (String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.get(channelName);
if (channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap =
channelCache.get(channelComponent.channel.getClass());
if (nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}
得到各个组件的对象,Application类的startAllComponents
函数便可以对各个组件进行启动。
首先启动channel,确保channel都启动之后再启动sink和source,以channel为例我们来看下flume是怎么启动他们的:
private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
//省略若干行
for (Entry<String, Channel> entry :
materializedConfiguration.getChannels().entrySet()) {
try {
logger.info("Starting Channel " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e) {
logger.error("Error while starting {}", entry.getValue(), e);
}
}
//省略若干行
}
调用方式跟Application的start方法一致,都是借助LifecycleSupervisor类内部的MonitorRunnable每间隔3s调度一次。
启动顺序是:Channel -> 等待所有channel都起来 -> Sink -> Source
在程序启动时,会启动MaterializedConfiguration中的所有的SourceRunner、Channel、SinkRunner。其中Channel的启动,没做什么特别的事情,就是初始化一下状态、创建一下计数器,算做一个被动的角色。
@Override
public synchronized void start() {
channelCounter.start();
channelCounter.setChannelSize(queue.size());
channelCounter.setChannelCapacity(Long.valueOf(
queue.size() + queue.remainingCapacity()));
super.start();
}
channel启动除了启动了monitorRunnable线程外,没有额外启动线程,sourceRunner和SinkRunner会启动其他线程。
2.Source,Sink与Channel的关联
interface Source:
public interface Source extends LifecycleAware, NamedComponent {
/**
* Specifies which channel processor will handle this source's events.
*
* @param channelProcessor
*/
public void setChannelProcessor(ChannelProcessor channelProcessor);
/**
* Returns the channel processor th
* at will handle this source's events.
*/
public ChannelProcessor getChannelProcessor();
}
interface Sink:
public interface Sink extends LifecycleAware, NamedComponent {
/**
* <p>Sets the channel the sink will consume from</p>
* @param channel The channel to be polled
*/
public void setChannel(Channel channel);
/**
* @return the channel associated with this sink
*/
public Channel getChannel();
/**
* <p>Requests the sink to attempt to consume data from attached channel</p>
* <p><strong>Note</strong>: This method should be consuming from the channel
* within the bounds of a Transaction. On successful delivery, the transaction
* should be committed, and on failure it should be rolled back.
* @return READY if 1 or more Events were successfully delivered, BACKOFF if
* no data could be retrieved from the channel feeding this sink
* @throws EventDeliveryException In case of any kind of failure to
* deliver data to the next hop destination.
*/
public Status process() throws EventDeliveryException;
public static enum Status {
READY, BACKOFF
}
}
2.1 Source to Channel
Source 到Channel的流程以taildirSource为例:
//省略若干行
List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
if (events.isEmpty()) {
return false;
}
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();
try {
getChannelProcessor().processEventBatch(events);
//省略若干行
在拿到事件之后,调用getChannelProcessor().processEventBatch(events)
将数据封装成的事件导入到channel中,在这里实现了事务,保证数据exectly once放入到channel的队列中。在这期间,如果有拦截器(Interceptor),首先执行拦截器,然后ChannelProcessor调用选择器(selector)将event到对应的channel中。具体ChannelProcessor的处理逻辑如下:
public void processEventBatch(List<Event> events) {
Preconditions.checkNotNull(events, "Event list must not be null");
events = interceptorChain.intercept(events);
Map<Channel, List<Event>> reqChannelQueue =
new LinkedHashMap<Channel, List<Event>>();
Map<Channel, List<Event>> optChannelQueue =
new LinkedHashMap<Channel, List<Event>>();
for (Event event : events) {
List<Channel> reqChannels = selector.getRequiredChannels(event);
for (Channel ch : reqChannels) {
List<Event> eventQueue = reqChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
reqChannelQueue.put(ch, eventQueue);
}
eventQueue.add(event);
}
List<Channel> optChannels = selector.getOptionalChannels(event);
for (Channel ch : optChannels) {
List<Event> eventQueue = optChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
optChannelQueue.put(ch, eventQueue);
}
eventQueue.add(event);
}
}
// Process required channels
for (Channel reqChannel : reqChannelQueue.keySet()) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
List<Event> batch = reqChannelQueue.get(reqChannel);
for (Event event : batch) {
reqChannel.put(event);
}
tx.commit();
} catch (Throwable t) {
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
} else if (t instanceof ChannelException) {
throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {
if (tx != null) {
tx.close();
}
}
}
// Process optional channels
for (Channel optChannel : optChannelQueue.keySet()) {
Transaction tx = optChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
List<Event> batch = optChannelQueue.get(optChannel);
for (Event event : batch) {
optChannel.put(event);
}
tx.commit();
} catch (Throwable t) {
tx.rollback();
LOG.error("Unable to put batch on optional channel: " + optChannel, t);
if (t instanceof Error) {
throw (Error) t;
}
} finally {
if (tx != null) {
tx.close();
}
}
}
}
2.2 Channel to Sink
我们再看一下sink与channel是如何关联的。
以LoggerSink为例:
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
try {
transaction.begin();
event = channel.take();
if (event != null) {
if (logger.isInfoEnabled()) {
logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
}
} else {
// No event found, request back-off semantics from the sink runner
result = Status.BACKOFF;
}
transaction.commit();
//...
在Sink类中有一个Channel对象,在物化配置(详见1.2章节)调用loadSinks()
方法的时候,调用 sink.setChannel(channelComponent.channel);
进行了初始化。
process方法是Sink类中的主要执行方法,这个方法哪里调用的呢?
我们看到它实现自接口Sink,IDE中“find Usages”发现是sinkProcessor调用了sink的process:
我们随便打开一个SinkProcessor(以LoadBalancingSinkProcessor为例)的调用位置:
@Override
public Status process() throws EventDeliveryException {
Status status = null;
Iterator<Sink> sinkIterator = selector.createSinkIterator();
while (sinkIterator.hasNext()) {
Sink sink = sinkIterator.next();
try {
status = sink.process();
break;
} catch (Exception ex) {
selector.informSinkFailed(sink);
LOGGER.warn("Sink failed to consume event. "
+ "Attempting next sink if available.", ex);
}
}
我们会发现SinkProcessor还有一个process方法,该方法实现自SinkProcessor
接口,同样用“Find Usages”查询发现,最终是SinkRunner
的start()
调用了process方法。SinkRunner初始化是在物化配置过程的loadSinkGroups里调用的。一个SinkGroup有一个SinkProcessor,如果没有没有group则默认调用DefaultSinkProcessor。
private void loadSinkGroups(AgentConfiguration agentConf,
Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap)
throws InstantiationException {
Set<String> sinkGroupNames = agentConf.getSinkgroupSet();
Map<String, ComponentConfiguration> compMap =
agentConf.getSinkGroupConfigMap();
Map<String, String> usedSinks = new HashMap<String, String>();
for (String groupName: sinkGroupNames) {
ComponentConfiguration comp = compMap.get(groupName);
if (comp != null) {
SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
List<Sink> groupSinks = new ArrayList<Sink>();
for (String sink : groupConf.getSinks()) {
Sink s = sinks.remove(sink);
if (s == null) {
String sinkUser = usedSinks.get(sink);
if (sinkUser != null) {
throw new InstantiationException(String.format(
"Sink %s of group %s already " +
"in use by group %s", sink, groupName, sinkUser));
} else {
throw new InstantiationException(String.format(
"Sink %s of group %s does "
+ "not exist or is not properly configured", sink,
groupName));
}
}
groupSinks.add(s);
usedSinks.put(sink, groupName);
}
try {
SinkGroup group = new SinkGroup(groupSinks);
Configurables.configure(group, groupConf);
sinkRunnerMap.put(comp.getComponentName(),
new SinkRunner(group.getProcessor()));
} catch (Exception e) {
String msg = String.format("SinkGroup %s has been removed due to " +
"an error during configuration", groupName);
LOGGER.error(msg, e);
}
}
}
// add any unassigned sinks to solo collectors
for (Entry<String, Sink> entry : sinks.entrySet()) {
if (!usedSinks.containsValue(entry.getKey())) {
try {
SinkProcessor pr = new DefaultSinkProcessor();
List<Sink> sinkMap = new ArrayList<Sink>();
sinkMap.add(entry.getValue());
pr.setSinks(sinkMap);
Configurables.configure(pr, new Context());
sinkRunnerMap.put(entry.getKey(), new SinkRunner(pr));
} catch (Exception e) {
String msg = String.format("SinkGroup %s has been removed due to " +
"an error during configuration", entry.getKey());
LOGGER.error(msg, e);
}
}
}
}
所以总结来看,sink的调用过程是这样的:
sink的调用过程