Flink内存设置思考
通过上一章节我们可以得到,flink把container内存分成了8块,一般来说我们只需要关注其中的两块内存(task heap和managed),由内存计算公式可以看出,这两部分是此消彼长的。
Task heap
- 默认4g的一个TM,其中大概只有35%分配给了heap内存,一般来说对于简单的任务足够了,我们通过GC的情况来观测heap内存是否充足
- 如果我们SQL任务的计算逻辑比较复杂,比如有较多算子情况下,或者有一些维表关联需要缓存数据,那我们就需要增大heap内存的分配。
- 如果任务没有
join
,没有group by
等聚合算子,我们可以调小managed
内存分配的比例,以此来增大heap内存的分配量(taskmanager.memory.freaction=0.4 默认),如果内存还是不足可以通过增加TM的总内存 - 如果任务需要聚合算子,调小
managed
内存可以导致managed
内存的OOM,应该直接增加TM总内存
- 如果任务没有
Managed
Flink是有状态的计算,对于我们目前线上的使用方式,所谓的状态使用全都依赖于 Managed
内存,用来保存我们的一些数据和结果(例如双流 join
时两张表的缓存数据,group by
时使用的缓存中间结果值等)
对于有时间窗口的任务来说,状态数据会随着窗口的销毁而清楚,释放空间,但是对于没有时间窗口的任务,我们使用TTL来进行任务状态的清除(在sql中使用 set
table.exec.state.ttl= ${time}ms)
。ttl不应设置的过大,且ttl只保证数据在设置的时间内不会被删除,但是何时执行删除动作是不确定的
由于Managed内存是堆外内存,flink任务本身无法很准确的进行观测和控制,容易造成堆外内存的使用超限。对于目前线上的运行模式(flink on yarn),yarn会对container进程的资源消耗进行监控,如果总内存使用量超过申请上线,则会kill container。如果我们遇到,flink任务运行一段时间,出现 failover
,查看运行信息,报错信息类似于 “ Connection unexpectedly closed by remote task manager 'xxx.com/xxx:'. This might indicate that the remote task manager was lost.”
一个Taskmanager丢失,很大概率是由于这个container的managed内存用量超限被yarn kill导致,则我们应该增大内存,。通常我们使用直接增大总内存的方式来增加managed内存分配,当然在heap内存充足的情况下,也可以适当调大managed比例(taskmanager.memory.managed.fraction=0.4
)的值
-
TaskManager中运行多个slot和task
slot是flink中的最小执行资源,我们目前线上的配置为1个container设置占用1个VCORE(1个container就是1个Taskmanager),通常情况下,为了提供cpu的利用率,默认设置1个taskmanager设为2个slot。如果一个任务里面有多个算子需要使用状态存储,比如多次group by,存在一个tm上运行多个tsdb实例,如下图:
这个任务的配置是1个tm2个slot,并且graph中包含了3个agg算子,这表示了一个tm中会包含2*3=6个rocksdb的实例,因此此处也需要根据agg算子的类型来判定是否需要增加,常用的max,sum一般默认值足够,如果是自定义的udaf可能需要增大managed内存的分配
双流的无时间窗口的join一般也需要存储大量的数据,因此也应该适当调大managed内存的分配。
总结
通过以上的说明,大家应该对如何配置flink tm的内存有了一些思路。内存是任务运行的重要资源,在任务稳定运行,gc正常的情况下,过多的内存分配不会增加任务的运行效率,我们在探查任务运行效率时,磁盘的IO wait,CPU的利用率,算子的执行速度等都是需要考虑的重要因素