黑马程序员技术交流社区

标题: Flink的重启策略 [打印本页]

作者: YXW95    时间: 2019-12-13 11:05
标题: Flink的重启策略
本帖最后由 YXW95 于 2019-12-13 11:23 编辑

       Flink的重启策略
        Flink支持不同的重启策略,这些重启策略控制着job失败后如何重启。集群可以通过默认的重启策略来重启,这个默认的重启策略通常在未指定重启策略的情况下使用,而如果Job提交的时候指定了重启策略,这个重启策略就会覆盖掉集群的默认重启策略。

  概览
     默认的重启策略是通过Flink的flink-conf.yaml来指定的,这个配置参数restart-strategy定义了哪种策略会被采用。如果checkpoint未启动,就会采用no restart策略,如果启动了checkpoint机制,但是未指定重启策略的话,就会采用fixed-delay策略,重试Integer.MAX_VALUE次。请参考下面的可用重启策略来了解哪些值是支持的。
每个重启策略都有自己的参数来控制它的行为,这些值也可以在配置文件中设置,每个重启策略的描述都包含着各自的配置值信息。

重启策略
重启策略值
Fixed delay
fixed-delay
Failure rate
failure-rate
No restart
None

    除了定义一个默认的重启策略之外,你还可以为每一个Job指定它自己的重启策略,这个重启策略可以在ExecutionEnvironment中调用setRestartStrategy()方法来程序化地调用,主意这种方式同样适用于StreamExecutionEnvironment。

    下面的例子展示了我们如何为我们的Job设置一个固定延迟重启策略,一旦有失败,系统就会尝试每10秒重启一次,重启3次。
             val env = ExecutionEnvironment.getExecutionEnvironment()
             env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            3, // 重启次数Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔))

          下面部分描述了重启策略特定的配置项

   固定延迟重启策略(Fixed Delay Restart Strategy)
     固定延迟重启策略会尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。
      重启策略可以配置flink-conf.yaml的下面配置参数来启用,作为默认的重启策略:

配置参数

描述
默认值
restart-strategy.fixed-delay.attempts

在Job最终宣告失败之前,Flink尝试执行的次数
1,如果启用checkpoint的话是Integer.MAX_VALUE
restart-strategy.fixed-delay.delay

延迟重启意味着一个执行失败之后,并不会立即重启,而是要等待一段时间。
akka.ask.timeout,如果启用checkpoint的话是1s

      例子:
                    restart-strategy.fixed-delay.attempts: 3
                    restart-strategy.fixed-delay.delay: 10 s

  固定延迟重启也可以在程序中设置:
           val env = ExecutionEnvironment.getExecutionEnvironment()
          env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
          3, // 重启次数Time.of(10, TimeUnit.SECONDS) // 重启时间间隔))

    失败率重启策略
       失败率重启策略在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。
       失败率重启策略可以在flink-conf.yaml中设置下面的配置参数来启用:

