1. Spark IoTDB Connector only supports Spark 2.4.5 to 3.2.0 with Scala 2.12. If you need to support other versions, you can modify the Scala version of the POM file in the module spark-iotdb-connector in the source code and then recompile it.
    2. There is a conflict of thrift version between IoTDB and Spark. Therefore, if you want to debug in spark-shell, you need to execute rm -f $SPARK_HOME/jars/libthrift* and cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/ to resolve it. Otherwise, you can only debug the code in IDE. If you want to run your task by spark-submit, you must package with dependency.

    Maven Dependency

    1. <dependency>
    2. <groupId>org.apache.iotdb</groupId>
    3. <artifactId>spark-iotdb-connector</artifactId>
    4. <version>0.13.0</version>
    5. </dependency>

    spark-shell user guide

    To partition rdd:

    1. spark-shell --jars spark-iotdb-connector-0.13.0.jar,iotdb-jdbc-0.13.0-jar-with-dependencies.jar,iotdb-session-0.13.0-jar-with-dependencies.jar
    2. import org.apache.iotdb.spark.db._
    3. val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").
    4. option("numPartition", [the partition number you want]).load
    5. df.printSchema()
    6. df.show()

    Schema Inference

    The existing data in the TsFile is as follows:

    The wide(default) table form is as follows:

    Transform between wide and narrow table

    • from wide to narrow
    • from narrow to wide
    1. import org.apache.iotdb.spark.db._
    2. val wide_df = Transformer.toWideForm(spark, narrow_df)

    Java user guide

    User Guide

    1. // import narrow table
    2. val df = spark.createDataFrame(List(
    3. (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
    4. (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
    5. val dfWithColumn = df.withColumnRenamed("_1", "Time")
    6. .withColumnRenamed("_2", "device_name")
    7. .withColumnRenamed("_3", "s0")
    8. .withColumnRenamed("_4", "s1")
    9. .withColumnRenamed("_5", "s2")
    10. .withColumnRenamed("_6", "s3")
    11. .withColumnRenamed("_7", "s4")
    12. .withColumnRenamed("_8", "s5")
    13. .write
    14. .format("org.apache.iotdb.spark.db")
    15. .option("url", "jdbc:iotdb://127.0.0.1:6667/")
    16. .save
    17. // import wide table
    18. val df = spark.createDataFrame(List(
    19. (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
    20. (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
    21. val dfWithColumn = df.withColumnRenamed("_1", "Time")
    22. .withColumnRenamed("_2", "root.test.d0.s0")
    23. .withColumnRenamed("_3", "root.test.d0.s1")
    24. .withColumnRenamed("_4", "root.test.d0.s2")
    25. .withColumnRenamed("_5", "root.test.d0.s3")
    26. .withColumnRenamed("_6", "root.test.d0.s4")
    27. .withColumnRenamed("_7", "root.test.d0.s5")
    28. dfWithColumn.write.format("org.apache.iotdb.spark.db")
    29. .option("url", "jdbc:iotdb://127.0.0.1:6667/")
    30. .option("numPartition", "10")
    31. .save

    Notes

    1. You can directly write data to IoTDB whatever the dataframe contains a wide table or a narrow table.
    2. The parameter is used to set the number of partitions. The dataframe that you want to save will be repartition based on this parameter before writing data. Each partition will open a session to write data to increase the number of concurrent requests.