【Flink】Flink Watermark 产生和传递原理
Flink Watermark 的产生原理和传递过程
1. 产生 Watermark
1.1 定义 WatermarkStrategy 并在创建 SourceOperator 时传入
WatermarkStrategy 配置传入 SourceOperatorFactory
- StreamExecutionEnvironment#fromSource
- DataStreamSource#DataStreamSource
- SourceTransformation#SourceTransformation
- SourceTransformationTranslator#translateInternal
- SourceOperatorFactory#SourceOperaotorFactory
- SourceOperatorFactory#createStreamOperator
- SourceOperator#SourceOperator
1.2 初始化 SourceOperator 时创建 TimestampsAndWatermarks 对象
- SourceOperator#open
- TimestampsAndWatermarks#createProgressiveEventTimeLogic
TimestampsAndWatermarks 接口定义了时间戳提取和 watermark 生成 - ProgressiveTimestampsAndWatermarks#ProgressiveTimestampsAndWatermarks
- ProgressiveTimestampsAndWatermarks#startPeriodicWatermarkEmits
- ProgressiveTimestampsAndWatermarks#triggerPeriodicEmit 如果
pipeline.auto-watermark-interval配置不为 0 开启周期性触发 watermark
1.3 SourceOperator 在开始推送数据时创建 MainOutput 和 SplitLocalOutput
- SourceOperator#emitNext
- SourceOperator#emitNextNotReading
- SourceOperator#initializeMainOutput
- TimestampsAndWatermarks#createMainOutput
- ProgressiveTimestampsAndWatermarks#createMainOutput 创建 MainOutput 和 SplitLocalOutput
1.4 触发 MainOutput 周期性输出 Watermark
- ProgressiveTimestampsAndWatermarks#triggerPeriodicEmit
- SourceOutputWithWatermarks#emitPeriodicWatermark
- WatermarkGenerator#onPeriodicEmit
- BoundedOutOfOrdernessWatermarks#onPeriodicEmit 回到 WatermarkStrategy 里配置的 WatermarkGenerator,调用定义输出 watermark
- WatermarkOutput#emitWatermark
1.5 触发 SplitLocalOutputs 周期性输出 Watermark
- ProgressiveTimestampsAndWatermarks#triggerPeriodicEmit
- SplitLocalOutputs#emitPeriodicWatermark
2.1 SourceOutputWithWatermarks#emitPeriodicWatermark 后面的逻辑同 MainOutput
2.2 WatermarkOutputMultiplexer#onPeriodicEmit
2.3 WatermarkOutputMultiplexer#updateCombinedWatermark
更新 combinedWatermark 并推送到 underlyingOutput
2.3.1 CombinedWatermarkStatus#updateCombinedWatermark
取所有 partialWatermarks 里的最小值作为 combinedWatermark
2.3.2 WatermarkToDataOutput#emitWatermark
2.3.3 PushingAsyncDataInput.DataOutput#emitWatermark
将 watermark 推送到下游
1.6 DataSource 触发 MainOutput watermark 更新
- SourceOutput#collect(T, long) 这里需要 DataSource 支持,在 【Flink】Data Source API 结构及实现 里有介绍
- SourceReaderBase.SourceOutputWrapper#collect(T, long)
- SourceOutputWithWatermarks#collect(T, long)
3.1 TimestampAssigner#extractTimestamp 根据定义从数据和传入的时间戳中提取 watermark
3.1 PushingAsyncDataInput.DataOutput#emitRecord 先将带 watermark 的数据推送到下游
3.2 WatermarkGenerator#onEvent
3.2.1 BoundedOutOfOrdernessWatermarks#onEvent 更新最大的 watermark (以 BoundedOutOfOrdernessWatermarks 为例)
1.7 TimestampsAndWatermarksOperator 产生 Watermark
如果 watermark 的配置是通过 DataStream#assignTimestampsAndWatermarks 方法在 DataSource 之后配置的,Flink 会在 Source 的 OperatorChain 里创建 TimestampsAndWatermarksOperator 来处理 watermark 的生成和传递。
- TimestampsAndWatermarksOperator 继承了 ProcessingTimeService.ProcessingTimeCallback 接口,如果是流式任务 Operator 会循环注册定时器,时间间隔为
pipeline.auto-watermark-interval,每次触发都会调用会调用WatermarkGenerator#onPeriodicEmit接口 - TimestampsAndWatermarksOperator 会完全忽略上游发送的
WatermarkStatus,但会将上游发送的 watermark 传递给下游 - 上游发送的 StreamRecord 在 TimestampsAndWatermarksOperator#processElement 方法里被替换成 TimestampAssigner 产生的新 timestamp,然后被传给
WatermarkGenerator#onEvent接口更新 watermark
2. 传递 Watermark
- BoundedOutOfOrdernessWatermarks#onPeriodicEmit
- WatermarkToDataOutput#emitWatermark
- PushingAsyncDataInput.DataOutput#emitWatermark
- Input#processWatermark
- AbstractStreamOperator#processWatermark
2.1 TimeService 处理 Watermark
- AbstractStreamOperator#processWatermark
- InternalTimeServiceManager#advanceWatermark
- InternalTimeServiceManagerImpl#advanceWatermark
- InternalTimerServiceImpl#advanceWatermark
- Triggerable#onEventTime
2.2 传递给 Output
- AbstractStreamOperator#processWatermark
- Output#emitWatermark
- RecordWriterOutput#emitWatermark 将 watermark 推送到下游算子
- RecordWriter#broadcastEmit 将 watermark 封装成 Record 推送到 targetPartition
- ChannelSelectorRecordWriter#broadcastEmit 将 Record 序列化为 ByteBuffer 并推到所有 subPartitions
- ResultPartitionWriter#emitRecord
- BufferWritingResultPartition#emitRecord 将数据写入 Buffer
- BufferWritingResultPartition#appendUnicastDataForRecordContinuation
- BufferWritingResultPartition#addToSubpartition
- ResultSubpartition#add
- PipelinedSubpartition#add 数据写入 ResultSubpartition 的 Buffer
2.3 Input 读出 Watermark (LocalInputChannel 为例)
- StatusWatermarkValve#inputWatermark
- AbstractStreamTaskNetworkInput#processElement
- AbstractStreamTaskNetworkInput#processBuffer
- CheckpointedInputGate#pollNext
- InputGate#pollNext
- SingleInputGate#pollNext
- SingleInputGate#getNextBufferOrEvent
- SingleInputGate#waitAndGetNextData
- SingleInputGate#readBufferFromInputChannel
- InputChannel#getNextBuffer
- LocalInputChannel#getNextBuffer 从 subpartitionView 读取 Buffer
- LocalInputChannel#requestSubpartition 从 partitionManager 获取 ResultSubpartitionView
2.4 Input 输出 Watermark 到下游
- StreamTask#processInput
- StreamInputProcessor#processInput
- StreamOneInputProcessor#processInput
- PushingAsyncDataInput#emitNext
- AbstractStreamTaskNetworkInput#emitNext
- AbstractStreamTaskNetworkInput#processElement
- StatusWatermarkValve#inputWatermark
更新该 inputChannel 的 watermark 和 watermark aligned 状态,并更新所有 channel 的 watermark 组成的 priorityQueue 顺序 - StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels
将当前所有 channel 里最小的 watermark 输出到下游 - DataOutput#emitWatermark
3. 总结
3.1 Flink 的时间类型
Processing Time
Processing Time 是指 Flink 系统的时间,是 Flink 默认的时间类型无需额外配置。Processing Time 对应的 Operator 无需 watermark 触发(比如 TumblingProcessingTimeWindows ),而是由系统的时间触发
Ingestion Time
Ingestion Time 指 record 进入 Flink 系统时的时间,可以由数据源的 SourceOutput 指定,也可以配置 IngestionTimeAssigner 由 SourceOutputWithWatermarks 调用将时间绑定到 record 并生成 watermark。
Event Time
Event Time 指 record 在外部系统产生时的时间,作为 record 的一部分一起传入 Flink,其产生逻辑和 watermark 产生方式和 Ingestion Time 相同,区别在于时间戳从 record 内提取或从外部系统中获取(如 Kafka)。
3.2 watermark 的形成过程
watermark 是什么:当 Flink 的时间类型为 EventTime 时,Flink 系统为了评估数据流 EventTime 的进度引入 watermark,watermark 根据配置随 EventTime 的更新而更新,并定时作为特殊的 record 从数据源下发到整个数据流,下游基于 EventTime 的算子接收到 watermark 后触发相应的计算 。
- 数据源的 SourceReader 调用 SourceOutput 输出 record 或同时输出 rocord 和时间戳
- 如果是流式任务 SourceOperator 会用 ProgressiveTimestampsAndWatermarks 作为 TimestampsAndWatermarks,并周期性触发 WatermarkGenerator 的
onPeriodicEmit方法; - SourceOutputWithWatermarks 实现了 SourceOutput 接口,接收 record 后使用 TimestampAssigner 从 record 和传入时间戳中提取新的时间戳,并将提取的时间戳通过 WatermarkGenerator 的
onEvent方法传递给 WatermarkGenerator; - WatermarkGenerator 决定 watermark 的累积逻辑以及何时推送到下游
- watermark 被转换成 Record 并推送到下游
4. 关于空闲数据源和 Watermark 对齐
4.1 空闲数据源
如果 Source 长时间没有数据输出,会导致 watermark 无法更新,下游依赖 watermark 更新的算子无法继续处理数据
- 为解决这一问题,WatermarkStrategy 提供了
withIdleness选项设置空闲数据源的 idle timeout duration,当超过这个时间没有数据输出时会将WatermarkStatus#IDLE_STATUS通过Output#emitWatermark传递到下游 - 如果数据源的一个或多个 partitions/splits/shards 没有数据输出,但剩余的 partitions/splits/shards 还有数据输出,CombinedWatermarkStatus 会忽略掉 idle 的
partialWatermark,如果所有都 idle 则会输出WatermarkStatus#IDLE_STATUS到下游 - 当下游的算子从一个 InputChannel 接收到
WatermarkStatus#IDEL_STATUS时会将该 channel 设置成Unaligned状态,并忽略该 channel 之前的 watermark - 当下游算子的所有 InputChannel 都接收到
WatermarkStatus#IDLE_STATUS时,算子也进入 idle 状态并向下游发送WatermarkStatus#IDLE_STATUS - 如果 Source 结束输出数据,应该往下游推
Long.MAX_VALUE作为 watermark 而不是WatermarkStatus#IDLE_STATUS - 所有发送过
WatermarkStatus#IDLE_STATUS的算子在重新发送WatermarkStatus#ACTIVE_STATUS后就能恢复到正常工作状态
4.2 Watermark 对齐
如果数据源的一个或多个 partitions/splits/shards 输出数据的速度比其他的快很多/慢很多,会导致 watermark 更新速度差距太大,下游依赖 watermark 更新的算子无法正确处理数据,比如 TumblingEventTimeWindows 缓存很多数据而不触发聚合
TODO