导入SST文件数据

    • 仅Linux系统支持导入SST文件。

    • 不支持属性的Default值。

    背景信息

    Exchange支持两种数据导入模式:

    • 直接将数据源的数据通过nGQL语句的形式导入Nebula Graph。

    • 将数据源的数据生成SST文件,然后借助Console将SST文件导入Nebula Graph。

    下文将介绍生成SST文件并用其导入数据的适用场景、实现方法、前提条件、操作步骤等内容。

    • 适合在线业务,因为生成时几乎不会影响业务(只是读取Schema),导入速度快。

      Caution

      虽然导入速度快,但是导入期间(大约10秒)会阻塞对应空间的写操作,建议在业务低峰期进行导入。

    • 适合数据源数据量较大的场景,导入速度快。

    实现方法

    Nebula Graph底层使用RocksDB作为键值型存储引擎。RocksDB是基于硬盘的存储引擎,提供了一系列API用于创建及导入SST格式的文件,有助于快速导入海量数据。

    SST文件是一个内部包含了任意长度的有序键值对集合的文件,用于高效地存储大量键值型数据。生成SST文件的整个过程主要由Exchange的Reader、sstProcessor和sstWriter完成。整个数据处理过程如下:

    1. Reader从数据源中读取数据。

    2. sstProcessor根据Nebula Graph的Schema信息生成SST文件,然后上传至HDFS。SST文件的格式请参见数据存储格式

    3. sstWriter打开一个文件并插入数据。生成SST文件时,Key必须按照顺序写入。

    4. 生成SST文件之后,RocksDB通过方法将SST文件导入到Nebula Graph中。例如:

      调用IngestExternalFile()方法时,RocksDB默认会将文件拷贝到数据目录,并且阻塞RocksDB写入操作。如果SST文件中的键范围覆盖了Memtable键的范围,则将Memtable落盘(flush)到硬盘。将SST文件放置在LSM树最优位置后,为文件分配一个全局序列号,并打开写操作。

    本文以为例。

    环境配置

    本文示例在MacOS下完成,以下是相关的环境配置信息:

    • 硬件规格:

      • CPU:1.7 GHz Quad-Core Intel Core i7
      • 内存:16 GB
    • Spark:2.4.7 单机版

    • Hadoop:2.9.2 伪分布式部署

    • Nebula Graph:2.6.0。

    • 已经并获取如下信息:

      • Graph服务和Meta服务的的IP地址和端口。

      • 拥有Nebula Graph写权限的用户名和密码。

      • Meta服务配置文件中的--ws_storage_http_port和Storage服务配置文件中的--ws_http_port一致。例如都为19779

      • Graph服务配置文件中的--ws_meta_http_port和Meta服务配置文件中的--ws_http_port一致。例如都为19559

      • Schema的信息,包括Tag和Edge type的名称、属性等。

    • 已经编译Exchange,或者直接编译完成的.jar文件。本示例中使用Exchange 2.6.0。

    • 已经安装Spark。

    • 已经安装JDK 1.8或以上版本,并配置环境变量JAVA_HOME。

    • 确认Hadoop服务在所有部署Storage服务的机器上运行正常。

      Note

      • 如果需要生成其他数据源的SST文件,请参见相应数据源的文档,查看前提条件部分。

      • 如果只需要生成SST文件,不需要在部署Storage服务的机器上安装Hadoop服务。

    操作步骤

    分析CSV文件中的数据,按以下步骤在Nebula Graph中创建Schema:

    1. 确认Schema要素。Nebula Graph中的Schema要素如下表所示。

    2. 使用Nebula Console创建一个图空间basketballplayer,并创建一个Schema,如下所示。

      1. ## 创建图空间
      2. nebula> CREATE SPACE basketballplayer \
      3. (partition_num = 10, \
      4. replica_factor = 1, \
      5. vid_type = FIXED_STRING(30));
      6. ## 选择图空间basketballplayer
      7. nebula> USE basketballplayer;
      8. ## 创建Tag player
      9. nebula> CREATE TAG player(name string, age int);
      10. ## 创建Tag team
      11. nebula> CREATE TAG team(name string);
      12. ## 创建Edge type follow
      13. nebula> CREATE EDGE follow(degree int);
      14. ## 创建Edge type serve
      15. nebula> CREATE EDGE serve(start_year int, end_year int);

    更多信息,请参见。

    步骤 2:处理CSV文件

    确认以下信息:

    1. 处理CSV文件以满足Schema的要求。

      Note

      可以使用有表头或者无表头的CSV文件。

    2. 获取CSV文件存储路径。

    编译Exchange后,复制target/classes/application.conf文件设置相关配置。在本示例中,复制的文件名为sst_application.conf。各个配置项的详细说明请参见。

    1. {
    2. # Spark相关配置
    3. spark: {
    4. app: {
    5. name: Nebula Exchange 2.0
    6. }
    7. master:local
    8. driver: {
    9. cores: 1
    10. maxResultSize: 1G
    11. }
    12. executor: {
    13. memory:1G
    14. }
    15. cores:{
    16. max: 16
    17. }
    18. }
    19. # Nebula Graph相关配置
    20. nebula: {
    21. address:{
    22. graph:["127.0.0.1:9669"]
    23. meta:["127.0.0.1:9559"]
    24. }
    25. user: root
    26. pswd: nebula
    27. space: basketballplayer
    28. # SST文件相关配置
    29. path:{
    30. # 本地临时存放生成的SST文件的目录
    31. local:"/tmp"
    32. # SST文件在HDFS的存储路径
    33. remote:"/sst"
    34. # HDFS的NameNode地址
    35. hdfs.namenode: "hdfs://*.*.*.*:9000"
    36. }
    37. # 客户端连接参数
    38. connection {
    39. # socket连接、执行的超时时间,单位:毫秒。
    40. timeout: 30000
    41. error: {
    42. # 最大失败数,超过后会退出应用程序。
    43. max: 32
    44. # 失败的导入作业将记录在输出路径中。
    45. output: /tmp/errors
    46. }
    47. # 使用谷歌的RateLimiter来限制发送到NebulaGraph的请求。
    48. rate: {
    49. # RateLimiter的稳定吞吐量。
    50. limit: 1024
    51. # 从RateLimiter获取允许的超时时间,单位:毫秒
    52. timeout: 1000
    53. }
    54. }
    55. # 处理点
    56. tags: [
    57. {
    58. # 指定Nebula Graph中定义的Tag名称。
    59. name: player
    60. type: {
    61. # 指定数据源,使用CSV。
    62. source: csv
    63. # 指定如何将点数据导入Nebula Graph:Client或SST。
    64. sink: sst
    65. }
    66. # 指定CSV文件的路径。
    67. # 文件存储在HDFS上,用双引号括起路径,以hdfs://开头,例如"hdfs://ip:port/xx/xx.csv"。
    68. path: "hdfs://*.*.*.*:9000/dataset/vertex_player.csv"
    69. # 如果CSV文件没有表头,使用[_c0, _c1, _c2, ..., _cn]表示其表头,并将列指示为属性值的源。
    70. # 如果CSV文件有表头,则使用实际的列名。
    71. fields: [_c1, _c2]
    72. # 指定Nebula Graph中定义的属性名称。
    73. # fields与nebula.fields的顺序必须一一对应。
    74. nebula.fields: [age, name]
    75. # 指定一个列作为VID的源。
    76. # vertex的值必须与上述fields或者csv.fields中的列名保持一致。
    77. # 目前,Nebula Graph 2.6.0仅支持字符串或整数类型的VID。
    78. vertex: {
    79. field:_c0
    80. }
    81. # 指定的分隔符。默认值为英文逗号(,)。
    82. separator: ","
    83. # 如果CSV文件有表头,请将header设置为true。
    84. # 如果CSV文件没有表头,请将header设置为false。默认值为false。
    85. header: false
    86. # 指定单批次写入Nebula Graph的最大点数量。
    87. batch: 256
    88. # 指定Spark分片数量。
    89. partition: 32
    90. }
    91. # 设置Tag team相关信息。
    92. {
    93. # 指定Nebula Graph中定义的Tag名称。
    94. name: team
    95. type: {
    96. # 指定数据源,使用CSV。
    97. source: csv
    98. # 指定如何将点数据导入Nebula Graph:Client或SST。
    99. sink: sst
    100. }
    101. # 指定CSV文件的路径。
    102. # 文件存储在HDFS上,用双引号括起路径,以hdfs://开头,例如"hdfs://ip:port/xx/xx.csv"。
    103. path: "hdfs://*.*.*.*:9000/dataset/vertex_team.csv"
    104. # 如果CSV文件没有表头,使用[_c0, _c1, _c2, ..., _cn]表示其表头,并将列指示为属性值的源。
    105. # 如果CSV文件有表头,则使用实际的列名。
    106. fields: [_c1]
    107. # 指定Nebula Graph中定义的属性名称。
    108. # fields与nebula.fields的顺序必须一一对应。
    109. nebula.fields: [name]
    110. # 指定一个列作为VID的源。
    111. # vertex的值必须与上述fields或者csv.fields中的列名保持一致。
    112. # 目前,Nebula Graph 2.6.0仅支持字符串或整数类型的VID。
    113. vertex: {
    114. field:_c0
    115. }
    116. # 指定的分隔符。默认值为英文逗号(,)。
    117. separator: ","
    118. # 如果CSV文件有表头,请将header设置为true。
    119. # 如果CSV文件没有表头,请将header设置为false。默认值为false。
    120. header: false
    121. # 指定单批次写入Nebula Graph的最大点数量。
    122. batch: 256
    123. # 指定Spark分片数量。
    124. partition: 32
    125. }
    126. # 如果需要添加更多点,请参考前面的配置进行添加。
    127. ]
    128. # 处理边
    129. edges: [
    130. {
    131. # 指定Nebula Graph中定义的Edge type名称。
    132. name: follow
    133. type: {
    134. # 指定数据源,使用CSV。
    135. source: csv
    136. # 指定如何将点数据导入Nebula Graph:Client或SST。
    137. sink: sst
    138. }
    139. # 指定CSV文件的路径。
    140. # 文件存储在HDFS上,用双引号括起路径,以hdfs://开头,例如"hdfs://ip:port/xx/xx.csv"。
    141. # 如果CSV文件没有表头,使用[_c0, _c1, _c2, ..., _cn]表示其表头,并将列指示为属性值的源。
    142. # 如果CSV文件有表头,则使用实际的列名。
    143. fields: [_c2]
    144. # 指定Nebula Graph中定义的属性名称。
    145. # fields与nebula.fields的顺序必须一一对应。
    146. nebula.fields: [degree]
    147. # 指定一个列作为起始点和目的点的源。
    148. # vertex的值必须与上述fields或者csv.fields中的列名保持一致。
    149. # 目前,Nebula Graph 2.6.0仅支持字符串或整数类型的VID。
    150. source: {
    151. field: _c0
    152. }
    153. target: {
    154. field: _c1
    155. }
    156. # 指定的分隔符。默认值为英文逗号(,)。
    157. separator: ","
    158. # 指定一个列作为rank的源(可选)。
    159. #ranking: rank
    160. # 如果CSV文件有表头,请将header设置为true。
    161. # 如果CSV文件没有表头,请将header设置为false。默认值为false。
    162. header: false
    163. # 指定单批次写入Nebula Graph的最大边数量。
    164. batch: 256
    165. # 指定Spark分片数量。
    166. partition: 32
    167. }
    168. # 设置Edge type serve相关信息。
    169. {
    170. # 指定Nebula Graph中定义的Edge type名称。
    171. name: serve
    172. type: {
    173. # 指定数据源,使用CSV。
    174. source: csv
    175. # 指定如何将点数据导入Nebula Graph:Client或SST。
    176. sink: sst
    177. }
    178. # 指定CSV文件的路径。
    179. # 文件存储在HDFS上,用双引号括起路径,以hdfs://开头,例如"hdfs://ip:port/xx/xx.csv"。
    180. path: "hdfs://*.*.*.*:9000/dataset/edge_serve.csv"
    181. # 如果CSV文件没有表头,使用[_c0, _c1, _c2, ..., _cn]表示其表头,并将列指示为属性值的源。
    182. # 如果CSV文件有表头,则使用实际的列名。
    183. fields: [_c2,_c3]
    184. # 指定Nebula Graph中定义的属性名称。
    185. # fields与nebula.fields的顺序必须一一对应。
    186. nebula.fields: [start_year, end_year]
    187. # 指定一个列作为起始点和目的点的源。
    188. # vertex的值必须与上述fields或者csv.fields中的列名保持一致。
    189. # 目前,Nebula Graph 2.6.0仅支持字符串或整数类型的VID。
    190. source: {
    191. field: _c0
    192. }
    193. target: {
    194. field: _c1
    195. }
    196. # 指定的分隔符。默认值为英文逗号(,)。
    197. separator: ","
    198. # 指定一个列作为rank的源(可选)。
    199. #ranking: _c5
    200. # 如果CSV文件有表头,请将header设置为true。
    201. # 如果CSV文件没有表头,请将header设置为false。默认值为false。
    202. header: false
    203. # 指定单批次写入Nebula Graph的最大边数量。
    204. batch: 256
    205. # 指定Spark分片数量。
    206. partition: 32
    207. }
    208. ]
    209. # 如果需要添加更多边,请参考前面的配置进行添加。
    210. }

    步骤 4:生成SST文件

    Note

    生成SST文件时,会涉及到Spark的shuffle操作,请注意在提交命令中增加spark.sql.shuffle.partition的配置。

    Note

    JAR包有两种获取方式:或者从maven仓库下载。

    示例:

    1. ${SPARK_HOME}/bin/spark-submit --master "local" --conf spark.sql.shuffle.partition=200 --class com.vesoft.nebula.exchange.Exchange /root/nebula-exchange/nebula-exchange/target/nebula-exchange-2.6.0.jar -c /root/nebula-exchange/nebula-exchange/target/classes/sst_application.conf

    任务执行完成后,可以在HDFS上的/sst目录(nebula.path.remote参数指定)内查看到生成的SST文件。

    Note

    如果对Schema有修改操作,例如重建图空间、修改Tag、修改Edge type等,需要重新生成SST文件,因为SST文件会验证Space ID、Tag ID、Edge ID等信息。

    Note

    导入前请确认以下信息:

    • 确认所有部署Storage服务的机器上都已部署Hadoop服务,并配置HADOOP_HOME和JAVA_HOME。

    • Meta服务配置文件中的--ws_storage_http_port(如果没有,请手动添加)和Storage服务配置文件中的--ws_http_port一致。例如都为19779

    • Graph服务配置文件中的--ws_meta_http_port(如果没有,请手动添加)和Meta服务配置文件中的--ws_http_port一致。例如都为19559

    使用客户端工具连接Nebula Graph数据库,按如下操作导入SST文件:

    1. 执行命令选择之前创建的图空间。

      1. nebula> USE basketballplayer;
    2. 执行命令下载SST文件:

      示例:

      1. nebula> DOWNLOAD HDFS "hdfs://*.*.*.*:9000/sst";
    3. 执行命令导入SST文件:

      1. nebula> INGEST;

    Note

    • 如果需要重新下载,请在Nebula Graph安装路径内的data/storage/nebula目录内,将对应Space ID目录内的download文件夹删除,然后重新下载SST文件。如果图空间是多副本,保存副本的所有机器都需要删除download文件夹。

    步骤 6:(可选)验证数据

    用户可以在Nebula Graph客户端(例如Nebula Graph Studio)中执行查询语句,确认数据是否已导入。例如:

    用户也可以使用命令查看统计数据。