Spark IoTDB Connector
only supports Spark2.4.5
to3.2.0
withScala 2.12
. If you need to support other versions, you can modify the Scala version of the POM file in the modulespark-iotdb-connector
in the source code and then recompile it.- There is a conflict of thrift version between IoTDB and Spark. Therefore, if you want to debug in spark-shell, you need to execute
rm -f $SPARK_HOME/jars/libthrift*
andcp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/
to resolve it. Otherwise, you can only debug the code in IDE. If you want to run your task byspark-submit
, you must package with dependency.
Maven Dependency
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>spark-iotdb-connector</artifactId>
<version>0.13.0</version>
</dependency>
spark-shell user guide
To partition rdd:
spark-shell --jars spark-iotdb-connector-0.13.0.jar,iotdb-jdbc-0.13.0-jar-with-dependencies.jar,iotdb-session-0.13.0-jar-with-dependencies.jar
import org.apache.iotdb.spark.db._
val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").
option("numPartition", [the partition number you want]).load
df.printSchema()
df.show()
Schema Inference
The existing data in the TsFile is as follows:
The wide(default) table form is as follows:
Transform between wide and narrow table
- from wide to narrow
- from narrow to wide
import org.apache.iotdb.spark.db._
val wide_df = Transformer.toWideForm(spark, narrow_df)
Java user guide
User Guide
// import narrow table
val df = spark.createDataFrame(List(
(1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
(2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_2", "device_name")
.withColumnRenamed("_3", "s0")
.withColumnRenamed("_4", "s1")
.withColumnRenamed("_5", "s2")
.withColumnRenamed("_6", "s3")
.withColumnRenamed("_7", "s4")
.withColumnRenamed("_8", "s5")
.write
.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.save
// import wide table
val df = spark.createDataFrame(List(
(1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
(2L, 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_2", "root.test.d0.s0")
.withColumnRenamed("_3", "root.test.d0.s1")
.withColumnRenamed("_4", "root.test.d0.s2")
.withColumnRenamed("_5", "root.test.d0.s3")
.withColumnRenamed("_6", "root.test.d0.s4")
.withColumnRenamed("_7", "root.test.d0.s5")
dfWithColumn.write.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.option("numPartition", "10")
.save
Notes
- You can directly write data to IoTDB whatever the dataframe contains a wide table or a narrow table.
- The parameter is used to set the number of partitions. The dataframe that you want to save will be repartition based on this parameter before writing data. Each partition will open a session to write data to increase the number of concurrent requests.