JDBC Connector

    To use it, add the following dependency to your project (along with your JDBC driver):

    Copied to clipboard!

    Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here. A driver dependency is also required to connect to a specified database. Please consult your database documentation on how to add the corresponding driver.

    The JDBC sink provides at-least-once guarantee. Effectively though, exactly-once can be achieved by crafting upsert SQL statements or idempotent SQL updates. Configuration goes as follow (see also ).

    1. JdbcSink.sink(
    2. sqlDmlStatement, // mandatory
    3. jdbcStatementBuilder, // mandatory
    4. jdbcExecutionOptions, // optional
    5. jdbcConnectionOptions // mandatory
    6. );

    The sink builds one JDBC prepared statement from a user-provider SQL string, e.g.:

    1. INSERT INTO some_table field1, field2 values (?, ?)

    It then repeatedly calls a user-provided function to update that prepared statement with each value of the stream, e.g.:

    1. JdbcExecutionOptions.builder()
    2. .withBatchIntervalMs(200) // optional: default = 0, meaning no time-based execution is done
    3. .withBatchSize(1000) // optional: default = 5000 values
    4. .withMaxRetries(5) // optional: default = 3
    5. .build()

    A JDBC batch is executed as soon as one of the following conditions is true:

    • the configured batch interval time is elapsed
    • the maximum batch size is reached
    • a Flink checkpoint has started

    The connection to the database is configured with a JdbcConnectionOptions instance. Please see for details

    1. public class JdbcSinkExample {
    2. static class Book {
    3. public Book(Long id, String title, String authors, Integer year) {
    4. this.id = id;
    5. this.authors = authors;
    6. this.year = year;
    7. }
    8. final Long id;
    9. final String title;
    10. final String authors;
    11. }
    12. public static void main(String[] args) throws Exception {
    13. var env = StreamExecutionEnvironment.getExecutionEnvironment();
    14. env.fromElements(
    15. new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
    16. new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
    17. new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
    18. new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
    19. ).addSink(
    20. JdbcSink.sink(
    21. "insert into books (id, title, authors, year) values (?, ?, ?, ?)",
    22. (statement, book) -> {
    23. statement.setLong(1, book.id);
    24. statement.setString(2, book.title);
    25. statement.setString(3, book.authors);
    26. statement.setInt(4, book.year);
    27. },
    28. JdbcExecutionOptions.builder()
    29. .withBatchIntervalMs(200)
    30. .withMaxRetries(5)
    31. .build(),
    32. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    33. .withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
    34. .withDriverName("org.postgresql.Driver")
    35. .withUsername("someUser")
    36. .build()
    37. ));
    38. env.execute();
    39. }
    40. }

    JdbcSink.exactlyOnceSink

    Since 1.13, Flink JDBC sink supports exactly-once mode. The implementation relies on the JDBC driver support of XA . Most drivers support XA if the database also supports XA (so the driver is usually the same).

    To use it, create a sink using exactlyOnceSink() method as above and additionally provide:

    For example:

    NOTE: Some databases only allow a single XA transaction per connection (e.g. PostgreSQL, MySQL). In such cases, please use the following API to construct JdbcExactlyOnceOptions:

    1. JdbcExactlyOnceOptions.builder()
    2. .withTransactionPerConnection(true)
    3. .build()

    Furthermore, XA needs to be enabled and/or configured in some databases. For PostgreSQL, you should set max_prepared_transactions to some value greater than zero. For MySQL v8+, you should grant XA_RECOVER_ADMIN to Flink DB user.

    ATTENTION: Currently, JdbcSink.exactlyOnceSink can ensure exactly once semantics with JdbcExecutionOptions.maxRetries == 0; otherwise, duplicated results maybe produced.

    PostgreSQL XADataSource example:

    1. PGXADataSource xaDataSource = new org.postgresql.xa.PGXADataSource();
    2. xaDataSource.setUrl("jdbc:postgresql://localhost:5432/postgres");
    3. xaDataSource.setUser(username);
    4. xaDataSource.setPassword(password);

    MySQL XADataSource example:

    Oracle XADataSource example:

    1. OracleXADataSource xaDataSource = new oracle.jdbc.xa.OracleXADataSource();
    2. xaDataSource.setURL("jdbc:oracle:oci8:@");
    3. xaDataSource.setUser("scott");

    Please also take Oracle connection pooling into account.