Flink 使用之重启策略
Flink 使用介绍相关文档目录
Flink作业重启策略
实际生产作业中,我们期望Flink作业遇到错误的时候,能够自动重启恢复到正常运行状态。
Flink支持多种作业重启策略,但默认作业重启策略为none,即不会自动重启。作业一旦出现异常,会被标记为failed直接退出。
接下来为大家带来Flink支持的重启策略类型和配置方法。
重启策略类型
Flink支持的重启策略类型如下:
- none, off, disable:无重启策略,作业遇到问题直接失败,不会重启。
- fixeddelay, fixed-delay:作业失败后,延迟一定时间重启。但是有最大重启次数限制,超过这个限制后作业失败,不再重启。
- failurerate, failure-rate:作业失败后,延迟一定时间重启。但是有最大失败率限制。如果一定时间内作业失败次数超过配置值,则标记为真的失败,不再重启。
- exponentialdelay, exponential-delay:作业失败后重启延迟时间随着失败次数指数递增。没有最大重启次数限制,无限尝试重启作业。
注意:如果启用了checkpoint并且没有显式配置重启策略,会默认使用fixeddelay策略,最大重试次数为
Integer.MAX_VALUE
。
全局配置
全局配置影响Flink提交的所有作业的。修改全局配置需要编辑flink-conf.yaml
文件。
配置重启策略的方式:
restart-strategy: none, off, disable | fixeddelay, fixed-delay | failurerate, failure-rate | exponentialdelay, exponential-delay
接下来分别列出各个重启策略专属的配置参数和含义。
fixeddelay
# 尝试重启次数
restart-strategy.fixed-delay.attempts: 10
# 两次连续重启的间隔时间
restart-strategy.fixed-delay.delay: 20 s
failurerate
# 两次连续重启的间隔时间
restart-strategy.failure-rate.delay: 10 s
# 计算失败率的统计时间跨度
restart-strategy.failure-rate.failure-rate-interval: 2 min
# 计算失败率的统计时间内的最大失败次数
restart-strategy.failure-rate.max-failures-per-interval: 10
exponentialdelay
# 初次失败后重启时间间隔(初始值)
restart-strategy.exponential-delay.initial-backoff: 1 s
# 以后每次失败,重启时间间隔为上一次重启时间间隔乘以这个值
restart-strategy.exponential-delay.backoff-multiplier: 2
# 每次重启间隔时间的最大抖动值(加或减去该配置项范围内的一个随机数),防止大量作业在同一时刻重启
restart-strategy.exponential-delay.jitter-factor: 0.1
# 最大重启时间间隔,超过这个最大值后,重启时间间隔不再增大
restart-strategy.exponential-delay.max-backoff: 1 min
# 多长时间作业运行无失败后,重启间隔时间会重置为初始值(第一个配置项的值)
restart-strategy.exponential-delay.reset-backoff-threshold: 1 h
作业级别配置
作业级别的配置仅仅会影响到单个job的行为,通常使用代码的方式。通过调用env.setRestartStrategy
方法,可以为指定作业级别的重启策略。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRestartStrategy(...)
创建重启策略需要用到RestartStrategies
这个类。接下来讲解下如何创建第二节所述的各种重启策略。
fixeddelay
// 设置重启次数为10,重启间隔时间为1000ms
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1000))
// 时间既可以使用long类型(毫秒为单位),也可以使用org.apache.flink.api.common.time.Time类型,更加直观
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.milliseconds(1000)))
failurerate
// 设置如果10分钟内失败10次不再尝试重启,每次重启间隔5秒
env.setRestartStrategy(RestartStrategies.failureRateRestart(10, Time.minutes(10), Time.seconds(5)))
exponentialdelay
// 设置初始重启间隔为1秒,最大重启间隔为5分钟,每次失败重启间隔扩大2倍,1小时内作业无失败重启时间间隔重置,抖动值为0.5
env.setRestartStrategy(
RestartStrategies.exponentialDelayRestart(
Time.seconds(1),
Time.minutes(5),
2.0,
Time.hours(1),
0.5
)
)
演示代码
为了在测试环境演示自动重启效果,我们可以使用socketTextStream
实时接收用户输入,通过输入错误来模拟程序崩溃的场景。
一段示例代码如下所示:
def main(args: Array[String]): Unit = {
val conf = new Configuration
// IDE本地运行启用WebUI
conf.setInteger(RestOptions.PORT, 8080)
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
// 设置重启策略为fixedDelay
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1000))
val stream = env.socketTextStream("127.0.0.1", 9000)
// 这里将输入转换为整数,潜在的崩溃点
.map(s => s.toInt)
.print()
env.execute()
}
通过这段代码,我们可以模拟字符串转整数报错,从而引起作业重启。
接下来我们重点观察下作业重启过程的日志输出。
首先开启socket端口,启动作业。作业启动成功后,我们看到类似如下输出:
11:16:44,458 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Print to Std. Out (2/12) (9992bdbd2191dfee866dd6f47c8d7815) switched from INITIALIZING to RUNNING.
11:16:44,458 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Print to Std. Out (11/12) (efa872145cbb71236739c2cffc085cc9) switched from INITIALIZING to RUNNING.
11:16:44,458 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Print to Std. Out (9/12) (dce870ff8e69fcbd2fc9dcdec454d4f6) switched from INITIALIZING to RUNNING.
11:16:44,459 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Print to Std. Out (6/12) (c86cd1b0771887f188bd96acca83e2db) switched from INITIALIZING to RUNNING.
说明作业状态已从INITIALIZING
切换为RUNNING
,作业启动成功。
然后故意输入一个无法转换为整数类型的字符,例如s
。我们在程序报错之后看到了如下内容:
11:17:48,964 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task 20ba6b65f97481d5570070de90e4e791_7.
11:17:48,964 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - 13 tasks should be restarted to recover the failed task 20ba6b65f97481d5570070de90e4e791_7.
11:17:48,966 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (9dd4540acc7f786b009bd8e986d9a9d0) switched from state RUNNING to RESTARTING.
Flink首先计算需要重启的task数量,然后将Job的状态从RUNNING
切换为RESTARTING
。接下来和正常启动的流程基本一致。有兴趣的读者可以尝试下上述其他重启策略的具体行为。