Queries
In order to access a table in a SQL query, it must be registered in the TableEnvironment. A table can be registered from a , Table, , DataStream. Alternatively, users can also to specify the location of the data sources.
For convenience, Table.toString()
automatically registers the table under a unique name in its TableEnvironment
and returns the name. So, Table
objects can be directly inlined into SQL queries as shown in the examples below.
Note: Queries that include unsupported SQL features cause a TableException
. The supported features of SQL on batch and streaming tables are listed in the following sections.
The following examples show how to specify a SQL queries on registered and inlined tables.
Java
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// SQL query with an inlined (unregistered) table
val table = ds.toTable(tableEnv, $"user", $"product", $"amount")
val result = tableEnv.sqlQuery(
s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
// SQL query with a registered table
// register the DataStream under the name "Orders"
tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount")
// run a SQL query on the Table and retrieve the result as a new Table
val result2 = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
// create and register a TableSink
val schema = Schema.newBuilder()
.column("product", DataTypes.STRING())
.column("amount", DataTypes.INT())
.build()
val sinkDescriptor = TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("field-delimiter", ",")
.build())
tableEnv.createTemporaryTable("RubberOrders", sinkDescriptor)
// run an INSERT SQL on the Table and emit the result to the TableSink
tableEnv.executeSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
Python
A SELECT statement or a VALUES statement can be executed to collect the content to local through the TableEnvironment.executeSql()
method. The method returns the result of the SELECT statement (or the VALUES statement) as a TableResult
. Similar to a SELECT statement, a Table
object can be executed using the Table.execute()
method to collect the content of the query to the local client. TableResult.collect()
method returns a closeable row iterator. The select job will not be finished unless all result data has been collected. We should actively close the job to avoid resource leak through the CloseableIterator#close()
method. We can also print the select result to client console through the TableResult.print()
method. The result data in TableResult
can be accessed only once. Thus, collect()
and print()
must not be called after each other.
TableResult.collect()
and TableResult.print()
have slightly different behaviors under different checkpointing settings (to enable checkpointing for a streaming job, see checkpointing config).
- For batch jobs or streaming jobs without checkpointing,
TableResult.collect()
andTableResult.print()
have neither exactly-once nor at-least-once guarantee. Query results are immediately accessible by the clients once they’re produced, but exceptions will be thrown when the job fails and restarts. - For streaming jobs with exactly-once checkpointing,
TableResult.collect()
andTableResult.print()
guarantee an end-to-end exactly-once record delivery. A result will be accessible by clients only after its corresponding checkpoint completes. For streaming jobs with at-least-once checkpointing,
TableResult.collect()
andTableResult.print()
guarantee an end-to-end at-least-once record delivery. Query results are immediately accessible by the clients once they’re produced, but it is possible for the same result to be delivered multiple times.Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
// execute SELECT statement
TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders");
// use try-with-resources statement to make sure the iterator will be closed automatically
while(it.hasNext()) {
Row row = it.next();
// handle row
}
}
// execute Table
TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute();
tableResult2.print();
Python
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env, settings)
# enable checkpointing
table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")
# execute SELECT statement
table_result1 = table_env.execute_sql("SELECT * FROM Orders")
table_result1.print()
# execute Table
table_result2 = table_env.sql_query("SELECT * FROM Orders").execute()
table_result2.print()
Flink parses SQL using , which supports standard ANSI SQL.
The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The Operations section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.
Grammar ↕
Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java:
- The case of identifiers is preserved whether or not they are quoted.
- After which, identifiers are matched case-sensitively.
- Unlike Java, back-ticks allow identifiers to contain non-alphanumeric characters (e.g.
SELECT a AS `my field` FROM t
).
Flink SQL> SELECT 'Hello World', 'It''s me';
+-------------+---------+
| EXPR$0 | EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in set
Unicode characters are supported in string literals. If explicit unicode code points are required, use the following syntax:
- Use the backslash (
\
) as escaping character (default): - Use a custom escaping character:
SELECT U&'#263A' UESCAPE '#'