重启策略 | 重启策略值 |
Fixed delay | fixed-delay |
Failure rate | failure-rate |
No restart | None |
配置参数 | 描述 | 默认值 | |
restart-strategy.fixed-delay.attempts | 在Job最终宣告失败之前,Flink尝试执行的次数 | 1,如果启用checkpoint的话是Integer.MAX_VALUE | |
restart-strategy.fixed-delay.delay | 延迟重启意味着一个执行失败之后,并不会立即重启,而是要等待一段时间。 | akka.ask.timeout,如果启用checkpoint的话是1s |
配置参数 | 描述 | 默认值 |
restart-strategy.failure-rate.max-failures-per-interval | 在一个Job认定为失败之前,最大的重启次数 | 1 |
restart-strategy.failure-rate.failure-rate-interval | 计算失败率的时间间隔 | 1分钟 |
restart-strategy.failure-rate.delay | 两次连续重启尝试之间的时间间隔 | akka.ask.timeout |
import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.scala._ object FixDelayRestartStrategiesDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment//如果想要开启重启策略,就必须开启CheckPointenv.enableCheckpointing(5000L) //指定状态存储后端,默认就是内存 //现在指定的是FsStateBackend,支持本地系统、 //new FsStateBackend要指定存储系统的协议: scheme (hdfs://, file://, etc)env.setStateBackend(new FsStateBackend(args(0))) //如果程序被cancle,保留以前做的checkpointenv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //指定以后存储多个checkpoint目录env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) //指定重启策略,默认的重启策略是不停的重启 //程序出现异常是会重启,重启五次,每次延迟5秒,如果超过了5次,程序退出env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000)) val lines: DataStream[String] = env.socketTextStream(args(1), 8888) val result = lines.flatMap(_.split(" ").map(word => { if(word.equals("zhangsan")) { throw new RuntimeException("zhangsan,程序重启!"); } (word, 1) })).keyBy(0).sum(1) result.print() env.execute() } } |
//发送数据形式case class SEvent(id: Long, name: String, info: String, count: Int)
class SEventSourceWithChk extends RichSourceFunction[SEvent]{
private var count = 0L
private var isRunning = true private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"// 任务取消时调用override def cancel(): Unit = {
isRunning = false}
//// source算子的逻辑,即:每秒钟向流图中注入10000个元组override def run(sourceContext: SourceContext[SEvent]): Unit = {
while(isRunning) {
for (i <- 0 until 10000) {
sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
count += 1L
}
Thread.sleep(1000)
}
}
}
/**该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使 用了event time。 */object FlinkEventTimeAPIChkMain {
def main(args: Array[String]): Unit ={
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointInterval(6000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//保留策略:默认情况下,检查点不会被保留,仅用于故障中恢复作业,可以启用外部持久化检查点,同时指定保留策略//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业取消时保留检查点,注意在这种情况下,您必须在取消后手动清理检查点状态//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业被cancel时,删除检查点,检查点状态仅在作业失败时可用env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
// 应用逻辑val source: DataStream[SEvent] = env.addSource(new SEventSourceWithChk)
source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] {
// 设置watermarkoverride def getCurrentWatermark: Watermark = {
new Watermark(System.currentTimeMillis())
}
// 给每个元组打上时间戳override def extractTimestamp(t: SEvent, l: Long): Long = {
System.currentTimeMillis()
}
})
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)))
.apply(new WindowStatisticWithChk)
.print()
env.execute()
}
}
//该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。// 用户自定义状态class UDFState extends Serializable{
private var count = 0L
// 设置用户自定义状态def setState(s: Long) = count = s
// 获取用户自定状态def getState = count}
//该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{
private var total = 0L
// window算子的实现逻辑,即:统计window中元组的数量override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = {
var count = 0L
for (event <- input) {
count += 1L
}
total += count
out.collect(count)
}
// 从自定义快照中恢复状态override def restoreState(state: util.List[UDFState]): Unit = {
val udfState = state.get(0)
total = udfState.getState
}
// 制作自定义状态快照override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = {
val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState]
val udfState = new UDFState
udfState.setState(total)
udfList.add(udfState)
udfList
}
}
欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) | 黑马程序员IT技术论坛 X3.2 |