Queries

    In order to access a table in a SQL query, it must be registered in the TableEnvironment. A table can be registered from a , Table, , DataStream. Alternatively, users can also to specify the location of the data sources.

    For convenience, Table.toString() automatically registers the table under a unique name in its TableEnvironment and returns the name. So, Table objects can be directly inlined into SQL queries as shown in the examples below.

    Note: Queries that include unsupported SQL features cause a TableException. The supported features of SQL on batch and streaming tables are listed in the following sections.

    The following examples show how to specify a SQL queries on registered and inlined tables.

    Java

    1. val env = StreamExecutionEnvironment.getExecutionEnvironment
    2. val tableEnv = StreamTableEnvironment.create(env)
    3. // read a DataStream from an external source
    4. val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
    5. // SQL query with an inlined (unregistered) table
    6. val table = ds.toTable(tableEnv, $"user", $"product", $"amount")
    7. val result = tableEnv.sqlQuery(
    8. s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
    9. // SQL query with a registered table
    10. // register the DataStream under the name "Orders"
    11. tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount")
    12. // run a SQL query on the Table and retrieve the result as a new Table
    13. val result2 = tableEnv.sqlQuery(
    14. "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
    15. // create and register a TableSink
    16. val schema = Schema.newBuilder()
    17. .column("product", DataTypes.STRING())
    18. .column("amount", DataTypes.INT())
    19. .build()
    20. val sinkDescriptor = TableDescriptor.forConnector("filesystem")
    21. .schema(schema)
    22. .option("field-delimiter", ",")
    23. .build())
    24. tableEnv.createTemporaryTable("RubberOrders", sinkDescriptor)
    25. // run an INSERT SQL on the Table and emit the result to the TableSink
    26. tableEnv.executeSql(
    27. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

    Python

    A SELECT statement or a VALUES statement can be executed to collect the content to local through the TableEnvironment.executeSql() method. The method returns the result of the SELECT statement (or the VALUES statement) as a TableResult. Similar to a SELECT statement, a Table object can be executed using the Table.execute() method to collect the content of the query to the local client. TableResult.collect() method returns a closeable row iterator. The select job will not be finished unless all result data has been collected. We should actively close the job to avoid resource leak through the CloseableIterator#close() method. We can also print the select result to client console through the TableResult.print() method. The result data in TableResult can be accessed only once. Thus, collect() and print() must not be called after each other.

    TableResult.collect() and TableResult.print() have slightly different behaviors under different checkpointing settings (to enable checkpointing for a streaming job, see checkpointing config).

    • For batch jobs or streaming jobs without checkpointing, TableResult.collect() and TableResult.print() have neither exactly-once nor at-least-once guarantee. Query results are immediately accessible by the clients once they’re produced, but exceptions will be thrown when the job fails and restarts.
    • For streaming jobs with exactly-once checkpointing, TableResult.collect() and TableResult.print() guarantee an end-to-end exactly-once record delivery. A result will be accessible by clients only after its corresponding checkpoint completes.
    • For streaming jobs with at-least-once checkpointing, TableResult.collect() and TableResult.print() guarantee an end-to-end at-least-once record delivery. Query results are immediately accessible by the clients once they’re produced, but it is possible for the same result to be delivered multiple times.

      Java

    1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
    3. tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
    4. // execute SELECT statement
    5. TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders");
    6. // use try-with-resources statement to make sure the iterator will be closed automatically
    7. while(it.hasNext()) {
    8. Row row = it.next();
    9. // handle row
    10. }
    11. }
    12. // execute Table
    13. TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute();
    14. tableResult2.print();

    Python

    1. env = StreamExecutionEnvironment.get_execution_environment()
    2. table_env = StreamTableEnvironment.create(env, settings)
    3. # enable checkpointing
    4. table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
    5. table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")
    6. # execute SELECT statement
    7. table_result1 = table_env.execute_sql("SELECT * FROM Orders")
    8. table_result1.print()
    9. # execute Table
    10. table_result2 = table_env.sql_query("SELECT * FROM Orders").execute()
    11. table_result2.print()

    Flink parses SQL using , which supports standard ANSI SQL.

    The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The Operations section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.

    Grammar ↕

    Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java:

    • The case of identifiers is preserved whether or not they are quoted.
    • After which, identifiers are matched case-sensitively.
    • Unlike Java, back-ticks allow identifiers to contain non-alphanumeric characters (e.g. SELECT a AS `my field` FROM t).
    1. Flink SQL> SELECT 'Hello World', 'It''s me';
    2. +-------------+---------+
    3. | EXPR$0 | EXPR$1 |
    4. +-------------+---------+
    5. | Hello World | It's me |
    6. +-------------+---------+
    7. 1 row in set

    Unicode characters are supported in string literals. If explicit unicode code points are required, use the following syntax:

    • Use the backslash (\) as escaping character (default):
    • Use a custom escaping character: SELECT U&'#263A' UESCAPE '#'