Spark Doris Connector

    Github: https://github.com/apache/incubator-doris-spark-connector

    • Support reading data from .
    • Support Spark DataFrame batch/stream writing data to Doris
    • You can map the Doris table to DataFrame or RDD, it is recommended to use DataFrame.
    • Support the completion of data filtering on the Doris side to reduce the amount of data transmission.

    Build and Install

    Ready to work

    2.Specify the thrift installation directory

    Execute following command in source dir

    1. sh build.sh --spark 2.3.4 --scala 2.11 ## spark 2.3.4, scala 2.11
    2. sh build.sh --spark 3.1.2 --scala 2.12 ## spark 3.1.2, scala 2.12
    3. sh build.sh --spark 3.2.0 --scala 2.12 \
    4. --mvn-args "-Dnetty.version=4.1.68.Final -Dfasterxml.jackson.version=2.12.3" ## spark 3.2.0, scala 2.12

    After successful compilation, the file doris-spark-2.3.4-2.11-1.0.0-SNAPSHOT.jar will be generated in the output/ directory. Copy this file to ClassPath in Spark to use Spark-Doris-Connector. For example, Spark running in Local mode, put this file in the jars/ folder. Spark running in Yarn cluster mode, put this file in the pre-deployment package ,for example upload doris-spark-2.3.4-2.11-1.0.0-SNAPSHOT.jar to hdfs and add hdfs file path in spark.yarn.jars.

    1. Upload doris-spark-connector-3.1.2-2.12-1.0.0.jar Jar to hdfs.
    1. hdfs dfs -mkdir /spark-jars/
    2. hdfs dfs -put /your_local_path/doris-spark-connector-3.1.2-2.12-1.0.0.jar /spark-jars/
    1. Add doris-spark-connector-3.1.2-2.12-1.0.0.jar depence in Cluster.
    1. <dependency>
    2. <groupId>org.apache.doris</groupId>
    3. <artifactId>spark-doris-connector-3.1_2.12</artifactId>
    4. <!--artifactId>spark-doris-connector-2.3_2.11</artifactId-->
    5. </dependency>

    Notes

    Example

    SQL

    1. CREATE TEMPORARY VIEW spark_doris
    2. OPTIONS(
    3. "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
    4. "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
    5. "user"="$YOUR_DORIS_USERNAME",
    6. "password"="$YOUR_DORIS_PASSWORD"
    7. );
    8. SELECT * FROM spark_doris;

    DataFrame

    RDD

    1. import org.apache.doris.spark._
    2. val dorisSparkRDD = sc.dorisRDD(
    3. tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
    4. cfg = Some(Map(
    5. "doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
    6. "doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
    7. "doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
    8. ))
    9. )
    10. dorisSparkRDD.collect()

    pySpark

    1. dorisSparkDF = spark.read.format("doris")
    2. .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
    3. .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
    4. .option("user", "$YOUR_DORIS_USERNAME")
    5. .option("password", "$YOUR_DORIS_PASSWORD")
    6. .load()
    7. # show 5 lines data
    8. dorisSparkDF.show(5)

    SQL

    DataFrame(batch/stream)

    1. ## batch sink
    2. val mockDataDF = List(
    3. (3, "440403001005", "21.cn"),
    4. (33, null, "23.cn")
    5. ).toDF("id", "mi_code", "mi_name")
    6. mockDataDF.write.format("doris")
    7. .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
    8. .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
    9. .option("user", "$YOUR_DORIS_USERNAME")
    10. .option("password", "$YOUR_DORIS_PASSWORD")
    11. //other options
    12. //specify the fields to write
    13. .option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
    14. .save()
    15. ## stream sink(StructuredStreaming)
    16. val kafkaSource = spark.readStream
    17. .option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
    18. .option("startingOffsets", "latest")
    19. .option("subscribe", "$YOUR_KAFKA_TOPICS")
    20. .format("kafka")
    21. .load()
    22. kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
    23. .writeStream
    24. .format("doris")
    25. .option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
    26. .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
    27. .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
    28. .option("user", "$YOUR_DORIS_USERNAME")
    29. .option("password", "$YOUR_DORIS_PASSWORD")
    30. //other options
    31. //specify the fields to write
    32. .option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
    33. .start()

    Doris & Spark Column Type Mapping

    • Note: In Connector, DATE and DATETIME are mapped to . Due to the processing logic of the Doris underlying storage engine, when the time type is used directly, the time range covered cannot meet the demand. So use String type to directly return the corresponding time readable text.