State Schema Evolution
This page provides an overview of how you can evolve your state type’s data schema. The current restrictions varies across different types and state structures (, ListState
, etc.).
Note that the information on this page is relevant only if you are using state serializers that are generated by Flink’s own type serialization framework. That is, when declaring your state, the provided state descriptor is not configured to use a specific or TypeInformation
, in which case Flink infers information about the state type:
Under the hood, whether or not the schema of state can be evolved depends on the serializer used to read / write persisted state bytes. Simply put, a registered state’s schema can only be evolved if its serializer properly supports it. This is handled transparently by serializers generated by Flink’s type serialization framework (current scope of support is listed .
If you intend to implement a custom for your state type and would like to learn how to implement the serializer to support state schema evolution, please refer to Custom State Serialization. The documentation there also covers necessary internal details about the interplay between state serializers and Flink’s state backends to support state schema evolution.
To evolve the schema of a given state type, you would take the following steps:
- Take a savepoint of your Flink streaming job.
- Update state types in your application (e.g., modifying your Avro type schema).
Further details about the migration process is out of the scope of this documentation; please refer to .
Currently, schema evolution is supported only for POJO and Avro types. Therefore, if you care about schema evolution for state, it is currently recommended to always use either Pojo or Avro for state data types.
There are plans to extend the support for more composite types; for more details, please refer to FLINK-10896.
Flink supports evolving schema of , based on the following set of rules:
- Fields can be removed. Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints.
- New fields can be added. The new field will be initialized to the default value for its type, as defined by Java.
- Declared fields types cannot change.
- Class name of the POJO type cannot change, including the namespace of the class.
Note that the schema of POJO type state can only be evolved when restoring from a previous savepoint with Flink versions newer than 1.8.0. When restoring with Flink versions older than 1.8.0, the schema cannot be changed.
One limitation is that Avro generated classes used as the state type cannot be relocated or have different namespaces when the job is restored.
Flink’s schema migration has some limitations that are required to ensure correctness. For users that need to work around these limitations, and understand them to be safe in their specific use-case, consider using a or the state processor api.
The structure of a key cannot be migrated as this may lead to non-deterministic behavior. For example, if a POJO is used as a key and one field is dropped then there may suddenly be multiple separate keys that are now identical. Flink has no way to merge the corresponding values.
Additionally, the RocksDB state backend relies on binary object identity, rather than the hashCode
method. Any change to the keys’ object structure can lead to non-deterministic behavior.
When Kryo is used, there is no possibility for the framework to verify if any incompatible changes have been made.