Time Attributes

    • Processing time refers to the machine’s system time (also known as epoch time, e.g. Java’s ) that is executing the respective operation.
    • Event time refers to the processing of streaming data based on timestamps that are attached to each row. The timestamps can encode when an event happened.

    For more information about time handling in Flink, see the introduction about event time and watermarks.

    Time attributes can be part of every table schema. They are defined when creating a table from a CREATE TABLE DDL or a DataStream. Once a time attribute is defined, it can be referenced as a field and used in time-based operations. As long as a time attribute is not modified, and is simply forwarded from one part of a query to another, it remains a valid time attribute. Time attributes behave like regular timestamps, and are accessible for calculations. When used in calculations, time attributes are materialized and act as standard timestamps. However, ordinary timestamps cannot be used in place of, or be converted to, time attributes.

    Event time allows a table program to produce results based on timestamps in every record, allowing for consistent results despite out-of-order or late events. It also ensures the replayability of the results of the table program when reading records from persistent storage.

    Additionally, event time allows for unified syntax for table programs in both batch and streaming environments. A time attribute in a streaming environment can be a regular column of a row in a batch environment.

    To handle out-of-order events and to distinguish between on-time and late events in streaming, Flink needs to know the timestamp for each row, and it also needs regular indications of how far along in event time the processing has progressed so far (via so-called ).

    Event time attributes can be defined in CREATE table DDL or during DataStream-to-Table conversion.

    Flink supports defining event time attribute on TIMESTAMP column and TIMESTAMP_LTZ column. If the timestamp data in the source is represented as year-month-day-hour-minute-second, usually a string value without time-zone information, e.g. 2020-04-15 20:13:40.564, it’s recommended to define the event time attribute as a TIMESTAMP column::

    If the timestamp data in the source is represented as a epoch time, usually a long value, e.g. 1618989564564, it’s recommended to define event time attribute as a TIMESTAMP_LTZ column:

    1. CREATE TABLE user_actions (
    2. user_name STRING,
    3. data STRING,
    4. ts BIGINT,
    5. time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
    6. -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
    7. WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
    8. ...
    9. );
    10. FROM user_actions
    11. GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

    When converting a DataStream to a table, an event time attribute can be defined with the .rowtime property during schema definition. Timestamps and watermarks must have already been assigned in the DataStream being converted. During the conversion, Flink always derives rowtime attribute as TIMESTAMP WITHOUT TIME ZONE, because DataStream doesn’t have time zone notion, and treats all event time values as in UTC.

    There are two ways of defining the time attribute when converting a DataStream into a Table. Depending on whether the specified .rowtime field name exists in the schema of the DataStream, the timestamp is either (1) appended as a new column, or it (2) replaces an existing column.

    In either case, the event time timestamp field will hold the value of the DataStream event time timestamp.

    Java

    1. // Option 1:
    2. // extract timestamp and assign watermarks based on knowledge of the stream
    3. val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
    4. // declare an additional logical field as an event time attribute
    5. val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime)
    6. // Option 2:
    7. // extract timestamp from first field, and assign watermarks based on knowledge of the stream
    8. val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
    9. // the first field has been used for timestamp extraction, and is no longer necessary
    10. // replace first field with a logical event time attribute
    11. val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")
    12. // Usage:
    13. val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

    Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time, but it will generate non-deterministic results. Processing time does not require timestamp extraction or watermark generation.

    There are two ways to define a processing time attribute.

    The processing time attribute is defined as a computed column in CREATE table DDL using the system PROCTIME() function, the function return type is TIMESTAMP_LTZ. Please see for more information about computed column.

    The processing time attribute is defined with the .proctime property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it is only definable at the end of the schema definition.

    Java

    1. DataStream<Tuple2<String, String>> stream = ...;
    2. // declare an additional logical field as a processing time attribute
    3. Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());
    4. WindowedTable windowedTable = table.window(
    5. Tumble.over(lit(10).minutes())

    Scala