Versioned Tables

    Dynamic tables define relations over time. Often, particularly when working with metadata, a key’s old value does not become irrelevant when it changes.

    Flink SQL can define versioned tables over any dynamic table with a constraint and time attribute.

    A primary key constraint in Flink means that a column or set of columns of a table or view are unique and non-null. The primary key semantic on a upserting table means the materialized changes for a particular key (INSERT/UPDATE/DELETE) represent the changes to a single row over time. The time attribute on a upserting table defines when each change occurred.

    Taken together, Flink can track the changes to a row over time and maintain the period for which each value was valid for that key.

    Given this set of changes, we track how the price of a scooter changes over time. It is initially $11.11 at 00:01:00 when added to the catalog. The price then rises to $12.99 at 12:00:00 before being deleted from the catalog at 18:00:00.

    If we queried the table for various products’ prices at different times, we would retrieve different results. At 10:00:00 the table would show one set of prices.

    1. update_time product_id product_name price
    2. =========== ========== ============ =====
    3. 00:01:00 p_001 scooter 11.11
    4. 00:02:00 p_002 basketball 23.11

    While at 13:00:00, we would find another set of prices.

    Versioned tables are defined implicitly for any tables whose underlying sources or formats directly define changelogs. Examples include the upsert Kafka source as well as database changelog formats such as and canal. As discussed above, the only additional requirement is the CREATE table statement must contain a PRIMARY KEY and an event-time attribute.

    1. product_id STRING,
    2. product_name STRING,
    3. update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
    4. PRIMARY KEY (product_id) NOT ENFORCED,
    5. WATERMARK FOR update_time AS update_time
    6. ) WITH (...);

    The table currency_rates contains a row for each currency — with respect to USD — and receives a new row each time the rate changes. The JSON format does not support native changelog semantics, so Flink can only read this table as append-only.

    1. (changelog kind) update_time currency rate
    2. ================ ============= ========= ====
    3. +(INSERT) 09:00:00 Yen 102
    4. +(INSERT) 09:00:00 Euro 114
    5. +(INSERT) 09:00:00 USD 1

    Flink interprets each row as an INSERT to the table, meaning we cannot define a PRIMARY KEY over currency. However, it is clear to us (the query developer) that this table has all the necessary information to define a versioned table. Flink can reinterpret this table as a versioned table by defining a which produces an ordered changelog stream with an inferred primary key (currency) and event time (update_time).

    Flink has a special optimization step that will efficiently transform this query into a versioned table usable in subsequent queries. In general, the results of a query with the following format produces a versioned table:

    1. SELECT [column_list]
    2. FROM (
    3. SELECT [column_list],
    4. ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
    5. ORDER BY time_attr DESC) AS rownum
    6. FROM table_name)
    7. WHERE rownum = 1

    Parameter Specification:

    • ROW_NUMBER(): Assigns an unique, sequential number to each row, starting with one.
    • PARTITION BY col1[, col2...]: Specifies the partition columns, i.e. the deduplicate key. These columns form the primary key of the subsequent versioned table.
    • ORDER BY time_attr DESC: Specifies the ordering column, it must be a time attribute.