Flink学习指南Java玩转大数据

Flink 使用之重启策略

2021-12-16  本文已影响0人  AlienPaul

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

Flink作业重启策略

实际生产作业中,我们期望Flink作业遇到错误的时候,能够自动重启恢复到正常运行状态。

Flink支持多种作业重启策略,但默认作业重启策略为none,即不会自动重启。作业一旦出现异常,会被标记为failed直接退出。

接下来为大家带来Flink支持的重启策略类型和配置方法。

重启策略类型

Flink支持的重启策略类型如下:

注意:如果启用了checkpoint并且没有显式配置重启策略,会默认使用fixeddelay策略,最大重试次数为Integer.MAX_VALUE

全局配置

全局配置影响Flink提交的所有作业的。修改全局配置需要编辑flink-conf.yaml文件。

配置重启策略的方式:

restart-strategy: none, off, disable | fixeddelay, fixed-delay | failurerate, failure-rate | exponentialdelay, exponential-delay

接下来分别列出各个重启策略专属的配置参数和含义。

fixeddelay

# 尝试重启次数
restart-strategy.fixed-delay.attempts: 10
# 两次连续重启的间隔时间
restart-strategy.fixed-delay.delay: 20 s

failurerate

# 两次连续重启的间隔时间
restart-strategy.failure-rate.delay: 10 s
# 计算失败率的统计时间跨度
restart-strategy.failure-rate.failure-rate-interval: 2 min
# 计算失败率的统计时间内的最大失败次数
restart-strategy.failure-rate.max-failures-per-interval: 10

exponentialdelay

# 初次失败后重启时间间隔(初始值)
restart-strategy.exponential-delay.initial-backoff: 1 s
# 以后每次失败,重启时间间隔为上一次重启时间间隔乘以这个值
restart-strategy.exponential-delay.backoff-multiplier: 2
# 每次重启间隔时间的最大抖动值(加或减去该配置项范围内的一个随机数),防止大量作业在同一时刻重启
restart-strategy.exponential-delay.jitter-factor: 0.1
# 最大重启时间间隔,超过这个最大值后,重启时间间隔不再增大
restart-strategy.exponential-delay.max-backoff: 1 min
# 多长时间作业运行无失败后,重启间隔时间会重置为初始值(第一个配置项的值)
restart-strategy.exponential-delay.reset-backoff-threshold: 1 h

作业级别配置

作业级别的配置仅仅会影响到单个job的行为,通常使用代码的方式。通过调用env.setRestartStrategy方法,可以为指定作业级别的重启策略。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRestartStrategy(...)

创建重启策略需要用到RestartStrategies这个类。接下来讲解下如何创建第二节所述的各种重启策略。

fixeddelay

// 设置重启次数为10,重启间隔时间为1000ms
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1000))
// 时间既可以使用long类型(毫秒为单位),也可以使用org.apache.flink.api.common.time.Time类型,更加直观
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.milliseconds(1000)))

failurerate

// 设置如果10分钟内失败10次不再尝试重启,每次重启间隔5秒
env.setRestartStrategy(RestartStrategies.failureRateRestart(10, Time.minutes(10), Time.seconds(5)))

exponentialdelay

// 设置初始重启间隔为1秒,最大重启间隔为5分钟,每次失败重启间隔扩大2倍,1小时内作业无失败重启时间间隔重置,抖动值为0.5
env.setRestartStrategy(
    RestartStrategies.exponentialDelayRestart(
      Time.seconds(1),
      Time.minutes(5),
      2.0,
      Time.hours(1),
      0.5
    )
)

演示代码

为了在测试环境演示自动重启效果,我们可以使用socketTextStream实时接收用户输入,通过输入错误来模拟程序崩溃的场景。

一段示例代码如下所示:

def main(args: Array[String]): Unit = {
    val conf = new Configuration
    // IDE本地运行启用WebUI
    conf.setInteger(RestOptions.PORT, 8080)
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 设置重启策略为fixedDelay
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1000))

    val stream = env.socketTextStream("127.0.0.1", 9000)
    // 这里将输入转换为整数,潜在的崩溃点
    .map(s => s.toInt)
    .print()

    env.execute()
}

通过这段代码,我们可以模拟字符串转整数报错,从而引起作业重启。

接下来我们重点观察下作业重启过程的日志输出。

首先开启socket端口,启动作业。作业启动成功后,我们看到类似如下输出:

11:16:44,458 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Print to Std. Out (2/12) (9992bdbd2191dfee866dd6f47c8d7815) switched from INITIALIZING to RUNNING.
11:16:44,458 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Print to Std. Out (11/12) (efa872145cbb71236739c2cffc085cc9) switched from INITIALIZING to RUNNING.
11:16:44,458 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Print to Std. Out (9/12) (dce870ff8e69fcbd2fc9dcdec454d4f6) switched from INITIALIZING to RUNNING.
11:16:44,459 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Print to Std. Out (6/12) (c86cd1b0771887f188bd96acca83e2db) switched from INITIALIZING to RUNNING.

说明作业状态已从INITIALIZING切换为RUNNING,作业启动成功。

然后故意输入一个无法转换为整数类型的字符,例如s。我们在程序报错之后看到了如下内容:

11:17:48,964 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy  - Calculating tasks to restart to recover the failed task 20ba6b65f97481d5570070de90e4e791_7.
11:17:48,964 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy  - 13 tasks should be restarted to recover the failed task 20ba6b65f97481d5570070de90e4e791_7. 
11:17:48,966 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (9dd4540acc7f786b009bd8e986d9a9d0) switched from state RUNNING to RESTARTING.

Flink首先计算需要重启的task数量,然后将Job的状态从RUNNING切换为RESTARTING。接下来和正常启动的流程基本一致。有兴趣的读者可以尝试下上述其他重启策略的具体行为。

上一篇 下一篇

猜你喜欢

热点阅读