flink

高吞吐量Flume Agent调优小结

2020-10-27  本文已影响0人  LittleMagic

前言

所有电商企业在一年一度的双11都要迎来大促与大考,我司也不例外(所以最近真是前所未有的忙乱)。前段时间在配合执行全链路压测的过程中,发现平时不太关注的Flume配置可能存在瓶颈。Flume在笔者负责的实时计算平台里用于收集所有后端访问日志和埋点日志,其效率和稳定性比较重要。除了及时扩容之外,也有必要对Flume进行调优。今天在百忙之中挤出一点时间来写写。

Flume系统以一个或多个Flume-NG Agent的形式部署,一个Agent对应一个JVM进程,并且由三个部分组成:Source、Channel和Sink,示意图如下。

Source

Flume有3种能够监听文件的Source,分别是Exec Source(配合tail -f命令)、Spooling Directory Source和Taildir Source。Taildir Source显然是最好用的,在我们的实践中,需要注意的参数列举如下。

a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /data/logs/ng1/access.log
a1.sources.r1.headers.f1.headerKey1 = ng1
a1.sources.r1.filegroups.f2 = /data/logs/ng2/.*log
a1.sources.r1.headers.f2.headerKey1 = ng2
a1.sources.r1.batchSize = 1000
a1.sources.r1.maxBatchCount = 100
a1.sources.r1.writePosInterval = 1000

Channel

Flume内置了多种Channel的实现,比较常用的有Memory Channel、File Channel、JDBC Channel、Kafka Channel等。我们的选择主要针对Memory Channel和File Channel两种,对比一下:

鉴于我们下游业务的主要痛点在吞吐量与实时性,且可以容忍数据少量丢失,日志服务器的磁盘压力也已经比较大了,故Memory Channel更加合适。需要注意的参数如下。

a1.channels.c1.type = memory
a1.channels.c1.transactionCapacity = 5000
a1.channels.c1.capacity = 10000
a1.channels.c1.keep-alive = 15

当然File Channel也很常用,其参数就不再赘述,看官可参考官方文档。

Sink

我们实时数仓接入层的起点是Kafka,自然要利用Kafka Sink。需要注意的参数列举如下。

示例:

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.flumeBatchSize = 1000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 50
a1.sinks.k1.kafka.producer.compression.type = snappy

Kafka Sink也支持其他Producer参数,可以按需配置。

还有一点需要注意的是,Flume默认引用的Kafka Client版本为0.9,其产生的消息在较高版本的Kafka Broker上没有时间戳,因此非常建议手动将$FLUME_HOME/lib目录下的kafka-client JAR包替换成0.10.2或更高的版本。

Interceptor

拦截器方面就比较简单粗暴,在注重吞吐量的场合一定不要使用或者自定义规则复杂的拦截器(比如自带的Regex Interceptor、Search and Replace Interceptor),最好是不使用任何拦截器,把数据清洗的任务交给下游去处理(Flink它不香嘛

Agent Process

在flume-env.sh中添加JVM参数,避免默认堆内存太小导致OOM。

export JAVA_OPTS="-Xms8192m -Xmx8192m -Xmn3072m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError"

另外,Taildir Source会积极地使用堆外内存,如果发现Flume消耗的总内存量过大,可以适当限制直接内存的用量,如:-XX:MaxDirectMemorySize=4096m

Flume原生并没有传统意义上的“高可用”配置(Sink Group Failover不算)。为了防止Agent进程因为各种原因静默地挂掉,需要用一个“保姆脚本”(nanny script)定期检测Agent进程的状态,并及时拉起来。当然也可以在下游采用两级Collector的架构增强鲁棒性,本文不表。Cloudera Community上有一个关于Flume HA的提问,参见这里

The End

经过上述适当的调优过程,我们的单个Flume-NG Agent能够轻松承受高达5W+ RPS的持续流量高峰,比较令人满意了。

民那晚安晚安。

上一篇下一篇

猜你喜欢

热点阅读