Spark Doris Connector
Github: https://github.com/apache/incubator-doris-spark-connector
- Support reading data from .
- Support
Spark DataFrame
batch/stream writing data toDoris
- You can map the
Doris
table toDataFrame
orRDD
, it is recommended to useDataFrame
. - Support the completion of data filtering on the
Doris
side to reduce the amount of data transmission.
Build and Install
Ready to work
2.Specify the thrift installation directory
Execute following command in source dir
sh build.sh --spark 2.3.4 --scala 2.11 ## spark 2.3.4, scala 2.11
sh build.sh --spark 3.1.2 --scala 2.12 ## spark 3.1.2, scala 2.12
sh build.sh --spark 3.2.0 --scala 2.12 \
--mvn-args "-Dnetty.version=4.1.68.Final -Dfasterxml.jackson.version=2.12.3" ## spark 3.2.0, scala 2.12
After successful compilation, the file doris-spark-2.3.4-2.11-1.0.0-SNAPSHOT.jar
will be generated in the output/
directory. Copy this file to ClassPath
in Spark
to use Spark-Doris-Connector
. For example, Spark
running in Local
mode, put this file in the jars/
folder. Spark
running in Yarn
cluster mode, put this file in the pre-deployment package ,for example upload doris-spark-2.3.4-2.11-1.0.0-SNAPSHOT.jar
to hdfs and add hdfs file path in spark.yarn.jars.
- Upload doris-spark-connector-3.1.2-2.12-1.0.0.jar Jar to hdfs.
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/doris-spark-connector-3.1.2-2.12-1.0.0.jar /spark-jars/
- Add doris-spark-connector-3.1.2-2.12-1.0.0.jar depence in Cluster.
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.1_2.12</artifactId>
<!--artifactId>spark-doris-connector-2.3_2.11</artifactId-->
</dependency>
Notes
Example
SQL
CREATE TEMPORARY VIEW spark_doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);
SELECT * FROM spark_doris;
DataFrame
RDD
import org.apache.doris.spark._
val dorisSparkRDD = sc.dorisRDD(
tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
cfg = Some(Map(
"doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
"doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
))
)
dorisSparkRDD.collect()
pySpark
dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()
# show 5 lines data
dorisSparkDF.show(5)
SQL
DataFrame(batch/stream)
## batch sink
val mockDataDF = List(
(3, "440403001005", "21.cn"),
(33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")
mockDataDF.write.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//other options
//specify the fields to write
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
.save()
## stream sink(StructuredStreaming)
val kafkaSource = spark.readStream
.option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
.option("startingOffsets", "latest")
.option("subscribe", "$YOUR_KAFKA_TOPICS")
.format("kafka")
.load()
kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//other options
//specify the fields to write
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
.start()
Doris & Spark Column Type Mapping
- Note: In Connector,
DATE
andDATETIME
are mapped to . Due to the processing logic of the Doris underlying storage engine, when the time type is used directly, the time range covered cannot meet the demand. So useString
type to directly return the corresponding time readable text.