JDBC Connector
To use it, add the following dependency to your project (along with your JDBC driver):
Copied to clipboard!
Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here. A driver dependency is also required to connect to a specified database. Please consult your database documentation on how to add the corresponding driver.
The JDBC sink provides at-least-once guarantee. Effectively though, exactly-once can be achieved by crafting upsert SQL statements or idempotent SQL updates. Configuration goes as follow (see also ).
JdbcSink.sink(
sqlDmlStatement, // mandatory
jdbcStatementBuilder, // mandatory
jdbcExecutionOptions, // optional
jdbcConnectionOptions // mandatory
);
The sink builds one JDBC prepared statement from a user-provider SQL string, e.g.:
INSERT INTO some_table field1, field2 values (?, ?)
It then repeatedly calls a user-provided function to update that prepared statement with each value of the stream, e.g.:
JdbcExecutionOptions.builder()
.withBatchIntervalMs(200) // optional: default = 0, meaning no time-based execution is done
.withBatchSize(1000) // optional: default = 5000 values
.withMaxRetries(5) // optional: default = 3
.build()
A JDBC batch is executed as soon as one of the following conditions is true:
- the configured batch interval time is elapsed
- the maximum batch size is reached
- a Flink checkpoint has started
The connection to the database is configured with a JdbcConnectionOptions
instance. Please see for details
public class JdbcSinkExample {
static class Book {
public Book(Long id, String title, String authors, Integer year) {
this.id = id;
this.authors = authors;
this.year = year;
}
final Long id;
final String title;
final String authors;
}
public static void main(String[] args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(
new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
).addSink(
JdbcSink.sink(
"insert into books (id, title, authors, year) values (?, ?, ?, ?)",
(statement, book) -> {
statement.setLong(1, book.id);
statement.setString(2, book.title);
statement.setString(3, book.authors);
statement.setInt(4, book.year);
},
JdbcExecutionOptions.builder()
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
.withDriverName("org.postgresql.Driver")
.withUsername("someUser")
.build()
));
env.execute();
}
}
JdbcSink.exactlyOnceSink
Since 1.13, Flink JDBC sink supports exactly-once mode. The implementation relies on the JDBC driver support of XA . Most drivers support XA if the database also supports XA (so the driver is usually the same).
To use it, create a sink using exactlyOnceSink()
method as above and additionally provide:
- exactly-once options
- XA DataSource Supplier
For example:
NOTE: Some databases only allow a single XA transaction per connection (e.g. PostgreSQL, MySQL). In such cases, please use the following API to construct JdbcExactlyOnceOptions
:
JdbcExactlyOnceOptions.builder()
.withTransactionPerConnection(true)
.build()
Furthermore, XA needs to be enabled and/or configured in some databases. For PostgreSQL, you should set max_prepared_transactions
to some value greater than zero. For MySQL v8+, you should grant XA_RECOVER_ADMIN
to Flink DB user.
ATTENTION: Currently, JdbcSink.exactlyOnceSink
can ensure exactly once semantics with JdbcExecutionOptions.maxRetries == 0
; otherwise, duplicated results maybe produced.
PostgreSQL XADataSource
example:
PGXADataSource xaDataSource = new org.postgresql.xa.PGXADataSource();
xaDataSource.setUrl("jdbc:postgresql://localhost:5432/postgres");
xaDataSource.setUser(username);
xaDataSource.setPassword(password);
MySQL XADataSource
example:
Oracle XADataSource
example:
OracleXADataSource xaDataSource = new oracle.jdbc.xa.OracleXADataSource();
xaDataSource.setURL("jdbc:oracle:oci8:@");
xaDataSource.setUser("scott");
Please also take Oracle connection pooling into account.