JDBC SQL Connector

    The JDBC connector allows for reading data from and writing data into any relational databases with a JDBC driver. This document describes how to setup the JDBC connector to run SQL queries against relational databases.

    The JDBC sink operate in upsert mode for exchange UPDATE/DELETE messages with the external system if a primary key is defined on the DDL, otherwise, it operates in append mode and doesn’t support to consume UPDATE/DELETE messages.

    In order to use the JDBC connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

    A driver dependency is also required to connect to a specified database. Here are drivers currently supported:

    DriverGroup IdArtifact IdJAR
    MySQLmysql-connector-javaDownload
    PostgreSQLorg.postgresqlpostgresql
    Derbyorg.apache.derbyderbyDownload

    JDBC connector and drivers are not currently part of Flink’s binary distribution. See how to link with them for cluster execution .

    The JDBC table can be defined as following:

    1. -- register a MySQL table 'users' in Flink SQL
    2. CREATE TABLE MyUserTable (
    3. id BIGINT,
    4. name STRING,
    5. age INT,
    6. status BOOLEAN,
    7. PRIMARY KEY (id) NOT ENFORCED
    8. ) WITH (
    9. 'connector' = 'jdbc',
    10. 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
    11. 'table-name' = 'users'
    12. );
    13. -- write data into the JDBC table from the other table "T"
    14. INSERT INTO MyUserTable
    15. SELECT id, name, age, status FROM T;
    16. -- scan data from the JDBC table
    17. SELECT id, name, age, status FROM MyUserTable;
    18. -- temporal join the JDBC table as a dimension table
    19. SELECT * FROM myTopic
    20. ON myTopic.key = MyUserTable.id;

    Flink uses the primary key that defined in DDL when writing data to external databases. The connector operate in upsert mode if the primary key was defined, otherwise, the connector operate in append mode.

    In upsert mode, Flink will insert a new row or update the existing row according to the primary key, Flink can ensure the idempotence in this way. To guarantee the output result is as expected, it’s recommended to define primary key for the table and make sure the primary key is one of the unique key sets or primary key of the underlying database table. In append mode, Flink will interpret all records as INSERT messages, the INSERT operation may fail if a primary key or unique constraint violation happens in the underlying database.

    See CREATE TABLE DDL for more details about PRIMARY KEY syntax.

    To accelerate reading data in parallel Source task instances, Flink provides partitioned scan feature for JDBC table.

    • scan.partition.column: The column name used for partitioning the input.
    • scan.partition.num: The number of partitions.
    • scan.partition.lower-bound: The smallest value of the first partition.
    • scan.partition.upper-bound: The largest value of the last partition.

    JDBC connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.

    By default, lookup cache is not enabled. You can enable it by setting both lookup.cache.max-rows and lookup.cache.ttl.

    The lookup cache is used to improve performance of temporal join the JDBC connector. By default, lookup cache is not enabled, so all the requests are sent to external database. When lookup cache is enabled, each process (i.e. TaskManager) will hold a cache. Flink will lookup the cache first, and only send requests to external database when cache missing, and update cache with the rows returned. The oldest rows in cache will be expired when the cache hit to the max cached rows lookup.cache.max-rows or when the row exceeds the max time to live lookup.cache.ttl. The cached rows might not be the latest, users can tune lookup.cache.ttl to a smaller value to have a better fresh data, but this may increase the number of requests send to database. So this is a balance between throughput and correctness.

    JDBC sink will use upsert semantics rather than plain INSERT statements if primary key is defined in DDL. Upsert semantics refer to atomically adding a new row or updating the existing row if there is a unique constraint violation in the underlying database, which provides idempotence.

    If there are failures, the Flink job will recover and re-process from last successful checkpoint, which can lead to re-processing messages during recovery. The upsert mode is highly recommended as it helps avoid constraint violations or duplicate data if records need to be re-processed.

    Aside from failure recovery, the source topic may also naturally contain multiple records over time with the same primary key, making upserts desirable.

    As there is no standard syntax for upsert, the following table describes the database-specific DML that is used.

    DatabaseUpsert Grammar
    MySQLINSERT .. ON DUPLICATE KEY UPDATE ..
    PostgreSQLINSERT .. ON CONFLICT .. DO UPDATE SET ..

    The JdbcCatalog enables users to connect Flink to relational databases over JDBC protocol.

    Currently, PostgresCatalog is the only implementation of JDBC Catalog at the moment, PostgresCatalog only supports limited Catalog methods include:

    1. // The supported methods by Postgres Catalog.
    2. PostgresCatalog.databaseExists(String databaseName)
    3. PostgresCatalog.listDatabases()
    4. PostgresCatalog.getDatabase(String databaseName)
    5. PostgresCatalog.listTables(String databaseName)
    6. PostgresCatalog.getTable(ObjectPath tablePath)
    7. PostgresCatalog.tableExists(ObjectPath tablePath)

    Other Catalog methods is unsupported now.

    Usage of PostgresCatalog

    Postgres catalog supports the following options:

    • name: required, name of the catalog.
    • default-database: required, default database to connect to.
    • username: required, username of Postgres account.
    • : required, password of the account.
    • base-url: required, should be of format "jdbc:postgresql://<ip>:<port>", and should not contain database name here.

      SQL

    Java

    1. EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
    2. TableEnvironment tableEnv = TableEnvironment.create(settings);
    3. String name = "mypg";
    4. String defaultDatabase = "mydb";
    5. String username = "...";
    6. String password = "...";
    7. String baseUrl = "..."
    8. JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
    9. tableEnv.registerCatalog("mypg", catalog);
    10. // set the JdbcCatalog as the current catalog of the session
    11. tableEnv.useCatalog("mypg");

    Scala

    1. val settings = EnvironmentSettings.inStreamingMode()
    2. val name = "mypg"
    3. val defaultDatabase = "mydb"
    4. val username = "..."
    5. val password = "..."
    6. val baseUrl = "..."
    7. val catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl)
    8. tableEnv.registerCatalog("mypg", catalog)
    9. // set the JdbcCatalog as the current catalog of the session
    10. tableEnv.useCatalog("mypg")

    Python

    YAML

    1. execution:
    2. ...
    3. current-catalog: mypg # set the JdbcCatalog as the current catalog of the session
    4. current-database: mydb
    5. catalogs:
    6. - name: mypg
    7. type: jdbc
    8. default-database: mydb
    9. username: ...
    10. password: ...
    11. base-url: ...

    PostgreSQL Metaspace Mapping

    PostgreSQL has an additional namespace as schema besides database. A Postgres instance can have multiple databases, each database can have multiple schemas with a default one named “public”, each schema can have multiple tables. In Flink, when querying tables registered by Postgres catalog, users can use either schema_name.table_name or just table_name. The schema_name is optional and defaults to “public”.

    Therefor the metaspace mapping between Flink Catalog and Postgres is as following:

    The full path of Postgres table in Flink should be "<catalog>.<db>.`<schema.table>`" if schema is specified, note the <schema.table> should be escaped.

    1. -- scan table 'test_table' of 'public' schema (i.e. the default schema), the schema name can be omitted
    2. SELECT * FROM mypg.mydb.test_table;
    3. SELECT * FROM mydb.test_table;
    4. SELECT * FROM test_table;
    5. -- scan table 'test_table2' of 'custom_schema' schema,
    6. -- the custom schema can not be omitted and must be escaped with table.
    7. SELECT * FROM mypg.mydb.`custom_schema.test_table2`
    8. SELECT * FROM mydb.`custom_schema.test_table2`;

    Flink supports connect to several databases which uses dialect like MySQL, PostgreSQL, Derby. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.

    MySQL typePostgreSQL typeFlink SQL type
    TINYINTTINYINT

    TINYINT UNSIGNED
    SMALLINT
    INT2
    SMALLSERIAL
    SERIAL2
    SMALLINT
    INT
    MEDIUMINT
    SMALLINT UNSIGNED
    INTEGER
    SERIAL
    INT
    BIGINT
    INT UNSIGNED
    BIGINT
    BIGSERIAL
    BIGINT
    BIGINT UNSIGNEDDECIMAL(20, 0)
    BIGINTBIGINTBIGINT
    FLOATREAL
    FLOAT4
    FLOAT
    DOUBLE
    DOUBLE PRECISION
    FLOAT8
    DOUBLE PRECISION
    DOUBLE
    NUMERIC(p, s)
    DECIMAL(p, s)
    NUMERIC(p, s)
    DECIMAL(p, s)
    DECIMAL(p, s)
    BOOLEAN
    TINYINT(1)
    BOOLEANBOOLEAN
    DATEDATEDATE
    TIME [(p)]TIME [(p)] [WITHOUT TIMEZONE]TIME [(p)] [WITHOUT TIMEZONE]
    DATETIME [(p)]TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] [WITHOUT TIMEZONE]
    CHAR(n)
    VARCHAR(n)
    TEXT
    CHAR(n)
    CHARACTER(n)
    VARCHAR(n)
    CHARACTER VARYING(n)
    TEXT
    STRING
    BINARY
    VARBINARY
    BLOB
    BYTEABYTES
    ARRAY