Hive Catalog

    For users who have both Hive and Flink deployments, enables them to use Hive Metastore to manage Flink’s metadata.

    For users who have just Flink deployment, HiveCatalog is the only persistent catalog provided out-of-box by Flink. Without a persistent catalog, users using Flink SQL CREATE DDL have to repeatedly create meta-objects like a Kafka table in each session, which wastes a lot of time. HiveCatalog fills this gap by empowering users to create tables and other meta-objects only once, and reference and manage them with convenience later on across sessions.

    Setting up a HiveCatalog in Flink requires the same as those of an overall Flink-Hive integration.

    Setting up a HiveCatalog in Flink requires the same configuration as those of an overall Flink-Hive integration.

    Once configured properly, HiveCatalog should just work out of box. Users can create Flink meta-objects with DDL, and should see them immediately afterwards.

    HiveCatalog can be used to handle two kinds of tables: Hive-compatible tables and generic tables. Hive-compatible tables are those stored in a Hive-compatible way, in terms of both metadata and data in the storage layer. Therefore, Hive-compatible tables created via Flink can be queried from Hive side.

    Generic tables, on the other hand, are specific to Flink. When creating generic tables with HiveCatalog, we’re just using HMS to persist the metadata. While these tables are visible to Hive, it’s unlikely Hive is able to understand the metadata. And therefore using such tables in Hive leads to undefined behavior.

    We will walk through a simple example here.

    step 1: set up a Hive Metastore

    Have a Hive Metastore running.

    Here, we set up a local Hive Metastore and our hive-site.xml file in local path /opt/hive-conf/hive-site.xml. We have some configs like the following:

    Test connection to the HMS with Hive Cli. Running some commands, we can see we have a database named default and there’s no table in it.

    1. hive> show databases;
    2. OK
    3. default
    4. Time taken: 0.032 seconds, Fetched: 1 row(s)
    5. hive> show tables;
    6. OK
    7. Time taken: 0.028 seconds, Fetched: 0 row(s)

    Add all Hive dependencies to /lib dir in Flink distribution, and modify SQL CLI’s yaml config file sql-cli-defaults.yaml as following:

    1. execution:
    2. type: streaming
    3. ...
    4. current-catalog: myhive # set the HiveCatalog as the current catalog of the session
    5. current-database: mydatabase
    6. catalogs:
    7. - name: myhive
    8. type: hive
    9. hive-conf-dir: /opt/hive-conf # contains hive-site.xml

    step 3: set up a Kafka cluster

    Bootstrap a local Kafka 2.3.0 cluster with a topic named “test”, and produce some simple data to the topic as tuple of name and age.

    These message can be seen by starting a Kafka console consumer.

    1. localhost$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    2. tom,15
    3. john,21
    1. Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
    2. 'connector.type' = 'kafka',
    3. 'connector.version' = 'universal',
    4. 'connector.topic' = 'test',
    5. 'connector.properties.bootstrap.servers' = 'localhost:9092',
    6. 'format.type' = 'csv',
    7. );
    8. [INFO] Table has been created.
    9. Flink SQL> DESCRIBE mykafka;
    10. root
    11. |-- name: STRING
    12. |-- age: INT

    Verify the table is also visible to Hive via Hive Cli:

    Run a simple select query from Flink SQL Client in a Flink cluster, either standalone or yarn-session.

    1. Flink SQL> select * from mykafka;

    Produce some more messages in the Kafka topic

    1. localhost$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    2. tom,15
    3. john,21
    4. kitty,30
    5. amy,24
    6. kaiky,18

    You should see results produced by Flink in SQL Client now, as:

    HiveCatalog supports all Flink types for generic tables.

    For Hive-compatible tables, HiveCatalog needs to map Flink data types to corresponding Hive types as described in the following table:

    Something to note about the type mapping:

    • Hive’s CHAR(p) has a maximum length of 255
    • Hive’s VARCHAR(p) has a maximum length of 65535
    • Hive’s MAP only supports primitive key types while Flink’s MAP can be any data type
    • Hive’s UNION type is not supported
    • Hive’s TIMESTAMP always has precision 9 and doesn’t support other precisions. Hive UDFs, on the other hand, can process TIMESTAMP values with a precision <= 9.
    • Hive doesn’t support Flink’s TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, and MULTISET