A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© YXW95 初级黑马   /  2019-12-13 11:05  /  1305 人查看  /  0 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

本帖最后由 YXW95 于 2019-12-13 11:23 编辑

       Flink的重启策略
        Flink支持不同的重启策略,这些重启策略控制着job失败后如何重启。集群可以通过默认的重启策略来重启,这个默认的重启策略通常在未指定重启策略的情况下使用,而如果Job提交的时候指定了重启策略,这个重启策略就会覆盖掉集群的默认重启策略。

  概览
     默认的重启策略是通过Flink的flink-conf.yaml来指定的,这个配置参数restart-strategy定义了哪种策略会被采用。如果checkpoint未启动,就会采用no restart策略,如果启动了checkpoint机制,但是未指定重启策略的话,就会采用fixed-delay策略,重试Integer.MAX_VALUE次。请参考下面的可用重启策略来了解哪些值是支持的。
每个重启策略都有自己的参数来控制它的行为,这些值也可以在配置文件中设置,每个重启策略的描述都包含着各自的配置值信息。

重启策略
重启策略值
Fixed delay
fixed-delay
Failure rate
failure-rate
No restart
None

    除了定义一个默认的重启策略之外,你还可以为每一个Job指定它自己的重启策略,这个重启策略可以在ExecutionEnvironment中调用setRestartStrategy()方法来程序化地调用,主意这种方式同样适用于StreamExecutionEnvironment。

    下面的例子展示了我们如何为我们的Job设置一个固定延迟重启策略,一旦有失败,系统就会尝试每10秒重启一次,重启3次。
             val env = ExecutionEnvironment.getExecutionEnvironment()
             env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            3, // 重启次数Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔))

          下面部分描述了重启策略特定的配置项

   固定延迟重启策略(Fixed Delay Restart Strategy)
     固定延迟重启策略会尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。
      重启策略可以配置flink-conf.yaml的下面配置参数来启用,作为默认的重启策略:

配置参数

描述
默认值
restart-strategy.fixed-delay.attempts

在Job最终宣告失败之前,Flink尝试执行的次数
1,如果启用checkpoint的话是Integer.MAX_VALUE
restart-strategy.fixed-delay.delay

延迟重启意味着一个执行失败之后,并不会立即重启,而是要等待一段时间。
akka.ask.timeout,如果启用checkpoint的话是1s

      例子:
                    restart-strategy.fixed-delay.attempts: 3
                    restart-strategy.fixed-delay.delay: 10 s

  固定延迟重启也可以在程序中设置:
           val env = ExecutionEnvironment.getExecutionEnvironment()
          env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
          3, // 重启次数Time.of(10, TimeUnit.SECONDS) // 重启时间间隔))

    失败率重启策略
       失败率重启策略在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。
       失败率重启策略可以在flink-conf.yaml中设置下面的配置参数来启用:

