本帖最后由 YXW95 于 2019-12-13 11:23 编辑
Flink的重启策略 Flink支持不同的重启策略,这些重启策略控制着job失败后如何重启。集群可以通过默认的重启策略来重启,这个默认的重启策略通常在未指定重启策略的情况下使用,而如果Job提交的时候指定了重启策略,这个重启策略就会覆盖掉集群的默认重启策略。
概览 默认的重启策略是通过Flink的flink-conf.yaml来指定的,这个配置参数restart-strategy定义了哪种策略会被采用。如果checkpoint未启动,就会采用no restart策略,如果启动了checkpoint机制,但是未指定重启策略的话,就会采用fixed-delay策略,重试Integer.MAX_VALUE次。请参考下面的可用重启策略来了解哪些值是支持的。 每个重启策略都有自己的参数来控制它的行为,这些值也可以在配置文件中设置,每个重启策略的描述都包含着各自的配置值信息。
除了定义一个默认的重启策略之外,你还可以为每一个Job指定它自己的重启策略,这个重启策略可以在ExecutionEnvironment中调用setRestartStrategy()方法来程序化地调用,主意这种方式同样适用于StreamExecutionEnvironment。
下面的例子展示了我们如何为我们的Job设置一个固定延迟重启策略,一旦有失败,系统就会尝试每10秒重启一次,重启3次。 val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔))
下面部分描述了重启策略特定的配置项
固定延迟重启策略(Fixed Delay Restart Strategy) 固定延迟重启策略会尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。 重启策略可以配置flink-conf.yaml的下面配置参数来启用,作为默认的重启策略:
|
| | | restart-strategy.fixed-delay.attempts |
| 在Job最终宣告失败之前,Flink尝试执行的次数 | 1,如果启用checkpoint的话是Integer.MAX_VALUE | restart-strategy.fixed-delay.delay |
| 延迟重启意味着一个执行失败之后,并不会立即重启,而是要等待一段时间。 | akka.ask.timeout,如果启用checkpoint的话是1s |
例子: restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10 s
固定延迟重启也可以在程序中设置: val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数Time.of(10, TimeUnit.SECONDS) // 重启时间间隔))
失败率重启策略 失败率重启策略在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。 失败率重启策略可以在flink-conf.yaml中设置下面的配置参数来启用:
| | | restart-strategy.failure-rate.max-failures-per-interval | | | restart-strategy.failure-rate.failure-rate-interval | | | restart-strategy.failure-rate.delay | | |
例子: restart-strategy.failure-rate.max-failures-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 5 min restart-strategy.failure-rate.delay: 10 s
失败率重启策略也可以在程序中设置:
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个测量时间间隔最大失败次数Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔Time.of(10, TimeUnit.SECONDS) // 两次连续重启尝试的时间间隔))
无重启策略 Job直接失败,不会尝试进行重启 restart-strategy: none
无重启策略也可以在程序中设置 val env = ExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.noRestart()) 例子 输入五次zhangsan,程序挂掉 [mw_shl_code=java,true]import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala._
object FixDelayRestartStrategiesDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment//如果想要开启重启策略,就必须开启CheckPointenv.enableCheckpointing(5000L)
//指定状态存储后端,默认就是内存 //现在指定的是FsStateBackend,支持本地系统、 //new FsStateBackend要指定存储系统的协议: scheme (hdfs://, file://, etc)env.setStateBackend(new FsStateBackend(args(0)))
//如果程序被cancle,保留以前做的checkpointenv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//指定以后存储多个checkpoint目录env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
//指定重启策略,默认的重启策略是不停的重启 //程序出现异常是会重启,重启五次,每次延迟5秒,如果超过了5次,程序退出env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000))
val lines: DataStream[String] = env.socketTextStream(args(1), 8888)
val result = lines.flatMap(_.split(" ").map(word => {
if(word.equals("zhangsan")) {
throw new RuntimeException("zhangsan,程序重启!");
}
(word, 1)
})).keyBy(0).sum(1)
result.print()
env.execute()
}
} |
[/mw_shl_code]
例子
需求 假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,然后对统计的结果值进行checkpoint处理
数据规划 1) 使用自定义算子每秒钟产生大约10000条数据。 2) 产生的数据为一个四元组(Long,String,String,Integer)—------(id,name,info,count) 3) 数据经统计后,统计结果打印到终端输出 4) 打印输出的结果为Long类型的数据
开发思路 1) source算子每隔1秒钟发送10000条数据,并注入到Window算子中。 2) window算子每隔1秒钟统计一次最近4秒钟内数据数量。 3) 每隔1秒钟将统计结果打印到终端 4) 每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。
开发步骤: 1.获取流处理执行环境 2.设置检查点机制 3.自定义数据源 4.数据分组 5.划分时间窗口 6.数据聚合 7.数据打印 8.触发执行 [mw_shl_code=java,true]
//发送数据形式case class SEvent(id: Long, name: String, info: String, count: Int)
class SEventSourceWithChk extends RichSourceFunction[SEvent]{
private var count = 0L
private var isRunning = true private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"// 任务取消时调用override def cancel(): Unit = {
isRunning = false}
//// source算子的逻辑,即:每秒钟向流图中注入10000个元组override def run(sourceContext: SourceContext[SEvent]): Unit = {
while(isRunning) {
for (i <- 0 until 10000) {
sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
count += 1L
}
Thread.sleep(1000)
}
}
}
/**该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使 用了event time。 */object FlinkEventTimeAPIChkMain {
def main(args: Array[String]): Unit ={
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointInterval(6000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //保留策略:默认情况下,检查点不会被保留,仅用于故障中恢复作业,可以启用外部持久化检查点,同时指定保留策略//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业取消时保留检查点,注意在这种情况下,您必须在取消后手动清理检查点状态//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业被cancel时,删除检查点,检查点状态仅在作业失败时可用env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
// 应用逻辑val source: DataStream[SEvent] = env.addSource(new SEventSourceWithChk)
source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] {
// 设置watermarkoverride def getCurrentWatermark: Watermark = {
new Watermark(System.currentTimeMillis())
}
// 给每个元组打上时间戳override def extractTimestamp(t: SEvent, l: Long): Long = {
System.currentTimeMillis()
}
})
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)))
.apply(new WindowStatisticWithChk)
.print()
env.execute()
}
}
//该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。// 用户自定义状态class UDFState extends Serializable{
private var count = 0L
// 设置用户自定义状态def setState(s: Long) = count = s
// 获取用户自定状态def getState = count}
//该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{
private var total = 0L
// window算子的实现逻辑,即:统计window中元组的数量override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = {
var count = 0L
for (event <- input) {
count += 1L
}
total += count
out.collect(count)
}
// 从自定义快照中恢复状态override def restoreState(state: util.List[UDFState]): Unit = {
val udfState = state.get(0)
total = udfState.getState
}
// 制作自定义状态快照override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = {
val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState]
val udfState = new UDFState
udfState.setState(total)
udfList.add(udfState)
udfList
}
} [/mw_shl_code]
|
|