Flink实战

Flink实战之入库任务调优

2020-12-26  本文已影响0人  〇白衣卿相〇

背景

在调试flink写hdfs和hive时,任务总是报各种各样的异常,其中255问题最多,异常信息如下:

java.lang.Exception: Exception from container-launch.
Container id: container_1597847003686_5818_01_000002
Exit code: 255
Stack trace: ExitCodeException exitCode=255: 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:604)
    at org.apache.hadoop.util.Shell.run(Shell.java:507)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:789)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)


Container exited with a non-zero exit code 255

    at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

这段异常是yarn报出来的,根本原因是Direct Memory OOM了。那么该如何调优呢,容我慢慢道来。
我们先看下Flink的内存模型。

Flink内存模型

JVM Heap内存

堆内存包括:

  1. Framework Heap内存:flink框架使用的堆内存
  2. Task Heap内存:任务使用堆内存(java对象,基于内存的backend存储的state对象)

配置参数:

taskmanager.memory.framework.heap.size

taskmanager.memory.task.heap.size

JVM Off-Heap内存

对外内存:

  1. Framework Off-Heap内存:flink框架使用的对外内存
  2. Task Off-Heap内存:任务使用的对外内存

配置参数:

taskmanager.memory.framework.off-heap.size

taskmanager.memory.task.off-heap.size

Framework vs Task

区分:是否计入Slot资源

Framework:flink框架运行使用的内存

Task:任务运行使用的内存,包括heap、off-heap、managed、direct

Heap vs Off-Heap

区分:jvm堆内存和对外内存

Heap:jvm堆

Off-Heap:包括Direct、Native

Framework Heap+Task Heap = -Xmx

Framework off-heap +task off-heap + network = -XX:MaxDirectMemorySize

Network Memory(网络buffer)

属于Directory Memory

用途:

用于task之间缓冲数据,input buffer pool / output buffer pool

配置参数:

taskmanager.memory.network.min
taskmanager.memory.network.max
taskmanager.memory.network.fraction

Managed Memory(托管内存)

属于Native Memory

用途:

  1. streaming任务RocksDB Backend
  2. batch任务的sort、hash table、中间结果缓存
  3. python任务的UDF使用

配置参数:

设置大小:taskmanager.memory.managed.size

设置比率:taskmanager.memory.managed.fraction

JVM Metaspace & Overhead

都是jvm本身的开销

JVM Metaspace

用途:存放JVM加载的类的元数据,加载的类越多需要空间越大

所以如果任务需要加载大量第三方库时,可以调大Metaspace内存

配置参数:

taskmanager.memory.jvm-metaspace.size

JVM Overhead

属于Native Memory

用途:用于其他JVM开销,比如Code Cache、Thread Stack、garbage collection space 等。

配置参数:

taskmanager.memory.jvm-overhead.min
taskmanager.memory.jvm-overhead.max
taskmanager.memory.jvm-overhead.fraction

看完上面的总结,想必大家已经有了大概了解,回到我们的入库任务,理解入库任务主要会使用哪一块的内存,那么如何调优也就一目了然了。

入库任务使用内存

入库任务底层原理都是基于StreamingFileSink写Hdfs文件。借助BulkWriter进行写入,数据是先写到Direct Memory当中,然后在文件滚动时flush到hdfs。所以主要使用的Direct Memory,其属于task off-heap内存。
同时我们任务使用了RocksDB的状态后端,但是状态不是很大,也就1M左右。所以可以适当减少Managed Memory的大小。最终效果是调大了task off-heap的内存,调小了Managed Memory的内存,然后任务就不再报255了。
配置参数taskmanager.memory.task.off-heap.size和taskmanager.memory.managed.fraction,具体配置多大,需要根据你的数据量、单条数据大小、Checkpoint间隔时长来计算出大概会在Direct Memory中存多少数据。

后续

最近发现有两个任务也报255,但是并不是Direct Memory超用,而是堆内存超用,所以调大了TM的内存。
使用堆内存主要是Bucket对象,如果分区时间选择不合理,会导致分区很多,分配了大量Bucket,导致堆内存OOM。

上一篇 下一篇

猜你喜欢

热点阅读