Flink Doris Connector
- The Flink Doris Connector can support operations (read, insert, modify, delete) data stored in Doris through Flink.
- table can be mapped to
DataStream
orTable
.
Build and Install
Execute following command in source dir:
Note: If you check out the source code from tag, you can just run
sh build.sh --tag
without specifying the flink and scala versions. This is because the version in the tag source code is fixed. For example,1.13.5-2.12-1.0.1
means flink version 1.13.5, scala version 2.12, and connector version 1.0.1.
After successful compilation, the file doris-flink-1.13.5-2.12-1.0.1-SNAPSHOT.jar
will be generated in the output/
directory. Copy this file to ClassPath
in Flink
to use Flink-Doris-Connector
. For example, Flink
running in Local
mode, put this file in the jars/
folder. Flink
running in Yarn
cluster mode, put this file in the pre-deployment package.
- Doris FE should be configured to enable http v2 in the configuration
- Scala version currently only supports 2.12.x version
conf/fe.conf
enable_http_server_v2 = true
Add Dependency
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>doris-flink-connector</artifactId>
<version>1.11.6-2.12-SNAPSHOT</version>
</dependency>
Remarks
1.11.6
can be substitute with 1.12.7
or 1.13.5
base on flink version you are using
How to use
- SQL
- DataStream
- DataSet
Flink Doris Connector Sink writes data to Doris by the Stream load
, and also supports the configurations of Stream load
- SQL configured by
sink.properties.
in theWITH
- DataStream configured by
DorisExecutionOptions.builder().setStreamLoadProp(Properties)
- Source
CREATE TABLE flink_doris_source (
name STRING,
age INT,
price DECIMAL(5,2),
sale DOUBLE
)
WITH (
'connector' = 'doris',
'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
'username' = '$YOUR_DORIS_USERNAME',
'password' = '$YOUR_DORIS_PASSWORD'
);
- Sink
- Insert
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
- Source
Properties properties = new Properties();
properties.put("fenodes","FE_IP:8030");
properties.put("username","root");
properties.put("password","");
properties.put("table.identifier","db.table");
env.addSource(new DorisSourceFunction(
new DorisStreamOptions(properties),
new SimpleListDeserializationSchema()
)
).print();
Json Stream
Properties pro = new Properties();
pro.setProperty("format", "json");
env.fromElements(
"{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}"
)
.addSink(
DorisSink.sink(
DorisReadOptions.builder().build(),
DorisExecutionOptions.builder()
.setBatchSize(3)
.setBatchIntervalMs(0l)
.setMaxRetries(3)
.setStreamLoadProp(pro).build(),
DorisOptions.builder()
.setFenodes("FE_IP:8030")
.setTableIdentifier("db.table")
.setUsername("root")
.setPassword("").build()
));
Json Stream
RowData Stream
DataStream<RowData> source = env.fromElements("")
.map(new MapFunction<String, RowData>() {
@Override
public RowData map(String value) throws Exception {
GenericRowData genericRowData = new GenericRowData(3);
genericRowData.setField(0, StringData.fromString("北京"));
genericRowData.setField(1, 116.405419);
genericRowData.setField(2, 39.916927);
return genericRowData;
}
});
String[] fields = {"city", "longitude", "latitude"};
LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
source.addSink(
DorisSink.sink(
fields,
types,
DorisReadOptions.builder().build(),
DorisExecutionOptions.builder()
.setBatchSize(3)
.setBatchIntervalMs(0L)
.setMaxRetries(3)
.build(),
DorisOptions.builder()
.setFenodes("FE_IP:8030")
.setTableIdentifier("db.table")
.setUsername("root")
.setPassword("").build()
));
- Sink
MapOperator<String, RowData> data = env.fromElements("")
.map(new MapFunction<String, RowData>() {
@Override
public RowData map(String value) throws Exception {
genericRowData.setField(0, StringData.fromString("北京"));
genericRowData.setField(2, 39.916927);
return genericRowData;
}
});
DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes("FE_IP:8030")
.setTableIdentifier("db.table")
.setUsername("root")
.setPassword("").build();
DorisReadOptions readOptions = DorisReadOptions.defaults();
DorisExecutionOptions executionOptions = DorisExecutionOptions.defaults();
LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
String[] fields = {"city", "longitude", "latitude"};
DorisDynamicOutputFormat outputFormat = new DorisDynamicOutputFormat(
dorisOptions, readOptions, executionOptions, types, fields
);
outputFormat.open(0, 1);
data.output(outputFormat);
outputFormat.close();
An example of using Flink CDC to access Doris (supports insert/update/delete events)
CREATE TABLE cdc_mysql_source (
id int
,name VARCHAR
,PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'database',
'table-name' = 'table'
);
-- Support delete event synchronization (sink.enable-delete='true'), requires Doris table to enable batch delete function
CREATE TABLE doris_sink (
id INT,
name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true',
'sink.enable-delete' = 'true'
);
insert into doris_sink select id,name from cdc_mysql_source;