本帖最后由 YXW95 于 2019-12-13 11:23 编辑
Flink的重启策略 Flink支持不同的重启策略,这些重启策略控制着job失败后如何重启。集群可以通过默认的重启策略来重启,这个默认的重启策略通常在未指定重启策略的情况下使用,而如果Job提交的时候指定了重启策略,这个重启策略就会覆盖掉集群的默认重启策略。
概览 默认的重启策略是通过Flink的flink-conf.yaml来指定的,这个配置参数restart-strategy定义了哪种策略会被采用。如果checkpoint未启动,就会采用no restart策略,如果启动了checkpoint机制,但是未指定重启策略的话,就会采用fixed-delay策略,重试Integer.MAX_VALUE次。请参考下面的可用重启策略来了解哪些值是支持的。 每个重启策略都有自己的参数来控制它的行为,这些值也可以在配置文件中设置,每个重启策略的描述都包含着各自的配置值信息。
除了定义一个默认的重启策略之外,你还可以为每一个Job指定它自己的重启策略,这个重启策略可以在ExecutionEnvironment中调用setRestartStrategy()方法来程序化地调用,主意这种方式同样适用于StreamExecutionEnvironment。
下面的例子展示了我们如何为我们的Job设置一个固定延迟重启策略,一旦有失败,系统就会尝试每10秒重启一次,重启3次。 val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔))
下面部分描述了重启策略特定的配置项
固定延迟重启策略(Fixed Delay Restart Strategy) 固定延迟重启策略会尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。 重启策略可以配置flink-conf.yaml的下面配置参数来启用,作为默认的重启策略:
|
| | | restart-strategy.fixed-delay.attempts |
| 在Job最终宣告失败之前,Flink尝试执行的次数 | 1,如果启用checkpoint的话是Integer.MAX_VALUE | restart-strategy.fixed-delay.delay |
| 延迟重启意味着一个执行失败之后,并不会立即重启,而是要等待一段时间。 | akka.ask.timeout,如果启用checkpoint的话是1s |
例子: restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10 s
固定延迟重启也可以在程序中设置: val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数Time.of(10, TimeUnit.SECONDS) // 重启时间间隔))
失败率重启策略 失败率重启策略在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。 失败率重启策略可以在flink-conf.yaml中设置下面的配置参数来启用:
| | | restart-strategy.failure-rate.max-failures-per-interval | | | restart-strategy.failure-rate.failure-rate-interval | | | restart-strategy.failure-rate.delay | | |
例子: restart-strategy.failure-rate.max-failures-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 5 min restart-strategy.failure-rate.delay: 10 s
失败率重启策略也可以在程序中设置:
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个测量时间间隔最大失败次数Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔Time.of(10, TimeUnit.SECONDS) // 两次连续重启尝试的时间间隔))
无重启策略 Job直接失败,不会尝试进行重启 restart-strategy: none
无重启策略也可以在程序中设置 val env = ExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.noRestart()) 例子 输入五次zhangsan,程序挂掉 [Java] 纯文本查看 复制代码 [table]
[tr][td=600][align=left][b]import [/b]org.apache.flink.api.common.restartstrategy.RestartStrategies
[b]import [/b]org.apache.flink.runtime.state.filesystem.FsStateBackend
[b]import [/b]org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
[b]import [/b]org.apache.flink.streaming.api.scala._
[b]object [/b]FixDelayRestartStrategiesDemo {
[b]def [/b]main(args: Array[String]): Unit = {
[b]val [/b]env = StreamExecutionEnvironment.[i]getExecutionEnvironment[/i][i]//如果想要开启重启策略,就必须开启CheckPoint[/i]env.enableCheckpointing(5000L)
[i]//指定状态存储后端,默认就是内存[/i][i] //现在指定的是FsStateBackend,支持本地系统、[/i][i] //new FsStateBackend要指定存储系统的协议: scheme (hdfs://, file://, etc)[/i]env.setStateBackend([b]new [/b]FsStateBackend(args(0)))
[i]//如果程序被cancle,保留以前做的checkpoint[/i]env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.[i]RETAIN_ON_CANCELLATION[/i])
[i]//指定以后存储多个checkpoint目录[/i]env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
[i]//指定重启策略,默认的重启策略是不停的重启[/i][i] //程序出现异常是会重启,重启五次,每次延迟5秒,如果超过了5次,程序退出[/i]env.setRestartStrategy(RestartStrategies.[i]fixedDelayRestart[/i](5, 5000))
[b]val [/b]lines: DataStream[String] = env.socketTextStream(args(1), 8888)
[b]val [/b]result = lines.flatMap(_.split([b]" "[/b]).map(word => {
[b]if[/b](word.equals([b]"zhangsan"[/b])) {
[b]throw new [/b]RuntimeException([b]"zhangsan,程序重启!"[/b]);
}
(word, 1)
})).keyBy(0).sum(1)
result.print()
env.execute()
}
}[/align][/td][/tr]
[/table]
例子
需求 假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,然后对统计的结果值进行checkpoint处理
数据规划 1) 使用自定义算子每秒钟产生大约10000条数据。 2) 产生的数据为一个四元组(Long,String,String,Integer)—------(id,name,info,count) 3) 数据经统计后,统计结果打印到终端输出 4) 打印输出的结果为Long类型的数据
开发思路 1) source算子每隔1秒钟发送10000条数据,并注入到Window算子中。 2) window算子每隔1秒钟统计一次最近4秒钟内数据数量。 3) 每隔1秒钟将统计结果打印到终端 4) 每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。
开发步骤: 1.获取流处理执行环境 2.设置检查点机制 3.自定义数据源 4.数据分组 5.划分时间窗口 6.数据聚合 7.数据打印 8.触发执行 [Java] 纯文本查看 复制代码
[align=center][table]
[tr][td=602][align=left][i]//发送数据形式[/i][b]case class [/b]SEvent(id: Long, name: String, info: String, count: Int)[/align]
[align=left][b]class [/b]SEventSourceWithChk [b]extends [/b]RichSourceFunction[SEvent]{
[b]private var [/b][i]count [/i]= 0L
[b]private var [/b][i]isRunning [/i]= [b]true[/b][b] private val [/b][i]alphabet [/i]= [b]"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"[/b][i]// 任务取消时调用[/i][b]override def [/b]cancel(): Unit = {
[i]isRunning [/i]= [b]false[/b]}
[i]//// source算子的逻辑,即:每秒钟向流图中注入10000个元组[/i][b]override def [/b]run(sourceContext: SourceContext[SEvent]): Unit = {
[b]while[/b]([i]isRunning[/i]) {
[b]for [/b](i <- 0 until 10000) {
sourceContext.collect([i]SEvent[/i](1, [b]"hello-"[/b]+[i]count[/i], [i]alphabet[/i],1))
[i]count [/i]+= 1L
}
Thread.[i]sleep[/i](1000)
}
}
}[/align]
[align=left][i]/**[/i][i]该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使 用了event time。[/i][i] */[/i][b]object [/b]FlinkEventTimeAPIChkMain {
[b]def [/b]main(args: Array[String]): Unit ={
[b]val [/b]env = StreamExecutionEnvironment.[i]getExecutionEnvironment[/i]env.setStateBackend([b]new [/b]FsStateBackend([b]"hdfs://hadoop01:9000/flink-checkpoint/checkpoint/"[/b]))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.[i]EXACTLY_ONCE[/i])
env.getCheckpointConfig.setCheckpointInterval(6000)
env.setStreamTimeCharacteristic(TimeCharacteristic.[i]EventTime[/i])[/align][align=left][i]//保留策略:默认情况下,检查点不会被保留,仅用于故障中恢复作业,可以启用外部持久化检查点,同时指定保留策略[/i][i]//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业取消时保留检查点,注意在这种情况下,您必须在取消后手动清理检查点状态[/i][i]//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业被cancel时,删除检查点,检查点状态仅在作业失败时可用[/i]env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.[i]DELETE_ON_CANCELLATION[/i])[/align][align=left]
[i]// 应用逻辑[/i][b]val [/b]source: DataStream[SEvent] = env.addSource([b]new [/b]SEventSourceWithChk)
source.assignTimestampsAndWatermarks([b]new [/b]AssignerWithPeriodicWatermarks[SEvent] {
[i]// 设置watermark[/i][b]override def [/b]getCurrentWatermark: Watermark = {
[b]new [/b]Watermark(System.[i]currentTimeMillis[/i]())
}
[i]// 给每个元组打上时间戳[/i][b]override def [/b]extractTimestamp(t: SEvent, l: Long): Long = {
System.[i]currentTimeMillis[/i]()
}
})
.keyBy(0)
.window(SlidingEventTimeWindows.[i]of[/i](Time.[i]seconds[/i](4), Time.[i]seconds[/i](1)))
.apply([b]new [/b]WindowStatisticWithChk)
.print()
env.execute()
}
}
[i]//该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。[/i][i]// 用户自定义状态[/i][b]class [/b]UDFState [b]extends [/b]Serializable{
[b]private var [/b][i]count [/i]= 0L
[i]// 设置用户自定义状态[/i][b]def [/b]setState(s: Long) = [i]count [/i]= s
[i]// 获取用户自定状态[/i][b]def [/b]getState = [i]count[/i]}
[i]//该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。[/i][b]class [/b]WindowStatisticWithChk [b]extends [/b]WindowFunction[SEvent, Long, Tuple, TimeWindow] [b]with [/b]ListCheckpointed[UDFState]{
[b]private var [/b][i]total [/i]= 0L
[i]// window算子的实现逻辑,即:统计window中元组的数量[/i][b]override def [/b]apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = {
[b]var [/b]count = 0L
[b]for [/b](event <- input) {
count += 1L
}
[i]total [/i]+= count
out.collect(count)
}
[i]// 从自定义快照中恢复状态[/i][b]override def [/b]restoreState(state: util.List[UDFState]): Unit = {
[b]val [/b]udfState = state.get(0)
[i]total [/i]= udfState.getState
}
[i]// 制作自定义状态快照[/i][b]override def [/b]snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = {
[b]val [/b]udfList: util.ArrayList[UDFState] = [b]new [/b]util.ArrayList[UDFState]
[b]val [/b]udfState = [b]new [/b]UDFState
udfState.setState([i]total[/i])
udfList.add(udfState)
udfList
}
}[/align]
|