Apache Hive

    Flink offers a two-fold integration with Hive.

    The first is to leverage Hive’s Metastore as a persistent catalog with Flink’s for storing Flink specific metadata across sessions. For example, users can store their Kafka or ElasticSearch tables in Hive Metastore by using HiveCatalog, and reuse them later on in SQL queries.

    The second is to offer Flink as an alternative engine for reading and writing Hive tables.

    The HiveCatalog is designed to be “out of the box” compatible with existing Hive installations. You do not need to modify your existing Hive Metastore or change the data placement or partitioning of your tables.

    Flink supports the following Hive versions.

    • 1.0
      • 1.0.0
      • 1.0.1
    • 1.1
      • 1.1.0
      • 1.1.1
    • 1.2
      • 1.2.0
      • 1.2.1
      • 1.2.2
    • 2.0
      • 2.0.0
      • 2.0.1
    • 2.1
      • 2.1.0
      • 2.1.1
    • 2.2
      • 2.2.0
    • 2.3
      • 2.3.0
      • 2.3.1
      • 2.3.2
      • 2.3.3
      • 2.3.4
      • 2.3.5
      • 2.3.6
    • 3.1
      • 3.1.0
      • 3.1.1
      • 3.1.2

    Please note Hive itself have different features available for different versions, and these issues are not caused by Flink:

    • Hive built-in functions are supported in 1.2.0 and later.
    • Column constraints, i.e. PRIMARY KEY and NOT NULL, are supported in 3.1.0 and later.
    • Altering table statistics is supported in 1.2.0 and later.
    • DATE column statistics are supported in 1.2.0 and later.
    • Writing to ORC tables is not supported in 2.0.x.

    To integrate with Hive, you need to add some extra dependencies to the /lib/ directory in Flink distribution to make the integration work in Table API program or SQL in SQL Client. Alternatively, you can put these dependencies in a dedicated folder, and add them to classpath with the -C or -l option for Table API program or SQL Client respectively.

    Apache Hive is built on Hadoop, so you need to provide Hadoop dependencies, by setting the HADOOP_CLASSPATH environment variable:

    There are two ways to add Hive dependencies. First is to use Flink’s bundled Hive jars. You can choose a bundled Hive jar according to the version of the metastore you use. Second is to add each of the required jars separately. The second way can be useful if the Hive version you’re using is not listed here.

    Using bundled hive jar

    The following tables list all available bundled hive jars. You can pick one to the /lib/ directory in Flink distribution.

    User defined dependencies

    Please find the required dependencies for different Hive major versions below.

    Hive 2.3.4

    1. /flink-1.14.4
    2. /lib
    3. // Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jars
    4. flink-connector-hive_2.11-1.14.4.jar
    5. // Hive dependencies
    6. hive-exec-2.3.4.jar
    7. // add antlr-runtime if you need to use hive dialect
    8. antlr-runtime-3.5.2.jar

    Hive 1.0.0

    1. /flink-1.14.4
    2. /lib
    3. // Flink's Hive connector
    4. flink-connector-hive_2.11-1.14.4.jar
    5. // Hive dependencies
    6. hive-metastore-1.0.0.jar
    7. hive-exec-1.0.0.jar
    8. libfb303-0.9.0.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately
    9. // Orc dependencies -- required by the ORC vectorized optimizations
    10. orc-core-1.4.3-nohive.jar
    11. aircompressor-0.8.jar // transitive dependency of orc-core
    12. antlr-runtime-3.5.2.jar

    Hive 1.1.0

    1. /flink-1.14.4
    2. /lib
    3. // Flink's Hive connector
    4. flink-connector-hive_2.11-1.14.4.jar
    5. // Hive dependencies
    6. hive-metastore-1.1.0.jar
    7. hive-exec-1.1.0.jar
    8. libfb303-0.9.2.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately
    9. // Orc dependencies -- required by the ORC vectorized optimizations
    10. orc-core-1.4.3-nohive.jar
    11. aircompressor-0.8.jar // transitive dependency of orc-core
    12. // add antlr-runtime if you need to use hive dialect
    13. antlr-runtime-3.5.2.jar

    Hive 1.2.1

    1. /flink-1.14.4
    2. /lib
    3. // Flink's Hive connector
    4. flink-connector-hive_2.11-1.14.4.jar
    5. // Hive dependencies
    6. hive-metastore-1.2.1.jar
    7. hive-exec-1.2.1.jar
    8. libfb303-0.9.2.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately
    9. // Orc dependencies -- required by the ORC vectorized optimizations
    10. orc-core-1.4.3-nohive.jar
    11. aircompressor-0.8.jar // transitive dependency of orc-core
    12. // add antlr-runtime if you need to use hive dialect
    13. antlr-runtime-3.5.2.jar

    Hive 2.0.0

    Hive 2.1.0

    1. /flink-1.14.4
    2. /lib
    3. // Flink's Hive connector
    4. flink-connector-hive_2.11-1.14.4.jar
    5. // Hive dependencies
    6. hive-exec-2.1.0.jar
    7. // add antlr-runtime if you need to use hive dialect
    8. antlr-runtime-3.5.2.jar

    Hive 2.2.0

    1. /flink-1.14.4
    2. /lib
    3. // Flink's Hive connector
    4. flink-connector-hive_2.11-1.14.4.jar
    5. // Hive dependencies
    6. hive-exec-2.2.0.jar
    7. // Orc dependencies -- required by the ORC vectorized optimizations
    8. orc-core-1.4.3.jar
    9. aircompressor-0.8.jar // transitive dependency of orc-core
    10. // add antlr-runtime if you need to use hive dialect
    11. antlr-runtime-3.5.2.jar
    1. /flink-1.14.4
    2. /lib
    3. // Flink's Hive connector
    4. flink-connector-hive_2.11-1.14.4.jar
    5. // Hive dependencies
    6. hive-exec-3.1.0.jar
    7. libfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately
    8. // add antlr-runtime if you need to use hive dialect
    9. antlr-runtime-3.5.2.jar

    Program maven

    If you are building your own program, you need the following dependencies in your mvn file. It’s recommended not to include these dependencies in the resulting jar file. You’re supposed to add dependencies as stated above at runtime.

    1. <!-- Flink Dependency -->
    2. <dependency>
    3. <groupId>org.apache.flink</groupId>
    4. <artifactId>flink-connector-hive_2.11</artifactId>
    5. <version>1.14.4</version>
    6. <scope>provided</scope>
    7. </dependency>
    8. <groupId>org.apache.flink</groupId>
    9. <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    10. <version>1.14.4</version>
    11. <scope>provided</scope>
    12. </dependency>
    13. <!-- Hive Dependency -->
    14. <dependency>
    15. <groupId>org.apache.hive</groupId>
    16. <artifactId>hive-exec</artifactId>
    17. <version>${hive.version}</version>
    18. <scope>provided</scope>
    19. </dependency>

    Connect to an existing Hive installation using the and HiveCatalog through the table environment or YAML configuration.

    Following is an example of how to connect to Hive:

    Java

    Scala

    1. val settings = EnvironmentSettings.inStreamingMode()
    2. val tableEnv = TableEnvironment.create(settings)
    3. val name = "myhive"
    4. val defaultDatabase = "mydatabase"
    5. val hiveConfDir = "/opt/hive-conf"
    6. val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir)
    7. tableEnv.registerCatalog("myhive", hive)
    8. // set the HiveCatalog as the current catalog of the session
    9. tableEnv.useCatalog("myhive")

    Python

    1. from pyflink.table import *
    2. from pyflink.table.catalog import HiveCatalog
    3. settings = EnvironmentSettings.in_batch_mode()
    4. t_env = TableEnvironment.create(settings)
    5. catalog_name = "myhive"
    6. default_database = "mydatabase"
    7. hive_conf_dir = "/opt/hive-conf"
    8. hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
    9. t_env.register_catalog("myhive", hive_catalog)
    10. # set the HiveCatalog as the current catalog of the session
    11. tableEnv.use_catalog("myhive")

    YAML

    1. execution:
    2. ...
    3. current-catalog: myhive # set the HiveCatalog as the current catalog of the session
    4. current-database: mydatabase
    5. catalogs:
    6. - name: myhive
    7. type: hive
    8. hive-conf-dir: /opt/hive-conf

    SQL

    1. CREATE CATALOG myhive WITH (
    2. 'type' = 'hive',
    3. 'default-database' = 'mydatabase',
    4. 'hive-conf-dir' = '/opt/hive-conf'
    5. );
    6. -- set the HiveCatalog as the current catalog of the session
    7. USE CATALOG myhive;

    Below are the options supported when creating a HiveCatalog instance with YAML file or DDL.

    OptionRequiredDefaultTypeDescription
    type
    Yes(none)StringType of the catalog. Must be set to when creating a HiveCatalog.
    name
    Yes(none)StringThe unique name of the catalog. Only applicable to YAML file.
    hive-conf-dir
    No(none)StringURI to your Hive conf dir containing hive-site.xml. The URI needs to be supported by Hadoop FileSystem. If the URI is relative, i.e. without a scheme, local file system is assumed. If the option is not specified, hive-site.xml is searched in class path.
    default-database
    NodefaultStringThe default database to use when the catalog is set as the current catalog.
    hive-version
    No(none)StringHiveCatalog is capable of automatically detecting the Hive version in use. It’s recommended NOT to specify the Hive version, unless the automatic detection fails.
    hadoop-conf-dir
    No(none)StringPath to Hadoop conf dir. Only local file system paths are supported. The recommended way to set Hadoop conf is via the HADOOP_CONF_DIR environment variable. Use the option only if environment variable doesn’t work for you, e.g. if you want to configure each HiveCatalog separately.

    Flink supports DML writing to Hive tables. Please refer to details in