由于 TiSpark 并没有直接修改 Apache Spark 的代码,因此只要是 Apache Spark 2.1 以上版本,就可以找到对应兼容的 TiSpark。具体版本的对应,可以查看官网文档。无论是通过 YARN 部署还是通过 Standalone 部署,都可以参考 Apache Spark 官网的部署环节。通过匹配 Spark 版本,可以从 下载对应的 TiSpark 版本的 JAR 包。
实际开启和部署 TiSpark 需要确保如下两点:
- Spark Driver 以及 Executor 可以访问到 TiSpark JAR 包。
- Spark 开启如下参数(推荐通过修改 spark-defaults.conf 来控制)
以 Standalone 集群为例,加入 TiSpark 流程如下:
- 按照上述介绍修改 SPARK_HOME/spark-defaults.conf,加入上述必要配置。
- 启动 Spark 应用时引用 TiSpark JAR 包。以 spark-shell 为例:
TiSpark 的使用方法与原生 Spark 类似,提供多种方式查询数据。
11.2.2.1 通过 spark-shell 查询数据
使用 spark-shell 启动,通过 Spark 提供的 API 即可访问数据。
示例一:利用 spark-shell 查询 lineitem 表的数据
scala>spark.sql("use tpch")
使用 tpch 库
scala>spark.sql("select count(*) from lineitem").show
查询 lineitem 表的总行数
查询结果显示如下:
+-------------+
| Count (1) |
+-------------+
| 600000000 |
+-------------+
scala> spark.sql(
"""select
| l_returnflag,
| l_linestatus,
| sum(l_quantity) as sum_qty,
| sum(l_extendedprice) as sum_base_price,
| sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
| sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
| avg(l_quantity) as avg_qty,
| avg(l_extendedprice) as avg_price,
| count(*) as count_order
|from
| lineitem
|where
| l_shipdate <= date '1998-12-01' - interval '90' day
|group by
| l_returnflag,
| l_linestatus
| l_returnflag,
| l_linestatus
""".stripMargin).show
scala>
+------------+------------+---------+--------------+--------------+
|l_returnflag|l_linestatus| sum_qty|sum_base_price|sum_disc_price|
+------------+------------+---------+--------------+--------------+
| A| F|380456.00| 532348211.65|505822441.4861|
| N| F| 8971.00| 12384801.37| 11798257.2080|
| N| O|742802.00| 1041502841.45|989737518.6346|
| R| F|381449.00| 534594445.35|507996454.4067|
+------------+------------+---------+--------------+--------------+
11.2.2.2 通过 Spark SQL 查询数据
TiSpark同样支持利用 Spark SQL 查询数据。使用 spark-sql 命令即可进入交互式数据查询页面,接入输入 sql 即可。
示例三:利用 spark-sql 查询 lineitem 表的数据
11.2.2.3 利用 JDBC 访问 TiSpark
部署时启动 Thrift 服务器后,可以通过 JDBC 的方式使用 TiSpark。
示例四:利用 beeline 工具使用 JDBC 的方式访问 TiSpark
beeline> !connect jdbc:hive2://localhost:10000
1: jdbc:hive2://localhost:10000> use testdb;
+---------+--+
| Result |
+---------+--+
+---------+--+
No rows selected (0.013 seconds)
select count(*) from account;
+-----------+--+
| count(1) |
+-----------+--+
| 1000000 |
+-----------+--+
11.2.3.1 使用 PySpark 访问 TiSpark
TiSpark on PySpark 是 TiSpark 用来支持 Python 语言而构建的 Python 包。PySpark 支持直接使用也可以通过 python 的包管理工具来安装使用。
示例一:直接使用 PySpark 访问 TiSpark
./bin/pyspark --jars /PATH/tispark-${name_with_version}.jar
# Query as you are in spark-shell
spark.sql("show databases").show()
spark.sql("show tables").show()
spark.sql("select count(*) from customer").show()
# Result
+--------+
|count(1)|
+--------+
| 150|
+--------+
首先,利用 pip 来安装 pytispark,相关命令如下:
pip install pytispark
安装完成之后,创建一个用以查询数据的 python 文件 test.py,文件示例如下:
创建完成之后使用 spark-submit 来查询数据,相关命令如下:
./bin/spark-submit --jars /PATH/tispark-${name_with_version}.jar test.py
# Result:
+--------+
|count(1)|
+--------+
| 150|
+--------+
11.2.3.2 使用 TiSparkR 访问 TiSpark
TiSparkR 是 TiSpark 用来支持R语言来构建的 R 包。同 PySpark 类似,TiSparkR 同样支持直接使用也可以通过加载 library 的方式使用。
示例三:直接使用 PySpark 访问 TiSpark
./bin/sparkR --jars /PATH/tispark-${name_with_version}.jar
sql("use tpch_test")
count <- sql("select count(*) from customer")
head(count)
# Result
+--------+
|count(1)|
+--------+
| 150|
+--------+
示例四:利用 SparkR 包的形式使用 TiSpark 首先,创建一个用以查询数据的 R 文件 test.R,文件示例如下:
library(SparkR)
sparkR.session()
sql("use tpch_test")
11.2.3.3 TiSpark 访问 TiFlash
参见 TiSpark 访问 TiFlash 章节