【Flink】Fault Tolerance 原理和实现
Flink 是如何在程序异常后恢复运行的,本文简述一下其中的原理。
原理
Flink 容错机制的实现原理
- Flink 程序的状态(state)保存在 state backend 中;
- Flink 定时执行检查点(checkpoint)时,会对所有算子的状态做快照(snapshot),并把这些快照持久化保存到 checkpoint storage(比如分布式文件系统);
- 当任务出现故障,Flink 会重启任务并从最新的快照处重新执行任务,从而实现容错。
状态快照的执行过程
- Taskmananger 从 Jobmanager 接收到执行 checkpoint 的指令后,会让所有 source 算子记录当前的 offsets,并在数据流中插入一个编号的 checkpoint barrier;
- checkpoint barrier 会沿着数据流传递到所有的算子,各算子在接收到之后执行 state snapshot,执行完之后继续将 barrier 向下游传递;
- 有两个输入流的算子(如CoprocessFunction)会执行屏障对齐 (barrier aligment),使快照包含两个输入流在 checkpoint barrier 之前的所有 events 产生的 state;
状态快照的恢复过程
- Flink 重新启动部署整个分布式任务,从最新的 checkpoint 恢复每个算子的状态;
- 数据源从最新的 checkpoint 恢复 offset,继续消费数据流;
- 如果状态快照是增量式的,算子会先恢复到最新的全量快照,再逐个按增量快照更新。
非对齐的 checkpoint
算子在执行 checkpoint 时也可以不必 barrier aligment,其原理是:
- 算子在某个 input buffer 接收到 checkpoint 的第一个 barrier 时,就将其添加到 output buffer 的最后面(立即输出到下游);
- 算子将这次 checkpoint 对应的所有未输出到下游的数据(包含上游算子的 output buffer 在 barrier 之前的数据,这个算子 input buffer 在 barrier 之后的数据,这个算子 output buffer 里的所有数据)做 snapshot 保存到算子的 state
- 恢复时先将状态里所有 inflght 数据恢复到算子,再开始处理上游传来的数据
不同程度的容错机制
任务发生故障时,根据不同的容错机制可能会出现以下结果:
- 最多一次 at-most-once :Flink 不对故障做恢复,数据可能会丢失;
- 至少一次 at-least-once :Flink 从故障恢复,但可能产生重复结果;
at-least-once 需要数据源 source 支持回朔到快照发生的进度,比如 Kafka 回到某个 offset,同时整个数据链路保证在成功执行一次 checkpoint 快照时,所有数据源已发出的数据都被成功 sink 到下游。 - 精确一次 exactly-once :Flink 从故障恢复,没有结果丢失和重复。
exactly-once 在 at-least-once 的基础上还需要数据输出 Sink 支持事务写入或者幂等写入。
实现
JobMaster 发起 checkpoint
JobMaster 创建 ExecutionGraph
- JobMaster#JobMaster
- JobMaster#createScheduler
- DefaultSlotPoolServiceSchedulerFactory#createScheduler
- DefaultSchedulerFactory#createInstance
- DefaultScheduler#DefaultScheduler
- SchedulerBase#SchedulerBase
- SchedulerBase#createAndRestoreExecutionGraph
- DefaultExecutionGraphFactory#createAndRestoreExecutionGraph
- DefaultExecutionGraphBuilder#buildGraph
ExecutionGraph 通过 CheckpointCoordinator 开启 checkpoint
- DefaultExecutionGraph#enableCheckpointing
- CheckpointCoordinator#createActivatorDeactivator
在 CheckpointCoordinator 外包装一层 JobStatusListener,Job 状态变成 RUNNING 时开始 checkpoint schedule,从 RUNNING 变成其他状态时停止 checkpoint。 - CheckpointCoordinatorDeActivator#CheckpointCoordinatorDeActivator
- CheckpointCoordinator#startCheckpointScheduler
- CheckpointCoordinator#scheduleTriggerWithDelay
用一个 ScheduledExecutor 定期触发 ScheduledTrigger 执行 - CheckpointCoordinator.ScheduledTrigger#run
- CheckpointCoordinator#triggerCheckpoint
CheckpointCoordinator 触发 CheckpointTriggerRequest 执行
- CheckpointCoordinator.CheckpointTriggerRequest 创建一个 CheckpointTriggerRequest
- CheckpointRequestDecider#chooseRequestToExecute 对 CheckpointTriggerRequest 做一个限流
- CheckpointCoordinator#startTriggeringCheckpoint 触发 CheckpointTriggerRequest 执行
- Source Operator 的 OperatorCoordinator 执行 checkpoint
1. OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
2. OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpoints 触发所有 OperatorCoordinator 执行 checkpoint,并校验执行结果
3. OperatorCoordinatorCheckpoints#triggerAllCoordinatorCheckpoints
4. OperatorCoordinatorCheckpoints#triggerCoordinatorCheckpoint
5. OperatorCoordinatorHolder#checkpointCoordinator
6. OperatorCoordinatorHolder#checkpointCoordinatorInternal- OperatorCoordinatorHolder#closeGateways 关闭所有和 SubTask 通讯的 SubtaskGateway
- OperatorCoordinatorHolder#completeCheckpointOnceEventsAreDone 如果有已发送给 SubTask 但还未执行完成的 Events 等待其执行完成
- JobMaster snapshot state
- 触发 StreamTask 执行 checkpoint
1. CheckpointCoordinator#triggerCheckpointRequest
2. CheckpointCoordinator#triggerTasks
这里只对 CheckpointPlan 里的 tasksToTrigger 发送 checkpoint 请求,即 Source Tasks
3. Execution#triggerCheckpoint
4. Execution#triggerCheckpointHelper
5. TaskManagerGateway#triggerCheckpoint
6. RpcTaskManagerGateway#triggerCheckpoint RPC 请求触发 TaskManager 执行 checkpoint
6. TaskExecutorGateway#triggerCheckpoint
7. TaskExecutor#triggerCheckpoint
SourceStreamTask 触发 checkpoint
- Task#triggerCheckpointBarrier
- SourceOperatorStreamTask#triggerCheckpointAsync
这个接口继承自CheckpointableTask#triggerCheckpointAsync用于开启整个任务的 checkpoint,向数据流中注入 checkpoint barrier,后面普通 StreamTask 接收到 checkpoint barrier 后触发调用的是CheckpointableTask#triggerCheckpointOnBarrier接口 - SourceOperatorStreamTask#triggerCheckpointNowAsync
如果 Source 的 Source Reader 不是 ExternallyInducedSourceReader 则直接执行异步 checkpoint - StreamTask#triggerUnfinishedChannelsCheckpoint
创建 CheckpointBarrier 并下发到每个 InputGate 的每个 InputChannel - CheckpointBarrierHandler#processBarrier
CheckpointBarrierHandler 接口有两个实现:- CheckpointBarrierTracker 对应 checkpoint mode 为
at-least-once,不会阻塞 InputChannel 的输入,直到所有 InputChannel 都收到 barrier 调用 CheckpointBarrierHandler#notifyCheckpoint 触发 checkpoint 执行 - SingleCheckpointBarrierHandler 对应 checkpoint mode 为
exactly-once,接收和记录 barriers 并交由BarrierHandlerState决定何时触发 checkpoint 以及对 Inputchannel 的操作
5.1 SingleCheckpointBarrierHandler#processBarrier
SingleCheckpointBarrierHandler 支持aligned和unaligned两种 checkpoint 模式 5.2 SingleCheckpointBarrierHandler#markCheckpointAlignedAndTransformState
将 barrier 交由 BarrierHandlerState 处理并记录 InputChannel 的 barrier 对齐情况
5.3 BarrierHandlerState#barrierReceived
BarrierHandlerState 接收 barrier 并根据 barrier 转换自身的类型,由不同类型代表算子处理 checkpoint 时的多个状态:- WaitingForFirstBarrier
aligned模式下等待第一个 barrier - CollectingBarriers
aligned模式下等待所有 barrier - AlternatingWaitingForFirstBarrier
aligned模式下等待第一个 barrier,有超时限制 - AlternatingCollectingBarriers
aligned模式下等待所有 barrier,有超时限制 - AlternatingWaitingForFirstBarrierUnaligned
unaligned模式下等待第一个 barrier,有超时限制 - AlternatingCollectingBarriersUnaligned
unaligned模式下等待所有 barrier,有超时限制 5.4 AbstractAlignedBarrierHandlerState#barrierReceived 以aligned模式为例
SourceTask 并不暂停 InputChannel 的输入,所有 barrier 都收到后触发全局 checkpoint
5.5 AbstractAlignedBarrierHandlerState#triggerGlobalCheckpoint
执行全局 checkpoint ,完成后恢复所有 InputChannel 的输入,并进入 WaitingForFirstBarrier 的状态
5.6 SingleCheckpointBarrierHandler#triggerCheckpoint
- WaitingForFirstBarrier
- CheckpointBarrierTracker 对应 checkpoint mode 为
- CheckpointBarrierHandler#notifyCheckpoint
回调 StreamTask 执行 checkpoint - CheckpointableTask#triggerCheckpointOnBarrier
checkpoint barrier 在上下游 StreamTask 之间的传递
- OperatorChain#broadcastEvent
遍历所有 RecordWriters 广播 CheckpointBarrier - RecordWriterOutput#broadcastEvent
- RecordWriter#broadcastEvent
- ResultPartitionWriter#broadcastEvent
将 CheckpointBarrier 写到 Buffer
… 省略从上游算子的 Output Buffer 到下游算子的 Input Buffer - AbstractStreamTaskNetworkInput#emitNext
- CheckpointedInputGate#pollNext
- CheckpointedInputGate#handleEvent
- CheckpointBarrierHandler#processBarrier
… 这里同前面 SourceStreamTask 第 5 步一样 - CheckpointableTask#triggerCheckpointOnBarrier
StreamTask 执行 checkpoint
- StreamTask#triggerCheckpointOnBarrier
- StreamTask#performCheckpoint
- SubtaskCheckpointCoordinatorImpl#checkpointState
对所有 TaskOperators 执行 checkpoint
3.1 OperatorChain#prepareSnapshotPreBarrier 遍历所有 StreamOperator 依次执行
StreamOperator#prepareSnapshotPreBarrier
3.2 OperatorChain#broadcastEvent 向所有 SubTask 的下游广播 CheckpointBarrier
3.3 ChannelStateWriter#finishOutput 如果是 unaligned 的 checkpoint 停止持久化 channel state
3.4 SubtaskCheckpointCoordinatorImpl#takeSnapshotSync 执行 OperatorChain#snapshotState,这一步会传入 CheckpointStreamFactory 用于输出 State 持久化后的数据流,根据配置数据流会被写入不同的 State Backend
3.5 SubtaskCheckpointCoordinatorImpl#finishAndReportAsync
SubTask 执行 checkpoint 结束后通知 JobMaster - OperatorChain#snapshotState
从 3.4 往下执行,遍历 OperatorChain 里的所有 StreamOperator,执行两个持久化:
a. StreamOperator#snapshotState StreamOperator 的状态持久化; b. 如果开启了 unaligned checkpoint 或者 checkpoint 允许超时,则需要对 OperatorChain 的 channel state 持久化:被遍历的 StreamOperator 如果是 OperatorChain 的 main operator 或者 tail operator,对其InputChannel State或ResultSubPartition State做持久化
StreamOperator 执行 prepareSnapshot 和 snapshotState
JobMaster 确认 checkpoint 执行结果
- AsyncCheckpointRunnable#run
- AsyncCheckpointRunnable#reportCompletedSnapshotStates
- TaskStateManagerImpl#reportTaskStateSnapshots
- RpcCheckpointResponder#acknowledgeCheckpoint RPC 请求 JobMaster checkpoint 执行结束
- JobMaster#acknowledgeCheckpoint
- SchedulerBase#acknowledgeCheckpoint
- ExecutionGraphHandler#acknowledgeCheckpoint
- CheckpointCoordinator#receiveAcknowledgeMessage
- PendingCheckpoint#acknowledgeTask
PendingCheckpoint 记录完成 checkpoint 的 Task - CheckpointCoordinator#completePendingCheckpoint
- CheckpointCoordinator#finalizeCheckpoint