Checkpointing

    容错文档 中介绍了 Flink 流计算容错机制内部的技术原理。

    Flink 的 checkpoint 机制会和持久化存储进行交互,读写流与状态。一般需要:

    • 一个能够回放一段时间内数据的持久化数据源,例如持久化消息队列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 Google PubSub 等)或文件系统(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。

    默认情况下 checkpoint 是禁用的。通过调用 的 enableCheckpointing(n) 来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。

    Checkpoint 其他的属性包括:

    • 精确一次(exactly-once)对比至少一次(at-least-once):你可以选择向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中传入一个模式来选择使用两种保证等级中的哪一种。 对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。

    • checkpoint 超时:如果 checkpoint 执行的时间超过了该配置的阈值,还在进行中的 checkpoint 操作就会被抛弃。

    • checkpoints 之间的最小时间:该属性定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 5000, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。

      往往使用“checkpoints 之间的最小时间”来配置应用会比 checkpoint 间隔容易很多,因为“checkpoints 之间的最小时间”在 checkpoint 的执行时间超过平均值时不会受到影响(例如如果目标的存储系统忽然变得很慢)。

      注意这个值也意味着并发 checkpoint 的数目是

    • externalized checkpoints: 你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。 这种方式下,如果你的 job 失败,你将会有一个现有的 checkpoint 去恢复。更多的细节请看 。

      Java

    Scala

    Python

    更多的属性与默认值能在 conf/flink-conf.yaml 中设置(完整教程请阅读 配置)。

    Flink 的 会将 timer 以及 stateful 的 operator 进行快照,然后存储下来, 包括连接器(connectors),窗口(windows)以及任何用户自定义的状态。 Checkpoint 存储在哪里取决于所配置的 State Backend(比如 JobManager memory、 file system、 database)。

    默认情况下,状态是保持在 TaskManagers 的内存中,checkpoint 保存在 JobManager 的内存中。为了合适地持久化大体量状态, Flink 支持各种各样的途径去存储 checkpoint 状态到其他的 state backends 上。通过 来配置所选的 state backends。

    Flink 现在为没有迭代(iterations)的作业提供一致性的处理保证。在迭代作业上开启 checkpoint 会导致异常。为了在迭代程序中强制进行 checkpoint,用户需要在开启 checkpoint 时设置一个特殊的标志: env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)

    请注意在环形边上游走的记录(以及与之相关的状态变化)在故障时会丢失。

    从版本 1.14 开始 Flink 支持在部分任务结束后继续进行Checkpoint。 如果一部分数据源是有限数据集,那么就可以出现这种情况。 从版本 1.15 开始,这一特性被默认打开。如果想要关闭这一功能,可以执行:

    在这种情况下,结束的任务不会参与 Checkpoint 的过程。在实现自定义的算子或者 UDF (用户自定义函数)时需要考虑这一点。

    为了支持部分任务结束后的 Checkpoint 操作,我们调整了 并且引入了 StreamOperator#finish 方法。 在这一方法中,用户需要写出所有缓冲区中的数据。在 finish 方法调用后的 checkpoint 中,这一任务一定不能再有缓冲区中的数据,因为在 finish() 后没有办法来输出这些数据。 在大部分情况下,finish() 后这一任务的状态为空,唯一的例外是如果其中某些算子中包含外部系统事务的句柄(例如为了实现恰好一次语义), 在这种情况下,在 finish() 后进行的 checkpoint 操作应该保留这些句柄,并且在结束 checkpoint(即任务退出前所等待的 checkpoint)时提交。 一个可以参考的例子是满足恰好一次语义的 sink 接口与 TwoPhaseCommitSinkFunction

    在部分 Task 结束后的checkpoint中,Flink 对 UnionListState 进行了特殊的处理。 一般用于实现对外部系统读取位置的一个全局视图(例如,用于记录所有 Kafka 分区的读取偏移)。 如果我们在算子的某个并发调用 close() 方法后丢弃它的状态,我们就会丢失它所分配的分区的偏移量信息。 为了解决这一问题,对于使用 UnionListState 的算子我们只允许在它的并发都在运行或都已结束的时候才能进行 checkpoint 操作。

    ListState 一般不会用于类似的场景,但是用户仍然需要注意在调用 close() 方法后进行的 checkpoint 会丢弃算子的状态并且 这些状态在算子重启后不可用。

    任何支持并发修改操作的算子也可以支持部分并发实例结束后的恢复操作。从这种类型的快照中恢复等价于将算子的并发改为正在运行的并发实例数。

    为了保证使用两阶段提交的算子可以提交所有的数据,任务会在所有算子都调用 finish() 方法后等待下一次 checkpoint 成功后退出。 需要注意的是,这一行为可能会延长任务运行的时间,如果 checkpoint 周期比较大,这一延迟会非常明显。 极端情况下,如果 checkpoint 的周期被设置为 Long.MAX_VALUE,那么任务永远不会结束,因为下一次 checkpoint 不会进行。