SQL 客户端

    SQL 客户端 的目的是提供一种简单的方式来编写、调试和提交表程序到 Flink 集群上,而无需写一行 Java 或 Scala 代码。SQL 客户端命令行界面(CLI) 能够在命令行中检索和可视化分布式应用中实时产生的结果。

    本节介绍如何在命令行里启动(setup)和运行你的第一个 Flink SQL 程序。

    SQL 客户端捆绑在常规 Flink 发行版中,因此可以直接运行。它仅需要一个正在运行的 Flink 集群就可以在其中执行表程序。有关设置 Flink 群集的更多信息,请参见集群和部署部分。如果仅想试用 SQL 客户端,也可以使用以下命令启动本地集群:

    SQL Client 脚本也位于 Flink 的 bin 目录中。,用户可以通过启动嵌入式 standalone 进程或通过连接到远程 SQL 客户端网关来启动 SQL 客户端命令行界面。目前仅支持 ,模式默认值embedded。可以通过以下方式启动 CLI:

    1. ./bin/sql-client.sh

    或者显式使用 embedded 模式:

    1. ./bin/sql-client.sh embedded

    执行 SQL 查询

    命令行界面启动后,你可以使用 HELP 命令列出所有可用的 SQL 语句。输入第一条 SQL 查询语句并按 Enter 键执行,可以验证你的设置及集群连接是否正确:

    1. SELECT 'Hello World';

    该查询不需要 table source,并且只产生一行结果。CLI 将从集群中检索结果并将其可视化。按 Q 键退出结果视图。

    CLI 为维护和可视化结果提供三种模式

    表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:

    1. SET 'sql-client.execution.result-mode' = 'table';

    变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。

    1. SET 'sql-client.execution.result-mode' = 'changelog';

    Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):

    1. SET 'sql-client.execution.result-mode' = 'tableau';

    注意当你使用这个模式运行一个流式查询的时候,Flink 会将结果持续的打印在当前的屏幕之上。如果这个流式查询的输入是有限的数据集, 那么Flink在处理完所有的数据之后,会自动的停止作业,同时屏幕上的打印也会相应的停止。如果你想提前结束这个查询,那么可以直接使用 CTRL-C 按键,这个会停掉作业同时停止屏幕上的打印。

    你可以用如下查询来查看三种结果模式的运行情况:

    1. SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

    此查询执行一个有限字数示例:

    变更日志模式 下,看到的结果应该类似于:

    表格模式 下,可视化结果表将不断更新,直到表程序以如下内容结束:

    1. Bob, 2
    2. Alice, 1
    3. Greg, 1

    Tableau模式 下,如果这个查询以流的方式执行,那么将显示以下内容:

    1. +-----+----------------------+----------------------+
    2. | +/- | name | cnt |
    3. +-----+----------------------+----------------------+
    4. | + | Bob | 1 |
    5. | + | Alice | 1 |
    6. | + | Greg | 1 |
    7. | - | Bob | 1 |
    8. | + | Bob | 2 |
    9. +-----+----------------------+----------------------+
    10. Received a total of 5 rows
    1. +-------+-----+
    2. | name | cnt |
    3. +-------+-----+
    4. | Alice | 1 |
    5. | Bob | 2 |
    6. | Greg | 1 |
    7. +-------+-----+
    8. 3 rows in set

    这几种结果模式在 SQL 查询的原型设计过程中都非常有用。这些模式的结果都存储在 SQL 客户端 的 Java 堆内存中。为了保持 CLI 界面及时响应,变更日志模式仅显示最近的 1000 个更改。表格模式支持浏览更大的结果,这些结果仅受可用主内存和配置的(sql-client.execution.max-table-result.rows)的限制。

    注意 在批处理环境下执行的查询只能用表格模式或者Tableau模式进行检索。

    定义查询语句后,可以将其作为长时间运行的独立 Flink 作业提交给集群。配置部分解释如何声明读取数据的 table source,写入数据的 sink 以及配置其他表程序属性的方法。

    SQL Client startup options

    The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs.

    1. ./bin/sql-client.sh --help
    2. Mode "embedded" (default) submits Flink jobs from the local machine.
    3. Syntax: [embedded] [OPTIONS]
    4. "embedded" mode options:
    5. -f,--file <script file> Script file that should be executed.
    6. In this mode, the client will not
    7. open an interactive terminal.
    8. -h,--help Show the help message with
    9. descriptions of all options.
    10. -hist,--history <History file path> The file which you want to save the
    11. command history into. If not
    12. specified, we will auto-generate one
    13. under your user's home directory.
    14. -i,--init <initialization file> Script file that used to init the
    15. session context. If get error in
    16. execution, the sql client will exit.
    17. Notice it's not allowed to add query
    18. or insert into the init file.
    19. -j,--jar <JAR file> A JAR file to be imported into the
    20. session. The file might contain
    21. user-defined classes needed for the
    22. execution of statements such as
    23. functions, table sources, or sinks.
    24. Can be used multiple times.
    25. -l,--library <JAR directory> A JAR file directory with which every
    26. new session is initialized. The files
    27. might contain user-defined classes
    28. needed for the execution of
    29. statements such as functions, table
    30. sources, or sinks. Can be used
    31. multiple times.
    32. -pyarch,--pyArchives <arg> Add python archive files for job. The
    33. archive files will be extracted to
    34. the working directory of python UDF
    35. worker. For each archive file, a
    36. target directory be specified. If the
    37. target directory name is specified,
    38. the archive file will be extracted to
    39. a directory with the
    40. specified name. Otherwise, the
    41. archive file will be extracted to a
    42. directory with the same name of the
    43. archive file. The files uploaded via
    44. this option are accessible via
    45. relative path. '#' could be used as
    46. the separator of the archive file
    47. path and the target directory name.
    48. Comma (',') could be used as the
    49. separator to specify multiple archive
    50. files. This option can be used to
    51. upload the virtual environment, the
    52. data files used in Python UDF (e.g.:
    53. --pyArchives
    54. file:///tmp/py37.zip,file:///tmp/data
    55. .zip#data --pyExecutable
    56. py37.zip/py37/bin/python). The data
    57. files could be accessed in Python
    58. UDF, e.g.: f = open('data/data.txt',
    59. 'r').
    60. -pyexec,--pyExecutable <arg> Specify the path of the python
    61. interpreter used to execute the
    62. python UDF worker (e.g.:
    63. --pyExecutable
    64. UDF worker depends on Python 3.6+,
    65. Apache Beam (version == 2.27.0), Pip
    66. (version >= 7.1.0) and SetupTools
    67. (version >= 37.0.0). Please ensure
    68. that the specified environment meets
    69. the above requirements.
    70. -pyfs,--pyFiles <pythonFiles> Attach custom files for job.
    71. The standard resource file suffixes
    72. directory are all supported. These
    73. files will be added to the PYTHONPATH
    74. of both the local client and the
    75. remote python UDF worker. Files
    76. suffixed with .zip will be extracted
    77. and added to PYTHONPATH. Comma (',')
    78. could be used as the separator to
    79. specify multiple files (e.g.:
    80. --pyFiles
    81. file:///tmp/myresource.zip,hdfs:///$n
    82. amenode_address/myresource2.zip).
    83. -pyreq,--pyRequirements <arg> Specify a requirements.txt file which
    84. defines the third-party dependencies.
    85. These dependencies will be installed
    86. and added to the PYTHONPATH of the
    87. python UDF worker. A directory which
    88. contains the installation packages of
    89. these dependencies could be specified
    90. optionally. Use '#' as the separator
    91. if the optional parameter exists
    92. (e.g.: --pyRequirements
    93. file:///tmp/requirements.txt#file:///
    94. tmp/cached_dir).
    95. -s,--session <session identifier> The identifier for a session.
    96. 'default' is the default identifier.
    97. -u,--update <SQL update statement> Deprecated Experimental (for testing
    98. only!) feature: Instructs the SQL
    99. Client to immediately execute the
    100. given update statement after starting
    101. up. The process is shut down after
    102. the statement has been submitted to
    103. the cluster and returns an
    104. appropriate return code. Currently,
    105. this feature is only supported for
    106. INSERT INTO statements that declare
    107. the target sink table.Please use
    108. option -f to submit update statement.

    SQL Client Configuration

    A SQL query needs a configuration environment in which it is executed. SQL Client supports the -i startup option to execute an initialization SQL file to setup environment when starting up the SQL Client. The so-called initialization SQL file can use DDLs to define available catalogs, table sources and sinks, user-defined functions, and other properties required for execution and deployment.

    An example of such a file is presented below.

    1. -- Define available catalogs
    2. CREATE CATALOG MyCatalog
    3. WITH (
    4. 'type' = 'hive'
    5. );
    6. USE CATALOG MyCatalog;
    7. -- Define available database
    8. CREATE DATABASE MyDatabase;
    9. USE MyDatabase;
    10. -- Define TABLE
    11. CREATE TABLE MyTable(
    12. MyField1 INT,
    13. MyField2 STRING
    14. ) WITH (
    15. 'connector' = 'filesystem',
    16. 'path' = '/path/to/something',
    17. 'format' = 'csv'
    18. );
    19. -- Define VIEW
    20. CREATE VIEW MyCustomView AS SELECT MyField2 FROM MyTable;
    21. -- Define user-defined functions here.
    22. CREATE FUNCTION foo.bar.AggregateUDF AS myUDF;
    23. -- Properties that change the fundamental execution behavior of a table program.
    24. SET 'execution.runtime-mode' = 'streaming'; -- execution mode either 'batch' or 'streaming'
    25. SET 'sql-client.execution.result-mode' = 'table'; -- available values: 'table', 'changelog' and 'tableau'
    26. SET 'sql-client.execution.max-table-result.rows' = '10000'; -- optional: maximum number of maintained rows
    27. SET 'parallelism.default' = '1'; -- optional: Flink's parallelism (1 by default)
    28. SET 'pipeline.auto-watermark-interval' = '200'; --optional: interval for periodic watermarks
    29. SET 'pipeline.max-parallelism' = '10'; -- optional: Flink's maximum parallelism
    30. SET 'table.exec.state.ttl' = '1000'; -- optional: table program's idle state time
    31. SET 'restart-strategy' = 'fixed-delay';
    32. -- Configuration options for adjusting and tuning table programs.
    33. SET 'table.optimizer.join-reorder-enabled' = 'true';
    34. SET 'table.exec.spill-compression.enabled' = 'true';
    35. SET 'table.exec.spill-compression.block-size' = '128kb';

    This configuration:

    • connects to Hive catalogs and uses MyCatalog as the current catalog with MyDatabase as the current database of the catalog,
    • defines a table MyTableSource that can read data from a CSV file,
    • defines a view MyCustomView that declares a virtual table using a SQL query,
    • defines a user-defined function myUDF that can be instantiated using the class name,
    • uses streaming mode for running statements and a parallelism of 1,
    • runs exploratory queries in the table result mode,
    • and makes some planner adjustments around join reordering and spilling via configuration options.

    When using -i <init.sql> option to initialize SQL Client session, the following statements are allowed in an initialization SQL file:

    • DDL(CREATE/DROP/ALTER),
    • USE CATALOG/DATABASE,
    • LOAD/UNLOAD MODULE,
    • SET command,

    When execute queries or insert statements, please enter the interactive mode or use the -f option to submit the SQL statements.

    Attention If SQL Client receives errors during initialization, SQL Client will exit with error messages.

    Dependencies

    The SQL Client does not require setting up a Java project using Maven, Gradle, or sbt. Instead, you can pass the dependencies as regular JAR files that get submitted to the cluster. You can either specify each JAR file separately (using --jar) or define entire library directories (using --library). For connectors to external systems (such as Apache Kafka) and corresponding data formats (such as JSON), Flink provides ready-to-use JAR bundles. These JAR files can be downloaded for each release from the Maven central repository.

    The full list of offered SQL JARs can be found on the connection to external systems page.

    You can refer to the section for information on how to configure connector and format dependencies.

    SQL Client allows users to submit jobs either within the interactive command line or using -f option to execute sql file.

    In both modes, SQL Client supports to parse and execute all types of the Flink supported SQL statements.

    Interactive Command Line

    In interactive Command Line, the SQL Client reads user inputs and executes the statement terminated by semicolon (;).

    SQL Client will print success message if the statement is executed successfully. When getting errors, SQL Client will also print error messages. By default, the error message only contains the error cause. In order to print the full exception stack for debugging, please set the sql-client.verbose to true through command SET 'sql-client.verbose' = 'true';.

    Execute SQL Files

    An example of such a file is presented below.

    1. CREATE TEMPORARY TABLE users (
    2. user_id BIGINT,
    3. user_name STRING,
    4. user_level STRING,
    5. region STRING,
    6. PRIMARY KEY (user_id) NOT ENFORCED
    7. ) WITH (
    8. 'connector' = 'upsert-kafka',
    9. 'topic' = 'users',
    10. 'properties.bootstrap.servers' = '...',
    11. 'value.format' = 'avro'
    12. );
    13. -- set sync mode
    14. SET 'table.dml-sync' = 'true';
    15. -- set the job name
    16. SET 'pipeline.name' = 'SqlJob';
    17. -- set the queue that the job submit to
    18. SET 'yarn.application.queue' = 'root';
    19. -- set the job parallelism
    20. SET 'parallelism.default' = '100';
    21. -- restore from the specific savepoint path
    22. SET 'execution.savepoint.path' = '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab';
    23. INSERT INTO pageviews_enriched
    24. SELECT *
    25. FROM pageviews AS p
    26. LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u
    27. ON p.user_id = u.user_id;

    This configuration:

    • defines a temporal table source users that reads from a CSV file,
    • set the properties, e.g job name,
    • set the savepoint path,
    • submit a sql job that load the savepoint from the specified savepoint path.

    Attention Compared to the interactive mode, SQL Client will stop execution and exit when there are errors.

    SQL Client execute each INSERT INTO statement as a single Flink job. However, this is sometimes not optimal because some part of the pipeline can be reused. SQL Client supports STATEMENT SET syntax to execute a set of SQL statements. This is an equivalent feature with StatementSet in Table API. The STATEMENT SET syntax encloses one or more INSERT INTO statements. All statements in a STATEMENT SET block are holistically optimized and executed as a single Flink job. Joint optimization and execution allows for reusing common intermediate results and can therefore significantly improve the efficiency of executing multiple queries.

    Syntax

    1. EXECUTE STATEMENT SET
    2. BEGIN
    3. -- one or more INSERT INTO statements
    4. { INSERT INTO|OVERWRITE <select_statement>; }+
    5. END;

    Attention The statements of enclosed in the STATEMENT SET must be separated by a semicolon (;). The old syntax BEGIN STATEMENT SET; ... END; is deprecated, may be removed in the future version.

    SQL CLI

    SQL File

    1. CREATE TABLE pageviews (
    2. user_id BIGINT,
    3. page_id BIGINT,
    4. viewtime TIMESTAMP,
    5. proctime AS PROCTIME()
    6. ) WITH (
    7. 'connector' = 'kafka',
    8. 'topic' = 'pageviews',
    9. 'properties.bootstrap.servers' = '...',
    10. 'format' = 'avro'
    11. );
    12. CREATE TABLE pageview (
    13. page_id BIGINT,
    14. cnt BIGINT
    15. ) WITH (
    16. 'connector' = 'jdbc',
    17. 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
    18. 'table-name' = 'pageview'
    19. );
    20. CREATE TABLE uniqueview (
    21. page_id BIGINT,
    22. cnt BIGINT
    23. ) WITH (
    24. 'connector' = 'jdbc',
    25. 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
    26. 'table-name' = 'uniqueview'
    27. );
    28. EXECUTE STATEMENT SET
    29. BEGIN
    30. INSERT INTO pageview
    31. SELECT page_id, count(1)
    32. FROM pageviews
    33. GROUP BY page_id;
    34. INSERT INTO uniqueview
    35. SELECT page_id, count(distinct user_id)
    36. FROM pageviews
    37. GROUP BY page_id;
    38. END;

    Execute DML statements sync/async

    By default, SQL Client executes DML statements asynchronously. That means, SQL Client will submit a job for the DML statement to a Flink cluster, and not wait for the job to finish. So SQL Client can submit multiple jobs at the same time. This is useful for streaming jobs, which are long-running in general.

    SQL Client makes sure that a statement is successfully submitted to the cluster. Once the statement is submitted, the CLI will show information about the Flink job.

    1. Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
    2. [INFO] Table update statement has been successfully submitted to the cluster:
    3. Cluster ID: StandaloneClusterId
    4. Job ID: 6f922fe5cba87406ff23ae4a7bb79044

    Attention The SQL Client does not track the status of the running Flink job after submission. The CLI process can be shutdown after the submission without affecting the detached query. Flink’s restart strategy takes care of the fault-tolerance. A query can be cancelled using Flink’s web interface, command-line, or REST API.

    However, for batch users, it’s more common that the next DML statement requires waiting until the previous DML statement finishes. In order to execute DML statements synchronously, you can set table.dml-sync option true in SQL Client.

    1. Flink SQL> SET 'table.dml-sync' = 'true';
    2. [INFO] Session property has been set.
    3. Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
    4. [INFO] Submitting SQL update statement to the cluster...
    5. [INFO] Execute statement in sync mode. Please wait for the execution finish...
    6. [INFO] Complete execution of the SQL update statement.

    Attention If you want to terminate the job, just type CTRL-C to cancel the execution.

    Start a SQL Job from a savepoint

    Flink supports to start the job with specified savepoint. In SQL Client, it’s allowed to use SET command to specify the path of the savepoint.

    1. Flink SQL> SET 'execution.savepoint.path' = '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab';
    2. [INFO] Session property has been set.
    3. -- all the following DML statements will be restroed from the specified savepoint path
    4. Flink SQL> INSERT INTO ...

    When the path to savepoint is specified, Flink will try to restore the state from the savepoint when executing all the following DML statements.

    Because the specified savepoint path will affect all the following DML statements, you can use RESET command to reset this config option, i.e. disable restoring from savepoint.

    1. Flink SQL> RESET execution.savepoint.path;
    2. [INFO] Session property has been reset.

    For more details about creating and managing savepoints, please refer to .

    Define a Custom Job Name

    SQL Client supports to define job name for queries and DML statements through SET command.

    1. Flink SQL> SET 'pipeline.name' = 'kafka-to-hive';
    2. [INFO] Session property has been set.
    3. -- all the following DML statements will use the specified job name.
    4. Flink SQL> INSERT INTO ...

    Because the specified job name will affect all the following queries and DML statements, you can also use RESET command to reset this configuration, i.e. use default job names.

    1. [INFO] Session property has been reset.

    当前的 SQL 客户端仅支持嵌入式模式。在将来,社区计划提供基于 REST 的 SQL 客户端网关(Gateway) 的功能,详见 和 FLIP-91