时区

    • TIMESTAMP(p)TIMESTAMP(p) WITHOUT TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6。
    • TIMESTAMP 用于描述年, 月, 日, 小时, 分钟, 秒 和 小数秒对应的时间戳。
    • TIMESTAMP 可以通过一个字符串来指定,例如:

    TIMESTAMP_LTZ 类型

    • TIMESTAMP_LTZ(p)TIMESTAMP(p) WITH LOCAL TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6。
    • TIMESTAMP_LTZ 用于描述时间线上的绝对时间点, 使用 long 保存从 epoch 至今的毫秒数, 使用int保存毫秒中的纳秒数。 epoch 时间是从 java 的标准 epoch 时间 1970-01-01T00:00:00Z 开始计算。 在计算和可视化时, 每个 TIMESTAMP_LTZ 类型的数据都是使用的 session (会话)中配置的时区。
    • TIMESTAMP_LTZ 没有字符串表达形式因此无法通过字符串来指定, 可以通过一个 long 类型的 epoch 时间来转化(例如: 通过 Java 来产生一个 long 类型的 epoch 时间 System.currentTimeMillis())
    1. Flink SQL> CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3);
    2. Flink SQL> SET 'table.local-time-zone' = 'UTC';
    3. Flink SQL> SELECT * FROM T1;
    4. +---------------------------+
    5. | TO_TIMESTAMP_LTZ(4001, 3) |
    6. +---------------------------+
    7. | 1970-01-01 00:00:04.001 |
    8. +---------------------------+
    9. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
    10. Flink SQL> SELECT * FROM T1;
    11. +---------------------------+
    12. | TO_TIMESTAMP_LTZ(4001, 3) |
    13. +---------------------------+
    14. | 1970-01-01 08:00:04.001 |
    15. +---------------------------+
    • TIMESTAMP_LTZ 可以用于跨时区的计算,因为它是一个基于 epoch 的绝对时间点(比如上例中的 4001 毫秒)代表的就是不同时区的同一个绝对时间点。 补充一个背景知识:在同一个时间点, 全世界所有的机器上执行 System.currentTimeMillis() 都会返回同样的值。 (比如上例中的 4001 milliseconds), 这就是绝对时间的定义。

    本地时区定义了当前 session(会话)所在的时区, 你可以在 Sql client 或者应用程序中配置。

    SQL Client

    1. -- 设置为 UTC 时区
    2. Flink SQL> SET 'table.local-time-zone' = 'UTC';
    3. -- 设置为上海时区
    4. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
    5. -- 设置为Los_Angeles时区
    6. Flink SQL> SET 'table.local-time-zone' = 'America/Los_Angeles';

    Java

    1. EnvironmentSettings envSetting = EnvironmentSettings.inStreamingMode();
    2. TableEnvironment tEnv = TableEnvironment.create(envSetting);
    3. // 设置为 UTC 时区
    4. tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
    5. // 设置为上海时区
    6. tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
    7. // 设置为 Los_Angeles 时区
    8. tEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles"));

    Scala

    1. val envSetting = EnvironmentSettings.inStreamingMode()
    2. val tEnv = TableEnvironment.create(envSetting)
    3. // 设置为 UTC 时区
    4. tEnv.getConfig.setLocalTimeZone(ZoneId.of("UTC"))
    5. // 设置为上海时区
    6. tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
    7. // 设置为 Los_Angeles 时区
    8. tEnv.getConfig.setLocalTimeZone(ZoneId.of("America/Los_Angeles"))

    session(会话)的时区设置在 Flink SQL 中非常有用, 它的主要用法如下:

    session (会话)中配置的时区会对以下函数生效。

    • LOCALTIME
    • LOCALTIMESTAMP
    • CURRENT_DATE
    • CURRENT_TIME
    • CURRENT_TIMESTAMP
    • CURRENT_ROW_TIMESTAMP()
    • NOW()
    • PROCTIME()
    1. Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
    2. Flink SQL> CREATE VIEW MyView1 AS SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(), PROCTIME();
    3. Flink SQL> DESC MyView1;
    1. +------------------------+-----------------------------+-------+-----+--------+-----------+
    2. | name | type | null | key | extras | watermark |
    3. +------------------------+-----------------------------+-------+-----+--------+-----------+
    4. | LOCALTIME | TIME(0) | false | | | |
    5. | LOCALTIMESTAMP | TIMESTAMP(3) | false | | | |
    6. | CURRENT_DATE | DATE | false | | | |
    7. | CURRENT_TIME | TIME(0) | false | | | |
    8. | CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | false | | | |
    9. |CURRENT_ROW_TIMESTAMP() | TIMESTAMP_LTZ(3) | false | | | |
    10. | NOW() | TIMESTAMP_LTZ(3) | false | | | |
    11. | PROCTIME() | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | |
    12. +------------------------+-----------------------------+-------+-----+--------+-----------+
    1. Flink SQL> SET 'table.local-time-zone' = 'UTC';
    2. Flink SQL> SELECT * FROM MyView1;
    1. +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
    2. | LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() |
    3. +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
    4. | 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 | 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 |
    5. +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
    1. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
    2. Flink SQL> SELECT * FROM MyView1;
    1. +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
    2. | LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() |
    3. +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
    4. | 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 | 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 |
    5. +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+

    TIMESTAMP_LTZ 字符串表示

    当一个 TIMESTAMP_LTZ 值转为 string 格式时, session 中配置的时区会生效。 例如打印这个值,将类型强制转化为 STRING 类型, 将类型强制转换为 TIMESTAMP ,将 TIMESTAMP 的值转化为 TIMESTAMP_LTZ 类型:

    1. Flink SQL> CREATE VIEW MyView2 AS SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz, TIMESTAMP '1970-01-01 00:00:01.001' AS ntz;
    2. Flink SQL> DESC MyView2;
    1. +------+------------------+-------+-----+--------+-----------+
    2. | name | type | null | key | extras | watermark |
    3. +------+------------------+-------+-----+--------+-----------+
    4. | ntz | TIMESTAMP(3) | false | | | |
    5. +------+------------------+-------+-----+--------+-----------+
    1. Flink SQL> SET 'table.local-time-zone' = 'UTC';
    2. Flink SQL> SELECT * FROM MyView2;
    1. +-------------------------+-------------------------+
    2. | ltz | ntz |
    3. | 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 |
    4. +-------------------------+-------------------------+
    1. +-------------------------+-------------------------+
    2. | ltz | ntz |
    3. +-------------------------+-------------------------+
    4. | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 |
    5. +-------------------------+-------------------------+
    1. Flink SQL> CREATE VIEW MyView3 AS SELECT ltz, CAST(ltz AS TIMESTAMP(3)), CAST(ltz AS STRING), ntz, CAST(ntz AS TIMESTAMP_LTZ(3)) FROM MyView2;
    1. Flink SQL> DESC MyView3;
    2. +-------------------------------+------------------+-------+-----+--------+-----------+
    3. | name | type | null | key | extras | watermark |
    4. +-------------------------------+------------------+-------+-----+--------+-----------+
    5. | ltz | TIMESTAMP_LTZ(3) | true | | | |
    6. | CAST(ltz AS TIMESTAMP(3)) | TIMESTAMP(3) | true | | | |
    7. | CAST(ltz AS STRING) | STRING | true | | | |
    8. | ntz | TIMESTAMP(3) | false | | | |
    9. | CAST(ntz AS TIMESTAMP_LTZ(3)) | TIMESTAMP_LTZ(3) | false | | | |
    10. +-------------------------------+------------------+-------+-----+--------+-----------+
    1. Flink SQL> SELECT * FROM MyView3;
    1. +-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
    2. | ltz | CAST(ltz AS TIMESTAMP(3)) | CAST(ltz AS STRING) | ntz | CAST(ntz AS TIMESTAMP_LTZ(3)) |
    3. +-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
    4. | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 |
    5. +-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+

    更多时间属性相关的详细介绍, 请参考 Time Attribute

    PROCTIME() 返回的是本地时区的时间, 使用 TIMESTAMP_LTZ 类型也可以支持夏令时时间。

    1. Flink SQL> SET 'table.local-time-zone' = 'UTC';
    2. Flink SQL> SELECT PROCTIME();
    1. +-------------------------+
    2. | PROCTIME() |
    3. +-------------------------+
    4. | 2021-04-15 14:48:31.387 |
    5. +-------------------------+
    1. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
    2. Flink SQL> SELECT PROCTIME();
    1. +-------------------------+
    2. | PROCTIME() |
    3. +-------------------------+
    4. | 2021-04-15 22:48:31.387 |
    5. +-------------------------+
    1. Flink SQL> CREATE TABLE MyTable1 (
    2. item STRING,
    3. price DOUBLE,
    4. proctime as PROCTIME()
    5. ) WITH (
    6. 'connector' = 'socket',
    7. 'hostname' = '127.0.0.1',
    8. 'port' = '9999',
    9. 'format' = 'csv'
    10. );
    11. Flink SQL> CREATE VIEW MyView3 AS
    12. SELECT
    13. TUMBLE_START(proctime, INTERVAL '10' MINUTES) AS window_start,
    14. TUMBLE_END(proctime, INTERVAL '10' MINUTES) AS window_end,
    15. TUMBLE_PROCTIME(proctime, INTERVAL '10' MINUTES) as window_proctime,
    16. item,
    17. MAX(price) as max_price
    18. FROM MyTable1
    19. GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTES), item;
    20. Flink SQL> DESC MyView3;
    1. +-----------------+-----------------------------+-------+-----+--------+-----------+
    2. | name | type | null | key | extras | watermark |
    3. +-----------------+-----------------------------+-------+-----+--------+-----------+
    4. | window_start | TIMESTAMP(3) | false | | | |
    5. | window_end | TIMESTAMP(3) | false | | | |
    6. | window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | |
    7. | item | STRING | true | | | |
    8. | max_price | DOUBLE | true | | | |
    9. +-----------------+-----------------------------+-------+-----+--------+-----------+

    在终端执行以下命令写入数据到 MyTable1

    1. > nc -lk 9999
    2. A,1.1
    3. B,1.2
    4. A,1.8
    5. B,2.5
    6. C,3.8
    1. Flink SQL> SET 'table.local-time-zone' = 'UTC';
    2. Flink SQL> SELECT * FROM MyView3;
    1. +-------------------------+-------------------------+-------------------------+------+-----------+
    2. | window_start | window_end | window_procime | item | max_price |
    3. +-------------------------+-------------------------+-------------------------+------+-----------+
    4. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.005 | A | 1.8 |
    5. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | B | 2.5 |
    6. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | C | 3.8 |
    7. +-------------------------+-------------------------+-------------------------+------+-----------+

    相比在 UTC 时区下的计算结果, 在 Asia/Shanghai 时区下计算的窗口开始时间, 窗口结束时间和窗口处理时间是不同的。

    1. +-------------------------+-------------------------+-------------------------+------+-----------+
    2. | window_start | window_end | window_procime | item | max_price |
    3. +-------------------------+-------------------------+-------------------------+------+-----------+
    4. | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.005 | A | 1.8 |
    5. | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | B | 2.5 |
    6. | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | C | 3.8 |
    7. +-------------------------+-------------------------+-------------------------+------+-----------+

    处理时间窗口是不确定的, 每次运行都会返回不同的窗口和聚合结果。 以上的示例只用于说明时区如何影响处理时间窗口。

    事件时间和时区

    Flink 支持在 TIMESTAMP 列和 TIMESTAMP_LTZ 列上定义时间属性。

    TIMESTAMP 上的事件时间属性

    如果 source 中的时间用于表示年-月-日-小时-分钟-秒, 通常是一个不带时区的字符串, 例如: 2020-04-15 20:13:40.564。 推荐在 TIMESTAMP 列上定义事件时间属性。

    1. Flink SQL> CREATE TABLE MyTable2 (
    2. item STRING,
    3. price DOUBLE,
    4. ts TIMESTAMP(3), -- TIMESTAMP data type
    5. WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
    6. ) WITH (
    7. 'connector' = 'socket',
    8. 'hostname' = '127.0.0.1',
    9. 'port' = '9999',
    10. 'format' = 'csv'
    11. Flink SQL> CREATE VIEW MyView4 AS
    12. SELECT
    13. TUMBLE_START(ts, INTERVAL '10' MINUTES) AS window_start,
    14. TUMBLE_END(ts, INTERVAL '10' MINUTES) AS window_end,
    15. TUMBLE_ROWTIME(ts, INTERVAL '10' MINUTES) as window_rowtime,
    16. item,
    17. MAX(price) as max_price
    18. FROM MyTable2
    19. GROUP BY TUMBLE(ts, INTERVAL '10' MINUTES), item;
    20. Flink SQL> DESC MyView4;
    1. +----------------+------------------------+------+-----+--------+-----------+
    2. | name | type | null | key | extras | watermark |
    3. +----------------+------------------------+------+-----+--------+-----------+
    4. | window_start | TIMESTAMP(3) | true | | | |
    5. | window_end | TIMESTAMP(3) | true | | | |
    6. | window_rowtime | TIMESTAMP(3) *ROWTIME* | true | | | |
    7. | item | STRING | true | | | |
    8. | max_price | DOUBLE | true | | | |
    9. +----------------+------------------------+------+-----+--------+-----------+

    在终端执行以下命令用于写入数据到 MyTable2

    1. > nc -lk 9999
    2. A,1.1,2021-04-15 14:01:00
    3. B,1.2,2021-04-15 14:02:00
    4. A,1.8,2021-04-15 14:03:00
    5. B,2.5,2021-04-15 14:04:00
    6. C,3.8,2021-04-15 14:05:00
    7. C,3.8,2021-04-15 14:11:00
    1. Flink SQL> SET 'table.local-time-zone' = 'UTC';
    2. Flink SQL> SELECT * FROM MyView4;
    1. +-------------------------+-------------------------+-------------------------+------+-----------+
    2. | window_start | window_end | window_rowtime | item | max_price |
    3. +-------------------------+-------------------------+-------------------------+------+-----------+
    4. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 |
    5. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 |
    6. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 |
    7. +-------------------------+-------------------------+-------------------------+------+-----------+
    1. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
    2. Flink SQL> SELECT * FROM MyView4;
    1. +-------------------------+-------------------------+-------------------------+------+-----------+
    2. | window_start | window_end | window_rowtime | item | max_price |
    3. +-------------------------+-------------------------+-------------------------+------+-----------+
    4. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 |
    5. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 |
    6. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 |
    7. +-------------------------+-------------------------+-------------------------+------+-----------+

    TIMESTAMP_LTZ 上的事件时间属性

    如果源数据中的时间为一个 epoch 时间, 通常是一个 long 值, 例如: 1618989564564 ,推荐将事件时间属性定义在 TIMESTAMP_LTZ 列上。

    1. Flink SQL> CREATE TABLE MyTable3 (
    2. item STRING,
    3. price DOUBLE,
    4. ts BIGINT, -- long time value in epoch milliseconds
    5. ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
    6. WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND
    7. ) WITH (
    8. 'connector' = 'socket',
    9. 'hostname' = '127.0.0.1',
    10. 'port' = '9999',
    11. 'format' = 'csv'
    12. );
    13. Flink SQL> CREATE VIEW MyView5 AS
    14. SELECT
    15. TUMBLE_START(ts_ltz, INTERVAL '10' MINUTES) AS window_start,
    16. TUMBLE_END(ts_ltz, INTERVAL '10' MINUTES) AS window_end,
    17. TUMBLE_ROWTIME(ts_ltz, INTERVAL '10' MINUTES) as window_rowtime,
    18. item,
    19. MAX(price) as max_price
    20. FROM MyTable3
    21. GROUP BY TUMBLE(ts_ltz, INTERVAL '10' MINUTES), item;
    22. Flink SQL> DESC MyView5;
    1. +----------------+----------------------------+-------+-----+--------+-----------+
    2. | name | type | null | key | extras | watermark |
    3. +----------------+----------------------------+-------+-----+--------+-----------+
    4. | window_start | TIMESTAMP(3) | false | | | |
    5. | window_end | TIMESTAMP(3) | false | | | |
    6. | window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | true | | | |
    7. | item | STRING | true | | | |
    8. | max_price | DOUBLE | true | | | |
    9. +----------------+----------------------------+-------+-----+--------+-----------+

    MyTable3 的输入数据为:

    1. A,1.1,1618495260000 # The corresponding utc timestamp is 2021-04-15 14:01:00
    2. B,1.2,1618495320000 # The corresponding utc timestamp is 2021-04-15 14:02:00
    3. A,1.8,1618495380000 # The corresponding utc timestamp is 2021-04-15 14:03:00
    4. B,2.5,1618495440000 # The corresponding utc timestamp is 2021-04-15 14:04:00
    5. C,3.8,1618495500000 # The corresponding utc timestamp is 2021-04-15 14:05:00
    6. C,3.8,1618495860000 # The corresponding utc timestamp is 2021-04-15 14:11:00
    1. Flink SQL> SET 'table.local-time-zone' = 'UTC';
    2. Flink SQL> SELECT * FROM MyView5;
    1. +-------------------------+-------------------------+-------------------------+------+-----------+
    2. | window_start | window_end | window_rowtime | item | max_price |
    3. +-------------------------+-------------------------+-------------------------+------+-----------+
    4. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 |
    5. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 |
    6. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 |
    7. +-------------------------+-------------------------+-------------------------+------+-----------+
    1. Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
    2. Flink SQL> SELECT * FROM MyView5;

    相比在 UTC 时区下的计算结果, 在 Asia/Shanghai 时区下计算的窗口开始时间, 窗口结束时间和窗口的 rowtime 是不同的。

    Flink SQL支持在 TIMESTAMP_LTZ列上定义时间属性, 基于这一特征,Flink SQL 在窗口中使用 TIMESTAMPTIMESTAMP_LTZ 类型优雅地支持了夏令时。

    Flink 使用时间戳的字符格式来分割窗口并通过每条记录对应的 epoch 时间来分配窗口。 这意味着 Flink 窗口开始时间和窗口结束时间使用的是 TIMESTAMP 类型(例如: TUMBLE_STARTTUMBLE_END), 窗口的时间属性使用的是 TIMESTAMP_LTZ 类型(例如: TUMBLE_PROCTIMETUMBLE_ROWTIME)。 给定一个 tumble window示例, 在 Los_Angeles 时区下夏令时从 2021-03-14 02:00:00 开始:

    1. long epoch1 = 1615708800000L; // 2021-03-14 00:00:00
    2. long epoch2 = 1615712400000L; // 2021-03-14 01:00:00
    3. long epoch3 = 1615716000000L; // 2021-03-14 03:00:00, 手表往前拨一小时,跳过 (2021-03-14 02:00:00)
    4. long epoch4 = 1615719600000L; // 2021-03-14 04:00:00

    在 Los_angele 时区下, tumble window [2021-03-14 00:00:00, 2021-03-14 00:04:00] 将会收集3个小时的数据, 在其他非夏令时的时区下将会收集4个小时的数据,用户只需要在 TIMESTAMP_LTZ 列上声明时间属性即可。

    Flink 的所有窗口(如 Hop window, Session window, Cumulative window)都会遵循这种方式, Flink SQL 中的所有操作都很好地支持了 TIMESTAMP_LTZ 类型,因此Flink可以非常优雅的支持夏令时。

    以下函数:

    • LOCALTIME
    • LOCALTIMESTAMP
    • CURRENT_DATE
    • CURRENT_TIME
    • CURRENT_TIMESTAMP
    • NOW()

    以下时间函数无论是在 Streaming 模式还是 Batch 模式下,都会为每条记录计算一次结果:

    • CURRENT_ROW_TIMESTAMP()
    • PROCTIME()