Hive Read & Write

    Flink supports reading data from Hive in both BATCH and STREAMING modes. When run as a BATCH application, Flink will execute its query over the state of the table at the point in time when the query is executed. STREAMING reads will continuously monitor the table and incrementally fetch new data as it is made available. Flink will read tables as bounded by default.

    STREAMING reads support consuming both partitioned and non-partitioned tables. For partitioned tables, Flink will monitor the generation of new partitions, and read them incrementally when available. For non-partitioned tables, Flink will monitor the generation of new files in the folder and read new files incrementally.

    SQL Hints can be used to apply configurations to a Hive table without changing its definition in the Hive metastore.

    Notes

    • Monitor strategy is to scan all directories/files currently in the location path. Many partitions may cause performance degradation.
    • Streaming reads for non-partitioned tables requires that each file be written atomically into the target directory.
    • Streaming reading for partitioned tables requires that each partition should be added atomically in the view of hive metastore. If not, new data added to an existing partition will be consumed.
    • Streaming reads do not support watermark grammar in Flink DDL. These tables cannot be used for window operators.

    Flink is able to read from Hive defined views, but some limitations apply:

    1. The Hive catalog must be set as the current catalog before you can query the view. This can be done by either tableEnv.useCatalog(...) in Table API or USE CATALOG ... in SQL Client.

    2. Hive and Flink SQL have different syntax, e.g. different reserved keywords and literals. Make sure the view’s query is compatible with Flink grammar.

    Vectorized Optimization upon Read

    Flink will automatically used vectorized reads of Hive tables when the following conditions are met:

    • Format: ORC or Parquet.
    • Columns without complex data type, like hive types: List, Map, Struct, Union.

    This feature is enabled by default. It may be disabled with the following configuration.

    1. table.exec.hive.fallback-mapred-reader=true

    Flink allows you to flexibly configure the policy of parallelism inference. You can configure the following parameters in TableConfig (note that these parameters affect all sources of the job):

    Load Partition Splits

    Multi-thread is used to split hive’s partitions. You can use table.exec.hive.load-partition-splits.thread-num to configure the thread number. The default value is 3 and the configured value should be bigger than 0.

    You can use a Hive table as a temporal table, and then a stream can correlate the Hive table by temporal join. Please see for more information about the temporal join.

    Flink supports processing-time temporal join Hive Table, the processing-time temporal join always joins the latest version of temporal table. Flink supports temporal join both partitioned table and Hive non-partitioned table, for partitioned table, Flink supports tracking the latest partition of Hive table automatically.

    NOTE: Flink does not support event-time temporal join Hive table yet.

    For a partitioned table which is changing over time, we can read it out as an unbounded stream, the partition can be acted as a version of the temporal table if every partition contains complete data of a version, the version of temporal table keeps the data of the partition.

    Flink supports tracking the latest partition (version) of temporal table automatically in processing time temporal join, the latest partition (version) is defined by ‘streaming-source.partition-order’ option, This is the most common user cases that use Hive table as dimension table in a Flink stream application job.

    NOTE: This feature is only supported in Flink STREAMING Mode.

    The following demo shows a classical business pipeline, the dimension table comes from Hive and it’s updated once every day by a batch pipeline job or a Flink job, the kafka stream comes from real time online business data or log and need to join with the dimension table to enrich stream.

    Temporal Join The Latest Table

    When performing the temporal join the latest Hive table, the Hive table will be cached in Slot memory and each record from the stream is joined against the table by key to decide whether a match is found. Using the latest Hive table as a temporal table does not require any additional configuration. Optionally, you can configure the TTL of the Hive table cache with the following property. After the cache expires, the Hive table will be scanned again to load the latest data.

    The following demo shows load all data of hive table as a temporal table.

    1. -- Assume the data in hive table is overwrite by batch pipeline.
    2. SET table.sql-dialect=hive;
    3. CREATE TABLE dimension_table (
    4. product_id STRING,
    5. product_name STRING,
    6. unit_price DECIMAL(10, 4),
    7. pv_count BIGINT,
    8. like_count BIGINT,
    9. comment_count BIGINT,
    10. update_time TIMESTAMP(3),
    11. update_user STRING,
    12. ...
    13. 'streaming-source.partition.include' = 'all', -- option with default value, can be ignored.
    14. 'lookup.join.cache.ttl' = '12 h'
    15. );
    16. SET table.sql-dialect=default;
    17. CREATE TABLE orders_table (
    18. order_id STRING,
    19. order_amount DOUBLE,
    20. product_id STRING,
    21. log_ts TIMESTAMP(3),
    22. proctime as PROCTIME()
    23. ) WITH (...);
    24. -- streaming sql, kafka join a hive dimension table. Flink will reload all data from dimension_table after cache ttl is expired.
    25. SELECT * FROM orders_table AS o
    26. JOIN dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim
    27. ON o.product_id = dim.product_id;

    Note:

    1. Each joining subtask needs to keep its own cache of the Hive table. Please make sure the Hive table can fit into the memory of a TM task slot.
    2. It is encouraged to set a relatively large value both for streaming-source.monitor-interval(latest partition as temporal table) or lookup.join.cache.ttl(all partitions as temporal table). Otherwise, Jobs are prone to performance issues as the table needs to be updated and reloaded too frequently.
    3. Currently we simply load the whole Hive table whenever the cache needs refreshing. There’s no way to differentiate new data from the old.

    Flink supports writing data from Hive in both BATCH and STREAMING modes. When run as a BATCH application, Flink will write to a Hive table only making those records visible when the Job finishes. BATCH writes support both appending to and overwriting existing tables.

    Data can also be inserted into particular partitions.

    1. # ------ Insert with static partition ------
    2. Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;
    3. # ------ Insert with dynamic partition ------
    4. Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';
    5. # ------ Insert with static(my_type) and dynamic(my_date) partition ------

    STREAMING writes continuously adding new data to Hive, committing records - making them visible - incrementally. Users control when/how to trigger commits with several properties. Insert overwrite is not supported for streaming write.

    The below examples show how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit, and runs a batch query to read that data back out.

    Please see the for a full list of available configurations.

    If the watermark is defined on TIMESTAMP_LTZ column and used partition-time to commit, the sink.partition-commit.watermark-time-zone is required to set to the session time zone, otherwise the partition committed may happen after a few hours.

    1. SET table.sql-dialect=hive;
    2. CREATE TABLE hive_table (
    3. user_id STRING,
    4. order_amount DOUBLE
    5. ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
    6. 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
    7. 'sink.partition-commit.trigger'='partition-time',
    8. 'sink.partition-commit.delay'='1 h',
    9. 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user configured time zone is 'Asia/Shanghai'
    10. 'sink.partition-commit.policy.kind'='metastore,success-file'
    11. );
    12. SET table.sql-dialect=default;
    13. CREATE TABLE kafka_table (
    14. user_id STRING,
    15. order_amount DOUBLE,
    16. ts BIGINT, -- time in epoch milliseconds
    17. ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
    18. WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP_LTZ column
    19. ) WITH (...);
    20. -- streaming sql, insert into hive table
    21. INSERT INTO TABLE hive_table
    22. SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH')
    23. FROM kafka_table;
    24. -- batch sql, select with partition pruning
    25. SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

    Flink’s Hive integration has been tested against the following file formats:

    • Text
    • CSV
    • SequenceFile
    • Parquet