Catalogs

    One of the most crucial aspects of data processing is managing metadata. It may be transient metadata like temporary tables, or UDFs registered against the table environment. Or permanent metadata, like that in a Hive Metastore. Catalogs provide a unified API for managing metadata and making it accessible from the Table API and SQL Queries.

    Catalog enables users to reference existing metadata in their data systems, and automatically maps them to Flink’s corresponding metadata. For example, Flink can map JDBC tables to Flink table automatically, and users don’t have to manually re-writing DDLs in Flink. Catalog greatly simplifies steps required to get started with Flink with users’ existing system, and greatly enhanced user experiences.

    The is an in-memory implementation of a catalog. All objects will be available only for the lifetime of the session.

    JdbcCatalog

    The JdbcCatalog enables users to connect Flink to relational databases over JDBC protocol. PostgresCatalog is the only implementation of JDBC Catalog at the moment. See for more details on setting up the catalog.

    HiveCatalog

    The HiveCatalog serves two purposes; as persistent storage for pure Flink metadata, and as an interface for reading and writing existing Hive metadata. Flink’s provides full details on setting up the catalog and interfacing with an existing Hive installation.

    User-Defined Catalog

    Catalogs are pluggable and users can develop custom catalogs by implementing the Catalog interface.

    In order to use custom catalogs with Flink SQL, users should implement a corresponding catalog factory by implementing the CatalogFactory interface. The factory is discovered using Java’s Service Provider Interfaces (SPI). Classes that implement this interface can be added to META_INF/services/org.apache.flink.table.factories.Factory in JAR files. The provided factory identifier will be used for matching against the required type property in a SQL CREATE CATALOG DDL statement.

    Using SQL DDL

    Users can use SQL DDL to create tables in catalogs in both Table API and SQL.

    Java

    Scala

    1. val tableEnv = ...
    2. // Create a HiveCatalog
    3. val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")
    4. // Register the catalog
    5. tableEnv.registerCatalog("myhive", catalog)
    6. // Create a catalog database
    7. tableEnv.executeSql("CREATE DATABASE mydb WITH (...)")
    8. // Create a catalog table
    9. tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")
    10. tableEnv.listTables() // should return the tables in current catalog and database.

    Python

    1. from pyflink.table.catalog import HiveCatalog
    2. # Create a HiveCatalog
    3. catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
    4. # Register the catalog
    5. t_env.register_catalog("myhive", catalog)
    6. # Create a catalog database
    7. t_env.execute_sql("CREATE DATABASE mydb WITH (...)")
    8. # Create a catalog table
    9. t_env.execute_sql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")
    10. # should return the tables in current catalog and database.
    11. t_env.list_tables()

    SQL Client

    1. // the catalog should have been registered via yaml file
    2. Flink SQL> CREATE DATABASE mydb WITH (...);
    3. Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);
    4. Flink SQL> SHOW TABLES;
    5. mytable

    For detailed information, please check out Flink SQL CREATE DDL.

    Users can use Java, Scala or Python to create catalog tables programmatically.

    Java

    1. import org.apache.flink.table.api.*;
    2. import org.apache.flink.table.catalog.*;
    3. import org.apache.flink.table.catalog.hive.HiveCatalog;
    4. TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
    5. // Create a HiveCatalog
    6. Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
    7. // Register the catalog
    8. tableEnv.registerCatalog("myhive", catalog);
    9. // Create a catalog database
    10. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));
    11. // Create a catalog table
    12. final Schema schema = Schema.newBuilder()
    13. .column("name", DataTypes.STRING())
    14. .column("age", DataTypes.INT())
    15. .build();
    16. tableEnv.createTable("myhive.mydb.mytable", TableDescriptor.forConnector("kafka")
    17. .schema(schema)
    18. // …
    19. .build());
    20. List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"
    1. import org.apache.flink.table.api._
    2. import org.apache.flink.table.catalog._
    3. import org.apache.flink.table.catalog.hive.HiveCatalog
    4. val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
    5. // Create a HiveCatalog
    6. val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")
    7. // Register the catalog
    8. tableEnv.registerCatalog("myhive", catalog)
    9. // Create a catalog database
    10. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
    11. // Create a catalog table
    12. val schema = Schema.newBuilder()
    13. .column("name", DataTypes.STRING())
    14. .column("age", DataTypes.INT())
    15. .build()
    16. tableEnv.createTable("myhive.mydb.mytable", TableDescriptor.forConnector("kafka")
    17. .schema(schema)
    18. // …
    19. .build())
    20. val tables = catalog.listTables("mydb") // tables should contain "mytable"

    Python

    1. from pyflink.table import *
    2. from pyflink.table.catalog import HiveCatalog, CatalogDatabase, ObjectPath, CatalogBaseTable
    3. settings = EnvironmentSettings.in_batch_mode()
    4. t_env = TableEnvironment.create(settings)
    5. # Create a HiveCatalog
    6. catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
    7. # Register the catalog
    8. t_env.register_catalog("myhive", catalog)
    9. # Create a catalog database
    10. database = CatalogDatabase.create_instance({"k1": "v1"}, None)
    11. catalog.create_database("mydb", database)
    12. # Create a catalog table
    13. .column("name", DataTypes.STRING()) \
    14. .column("age", DataTypes.INT()) \
    15. .build()
    16. catalog_table = t_env.create_table("myhive.mydb.mytable", TableDescriptor.for_connector("kafka")
    17. .schema(schema)
    18. // …
    19. .build())
    20. # tables should contain "mytable"
    21. tables = catalog.list_tables("mydb")

    Note: only catalog program APIs are listed here. Users can achieve many of the same functionalities with SQL DDL. For detailed DDL information, please refer to .

    Database operations

    Java/Scala

    1. // create database
    2. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
    3. // drop database
    4. catalog.dropDatabase("mydb", false);
    5. // alter database
    6. catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
    7. // get database
    8. catalog.getDatabase("mydb");
    9. // check if a database exist
    10. catalog.databaseExists("mydb");
    11. // list databases in a catalog
    12. catalog.listDatabases();

    Python

    1. from pyflink.table.catalog import CatalogDatabase
    2. # create database
    3. catalog_database = CatalogDatabase.create_instance({"k1": "v1"}, None)
    4. catalog.create_database("mydb", catalog_database, False)
    5. # drop database
    6. catalog.drop_database("mydb", False)
    7. # alter database
    8. catalog.alter_database("mydb", catalog_database, False)
    9. # get database
    10. catalog.get_database("mydb")
    11. # check if a database exist
    12. catalog.database_exists("mydb")
    13. # list databases in a catalog
    14. catalog.list_databases()

    Table operations

    Java/Scala

    1. // create table
    2. catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
    3. // drop table
    4. catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
    5. // alter table
    6. catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
    7. // rename table
    8. catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
    9. // get table
    10. catalog.getTable("mytable");
    11. // check if a table exist or not
    12. catalog.tableExists("mytable");
    13. // list tables in a database
    14. catalog.listTables("mydb");

    Python

    1. from pyflink.table import *
    2. from pyflink.table.catalog import CatalogBaseTable, ObjectPath
    3. from pyflink.table.descriptors import Kafka
    4. table_schema = TableSchema.builder() \
    5. .field("name", DataTypes.STRING()) \
    6. .field("age", DataTypes.INT()) \
    7. .build()
    8. table_properties = Kafka() \
    9. .version("0.11") \
    10. .start_from_earlist() \
    11. .to_properties()
    12. catalog_table = CatalogBaseTable.create_table(schema=table_schema, properties=table_properties, comment="my comment")
    13. # create table
    14. catalog.create_table(ObjectPath("mydb", "mytable"), catalog_table, False)
    15. # drop table
    16. catalog.drop_table(ObjectPath("mydb", "mytable"), False)
    17. # alter table
    18. catalog.alter_table(ObjectPath("mydb", "mytable"), catalog_table, False)
    19. # rename table
    20. catalog.rename_table(ObjectPath("mydb", "mytable"), "my_new_table")
    21. # get table
    22. catalog.get_table("mytable")
    23. # check if a table exist or not
    24. catalog.table_exists("mytable")
    25. # list tables in a database
    26. catalog.list_tables("mydb")

    View operations

    Java/Scala

    Python

    1. from pyflink.table import *
    2. from pyflink.table.catalog import CatalogBaseTable, ObjectPath
    3. table_schema = TableSchema.builder() \
    4. .field("name", DataTypes.STRING()) \
    5. .field("age", DataTypes.INT()) \
    6. .build()
    7. catalog_table = CatalogBaseTable.create_view(
    8. original_query="select * from t1",
    9. expanded_query="select * from test-catalog.db1.t1",
    10. schema=table_schema,
    11. properties={},
    12. comment="This is a view"
    13. )
    14. catalog.create_table(ObjectPath("mydb", "myview"), catalog_table, False)
    15. # drop view
    16. catalog.drop_table(ObjectPath("mydb", "myview"), False)
    17. # alter view
    18. catalog.alter_table(ObjectPath("mydb", "mytable"), catalog_table, False)
    19. # rename view
    20. # get view
    21. catalog.get_table("myview")
    22. # check if a view exist or not
    23. catalog.table_exists("mytable")
    24. # list views in a database
    25. catalog.list_views("mydb")

    Partition operations

    Java/Scala

    1. // create view
    2. catalog.createPartition(
    3. new ObjectPath("mydb", "mytable"),
    4. new CatalogPartitionSpec(...),
    5. new CatalogPartitionImpl(...),
    6. false);
    7. // drop partition
    8. catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
    9. // alter partition
    10. catalog.alterPartition(
    11. new ObjectPath("mydb", "mytable"),
    12. new CatalogPartitionSpec(...),
    13. new CatalogPartitionImpl(...),
    14. false);
    15. // get partition
    16. catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    17. // check if a partition exist or not
    18. catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    19. // list partitions of a table
    20. catalog.listPartitions(new ObjectPath("mydb", "mytable"));
    21. // list partitions of a table under a give partition spec
    22. catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
    23. // list partitions of a table by expression filter
    24. catalog.listPartitionsByFilter(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));

    Python

    1. from pyflink.table.catalog import ObjectPath, CatalogPartitionSpec, CatalogPartition
    2. catalog_partition = CatalogPartition.create_instance({}, "my partition")
    3. catalog_partition_spec = CatalogPartitionSpec({"third": "2010", "second": "bob"})
    4. catalog.create_partition(
    5. ObjectPath("mydb", "mytable"),
    6. catalog_partition_spec,
    7. catalog_partition,
    8. False)
    9. # drop partition
    10. catalog.drop_partition(ObjectPath("mydb", "mytable"), catalog_partition_spec, False)
    11. # alter partition
    12. catalog.alter_partition(
    13. ObjectPath("mydb", "mytable"),
    14. CatalogPartitionSpec(...),
    15. catalog_partition,
    16. False)
    17. # get partition
    18. catalog.get_partition(ObjectPath("mydb", "mytable"), catalog_partition_spec)
    19. # check if a partition exist or not
    20. catalog.partition_exists(ObjectPath("mydb", "mytable"), catalog_partition_spec)
    21. # list partitions of a table
    22. catalog.list_partitions(ObjectPath("mydb", "mytable"))
    23. # list partitions of a table under a give partition spec
    24. catalog.list_partitions(ObjectPath("mydb", "mytable"), catalog_partition_spec)

    Java/Scala

    1. // create function
    2. catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
    3. // drop function
    4. catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
    5. // alter function
    6. catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
    7. // get function
    8. catalog.getFunction("myfunc");
    9. // check if a function exist or not
    10. catalog.functionExists("myfunc");
    11. // list functions in a database
    12. catalog.listFunctions("mydb");

    Python

    1. from pyflink.table.catalog import ObjectPath, CatalogFunction
    2. catalog_function = CatalogFunction.create_instance(class_name="my.python.udf")
    3. # create function
    4. catalog.create_function(ObjectPath("mydb", "myfunc"), catalog_function, False)
    5. # drop function
    6. catalog.drop_function(ObjectPath("mydb", "myfunc"), False)
    7. # alter function
    8. catalog.alter_function(ObjectPath("mydb", "myfunc"), catalog_function, False)
    9. # get function
    10. catalog.get_function("myfunc")
    11. # check if a function exist or not
    12. catalog.function_exists("myfunc")
    13. # list functions in a database
    14. catalog.list_functions("mydb")

    Registering a Catalog

    Users have access to a default in-memory catalog named default_catalog, that is always created by default. This catalog by default has a single database called default_database. Users can also register additional catalogs into an existing Flink session.

    Java/Scala

    1. tableEnv.registerCatalog(new CustomCatalog("myCatalog"));

    Python

    1. t_env.register_catalog(catalog)

    YAML

    1. catalogs:
    2. - name: myCatalog
    3. type: custom_catalog
    4. hive-conf-dir: ...

    Changing the Current Catalog And Database

    Flink will always search for tables, views, and UDF’s in the current catalog and database.

    Java/Scala

    1. tableEnv.useCatalog("myCatalog");
    2. tableEnv.useDatabase("myDb");

    Python

    1. t_env.use_catalog("myCatalog")
    2. t_env.use_database("myDb")

    SQL

    Metadata from catalogs that are not the current catalog are accessible by providing fully qualified names in the form catalog.database.object.

    Java/Scala

    1. tableEnv.from("not_the_current_catalog.not_the_current_db.my_table");

    Python

    1. t_env.from_path("not_the_current_catalog.not_the_current_db.my_table")

    SQL

    1. Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;

    List Available Catalogs

    Java/Scala

    1. tableEnv.listCatalogs();

    Python

    1. t_env.list_catalogs()

    SQL

    1. Flink SQL> show catalogs;

    List Available Databases

    Java/Scala

    1. tableEnv.listDatabases();

    Python

    1. t_env.list_databases()

    SQL

    1. Flink SQL> show databases;

    Java/Scala

      Python