Nebula Spark Connector

    • Reader

      提供一个Spark SQL接口,用户可以使用该接口编程读取Nebula Graph图数据,单次读取一个点或Edge type的数据,并将读取的结果组装成Spark的DataFrame。

    • Writer

      提供一个Spark SQL接口,用户可以使用该接口编程将DataFrame格式的数据逐条或批量写入Nebula Graph。

    更多使用说明请参见Nebula Spark Connector

    Nebula Spark Connector适用于以下场景:

    • 在不同的Nebula Graph集群之间迁移数据。

    • 在同一个Nebula Graph集群内不同图空间之间迁移数据。

    • Nebula Graph与其他数据源之间迁移数据。

    • 结合进行图计算。

    • 提供多种连接配置项,如超时时间、连接重试次数、执行重试次数等。

    • 提供多种数据配置项,如写入数据时设置对应列为点ID、起始点ID、目的点ID或属性。

    • Reader支持无属性读取和全属性读取。

    • Reader支持将Nebula Graph数据读取成Graphx的VertexRDD和EdgeRDD,支持非Long型点ID。

    • 统一了SparkSQL的扩展数据源,统一采用DataSourceV2进行Nebula Graph数据扩展。

    • 支持、updatedelete三种写入模式。insert模式会插入(覆盖)数据,update模式仅会更新已存在的数据,delete模式只删除数据。

    Note

    安装 Spark 2.3以上版本。

    1. 克隆仓库nebula-spark-connector

    2. 进入目录nebula-spark-connector

      1. $ cd nebula-spark-connector/nebula-spark-connector

    编译完成后,在目录nebula-spark-connector/nebula-spark-connector/target/下生成类似文件nebula-spark-connector-2.6.0-SHANPSHOT.jar

    下载地址

    使用Nebula Spark Connector读写Nebula Graph数据库时,只需要编写以下代码即可实现。

    1. # 从Nebula Graph读取点边数据。
    2. spark.read.nebula().loadVerticesToDF()
    3. spark.read.nebula().loadEdgesToDF()
    4. # 将dataframe数据作为点和边写入Nebula Graph中。
    5. dataframe.write.nebula().writeVertices()
    6. dataframe.write.nebula().writeEdges()

    nebula()接收两个配置参数,包括连接配置和读写配置。

    • NebulaConnectionConfig是连接Nebula Graph的配置,说明如下。

    • ReadNebulaConfig是读取Nebula Graph数据的配置,说明如下。

    1. val config = NebulaConnectionConfig
    2. .builder()
    3. .withMetaAddress("127.0.0.1:9559")
    4. .withConenctionRetry(2)
    5. .build()
    6. val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
    7. .builder()
    8. .withSpace("test")
    9. .withTag("person")
    10. .withVidField("id")
    11. .withVidPolicy("hash")
    12. .withVidAsProp(true)
    13. .withPasswd("nebula")
    14. .withBatch(1000)
    15. .build()
    16. df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
    17. val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
    18. .builder()
    19. .withSpace("test")
    20. .withEdge("friend")
    21. .withSrcIdField("src")
    22. .withSrcPolicy(null)
    23. .withDstIdField("dst")
    24. .withDstPolicy(null)
    25. .withRankField("degree")
    26. .withSrcAsProperty(true)
    27. .withDstAsProperty(true)
    28. .withRankAsProperty(true)
    29. .withUser("root")
    30. .withPasswd("nebula")
    31. .withBatch(1000)
    32. .build()
    33. df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges()

    默认写入模式为insert,可以通过withWriteMode配置修改为update

    • NebulaConnectionConfig是连接Nebula Graph的配置,说明如下。

    • WriteNebulaEdgeConfig是写入边的配置,说明如下。