Checkpointing
In order to make state fault tolerant, Flink needs to checkpoint the state. Checkpoints allow Flink to recover state and positions in the streams to give the application the same semantics as a failure-free execution.
The documentation on streaming fault tolerance describes in detail the technique behind Flink’s streaming fault tolerance mechanism.
Flink’s checkpointing mechanism interacts with durable storage for streams and state. In general, it requires:
- A persistent (or durable) data source that can replay records for a certain amount of time. Examples for such sources are persistent messages queues (e.g., Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) or file systems (e.g., HDFS, S3, GFS, NFS, Ceph, …).
- A persistent storage for state, typically a distributed filesystem (e.g., HDFS, S3, GFS, NFS, Ceph, …)
By default, checkpointing is disabled. To enable checkpointing, call on the StreamExecutionEnvironment
, where n is the in milliseconds.
Other parameters for checkpointing include:
checkpoint storage: You can set the location where checkpoint snapshots are made durable. By default Flink will use the JobManager’s heap. For production deployments it is recommended to instead use a durable filesystem. See checkpoint storage for more details on the available options for job-wide and cluster-wide configuration.
exactly-once vs. at-least-once: You can optionally pass a mode to the
enableCheckpointing(n)
method to choose between the two guarantee levels. Exactly-once is preferable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.minimum time between checkpoints: To make sure that the streaming application makes a certain amount of progress between checkpoints, one can define how much time needs to pass between checkpoints. If this value is set for example to 5000, the next checkpoint will be started no sooner than 5 seconds after the previous checkpoint completed, regardless of the checkpoint duration and the checkpoint interval. Note that this implies that the checkpoint interval will never be smaller than this parameter.
It is often easier to configure applications by defining the “time between checkpoints” than the checkpoint interval, because the “time between checkpoints” is not susceptible to the fact that checkpoints may sometimes take longer than on average (for example if the target storage system is temporarily slow).
tolerable checkpoint failure number: This defines how many consecutive checkpoint failures will be tolerated, before the whole job is failed over. The default value is
0
, which means no checkpoint failures will be tolerated, and the job will fail on first reported checkpoint failure.number of concurrent checkpoints: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.
This option cannot be used when a minimum time between checkpoints is defined.
externalized checkpoints: You can configure periodic checkpoints to be persisted externally. Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the job fails. This way, you will have a checkpoint around to resume from if your job fails. There are more details in the .
unaligned checkpoints: You can enable unaligned checkpoints to greatly reduce checkpointing times under backpressure. This only works for exactly-once checkpoints and with one concurrent checkpoint.
Scala
Python
Some more parameters and/or defaults may be set via conf/flink-conf.yaml
(see for a full guide):
By default, checkpoints are stored in memory in the JobManager. For proper persistence of large state, Flink supports various approaches for checkpointing state in other locations. The choice of checkpoint storage can be configured via StreamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(…)
. It is strongly encouraged that checkpoints be stored in a highly-available filesystem for production deployments.
See checkpoint storage for more details on the available options for job-wide and cluster-wide configuration.
Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)
.
Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.
Starting from Flink 1.14 it is possible to continue performing checkpoints even if parts of the job graph have finished processing all data, which might happen if it contains bounded sources. This feature must be enabled via a feature flag:
Once the tasks/subtasks are finished, they do not contribute to the checkpoints any longer. This is an an important consideration when implementing any custom operators or UDFs (User-Defined Functions).
In order to support checkpointing with tasks that finished, we adjusted the and introduced the StreamOperator#finish method. This method is expected to be a clear cutoff point for flushing any remaining buffered state. All checkpoints taken after the finish method has been called should be empty (in most cases) and should not contain any buffered data since there will be no way to emit this data. One notable exception is if your operator has some pointers to transactions in external systems (i.e. order to implement the exactly-once semantic). In such a case, checkpoints taken after invoking the finish()
method should keep a pointer to the last transaction(s) that will be committed in the final checkpoint before the operator is closed. A good built-in example of this are exactly-once sinks and the .
How does this impact the operator state?
There is a special handling for UnionListState
, which has often been used to implement a global view over offsets in an external system (i.e. storing current offsets of Kafka partitions). If we had discarded a state for a single subtask that had its finish
method called, we would have lost the offsets for partitions that it had been assigned. In order to work around this problem, we let checkpoints succeed only if none or all subtasks that use UnionListState
are finished.
We have not seen ListState
used in a similar way, but you should be aware that any state checkpointed after the method will be discarded and not be available after a restore.
Any operator that is prepared to be rescaled should work well with tasks that partially finish. Restoring from a checkpoint where only a subset of tasks finished is equivalent to restoring such a task with the number of new subtasks equal to the number of finished tasks.