Streaming Concepts

    The following pages explain concepts, practical limitations, and stream-specific configuration parameters of Flink’s relational APIs on streaming data.

    Table programs that run in streaming mode leverage all capabilities of Flink as a stateful stream processor.

    In particular, a table program can be configured with a state backend and various for handling different requirements regarding state size and fault tolerance. It is possible to take a savepoint of a running Table API & SQL pipeline and to restore the application’s state at a later point in time.

    Due to the declarative nature of Table API & SQL programs, it is not always obvious where and how much state is used within a pipeline. The planner decides whether state is necessary to compute a correct result. A pipeline is optimized to claim as little state as possible given the current set of optimizer rules.

    Queries such as which only consist of field projections or filters are usually stateless pipelines. However, operations such as joins, aggregations, or deduplications require keeping intermediate results in a fault-tolerant storage for which Flink’s state abstractions are used.

    For example, a regular SQL join of two tables requires the operator to keep both input tables in state entirely. For correct SQL semantics, the runtime needs to assume that a matching could occur at any point in time from both sides. Flink provides optimized window and interval joins that aim to keep the state size small by exploiting the concept of .

    Another example is the following query that computes the number of clicks per session.

    The sessionId attribute is used as a grouping key and the continuous query maintains a count for each sessionId it observes. The sessionId attribute is evolving over time and sessionId values are only active until the session ends, i.e., for a limited period of time. However, the continuous query cannot know about this property of sessionId and expects that every value can occur at any point of time. It maintains a count for each observed sessionId value. Consequently, the total state size of the query is continuously growing as more and more sessionId values are observed.

    Idle State Retention Time

    The Idle State Retention Time parameter defines for how long the state of a key is retained without being updated before it is removed. For the previous example query, the count of asessionId would be removed as soon as it has not been updated for the configured period of time.

    By removing the state of a key, the continuous query completely forgets that it has seen this key before. If a record with a key, whose state has been removed before, is processed, the record will be treated as if it was the first record with the respective key. For the example above this means that the count of a sessionId would start again at 0.

    Stateful Upgrades and Evolution

    Table programs that are executed in streaming mode are intended as standing queries which means they are defined once and are continuously evaluated as static end-to-end pipelines.

    For example, by adding a filter predicate, the optimizer might decide to reorder joins or change the schema of an intermediate operator. This prevents restoring from a savepoint due to either changed topology or different column layout within the state of an operator.

    The query implementer must ensure that the optimized plans before and after the change are compatible. Use the command in SQL or table.explain() in Table API to .

    Since new optimizer rules are continuously added, and operators become more efficient and specialized, also the upgrade to a newer Flink version could lead to incompatible plans.

    Since the community rejects contributions that modify the optimized plan and the operator topology in a patch version (e.g. from 1.13.1 to 1.13.2), it should be safe to upgrade a Table API & SQL pipeline to a newer bug fix release. However, major-minor upgrades from (e.g. from 1.12 to 1.13) are not supported.

    Where to go next?

    • : Explains time attributes and how time attributes are handled in Table API & SQL.
    • Versioned Tables: Describes the Temporal Table concept.
    • : Lists Table API & SQL specific configuration options.

    Back to top