TiSpark 用户指南

    TiSpark 是 PingCAP 为解决用户复杂 OLAP 需求而推出的产品。它借助 Spark 平台,同时融合 TiKV 分布式集群的优势,和 TiDB 一起为用户一站式解决 HTAP (Hybrid Transactional/Analytical Processing) 的需求。TiSpark 依赖于 TiKV 集群和 Placement Driver (PD),也需要你搭建一个 Spark 集群。

    本文简单介绍如何部署和使用 TiSpark。本文假设你对 Spark 有基本认知。你可以参阅 了解 Spark 的相关信息。

    TiSpark 是将 Spark SQL 直接运行在分布式存储引擎 TiKV 上的 OLAP 解决方案。其架构图如下:

    • TiSpark 深度整合了 Spark Catalyst 引擎, 可以对计算提供精确的控制,使 Spark 能够高效的读取 TiKV 中的数据,提供索引支持以实现高速的点查。
    • 通过多种计算下推减少 Spark SQL 需要处理的数据大小,以加速查询;利用 TiDB 的内建的统计信息选择更优的查询计划。
    • 从数据集群的角度看,TiSpark + TiDB 可以让用户无需进行脆弱和难以维护的 ETL,直接在同一个平台进行事务和分析两种工作,简化了系统架构和运维。
    • 用户借助 TiSpark 项目可以在 TiDB 上使用 Spark 生态圈提供的多种工具进行数据处理。例如,使用 TiSpark 进行数据分析和 ETL;使用 TiKV 作为机器学习的数据源;借助调度系统产生定时报表等等。
    • 除此之外,TiSpark 还提供了分布式写入 TiKV 的功能。相比使用 Spark 结合 JDBC 的方式写入 TiDB,分布式写入 TiKV 可以实现事务(要么全部数据写入成功,要么全部都写入失败),并且写入速度会更快。

    环境准备

    现有 TiSpark 2.x 版本支持 Spark 2.3.x 和 Spark 2.4.x。如果你希望使用 Spark 2.1.x 版本,需使用 TiSpark 1.x。

    TiSpark 需要 JDK 1.8+ 以及 Scala 2.11(Spark2.0+ 默认 Scala 版本)。

    TiSpark 可以在 YARN,Mesos,Standalone 等任意 Spark 模式下运行。

    推荐配置

    本部分描述了 TiKV 与 TiSpark 集群分开部署、Spark 与 TiSpark 集群独立部署,以及TiSpark 与 TiKV 集群混合部署的建议配置。

    对于 TiKV 与 TiSpark 分开部署的场景,可以参考如下建议配置:

    • 硬件配置建议

      普通场景可以参考 TiDB 和 TiKV 硬件配置建议,但是如果是偏重分析的场景,可以将 TiKV 节点增加到至少 64G 内存。

    关于 Spark 的详细硬件推荐配置请参考,如下是 TiSpark 所需环境的简单描述:

    Spark 推荐 32G 内存以上的配额。请在配置中预留 25% 的内存给操作系统。

    Spark 推荐每台计算节点配备 CPU 累计 8 到 16 核以上。你可以初始设定分配所有 CPU 核给 Spark。

    Spark 的具体配置方式也请参考官方说明。以下为根据 配置的范例:

    spark-defaults.conf 中,增加如下配置:

    1. spark.tispark.pd.addresses $your_pd_servers
    2. spark.sql.extensions org.apache.spark.sql.TiExtensions

    CDH spark 版本中添加如下配置:

    1. spark.tispark.pd.addresses=$your_pd_servers
    2. spark.sql.extensions=org.apache.spark.sql.TiExtensions

    your_pd_servers 是用逗号分隔的 PD 地址,每个地址使用 地址:端口 的格式。

    例如你有一组 PD 在10.16.20.110.16.20.210.16.20.3,那么 PD 配置格式是10.16.20.1:2379,10.16.20.2:2379,10.16.20.3:2379

    对于 TiKV 与 TiSpark 混合部署的场景,需在原有 TiKV 预留资源之外累加 Spark 所需部分,并分配 25% 的内存作为系统本身占用。

    如果在已有 Spark 集群上运行 TiSpark,无需重启集群。可以使用 Spark 的 --jars 参数将 TiSpark 作为依赖引入:

    1. spark-shell --jars $TISPARK_FOLDER/tispark-${name_with_version}.jar

    如果没有使用中的 Spark 集群,推荐使用 Spark Standalone 方式部署。这里简单介绍下 Standalone 部署方式。如果遇到问题,可以去官网寻求;也欢迎在 GitHub 上提 issue

    下载安装包并安装

    你可以在 下载 Apache Spark。

    对于 Standalone 模式且无需 Hadoop 支持,则选择 Spark 2.3.x 或者 Spark 2.4.x 且带有 Hadoop 依赖的 Pre-build with Apache Hadoop 2.x 任意版本。如有需要配合使用的 Hadoop 集群,则选择对应的 Hadoop 版本号。你也可以选择从源代码自行构建以配合官方 Hadoop 2.x 之前的版本。

    如果你已经有了 Spark 二进制文件,并且当前 PATH 为 SPARKPATH,需将 TiSpark jar 包拷贝到 ${SPARKPATH}/jars 目录下。

    启动 Master

    在选中的 Spark Master 节点执行如下命令:

    1. cd $SPARKPATH
    1. ./sbin/start-master.sh

    在这步完成以后,屏幕上会打印出一个 log 文件。检查 log 文件确认 Spark-Master 是否启动成功。你可以打开 查看集群信息(如果你没有改动 Spark-Master 默认 Port Numebr)。在启动 Spark-Worker 的时候,也可以通过这个面板来确认 Worker 是否已经加入集群。

    启动 Worker

    类似地,可以用如下命令启动 Spark-Worker 节点:

    1. ./sbin/start-slave.sh spark://spark-master-hostname:7077

    命令返回以后,即可通过刚才的面板查看这个 Worker 是否已经正确地加入了 Spark 集群。在所有 Worker 节点重复刚才的命令。确认所有的 Worker 都可以正确连接 Master,这样你就拥有了一个 Standalone 模式的 Spark 集群。

    Spark SQL shell 和 JDBC 服务器

    当前版本的 TiSpark 可以直接使用 spark-sql 和 Spark 的 ThriftServer JDBC 服务器。

    一个使用范例

    假设你已经按照上述步骤成功启动了 TiSpark 集群,下面简单介绍如何使用 Spark SQL 来做 OLAP 分析。这里我们用名为 tpch 数据库中的 lineitem 表作为范例。

    假设你的 PD 节点位于 192.168.1.100,端口为 2379,在 $SPARK_HOME/conf/spark-defaults.conf 加入:

    1. spark.tispark.pd.addresses 192.168.1.100:2379

      然后在 Spark-Shell 里像原生 Spark 一样输入下面的命令:

      1. spark.sql("select count(*) from lineitem").show

      结果为:

      1. +-------------+
      2. | Count (1) |
      3. +-------------+
      4. | 600000000 |
      5. +-------------+

      Spark SQL 交互 Shell 和原生 Spark 一致:

      1. spark-sql> use tpch;
      1. Time taken: 0.015 seconds
      1. spark-sql> select count(*) from lineitem;
      1. Time taken: 0.673 seconds, Fetched 1 row(s)

      SQuirreLSQL 和 hive-beeline 可以使用 JDBC 连接 Thrift 服务器。例如,使用 beeline 连接:

      1. ./beeline
      1. Beeline version 1.2.2 by Apache Hive
      1. 1: jdbc:hive2://localhost:10000> use testdb;
      1. +---------+--+
      2. | Result |
      3. +---------+--+
      4. +---------+--+
      5. No rows selected (0.013 seconds)
      1. select count(*) from account;
      1. +-----------+--+
      2. | count(1) |
      3. +-----------+--+
      4. | 1000000 |
      5. +-----------+--+
      6. 1 row selected (1.97 seconds)

      和 Hive 一起使用 TiSpark

      TiSpark 可以和 Hive 混合使用。在启动 Spark 之前,需要添加 HADOOP_CONF_DIR 环境变量指向 Hadoop 配置目录并且将 hive-site.xml 拷贝到 $SPARK_HOME/conf 目录下。

      1. val tisparkDF = spark.sql("select * from tispark_table").toDF
      2. tisparkDF.write.saveAsTable("hive_table") // save table to hive
      3. spark.sql("select * from hive_table a, tispark_table b where a.col1 = b.col1").show // join table across Hive and Tispark

      TiSpark 从 v2.3 版本开始原生支持将 DataFrame 批量写入 TiDB 集群,该写入模式通过 TiKV 的两阶段提交协议实现。

      TiSpark 批量写入相比 Spark + JDBC 写入,有以下特点:

      以下通过 scala API 演示如何使用 TiSpark 批量写入:

      1. // select data to write
      2. val df = spark.sql("select * from tpch.ORDERS")
      3. // write data to tidb
      4. df.write.
      5. option("tidb.addr", "127.0.0.1").
      6. option("tidb.port", "4000")
      7. option("tidb.user", "root").
      8. option("database", "tpch").
      9. option("table", "target_orders").
      10. mode("append").
      11. save()
      1. update mysql.tidb set VARIABLE_VALUE="6h" where VARIABLE_NAME="tikv_gc_life_time";

      详细使用手册请参考该文档

      通过 JDBC 将 DataFrame 写入 TiDB

      除了使用 TiSpark 将 DataFrame 批量写入 TiDB 集群以外,也可以使用 Spark 原生的 JDBC 支持进行写入:

      1. import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
      2. val customer = spark.sql("select * from customer limit 100000")
      3. // you might repartition source to make it balance across nodes
      4. // and increase concurrency
      5. val df = customer.repartition(32)
      6. df.write
      7. .mode(saveMode = "append")
      8. .format("jdbc")
      9. .option("driver", "com.mysql.jdbc.Driver")
      10. // replace host and port as your and be sure to use rewrite batch
      11. .option("url", "jdbc:mysql://127.0.0.1:4000/test?rewriteBatchedStatements=true")
      12. .option("useSSL", "false")
      13. // As tested, 150 is good practice
      14. .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 150)
      15. .option("dbtable", s"cust_test_select") // database name and table name here
      16. .option("isolationLevel", "NONE") // recommended to set isolationLevel to NONE if you have a large DF to load.
      17. .option("user", "root") // TiDB user here
      18. .save()

      推荐将 isolationLevel 设置为 NONE,否则单一大事务有可能造成 TiDB 服务器内存溢出。

      统计信息

      TiSpark 可以使用 TiDB 的统计信息:

      1. 选择代价最低的索引或扫表访问
      2. 估算数据大小以决定是否进行广播优化

      如果希望使用统计信息支持,需要确保所涉及的表已经被分析。请阅读了解如何进行表分析。

      从 TiSpark 2.0 开始,统计信息将会默认被读取。

      统计信息将在 Spark Driver 进行缓存,请确定 Driver 内存足够缓存统计信息。可以在spark-defaults.conf中开启或关闭统计信息读取:

      Property Name Default Description
      spark.tispark.statistics.auto_load true 是否默认进行统计信息读取
      • Q. 是独立部署还是和现有 Spark/Hadoop 集群共用资源?

        A. 可以利用现有 Spark 集群无需单独部署,但是如果现有集群繁忙,TiSpark 将无法达到理想速度。

      • Q. 是否可以和 TiKV 混合部署?

        A. 如果 TiDB 以及 TiKV 负载较高且运行关键的线上任务,请考虑单独部署 TiSpark;并且考虑使用不同的网卡保证 OLTP 的网络资源不被侵占而影响线上业务。如果线上业务要求不高或者机器负载不大,可以考虑与 TiKV 混合部署。

      • Q. Spark 执行中报 warning:WARN ObjectStore:568 - Failed to get database

        A. Warning 忽略即可,原因是 Spark 找不到对应的 hive 库,因为这个库是在 TIKV 中,而不是在 hive 中。可以考虑调整 log4j 日志,将该参数添加到 spark 下 conf 里 log4j 文件(如果后缀是 template 那先 mv 成后缀 properties)。

      • Q. Spark 执行中报 java.sql.BatchUpdateException: Data Truncated

        A. 写入的数据长度超过了数据库定义的数据类型的长度,可以确认 target table 的字段长度,进行调整。

      • Q. TiSpark 执行 Spark 任务时报:Error:java.io.InvalidClassException: com.pingcap.tikv.region.TiRegion; local class incompatible: stream classdesc serialVersionUID …