State Backends
- Windows gather elements or aggregates until they are triggered
- Transformation functions may use the key/value state interface to store values
- Transformation functions may implement the interface to make their local variables fault tolerant
See also state section in the streaming API guide.
When checkpointing is activated, such state is persisted upon checkpoints to guard against data loss and recover consistently.How the state is represented internally, and how and where it is persisted upon checkpoints depends on thechosen State Backend.
Out of the box, Flink bundles these state backends:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
If nothing else is configured, the system will use the MemoryStateBackend.
The MemoryStateBackend holds data internally as objects on the Java heap. Key/value state and window operators hold hash tablesthat store the values, triggers, etc.
Upon checkpoints, this state backend will snapshot the state and send it as part of the checkpoint acknowledgement messages to theJobManager (master), which stores it on its heap as well.
The MemoryStateBackend can be configured to use asynchronous snapshots. While we strongly encourage the use of asynchronous snapshots to avoid blocking pipelines, please note that this is currently enabled by default. To disable this feature, users can instantiate a MemoryStateBackend
with the corresponding boolean flag in the constructor set to false
(this should only used for debug), e.g.:
Limitations of the MemoryStateBackend:
- The size of each individual state is by default limited to 5 MB. This value can be increased in the constructor of the MemoryStateBackend.
- Irrespective of the configured maximal state size, the state cannot be larger than the akka frame size (see ).
- The aggregate state must fit into the JobManager memory.
- Local development and debugging
- Jobs that do hold little state, such as jobs that consist only of record-at-a-time functions (Map, FlatMap, Filter, …). The Kafka Consumer requires very little state.
The FsStateBackend is configured with a file system URL (type, address, path), such as “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
The FsStateBackend holds in-flight data in the TaskManager’s memory. Upon checkpointing, it writes state snapshots into files in the configured file system and directory. Minimal metadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).
The FsStateBackend uses asynchronous snapshots by default to avoid blocking the processing pipeline while writing state checkpoints. To disable this feature, users can instantiate a FsStateBackend
with the corresponding boolean flag in the constructor set to false
, e.g.:
new FsStateBackend(path, false);
The FsStateBackend is encouraged for:
- All high-availability setups.
The RocksDBStateBackend is configured with a file system URL (type, address, path), such as “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
The RocksDBStateBackend holds in-flight data in a RocksDB databasethat is (per default) stored in the TaskManager data directories. Upon checkpointing, the wholeRocksDB database will be checkpointed into the configured file system and directory. Minimalmetadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).
The RocksDBStateBackend always performs asynchronous snapshots.
Limitations of the RocksDBStateBackend:
- As RocksDB’s JNI bridge API is based on byte[], the maximum supported size per key and per value is 2^31 bytes each. IMPORTANT: states that use merge operations in RocksDB (e.g. ListState) can silently accumulate value sizes > 2^31 bytes and will then fail on their next retrieval. This is currently a limitation of RocksDB JNI.
- Jobs with very large state, long windows, large key/value states.
- All high-availability setups.
Note that the amount of state that you can keep is only limited by the amount of disk space available.This allows keeping very large state, compared to the FsStateBackend that keeps state in memory.This also means, however, that the maximum throughput that can be achieved will be lower withthis state backend. All reads/writes from/to this backend have to go through de-/serialization to retrieve/store the state objects, which is also more expensive than always working with theon-heap representation as the heap-based backends are doing.
RocksDBStateBackend is currently the only backend that offers incremental checkpoints (see ).
Certain RocksDB native metrics are available but disabled by default, you can find full documentation here
Configuring a State Backend
The default state backend, if you specify nothing, is the jobmanager. If you wish to establish a different default for all jobs on your cluster, you can do so by defining a new default state backend in flink-conf.yaml. The default state backend can be overridden on a per-job basis, as shown below.
The per-job state backend is set on the StreamExecutionEnvironment
of the job, as shown in the example below:
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
If you want to use the RocksDBStateBackend
, then you have to add the following dependency to your Flink project.
A default state backend can be configured in the flink-conf.yaml
, using the configuration key state.backend
.
Possible values for the config entry are jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend), or the fully qualified classname of the class that implements the state backend factory StateBackendFactory,such as org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
for RocksDBStateBackend.
A sample section in the configuration file could look as follows:
state.backend: filesystem
# Directory for storing checkpoints