配置参数
描述
默认值
restart-strategy.failure-rate.max-failures-per-interval
在一个Job认定为失败之前,最大的重启次数
1
restart-strategy.failure-rate.failure-rate-interval
计算失败率的时间间隔
1分钟
restart-strategy.failure-rate.delay
两次连续重启尝试之间的时间间隔
akka.ask.timeout

       例子:
       restart-strategy.failure-rate.max-failures-per-interval: 3
       restart-strategy.failure-rate.failure-rate-interval: 5 min
       restart-strategy.failure-rate.delay: 10 s

       失败率重启策略也可以在程序中设置:

       val env = ExecutionEnvironment.getExecutionEnvironment()
       env.setRestartStrategy(RestartStrategies.failureRateRestart(
      3, // 每个测量时间间隔最大失败次数Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔Time.of(10, TimeUnit.SECONDS) // 两次连续重启尝试的时间间隔))

    无重启策略
           Job直接失败,不会尝试进行重启
          restart-strategy: none

          无重启策略也可以在程序中设置
         val env = ExecutionEnvironment.getExecutionEnvironment()
         env.setRestartStrategy(RestartStrategies.noRestart())
例子
     输入五次zhangsan,程序挂掉
[Java] 纯文本查看 复制代码
[table]
[tr][td=600][align=left][b]import [/b]org.apache.flink.api.common.restartstrategy.RestartStrategies
[b]import [/b]org.apache.flink.runtime.state.filesystem.FsStateBackend
[b]import [/b]org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
[b]import [/b]org.apache.flink.streaming.api.scala._

[b]object [/b]FixDelayRestartStrategiesDemo {

  [b]def [/b]main(args: Array[String]): Unit = {
    [b]val [/b]env = StreamExecutionEnvironment.[i]getExecutionEnvironment[/i][i]//如果想要开启重启策略,就必须开启CheckPoint[/i]env.enableCheckpointing(5000L)

    [i]//指定状态存储后端,默认就是内存[/i][i]    //现在指定的是FsStateBackend,支持本地系统、[/i][i]    //new FsStateBackend要指定存储系统的协议: scheme (hdfs://, file://, etc)[/i]env.setStateBackend([b]new [/b]FsStateBackend(args(0)))

    [i]//如果程序被cancle,保留以前做的checkpoint[/i]env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.[i]RETAIN_ON_CANCELLATION[/i])

    [i]//指定以后存储多个checkpoint目录[/i]env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)

    [i]//指定重启策略,默认的重启策略是不停的重启[/i][i]    //程序出现异常是会重启,重启五次,每次延迟5秒,如果超过了5次,程序退出[/i]env.setRestartStrategy(RestartStrategies.[i]fixedDelayRestart[/i](5, 5000))

    [b]val [/b]lines: DataStream[String] = env.socketTextStream(args(1), 8888)

    [b]val [/b]result = lines.flatMap(_.split([b]" "[/b]).map(word => {
      [b]if[/b](word.equals([b]"zhangsan"[/b])) {
        [b]throw new [/b]RuntimeException([b]"zhangsan,程序重启!"[/b]);
      }
      (word, 1)
    })).keyBy(0).sum(1)
    result.print()
    env.execute()
  }
}[/align][/td][/tr]
[/table]


       例子
      需求
         假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,然后对统计的结果值进行checkpoint处理

数据规划
      1) 使用自定义算子每秒钟产生大约10000条数据。
       2) 产生的数据为一个四元组(Long,String,String,Integer)—------(id,name,info,count)
       3) 数据经统计后,统计结果打印到终端输出
      4) 打印输出的结果为Long类型的数据

  开发思路  
        1) source算子每隔1秒钟发送10000条数据,并注入到Window算子中。
        2) window算子每隔1秒钟统计一次最近4秒钟内数据数量。
        3) 每隔1秒钟将统计结果打印到终端
         4) 每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。

      开发步骤:
           1.获取流处理执行环境
           2.设置检查点机制
          3.自定义数据源
          4.数据分组
          5.划分时间窗口
          6.数据聚合
          7.数据打印
          8.触发执行
