Flink Doris Connector

    • The Flink Doris Connector can support operations (read, insert, modify, delete) data stored in Doris through Flink.
    • table can be mapped to DataStream or Table.

    Build and Install

    Execute following command in source dir:

    Note: If you check out the source code from tag, you can just run sh build.sh --tag without specifying the flink and scala versions. This is because the version in the tag source code is fixed. For example, 1.13.5-2.12-1.0.1 means flink version 1.13.5, scala version 2.12, and connector version 1.0.1.

    After successful compilation, the file doris-flink-1.13.5-2.12-1.0.1-SNAPSHOT.jar will be generated in the output/ directory. Copy this file to ClassPath in Flink to use Flink-Doris-Connector. For example, Flink running in Local mode, put this file in the jars/ folder. Flink running in Yarn cluster mode, put this file in the pre-deployment package.

    1. Doris FE should be configured to enable http v2 in the configuration
    2. Scala version currently only supports 2.12.x version

    conf/fe.conf

    1. enable_http_server_v2 = true

    Add Dependency

    1. <dependency>
    2. <groupId>org.apache.doris</groupId>
    3. <artifactId>doris-flink-connector</artifactId>
    4. <version>1.11.6-2.12-SNAPSHOT</version>
    5. </dependency>

    Remarks

    1.11.6 can be substitute with 1.12.7 or 1.13.5 base on flink version you are using

    How to use

    • SQL
    • DataStream
    • DataSet

    Flink Doris Connector Sink writes data to Doris by the Stream load, and also supports the configurations of Stream load

    • SQL configured by sink.properties. in the WITH
    • DataStream configured by DorisExecutionOptions.builder().setStreamLoadProp(Properties)
    • Source
    1. CREATE TABLE flink_doris_source (
    2. name STRING,
    3. age INT,
    4. price DECIMAL(5,2),
    5. sale DOUBLE
    6. )
    7. WITH (
    8. 'connector' = 'doris',
    9. 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
    10. 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
    11. 'username' = '$YOUR_DORIS_USERNAME',
    12. 'password' = '$YOUR_DORIS_PASSWORD'
    13. );
    • Sink
    • Insert
    1. INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
    • Source
    1. Properties properties = new Properties();
    2. properties.put("fenodes","FE_IP:8030");
    3. properties.put("username","root");
    4. properties.put("password","");
    5. properties.put("table.identifier","db.table");
    6. env.addSource(new DorisSourceFunction(
    7. new DorisStreamOptions(properties),
    8. new SimpleListDeserializationSchema()
    9. )
    10. ).print();

    Json Stream

    1. Properties pro = new Properties();
    2. pro.setProperty("format", "json");
    3. env.fromElements(
    4. "{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}"
    5. )
    6. .addSink(
    7. DorisSink.sink(
    8. DorisReadOptions.builder().build(),
    9. DorisExecutionOptions.builder()
    10. .setBatchSize(3)
    11. .setBatchIntervalMs(0l)
    12. .setMaxRetries(3)
    13. .setStreamLoadProp(pro).build(),
    14. DorisOptions.builder()
    15. .setFenodes("FE_IP:8030")
    16. .setTableIdentifier("db.table")
    17. .setUsername("root")
    18. .setPassword("").build()
    19. ));

    Json Stream

    RowData Stream

    1. DataStream<RowData> source = env.fromElements("")
    2. .map(new MapFunction<String, RowData>() {
    3. @Override
    4. public RowData map(String value) throws Exception {
    5. GenericRowData genericRowData = new GenericRowData(3);
    6. genericRowData.setField(0, StringData.fromString("北京"));
    7. genericRowData.setField(1, 116.405419);
    8. genericRowData.setField(2, 39.916927);
    9. return genericRowData;
    10. }
    11. });
    12. String[] fields = {"city", "longitude", "latitude"};
    13. LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
    14. source.addSink(
    15. DorisSink.sink(
    16. fields,
    17. types,
    18. DorisReadOptions.builder().build(),
    19. DorisExecutionOptions.builder()
    20. .setBatchSize(3)
    21. .setBatchIntervalMs(0L)
    22. .setMaxRetries(3)
    23. .build(),
    24. DorisOptions.builder()
    25. .setFenodes("FE_IP:8030")
    26. .setTableIdentifier("db.table")
    27. .setUsername("root")
    28. .setPassword("").build()
    29. ));
    • Sink
    1. MapOperator<String, RowData> data = env.fromElements("")
    2. .map(new MapFunction<String, RowData>() {
    3. @Override
    4. public RowData map(String value) throws Exception {
    5. genericRowData.setField(0, StringData.fromString("北京"));
    6. genericRowData.setField(2, 39.916927);
    7. return genericRowData;
    8. }
    9. });
    10. DorisOptions dorisOptions = DorisOptions.builder()
    11. .setFenodes("FE_IP:8030")
    12. .setTableIdentifier("db.table")
    13. .setUsername("root")
    14. .setPassword("").build();
    15. DorisReadOptions readOptions = DorisReadOptions.defaults();
    16. DorisExecutionOptions executionOptions = DorisExecutionOptions.defaults();
    17. LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
    18. String[] fields = {"city", "longitude", "latitude"};
    19. DorisDynamicOutputFormat outputFormat = new DorisDynamicOutputFormat(
    20. dorisOptions, readOptions, executionOptions, types, fields
    21. );
    22. outputFormat.open(0, 1);
    23. data.output(outputFormat);
    24. outputFormat.close();
    1. CREATE TABLE cdc_mysql_source (
    2. id int
    3. ,name VARCHAR
    4. ,PRIMARY KEY (id) NOT ENFORCED
    5. ) WITH (
    6. 'connector' = 'mysql-cdc',
    7. 'hostname' = '127.0.0.1',
    8. 'port' = '3306',
    9. 'username' = 'root',
    10. 'password' = 'password',
    11. 'database-name' = 'database',
    12. 'table-name' = 'table'
    13. );
    14. -- Support delete event synchronization (sink.enable-delete='true'), requires Doris table to enable batch delete function
    15. CREATE TABLE doris_sink (
    16. id INT,
    17. name STRING
    18. )
    19. WITH (
    20. 'connector' = 'doris',
    21. 'fenodes' = '127.0.0.1:8030',
    22. 'table.identifier' = 'database.table',
    23. 'username' = 'root',
    24. 'password' = '',
    25. 'sink.properties.format' = 'json',
    26. 'sink.properties.strip_outer_array' = 'true',
    27. 'sink.enable-delete' = 'true'
    28. );
    29. insert into doris_sink select id,name from cdc_mysql_source;