Fault Tolerance via State Snapshots

    This state that Flink manages is stored in a state backend. Two implementations of state backends are available – one based on RocksDB, an embedded key/value store that keeps its working state on disk, and another heap-based state backend that keeps its working state in memory, on the Java heap.

    When working with state kept in a heap-based state backend, accesses and updates involve reading and writing objects on the heap. But for objects kept in the , accesses and updates involve serialization and deserialization, and so are much more expensive. But the amount of state you can have with RocksDB is limited only by the size of the local disk. Note also that only the is able to do incremental snapshotting, which is a significant benefit for applications with large amounts of slowly changing state.

    Both of these state backends are able to do asynchronous snapshotting, meaning that they can take a snapshot without impeding the ongoing stream processing.

    Flink periodically takes persistent snapshots of all the state in every operator and copies these snapshots somewhere more durable, such as a distributed file system. In the event of the failure, Flink can restore the complete state of your application and resume processing as though nothing had gone wrong.

    The location where these snapshots are stored is defined via the jobs checkpoint storage. Two implementations of checkpoint storage are available - one that persists its state snapshots to a distributed file system, and another that users the JobManager’s heap.

    NameState Backup
    FileSystemCheckpointStorageDistributed file system
    • Supports very large state size
    • Highly durable
    • Recommended for production deployments
    JobManagerCheckpointStorageJobManager JVM Heap
    • Good for testing and experimentation with small state (locally)
    • Snapshot – a generic term referring to a global, consistent image of the state of a Flink job. A snapshot includes a pointer into each of the data sources (e.g., an offset into a file or Kafka partition), as well as a copy of the state from each of the job’s stateful operators that resulted from having processed all of the events up to those positions in the sources.
    • Checkpoint – a snapshot taken automatically by Flink for the purpose of being able to recover from faults. Checkpoints can be incremental, and are optimized for being restored quickly.
    • Externalized Checkpoint – normally checkpoints are not intended to be manipulated by users. Flink retains only the n-most-recent checkpoints (n being configurable) while a job is running, and deletes them when a job is cancelled. But you can configure them to be retained instead, in which case you can manually resume from them.
    • Savepoint – a snapshot triggered manually by a user (or an API call) for some operational purpose, such as a stateful redeploy/upgrade/rescaling operation. Savepoints are always complete, and are optimized for operational flexibility.

    When a task manager is instructed by the checkpoint coordinator (part of the job manager) to begin a checkpoint, it has all of the sources record their offsets and insert numbered checkpoint barriers into their streams. These barriers flow through the job graph, indicating the part of the stream before and after each checkpoint.

    Checkpoint n will contain the state of each operator that resulted from having consumed every event before checkpoint barrier n, and none of the events after it.

    As each operator in the job graph receives one of these barriers, it records its state. Operators with two input streams (such as a ) perform barrier alignment so that the snapshot will reflect the state resulting from consuming events from both input streams up to (but not past) both barriers.

    Barrier alignment

    When things go wrong in a stream processing application, it is possible to have either lost, or duplicated results. With Flink, depending on the choices you make for your application and the cluster you run it on, any of these outcomes is possible:

    • Flink makes no effort to recover from failures (at most once)
    • Nothing is lost, but you may experience duplicated results (at least once)

    Given that Flink recovers from faults by rewinding and replaying the source data streams, when the ideal situation is described as exactly once this does not mean that every event will be processed exactly once. Instead, it means that every event will affect the state being managed by Flink exactly once.

    Barrier alignment is only needed for providing exactly once guarantees. If you don’t need this, you can gain some performance by configuring Flink to use , which has the effect of disabling barrier alignment.

    To achieve exactly once end-to-end, so that every event from the sources affects the sinks exactly once, the following must be true:

    1. your sources must be replayable, and
    2. your sinks must be transactional (or idempotent)

    The Flink Operations Playground includes a section on .