Checkpoints
See Checkpointing for how to enable and configure checkpoints for your program.
To understand the differences between checkpoints and see checkpoints vs. savepoints.
When checkpointing is enabled, managed state is persisted to ensure consistent recovery in case of failures. Where the state is persisted during checkpointing depends on the chosen Checkpoint Storage.
Out of the box, Flink bundles these checkpoint storage types:
- JobManagerCheckpointStorage
- FileSystemCheckpointStorage
The JobManagerCheckpointStorage stores checkpoint snapshots in the JobManager’s heap.
It can be configured to fail the checkpoint if it goes over a certain size to avoid OutOfMemoryError
’s on the JobManager. To set this feature, users can instantiate a JobManagerCheckpointStorage
with the corresponding max size:
- The size of each individual state is by default limited to 5 MB. This value can be increased in the constructor of the .
- Irrespective of the configured maximal state size, the state cannot be larger than the Akka frame size (see ).
- The aggregate state must fit into the JobManager memory.
The JobManagerCheckpointStorage is encouraged for:
- Local development and debugging
The FileSystemCheckpointStorage is configured with a file system URL (type, address, path), such as “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
Upon checkpointing, it writes state snapshots into files in the configured file system and directory. Minimal metadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).
If a checkpoint directory is specified, FileSystemCheckpointStorage
will be used to persist checkpoint snapshots.
The FileSystemCheckpointStorage
is encouraged for:
- All high-availability setups.
It is also recommended to set managed memory to zero. This will ensure that the maximum amount of memory is allocated for user code on the JVM.
Checkpoints are by default not retained and are only used to resume a job from failures. They are deleted when a program is cancelled. You can, however, configure periodic checkpoints to be retained. Depending on the configuration these retained checkpoints are not automatically cleaned up when the job fails or is canceled. This way, you will have a checkpoint around to resume from if your job fails.
CheckpointConfig config = env.getCheckpointConfig();
config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails.
Similarly to , a checkpoint consists of a meta data file and, depending on the state backend, some additional data files. The meta data file and data files are stored in the directory that is configured via state.checkpoints.dir
in the configuration files, and also can be specified for per job in the code.
The current checkpoint directory layout (introduced by FLINK-8531) is as follows:
The SHARED directory is for state that is possibly part of multiple checkpoints, TASKOWNED is for state that must never be dropped by the JobManager, and EXCLUSIVE is for state that belongs to one checkpoint only.
The checkpoint directory is not part of a public API and can be changed in the future release.
Configure globally via configuration files
state.checkpoints.dir: hdfs:///checkpoints/
Configure for per job on the checkpoint configuration
Configure with checkpoint storage instance
Alternatively, checkpoint storage can be set by specifying the desired checkpoint storage instance which allows for setting low level configurations such as write buffer sizes.
new FileSystemCheckpointStorage("hdfs:///checkpoints-data/", FILE_SIZE_THESHOLD));