[Java] 纯文本查看 复制代码
[align=center][table]
[tr][td=602][align=left][i]//发送数据形式[/i][b]case class [/b]SEvent(id: Long, name: String, info: String, count: Int)[/align]
[align=left][b]class [/b]SEventSourceWithChk [b]extends [/b]RichSourceFunction[SEvent]{
  [b]private var [/b][i]count [/i]= 0L
  [b]private var [/b][i]isRunning [/i]= [b]true[/b][b]  private val [/b][i]alphabet [/i]= [b]"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"[/b][i]// 任务取消时调用[/i][b]override def [/b]cancel(): Unit = {
    [i]isRunning [/i]= [b]false[/b]}
  [i]//// source算子的逻辑,即:每秒钟向流图中注入10000个元组[/i][b]override def [/b]run(sourceContext: SourceContext[SEvent]): Unit = {
    [b]while[/b]([i]isRunning[/i]) {
      [b]for [/b](i <- 0 until 10000) {
        sourceContext.collect([i]SEvent[/i](1, [b]"hello-"[/b]+[i]count[/i], [i]alphabet[/i],1))
        [i]count [/i]+= 1L
      }
      Thread.[i]sleep[/i](1000)
    }
  }
}[/align]
[align=left][i]/**[/i][i]该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使 用了event time。[/i][i]  */[/i][b]object [/b]FlinkEventTimeAPIChkMain {
  [b]def [/b]main(args: Array[String]): Unit ={
    [b]val [/b]env = StreamExecutionEnvironment.[i]getExecutionEnvironment[/i]env.setStateBackend([b]new [/b]FsStateBackend([b]"hdfs://hadoop01:9000/flink-checkpoint/checkpoint/"[/b]))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.[i]EXACTLY_ONCE[/i])
    env.getCheckpointConfig.setCheckpointInterval(6000)
    env.setStreamTimeCharacteristic(TimeCharacteristic.[i]EventTime[/i])[/align][align=left][i]//保留策略:默认情况下,检查点不会被保留,仅用于故障中恢复作业,可以启用外部持久化检查点,同时指定保留策略[/i][i]//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业取消时保留检查点,注意在这种情况下,您必须在取消后手动清理检查点状态[/i][i]//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业被cancel时,删除检查点,检查点状态仅在作业失败时可用[/i]env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.[i]DELETE_ON_CANCELLATION[/i])[/align][align=left]
    [i]// 应用逻辑[/i][b]val [/b]source: DataStream[SEvent] = env.addSource([b]new [/b]SEventSourceWithChk)
    source.assignTimestampsAndWatermarks([b]new [/b]AssignerWithPeriodicWatermarks[SEvent] {
      [i]// 设置watermark[/i][b]override def [/b]getCurrentWatermark: Watermark = {
        [b]new [/b]Watermark(System.[i]currentTimeMillis[/i]())
      }
      [i]// 给每个元组打上时间戳[/i][b]override def [/b]extractTimestamp(t: SEvent, l: Long): Long = {
        System.[i]currentTimeMillis[/i]()
      }
    })
      .keyBy(0)
      .window(SlidingEventTimeWindows.[i]of[/i](Time.[i]seconds[/i](4), Time.[i]seconds[/i](1)))
      .apply([b]new [/b]WindowStatisticWithChk)
      .print()
    env.execute()
  }
}

[i]//该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。[/i][i]// 用户自定义状态[/i][b]class [/b]UDFState [b]extends [/b]Serializable{
  [b]private var [/b][i]count [/i]= 0L
  [i]// 设置用户自定义状态[/i][b]def [/b]setState(s: Long) = [i]count [/i]= s
  [i]// 获取用户自定状态[/i][b]def [/b]getState = [i]count[/i]}
[i]//该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。[/i][b]class [/b]WindowStatisticWithChk [b]extends [/b]WindowFunction[SEvent, Long, Tuple, TimeWindow] [b]with [/b]ListCheckpointed[UDFState]{
  [b]private var [/b][i]total [/i]= 0L

  [i]// window算子的实现逻辑,即:统计window中元组的数量[/i][b]override def [/b]apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = {
    [b]var [/b]count = 0L
    [b]for [/b](event <- input) {
      count += 1L
    }
    [i]total [/i]+= count
    out.collect(count)
  }
  [i]// 从自定义快照中恢复状态[/i][b]override def [/b]restoreState(state: util.List[UDFState]): Unit = {
    [b]val [/b]udfState = state.get(0)
    [i]total [/i]= udfState.getState
  }

  [i]// 制作自定义状态快照[/i][b]override def [/b]snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = {
    [b]val [/b]udfList: util.ArrayList[UDFState] = [b]new [/b]util.ArrayList[UDFState]
    [b]val [/b]udfState = [b]new [/b]UDFState
    udfState.setState([i]total[/i])
    udfList.add(udfState)
    udfList
  }
}[/align]




0 个回复

您需要登录后才可以回帖 登录 | 加入黑马