Intro to the Python Table API

    All Table API and SQL programs, both batch and streaming, follow the same pattern. The following code example shows the common structure of Table API and SQL programs.

    Create a TableEnvironment

    The is a central concept of the Table API and SQL integration. The following code example shows how to create a TableEnvironment:

    1. from pyflink.table import EnvironmentSettings, TableEnvironment
    2. # create a streaming TableEnvironment
    3. env_settings = EnvironmentSettings.in_streaming_mode()
    4. table_env = TableEnvironment.create(env_settings)
    5. # or create a batch TableEnvironment
    6. env_settings = EnvironmentSettings.in_batch_mode()
    7. table_env = TableEnvironment.create(env_settings)

    For more details about the different ways to create a TableEnvironment, please refer to the .

    The TableEnvironment is responsible for:

    • Creating Tables
    • Registering Tables as a temporary view
    • Executing SQL queries, see SQL for more details
    • Registering user-defined (scalar, table, or aggregation) functions, see and Vectorized User-defined Functions for more details
    • Configuring the job, see for more details
    • Managing Python dependencies, see Dependency Management for more details
    • Submitting the jobs for execution

    Table is a core component of the Python Table API. A Table is a logical representation of the intermediate result of a Table API Job.

    A Table is always bound to a specific TableEnvironment. It is not possible to combine tables from different TableEnvironments in same query, e.g., to join or union them.

    You can create a Table from a list object:

    1. from pyflink.table import EnvironmentSettings, TableEnvironment
    2. # create a batch TableEnvironment
    3. env_settings = EnvironmentSettings.in_batch_mode()
    4. table_env = TableEnvironment.create(env_settings)
    5. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
    6. table.to_pandas()

    The result is:

    1. _1 _2
    2. 0 1 Hi
    3. 1 2 Hello

    You can also create the Table with specified column names:

    1. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
    2. table.to_pandas()

    The result is:

    1. id data
    2. 0 1 Hi
    3. 1 2 Hello

    By default the table schema is extracted from the data automatically.

    If the automatically generated table schema isn’t satisfactory, you can specify it manually:

    1. table_without_schema = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
    2. # by default the type of the "id" column is 64 bit int
    3. default_type = table_without_schema.to_pandas()["id"].dtype
    4. print('By default the type of the "id" column is %s.' % default_type)
    5. from pyflink.table import DataTypes
    6. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
    7. DataTypes.ROW([DataTypes.FIELD("id", DataTypes.TINYINT()),
    8. DataTypes.FIELD("data", DataTypes.STRING())]))
    9. # now the type of the "id" column is 8 bit int
    10. type = table.to_pandas()["id"].dtype
    11. print('Now the type of the "id" column is %s.' % type)

    The result is:

    1. By default the type of the "id" column is int64.
    2. Now the type of the "id" column is int8.

    Create using a Connector

    You can create a Table using connector DDL:

    1. from pyflink.table import EnvironmentSettings, TableEnvironment
    2. # create a stream TableEnvironment
    3. env_settings = EnvironmentSettings.in_streaming_mode()
    4. table_env = TableEnvironment.create(env_settings)
    5. table_env.execute_sql("""
    6. CREATE TABLE random_source (
    7. id BIGINT,
    8. data TINYINT
    9. ) WITH (
    10. 'connector' = 'datagen',
    11. 'fields.id.kind'='sequence',
    12. 'fields.id.start'='1',
    13. 'fields.id.end'='3',
    14. 'fields.data.kind'='sequence',
    15. 'fields.data.start'='4',
    16. 'fields.data.end'='6'
    17. )
    18. """)
    19. table = table_env.from_path("random_source")
    20. table.to_pandas()

    The result is:

    1. id data
    2. 0 2 5
    3. 1 1 4
    4. 2 3 6

    Create using a Catalog

    A TableEnvironment maintains a map of catalogs of tables which are created with an identifier.

    The tables in a catalog may either be temporary, and tied to the lifecycle of a single Flink session, or permanent, and visible across multiple Flink sessions.

    The tables and views created via SQL DDL, e.g. “create table …” and “create view …” are also stored in a catalog.

    If you want to use tables from a catalog with the Table API, you can use the “from_path” method to create the Table API objects:

    1. # prepare the catalog
    2. # register Table API tables in the catalog
    3. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
    4. table_env.create_temporary_view('source_table', table)
    5. # create Table API table from catalog
    6. new_table = table_env.from_path('source_table')
    7. new_table.to_pandas()

    The result is:

    1. id data
    2. 0 1 Hi
    3. 1 2 Hello

    Write Queries

    The Table object offers many methods for applying relational operations. These methods return new Table objects representing the result of applying the relational operations on the input Table. These relational operations may be composed of multiple method calls, such as table.group_by(...).select(...).

    The Table API documentation describes all Table API operations that are supported on streaming and batch tables.

    The following example shows a simple Table API aggregation query:

    The result is:

    1. name rev_sum
    2. 0 Jack 30

    The are also supported in Python Table API, which include Map Operation, , Aggregate Operation and .

    The following example shows a simple row-based operation query:

    1. from pyflink.table import EnvironmentSettings, TableEnvironment
    2. from pyflink.table import DataTypes
    3. from pyflink.table.udf import udf
    4. import pandas as pd
    5. # using batch table environment to execute the queries
    6. env_settings = EnvironmentSettings.in_batch_mode()
    7. table_env = TableEnvironment.create(env_settings)
    8. orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],
    9. ['name', 'country', 'revenue'])
    10. map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),
    11. result_type=DataTypes.ROW(
    12. [DataTypes.FIELD("name", DataTypes.STRING()),
    13. DataTypes.FIELD("revenue", DataTypes.BIGINT())]),
    14. orders.map(map_function).alias('name', 'revenue').to_pandas()

    The result is:

    1. name revenue
    2. 0 Jack 100
    3. 1 Rose 300
    4. 2 Jack 200

    Write SQL Queries

    Flink’s SQL integration is based on , which implements the SQL standard. SQL queries are specified as Strings.

    The SQL documentation describes Flink’s SQL support for streaming and batch tables.

    The following example shows a simple SQL aggregation query:

    1. from pyflink.table import EnvironmentSettings, TableEnvironment
    2. # use a stream TableEnvironment to execute the queries
    3. env_settings = EnvironmentSettings.in_streaming_mode()
    4. table_env.execute_sql("""
    5. CREATE TABLE random_source (
    6. id BIGINT,
    7. data TINYINT
    8. ) WITH (
    9. 'connector' = 'datagen',
    10. 'fields.id.kind'='sequence',
    11. 'fields.id.start'='1',
    12. 'fields.id.end'='8',
    13. 'fields.data.kind'='sequence',
    14. 'fields.data.start'='4',
    15. 'fields.data.end'='11'
    16. )
    17. """)
    18. table_env.execute_sql("""
    19. CREATE TABLE print_sink (
    20. id BIGINT,
    21. data_sum TINYINT
    22. ) WITH (
    23. 'connector' = 'print'
    24. )
    25. """)
    26. table_env.execute_sql("""
    27. INSERT INTO print_sink
    28. SELECT id, sum(data) as data_sum FROM
    29. (SELECT id / 2 as id, data FROM random_source)
    30. WHERE id > 1
    31. GROUP BY id
    32. """).wait()

    The result is:

    1. 2> +I(4,11)
    2. 6> +I(2,8)
    3. 8> +I(3,10)
    4. 6> -U(2,8)
    5. 8> -U(3,10)
    6. 6> +U(2,15)
    7. 8> +U(3,19)

    In fact, this shows the change logs received by the print sink. The output format of a change log is:

    1. {subtask id}> {message type}{string format of the value}

    For example, “2> +I(4,11)” means this message comes from the 2nd subtask, and “+I” means it is an insert message. “(4, 11)” is the content of the message. In addition, “-U” means a retract record (i.e. update-before), which means this message should be deleted or retracted from the sink. “+U” means this is an update record (i.e. update-after), which means this message should be updated or inserted by the sink.

    So, we get this result from the change logs above:

    1. (4, 11)
    2. (2, 15)
    3. (3, 19)

    Mix the Table API and SQL

    The Table objects used in Table API and the tables used in SQL can be freely converted to each other.

    The following example shows how to use a Table object in SQL:

    1. # create a sink table to emit results
    2. table_env.execute_sql("""
    3. CREATE TABLE table_sink (
    4. id BIGINT,
    5. data VARCHAR
    6. ) WITH (
    7. 'connector' = 'print'
    8. )
    9. """)
    10. # convert the Table API table to a SQL view
    11. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
    12. table_env.create_temporary_view('table_api_table', table)
    13. # emit the Table API table
    14. table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()
    1. 6> +I(1,Hi)
    2. 6> +I(2,Hello)

    And the following example shows how to use SQL tables in the Table API:

    1. # create a sql source table
    2. table_env.execute_sql("""
    3. CREATE TABLE sql_source (
    4. id BIGINT,
    5. data TINYINT
    6. ) WITH (
    7. 'connector' = 'datagen',
    8. 'fields.id.kind'='sequence',
    9. 'fields.id.start'='1',
    10. 'fields.id.end'='4',
    11. 'fields.data.kind'='sequence',
    12. 'fields.data.start'='4',
    13. 'fields.data.end'='7'
    14. )
    15. """)
    16. # convert the sql table to Table API table
    17. table = table_env.from_path("sql_source")
    18. # or create the table from a sql query
    19. table = table_env.sql_query("SELECT * FROM sql_source")
    20. # emit the table
    21. table.to_pandas()

    The result is:

    1. id data
    2. 0 2 5
    3. 1 1 4
    4. 2 4 7
    5. 3 3 6

    You can call the TableResult.collect method to collect results of a table to client. The type of the results is an auto closeable iterator.

    The following code shows how to use the TableResult.collect() method:

    The result is:

    1. <Row(2, 'Hi', 'Hello')>
    2. <Row(3, 'Hello', 'Hello')>

    Collect Results to Client by converting it to pandas DataFrame

    You can call the “to_pandas” method to :

    1. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
    2. table.to_pandas()

    The result is:

    1. id data
    2. 0 1 Hi
    3. 1 2 Hello

    Note “to_pandas” will trigger the materialization of the table and collect table content to the memory of the client, it’s a good practice to limit the number of rows collected via Table.limit .

    Note “to_pandas” is not supported by the flink planner, and not all data types can be emitted to pandas DataFrames.

    Emit Results to One Sink Table

    You can call the “execute_insert” method to emit the data in a Table object to a sink table:

    1. table_env.execute_sql("""
    2. CREATE TABLE sink_table (
    3. id BIGINT,
    4. data VARCHAR
    5. ) WITH (
    6. 'connector' = 'print'
    7. )
    8. """)
    9. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
    10. table.execute_insert("sink_table").wait()

    The result is:

    1. 6> +I(1,Hi)
    2. 6> +I(2,Hello)

    This could also be done using SQL:

    1. table_env.create_temporary_view("table_source", table)
    2. table_env.execute_sql("INSERT INTO sink_table SELECT * FROM table_source").wait()

    You can use a to emit the Tables to multiple sink tables in one job:

    1. # prepare source tables and sink tables
    2. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
    3. table_env.create_temporary_view("simple_source", table)
    4. table_env.execute_sql("""
    5. CREATE TABLE first_sink_table (
    6. id BIGINT,
    7. data VARCHAR
    8. ) WITH (
    9. 'connector' = 'print'
    10. )
    11. """)
    12. table_env.execute_sql("""
    13. CREATE TABLE second_sink_table (
    14. id BIGINT,
    15. data VARCHAR
    16. ) WITH (
    17. 'connector' = 'print'
    18. )
    19. """)
    20. # create a statement set
    21. statement_set = table_env.create_statement_set()
    22. # emit the "table" object to the "first_sink_table"
    23. statement_set.add_insert("first_sink_table", table)
    24. # emit the "simple_source" to the "second_sink_table" via a insert sql query
    25. statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
    26. # execute the statement set
    27. statement_set.execute().wait()

    The result is:

    1. 7> +I(1,Hi)
    2. 7> +I(1,Hi)
    3. 7> +I(2,Hello)
    4. 7> +I(2,Hello)

    Explain Tables

    The Table API provides a mechanism to explain the logical and optimized query plans used to compute a Table. This is done through the Table.explain() or StatementSet.explain() methods. Table.explain()returns the plan of a Table. StatementSet.explain() returns the plan for multiple sinks. These methods return a string describing three things:

    1. the Abstract Syntax Tree of the relational query, i.e., the unoptimized logical query plan,
    2. the optimized logical query plan, and
    3. the physical execution plan.

    TableEnvironment.explain_sql() and TableEnvironment.execute_sql() support executing an EXPLAIN statement to get the plans. Please refer to the page for more details.

    The following code shows how to use the Table.explain() method:

    1. # using a stream TableEnvironment
    2. from pyflink.table import EnvironmentSettings, TableEnvironment
    3. env_settings = EnvironmentSettings.in_streaming_mode()
    4. table_env = TableEnvironment.create(env_settings)
    5. table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
    6. table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
    7. table = table1 \
    8. .where(table1.data.like('H%')) \
    9. .union_all(table2)
    10. print(table.explain())

    The result is:

    1. == Abstract Syntax Tree ==
    2. LogicalUnion(all=[true])
    3. :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'H%')])
    4. : +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_201907291, source: [PythonInputFormatTableSource(id, data)]]])
    5. +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1709623525, source: [PythonInputFormatTableSource(id, data)]]])
    6. == Optimized Logical Plan ==
    7. Union(all=[true], union=[id, data])
    8. :- Calc(select=[id, data], where=[LIKE(data, _UTF-16LE'H%')])
    9. : +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_201907291, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])
    10. +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1709623525, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])
    11. == Physical Execution Plan ==
    12. Stage 133 : Data Source
    13. content : Source: PythonInputFormatTableSource(id, data)
    14. Stage 134 : Operator
    15. content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_201907291, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
    16. ship_strategy : FORWARD
    17. Stage 135 : Operator
    18. content : Calc(select=[id, data], where=[(data LIKE _UTF-16LE'H%')])
    19. ship_strategy : FORWARD
    20. Stage 136 : Data Source
    21. content : Source: PythonInputFormatTableSource(id, data)
    22. Stage 137 : Operator
    23. content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1709623525, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
    24. ship_strategy : FORWARD
    1. # using a stream TableEnvironment
    2. from pyflink.table import EnvironmentSettings, TableEnvironment
    3. env_settings = EnvironmentSettings.in_streaming_mode()
    4. table_env = TableEnvironment.create(env_settings)
    5. table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
    6. table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
    7. table_env.execute_sql("""
    8. CREATE TABLE print_sink_table (
    9. id BIGINT,
    10. data VARCHAR
    11. ) WITH (
    12. 'connector' = 'print'
    13. )
    14. """)
    15. table_env.execute_sql("""
    16. CREATE TABLE black_hole_sink_table (
    17. id BIGINT,
    18. data VARCHAR
    19. ) WITH (
    20. 'connector' = 'blackhole'
    21. )
    22. """)
    23. statement_set = table_env.create_statement_set()
    24. statement_set.add_insert("print_sink_table", table1.where(table1.data.like('H%')))
    25. statement_set.add_insert("black_hole_sink_table", table2)
    26. print(statement_set.explain())

    The result is: