Nebula Spark Connector
Reader
提供一个Spark SQL接口,用户可以使用该接口编程读取Nebula Graph图数据,单次读取一个点或Edge type的数据,并将读取的结果组装成Spark的DataFrame。
Writer
提供一个Spark SQL接口,用户可以使用该接口编程将DataFrame格式的数据逐条或批量写入Nebula Graph。
更多使用说明请参见Nebula Spark Connector。
Nebula Spark Connector适用于以下场景:
在不同的Nebula Graph集群之间迁移数据。
在同一个Nebula Graph集群内不同图空间之间迁移数据。
Nebula Graph与其他数据源之间迁移数据。
结合进行图计算。
提供多种连接配置项,如超时时间、连接重试次数、执行重试次数等。
提供多种数据配置项,如写入数据时设置对应列为点ID、起始点ID、目的点ID或属性。
Reader支持无属性读取和全属性读取。
Reader支持将Nebula Graph数据读取成Graphx的VertexRDD和EdgeRDD,支持非Long型点ID。
统一了SparkSQL的扩展数据源,统一采用DataSourceV2进行Nebula Graph数据扩展。
支持、
update
和delete
三种写入模式。insert
模式会插入(覆盖)数据,update
模式仅会更新已存在的数据,delete
模式只删除数据。
Note
安装 Spark 2.3以上版本。
克隆仓库
nebula-spark-connector
。进入目录
nebula-spark-connector
。$ cd nebula-spark-connector/nebula-spark-connector
编译完成后,在目录nebula-spark-connector/nebula-spark-connector/target/
下生成类似文件nebula-spark-connector-2.6.0-SHANPSHOT.jar
。
使用Nebula Spark Connector读写Nebula Graph数据库时,只需要编写以下代码即可实现。
# 从Nebula Graph读取点边数据。
spark.read.nebula().loadVerticesToDF()
spark.read.nebula().loadEdgesToDF()
# 将dataframe数据作为点和边写入Nebula Graph中。
dataframe.write.nebula().writeVertices()
dataframe.write.nebula().writeEdges()
nebula()
接收两个配置参数,包括连接配置和读写配置。
NebulaConnectionConfig
是连接Nebula Graph的配置,说明如下。ReadNebulaConfig
是读取Nebula Graph数据的配置,说明如下。
val config = NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withConenctionRetry(2)
.build()
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace("test")
.withTag("person")
.withVidField("id")
.withVidPolicy("hash")
.withVidAsProp(true)
.withPasswd("nebula")
.withBatch(1000)
.build()
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
.builder()
.withSpace("test")
.withEdge("friend")
.withSrcIdField("src")
.withSrcPolicy(null)
.withDstIdField("dst")
.withDstPolicy(null)
.withRankField("degree")
.withSrcAsProperty(true)
.withDstAsProperty(true)
.withRankAsProperty(true)
.withUser("root")
.withPasswd("nebula")
.withBatch(1000)
.build()
df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges()
默认写入模式为insert
,可以通过withWriteMode
配置修改为update
:
NebulaConnectionConfig
是连接Nebula Graph的配置,说明如下。WriteNebulaEdgeConfig
是写入边的配置,说明如下。