配置参数
描述
默认值
restart-strategy.failure-rate.max-failures-per-interval
在一个Job认定为失败之前,最大的重启次数
1
restart-strategy.failure-rate.failure-rate-interval
计算失败率的时间间隔
1分钟
restart-strategy.failure-rate.delay
两次连续重启尝试之间的时间间隔
akka.ask.timeout

       例子:
       restart-strategy.failure-rate.max-failures-per-interval: 3
       restart-strategy.failure-rate.failure-rate-interval: 5 min
       restart-strategy.failure-rate.delay: 10 s

       失败率重启策略也可以在程序中设置:

       val env = ExecutionEnvironment.getExecutionEnvironment()
       env.setRestartStrategy(RestartStrategies.failureRateRestart(
      3, // 每个测量时间间隔最大失败次数Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔Time.of(10, TimeUnit.SECONDS) // 两次连续重启尝试的时间间隔))

    无重启策略
           Job直接失败,不会尝试进行重启
          restart-strategy: none

          无重启策略也可以在程序中设置
         val env = ExecutionEnvironment.getExecutionEnvironment()
         env.setRestartStrategy(RestartStrategies.noRestart())
例子
     输入五次zhangsan,程序挂掉
[Java] 纯文本查看 复制代码
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala._

object FixDelayRestartStrategiesDemo {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment//如果想要开启重启策略,就必须开启CheckPointenv.enableCheckpointing(5000L)

    //指定状态存储后端,默认就是内存    //现在指定的是FsStateBackend,支持本地系统、    //new FsStateBackend要指定存储系统的协议: scheme (hdfs://, file://, etc)env.setStateBackend(new FsStateBackend(args(0)))

    //如果程序被cancle,保留以前做的checkpointenv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    //指定以后存储多个checkpoint目录env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)

    //指定重启策略,默认的重启策略是不停的重启    //程序出现异常是会重启,重启五次,每次延迟5秒,如果超过了5次,程序退出env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000))

    val lines: DataStream[String] = env.socketTextStream(args(1), 8888)

    val result = lines.flatMap(_.split(" ").map(word => {
      if(word.equals("zhangsan")) {
        throw new RuntimeException("zhangsan,程序重启!");
      }
      (word, 1)
    })).keyBy(0).sum(1)
    result.print()
    env.execute()
  }
}



       例子
      需求
         假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,然后对统计的结果值进行checkpoint处理

数据规划
      1) 使用自定义算子每秒钟产生大约10000条数据。
       2) 产生的数据为一个四元组(Long,String,String,Integer)—------(id,name,info,count)
       3) 数据经统计后,统计结果打印到终端输出
      4) 打印输出的结果为Long类型的数据

  开发思路  
        1) source算子每隔1秒钟发送10000条数据,并注入到Window算子中。
        2) window算子每隔1秒钟统计一次最近4秒钟内数据数量。
        3) 每隔1秒钟将统计结果打印到终端
         4) 每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。

      开发步骤:
           1.获取流处理执行环境
           2.设置检查点机制
          3.自定义数据源
          4.数据分组
          5.划分时间窗口
          6.数据聚合
          7.数据打印
          8.触发执行
[Java] 纯文本查看 复制代码

//发送数据形式case class SEvent(id: Long, name: String, info: String, count: Int)

class SEventSourceWithChk extends RichSourceFunction[SEvent]{
  private var count = 0L
  private var isRunning = true  private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"// 任务取消时调用override def cancel(): Unit = {
    isRunning = false}
  //// source算子的逻辑,即:每秒钟向流图中注入10000个元组override def run(sourceContext: SourceContext[SEvent]): Unit = {
    while(isRunning) {
      for (i <- 0 until 10000) {
        sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
        count += 1L
      }
      Thread.sleep(1000)
    }
  }
}

/**该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使 用了event time。  */object FlinkEventTimeAPIChkMain {
  def main(args: Array[String]): Unit ={
    val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointInterval(6000)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//保留策略:默认情况下,检查点不会被保留,仅用于故障中恢复作业,可以启用外部持久化检查点,同时指定保留策略//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业取消时保留检查点,注意在这种情况下,您必须在取消后手动清理检查点状态//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业被cancel时,删除检查点,检查点状态仅在作业失败时可用env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)

    // 应用逻辑val source: DataStream[SEvent] = env.addSource(new SEventSourceWithChk)
    source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] {
      // 设置watermarkoverride def getCurrentWatermark: Watermark = {
        new Watermark(System.currentTimeMillis())
      }
      // 给每个元组打上时间戳override def extractTimestamp(t: SEvent, l: Long): Long = {
        System.currentTimeMillis()
      }
    })
      .keyBy(0)
      .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)))
      .apply(new WindowStatisticWithChk)
      .print()
    env.execute()
  }
}

//该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。// 用户自定义状态class UDFState extends Serializable{
  private var count = 0L
  // 设置用户自定义状态def setState(s: Long) = count = s
  // 获取用户自定状态def getState = count}
//该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{
  private var total = 0L

  // window算子的实现逻辑,即:统计window中元组的数量override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = {
    var count = 0L
    for (event <- input) {
      count += 1L
    }
    total += count
    out.collect(count)
  }
  // 从自定义快照中恢复状态override def restoreState(state: util.List[UDFState]): Unit = {
    val udfState = state.get(0)
    total = udfState.getState
  }

  // 制作自定义状态快照override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = {
    val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState]
    val udfState = new UDFState
    udfState.setState(total)
    udfList.add(udfState)
    udfList
  }
}









欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2