有状态流处理
- What is State?
- Keyed State
- State Persistence
- State and Fault Tolerance in Batch Programs
What is State?
虽然数据流中的许多操作一次只看一个单独的事件(例如事件解析器),但有些操作会记住多个事件的信息(例如窗口操作符)。这些操作被称为有状态操作。
一些有状态操作的例子。
- 当一个应用程序搜索某些事件模式时,状态将存储到目前为止遇到的事件序列。
- 当按分钟/小时/天聚合事件时,状态会保存待聚合的事件。
- 当在数据点流上训练机器学习模型时,状态会保存模型参数的当前版本。
- 当需要管理历史数据时,该状态可以有效地访问过去发生的事件。
- Flink需要了解状态,以便使用检查点和保存点使其具有容错性。
关于状态的知识还允许重新缩放Flink应用,这意味着Flink负责在并行实例之间重新分配状态。
可查询状态允许你在运行时从Flink外部访问状态。
在处理状态时,阅读一下Flink的状态后端可能也很有用。Flink提供了不同的状态后端,指定了状态的存储方式和位置。
Keyed State
键值状态被维护在可以被认为是一个嵌入式键/值存储中。该状态严格地与有状态操作者读取的流一起被分割和分配。因此,对键/值状态的访问只有在Keyed的流上,即在键/分区数据交换之后才有可能,并且仅限于与当前事件的密钥相关联的值。将流和状态的键对齐,可以确保所有的状态更新都是本地操作,保证了一致性,而没有事务开销。这种对齐方式还允许Flink透明地重新分配状态和调整流分区。
Keyed状态又被组织成所谓的Key Groups。Key Groups是Flink可以重新分配Keyed State的原子单位;Key Groups的数量正好与定义的最大并行度相同。在执行过程中,键控操作符的每个并行实例都与一个或多个Key Groups的键一起工作。
State Persistence
Flink使用流重放和检查点的组合来实现容错。一个检查点标记了每个输入流中的一个特定点以及每个操作者的相应状态。通过恢复运算符的状态,从检查点开始重放记录,可以从检查点恢复流数据流,同时保持一致性(精确的一次处理语义)。
检查点间隔是用恢复时间(需要重放的记录数量)来交换执行过程中容错的开销的一种手段。
容错机制不断地绘制分布式流数据流的快照。对于状态较小的流媒体应用,这些快照非常轻量级,可以频繁地绘制,而不会对性能产生太大的影响。流应用的状态存储在一个可配置的地方,通常是在一个分布式文件系统中。
在程序失败的情况下(由于机器、网络或软件故障),Flink会停止分布式流数据流。然后系统会重新启动操作者,并将其重置到最新的成功检查点。输入流被重置到状态快照的点。作为重新启动的并行数据流的一部分处理的任何记录都保证不影响之前的检查点状态。
注释 默认情况下,检查点被禁用。有关如何启用和配置检查点的详细信息,请参见检查点。
注释 为了实现这种机制的完全保证,数据流源(如消息队列或broker)需要能够将数据流倒退到一个定义的最近点。Apache Kafka具有这种能力,Flink的Kafka连接器利用了这一点。参见数据源和汇的容错保证,了解更多关于Flink连接器提供的保证的信息。
注意 因为Flink的检查点是通过分布式快照实现的,所以我们互换使用快照和检查点这两个词。通常我们也使用术语快照来表示检查点或保存点。
Checkpointing
Flink容错机制的核心部分是绘制分布式数据流和操作员状态的一致快照。这些快照作为一致的检查点,系统在发生故障时可以回退。Flink绘制这些快照的机制在 “Lightweight Asynchronous Snapshots for Distributed Dataflows “中描述。它的灵感来自于分布式快照的标准Chandy-Lamport算法,并专门为Flink的执行模型量身定做。
请记住,所有与检查点有关的事情都可以异步完成。检查点壁垒不按锁步走,操作可以异步快照其状态。
Barriers
Flink的分布式快照中的一个核心要素是 stream barriers. 这些barriers被注入到数据流中,并作为数据流的一部分与记录一起流动。barriers永远不会超越记录,它们严格按照线路流动。barriers将数据流中的记录分为进入当前快照的记录集和进入下一个快照的记录。每个barriers都带有其记录被推到前面的快照的ID。屏障不会中断数据流的流动,因此非常轻量级。不同快照的多个屏障可以同时出现在流中,这意味着不同的快照可以同时发生。
流屏障被注入流源的并行数据流中。快照n的屏障被注入的位置(我们称之为Sn)是在源流中快照覆盖数据之前的位置。例如,在Apache Kafka中,这个位置将是分区中最后一条记录的偏移量。将此位置Sn报告给检查点协调器(Flink的JobManager)。
屏障然后顺流而下。当一个中间操作符从它的所有输入流中接收到快照n的barrier时,它会在它的所有输出流中发出快照n的barrier。一旦接收操作符(流DAG的末尾)从它的所有输入流接收到barrier n,它就向检查点协调器确认快照n。在所有的接收都确认了一个快照之后,就认为它已经完成。
快照n一旦完成,作业将不再向源请求来自Sn之前的记录,因为此时这些记录(及其后代记录)将会通过整个数据流拓扑。
接收多个输入流的操作符需要在快照屏障上对齐输入流。上图说明了这一点:
一旦操作符从传入流接收到snapshot barrier n,它就不能处理该流的任何记录,直到它也从其他输入接收到barrier n。否则,它将混合属于快照n的记录和属于快照n+1的记录。
报告barrier n的流被暂时搁置。从这些流接收到的记录不被处理,而是被放入一个输入缓冲区。
一旦最后一个流接收到barrier n,操作符就会发出所有挂起的传出记录,然后发出快照n barrier本身。
之后,它继续处理来自所有输入流的记录,在处理来自流的记录之前处理来自输入缓冲区的记录。
Snapshotting Operator State
When operators contain any form of state, this state must be part of the snapshots as well.
Operators snapshot their state at the point in time when they have received all snapshot barriers from their input streams, and before emitting the barriers to their output streams. At that point, all updates to the state from records before the barriers will have been made, and no updates that depend on records from after the barriers have been applied. Because the state of a snapshot may be large, it is stored in a configurable state backend. By default, this is the JobManager’s memory, but for production use a distributed reliable storage should be configured (such as HDFS). After the state has been stored, the operator acknowledges the checkpoint, emits the snapshot barrier into the output streams, and proceeds.
The resulting snapshot now contains:
- For each parallel stream data source, the offset/position in the stream when the snapshot was started
- For each operator, a pointer to the state that was stored as part of the snapshot
Recovery
Recovery under this mechanism is straightforward: Upon a failure, Flink selects the latest completed checkpoint k. The system then re-deploys the entire distributed dataflow, and gives each operator the state that was snapshotted as part of checkpoint k. The sources are set to start reading the stream from position Sk. For example in Apache Kafka, that means telling the consumer to start fetching from offset Sk.
If state was snapshotted incrementally, the operators start with the state of the latest full snapshot and then apply a series of incremental snapshot updates to that state.
See Restart Strategies for more information.
State Backends
The exact data structures in which the key/values indexes are stored depends on the chosen state backend. One state backend stores data in an in-memory hash map, another state backend uses RocksDB as the key/value store. In addition to defining the data structure that holds the state, the state backends also implement the logic to take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint. State backends can be configured without changing your application logic.
Savepoints
All programs that use checkpointing can resume execution from a savepoint. Savepoints allow both updating your programs and your Flink cluster without losing any state.
Savepoints are manually triggered checkpoints, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this.
Savepoints are similar to checkpoints except that they are triggered by the user and don’t automatically expire when newer checkpoints are completed.
Exactly Once vs. At Least Once
The alignment step may add latency to the streaming program. Usually, this extra latency is on the order of a few milliseconds, but we have seen cases where the latency of some outliers increased noticeably. For applications that require consistently super low latencies (few milliseconds) for all records, Flink has a switch to skip the stream alignment during a checkpoint. Checkpoint snapshots are still drawn as soon as an operator has seen the checkpoint barrier from each input.
When the alignment is skipped, an operator keeps processing all inputs, even after some checkpoint barriers for checkpoint n arrived. That way, the operator also processes elements that belong to checkpoint n+1 before the state snapshot for checkpoint n was taken. On a restore, these records will occur as duplicates, because they are both included in the state snapshot of checkpoint n, and will be replayed as part of the data after checkpoint n.
Note Alignment happens only for operators with multiple predecessors (joins) as well as operators with multiple senders (after a stream repartitioning/shuffle). Because of that, dataflows with only embarrassingly parallel streaming operations (map()
, flatMap()
, filter()
, …) actually give exactly once guarantees even in at least once mode.
State and Fault Tolerance in Batch Programs
Flink executes batch programs as a special case of streaming programs, where the streams are bounded (finite number of elements). A DataSet is treated internally as a stream of data. The concepts above thus apply to batch programs in the same way as well as they apply to streaming programs, with minor exceptions:
- Fault tolerance for batch programs does not use checkpointing. Recovery happens by fully replaying the streams. That is possible, because inputs are bounded. This pushes the cost more towards the recovery, but makes the regular processing cheaper, because it avoids checkpoints.
- Stateful operations in the DataSet API use simplified in-memory/out-of-core data structures, rather than key/value indexes.
- The DataSet API introduces special synchronized (superstep-based) iterations, which are only possible on bounded streams. For details, check out the iteration docs.