DataStream API Integration

    The DataStream API offers the primitives of stream processing (namely time, state, and dataflow management) in a relatively low-level imperative programming API. The Table API abstracts away many internals and provides a structured and declarative API.

    Both APIs can work with bounded and unbounded streams.

    Bounded streams need to be managed when processing historical data. Unbounded streams occur in real-time processing scenarios that might be initialized with historical data first.

    For efficient execution, both APIs offer processing bounded streams in an optimized batch execution mode. However, since batch is just a special case of streaming, it is also possible to run pipelines of bounded streams in regular streaming execution mode.

    Pipelines in one API can be defined end-to-end without dependencies on the other API. However, it might be useful to mix both APIs for various reasons:

    • Use the table ecosystem for accessing catalogs or connecting to external systems easily, before implementing the main pipeline in DataStream API.
    • Access some of the SQL functions for stateless data normalization and cleansing, before implementing the main pipeline in DataStream API.
    • Switch to DataStream API every now and then if a more low-level operation (e.g. custom timer handling) is not present in Table API.

    Flink provides special bridging functionalities to make the integration with DataStream API as smooth as possible.

    Back to top

    Flink provides a specialized StreamTableEnvironment for integrating with the DataStream API. Those environments extend the regular TableEnvironment with additional methods and take the StreamExecutionEnvironment used in the DataStream API as a parameter.

    The following code shows an example of how to go back and forth between the two APIs. Column names and types of the Table are automatically derived from the TypeInformation of the DataStream. Since the DataStream API does not support changelog processing natively, the code assumes append-only/insert-only semantics during the stream-to-table and table-to-stream conversion.

    Java

    Scala

    1. import org.apache.flink.api.scala._
    2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    3. import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    4. // create environments of both APIs
    5. val env = StreamExecutionEnvironment.getExecutionEnvironment
    6. val tableEnv = StreamTableEnvironment.create(env)
    7. // create a DataStream
    8. val dataStream = env.fromElements("Alice", "Bob", "John")
    9. // interpret the insert-only DataStream as a Table
    10. val inputTable = tableEnv.fromDataStream(dataStream)
    11. // register the Table object as a view and query it
    12. tableEnv.createTemporaryView("InputTable", inputTable)
    13. val resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable")
    14. // interpret the insert-only Table as a DataStream again
    15. val resultStream = tableEnv.toDataStream(resultTable)
    16. // add a printing sink and execute in DataStream API
    17. resultStream.print()
    18. env.execute()
    19. // prints:
    20. // +I[Alice]
    21. // +I[Bob]
    22. // +I[John]

    Python

    1. from pyflink.datastream import StreamExecutionEnvironment
    2. from pyflink.table import StreamTableEnvironment
    3. from pyflink.common.typeinfo import Types
    4. env = StreamExecutionEnvironment.get_execution_environment()
    5. t_env = StreamTableEnvironment.create(env)
    6. # create a DataStream
    7. ds = env.from_collection(["Alice", "Bob", "John"], Types.STRING())
    8. # interpret the insert-only DataStream as a Table
    9. t = t_env.from_data_stream(ds)
    10. # register the Table object as a view and query it
    11. t_env.create_temporary_view("InputTable", t)
    12. res_table = t_env.sql_query("SELECT UPPER(f0) FROM InputTable")
    13. # interpret the insert-only Table as a DataStream again
    14. res_ds = t_env.to_data_stream(res_table)
    15. # add a printing sink and execute in DataStream API
    16. res_ds.print()
    17. env.execute()
    18. # prints:
    19. # +I[Alice]
    20. # +I[Bob]
    21. # +I[John]

    The complete semantics of fromDataStream and toDataStream can be found in the . In particular, the section discusses how to influence the schema derivation with more complex and nested types. It also covers working with event-time and watermarks.

    Depending on the kind of query, in many cases the resulting dynamic table is a pipeline that does not only produce insert-only changes when converting the Table to a DataStream but also produces retractions and other kinds of updates. During table-to-stream conversion, this could lead to an exception similar to

    1. Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].

    in which case one needs to revise the query again or switch to toChangelogStream.

    The following example shows how updating tables can be converted. Every result row represents an entry in a changelog with a change flag that can be queried by calling row.getKind() on it. In the example, the second score for Alice creates an update before (-U) and update after (+U) change.

    Java

    1. import org.apache.flink.streaming.api.datastream.DataStream;
    2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    3. import org.apache.flink.table.api.Table;
    4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    5. import org.apache.flink.types.Row;
    6. // create environments of both APIs
    7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    8. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    9. // create a DataStream
    10. DataStream<Row> dataStream = env.fromElements(
    11. Row.of("Alice", 12),
    12. Row.of("Bob", 10),
    13. Row.of("Alice", 100));
    14. // interpret the insert-only DataStream as a Table
    15. Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
    16. // register the Table object as a view and query it
    17. // the query contains an aggregation that produces updates
    18. tableEnv.createTemporaryView("InputTable", inputTable);
    19. Table resultTable = tableEnv.sqlQuery(
    20. "SELECT name, SUM(score) FROM InputTable GROUP BY name");
    21. // interpret the updating Table as a changelog DataStream
    22. DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
    23. // add a printing sink and execute in DataStream API
    24. resultStream.print();
    25. env.execute();
    26. // prints:
    27. // +I[Alice, 12]
    28. // +I[Bob, 10]
    29. // -U[Alice, 12]
    30. // +U[Alice, 112]

    Scala

    1. import org.apache.flink.api.scala.typeutils.Types
    2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    3. import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    4. import org.apache.flink.types.Row
    5. // create environments of both APIs
    6. val env = StreamExecutionEnvironment.getExecutionEnvironment
    7. val tableEnv = StreamTableEnvironment.create(env)
    8. // create a DataStream
    9. val dataStream = env.fromElements(
    10. Row.of("Alice", Int.box(12)),
    11. Row.of("Bob", Int.box(10)),
    12. Row.of("Alice", Int.box(100))
    13. )(Types.ROW(Types.STRING, Types.INT))
    14. // interpret the insert-only DataStream as a Table
    15. val inputTable = tableEnv.fromDataStream(dataStream).as("name", "score")
    16. // register the Table object as a view and query it
    17. // the query contains an aggregation that produces updates
    18. tableEnv.createTemporaryView("InputTable", inputTable)
    19. val resultTable = tableEnv.sqlQuery("SELECT name, SUM(score) FROM InputTable GROUP BY name")
    20. // interpret the updating Table as a changelog DataStream
    21. val resultStream = tableEnv.toChangelogStream(resultTable)
    22. // add a printing sink and execute in DataStream API
    23. resultStream.print()
    24. env.execute()
    25. // prints:
    26. // +I[Alice, 12]
    27. // +I[Bob, 10]
    28. // -U[Alice, 12]
    29. // +U[Alice, 112]

    Python

    1. from pyflink.datastream import StreamExecutionEnvironment
    2. from pyflink.table import StreamTableEnvironment
    3. from pyflink.common.typeinfo import Types
    4. # create environments of both APIs
    5. env = StreamExecutionEnvironment.get_execution_environment()
    6. t_env = StreamTableEnvironment.create(env)
    7. # create a DataStream
    8. ds = env.from_collection([("Alice", 12), ("Bob", 10), ("Alice", 100)],
    9. type_info=Types.ROW_NAMED(
    10. ["a", "b"],
    11. [Types.STRING(), Types.INT()]))
    12. input_table = t_env.from_data_stream(ds).alias("name", "score")
    13. # register the Table object as a view and query it
    14. # the query contains an aggregation that produces updates
    15. t_env.create_temporary_view("InputTable", input_table)
    16. res_table = t_env.sql_query("SELECT name, SUM(score) FROM InputTable GROUP BY name")
    17. # interpret the updating Table as a changelog DataStream
    18. res_stream = t_env.to_changelog_stream(res_table)
    19. # add a printing sink and execute in DataStream API
    20. res_stream.print()
    21. env.execute()
    22. # prints:
    23. # +I[Alice, 12]
    24. # +I[Bob, 10]
    25. # -U[Alice, 12]
    26. # +U[Alice, 112]

    The complete semantics of fromChangelogStream and toChangelogStream can be found in the dedicated section below. In particular, the section discusses how to influence the schema derivation with more complex and nested types. It covers working with event-time and watermarks. It discusses how to declare a primary key and changelog mode for the input and output streams.

    The example above shows how the final result is computed incrementally by continuously emitting for each incoming record. However, in cases where the input streams are finite (i.e. bounded), a result can be computed more efficiently by leveraging batch processing principles.

    In batch processing, operators can be executed in successive stages that consume the entire input table before emitting results. For example, a join operator can sort both bounded inputs before performing the actual joining (i.e. sort-merge join algorithm), or build a hash table from one input before consuming the other (i.e. build/probe phase of the hash join algorithm).

    Both DataStream API and Table API offer a specialized batch runtime mode.

    The following example illustrates that the unified pipeline is able to process both batch and streaming data by just switching a flag.

    Java

    1. import org.apache.flink.api.common.RuntimeExecutionMode;
    2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    3. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    4. // setup DataStream API
    5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    6. // set the batch runtime mode
    7. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    8. // uncomment this for streaming mode
    9. // env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    10. // setup Table API
    11. // the table environment adopts the runtime mode during initialization
    12. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    13. // define the same pipeline as above
    14. // prints in BATCH mode:
    15. // +I[Bob, 10]
    16. // +I[Alice, 112]
    17. // prints in STREAMING mode:
    18. // +I[Alice, 12]
    19. // +I[Bob, 10]
    20. // -U[Alice, 12]
    21. // +U[Alice, 112]

    Scala

    1. import org.apache.flink.api.common.RuntimeExecutionMode
    2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    3. import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    4. // setup DataStream API
    5. val env = StreamExecutionEnvironment.getExecutionEnvironment()
    6. // set the batch runtime mode
    7. env.setRuntimeMode(RuntimeExecutionMode.BATCH)
    8. // uncomment this for streaming mode
    9. // env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    10. // setup Table API
    11. // the table environment adopts the runtime mode during initialization
    12. val tableEnv = StreamTableEnvironment.create(env)
    13. // define the same pipeline as above
    14. // prints in BATCH mode:
    15. // +I[Bob, 10]
    16. // +I[Alice, 112]
    17. // prints in STREAMING mode:
    18. // +I[Alice, 12]
    19. // +I[Bob, 10]
    20. // -U[Alice, 12]
    21. // +U[Alice, 112]

    Once the changelog is applied to an external system (e.g. a key-value store), one can see that both modes are able to produce exactly the same output table. By consuming all input data before emitting results, the changelog of the batch mode consists solely of insert-only changes. See also the dedicated batch mode section below for more insights.

    Projects that combine Table API with DataStream API need to add one of the following bridging modules. They include transitive dependencies to flink-table-api-java or flink-table-api-scala and the corresponding language-specific DataStream API module.

    Java

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    4. <version>1.15.0</version>
    5. <scope>provided</scope>
    6. </dependency>

    Scala

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
    4. <version>1.15.0</version>
    5. <scope>provided</scope>
    6. </dependency>

    The following imports are required to declare common pipelines using either the Java or Scala version of both DataStream API and Table API.

    Java

    1. // imports for Java DataStream API
    2. import org.apache.flink.streaming.api.*;
    3. import org.apache.flink.streaming.api.environment.*;
    4. // imports for Table API with bridging to Java DataStream API
    5. import org.apache.flink.table.api.*;
    6. import org.apache.flink.table.api.bridge.java.*;

    Scala

    1. // imports for Scala DataStream API
    2. import org.apache.flink.api.scala._
    3. import org.apache.flink.streaming.api.scala._
    4. // imports for Table API with bridging to Scala DataStream API
    5. import org.apache.flink.table.api._
    6. import org.apache.flink.table.api.bridge.scala._

    Please refer to the section for more information.

    Configuration

    The TableEnvironment will adopt all configuration options from the passed StreamExecutionEnvironment. However, it cannot be guaranteed that further changes to the configuration of StreamExecutionEnvironment are propagated to the StreamTableEnvironment after its instantiation. The propagation of options from Table API to DataStream API happens during planning.

    We recommend setting all configuration options in DataStream API early before switching to Table API.

    Java

    1. import java.time.ZoneId;
    2. import org.apache.flink.streaming.api.CheckpointingMode;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    5. // create Java DataStream API
    6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    7. // set various configuration early
    8. env.setMaxParallelism(256);
    9. env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);
    10. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    11. // then switch to Java Table API
    12. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    13. // set configuration early
    14. tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));
    15. // start defining your pipelines in both APIs...

    Scala

    1. import java.time.ZoneId
    2. import org.apache.flink.api.scala._
    3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    4. import org.apache.flink.streaming.api.CheckpointingMode
    5. import org.apache.flink.table.api.bridge.scala._
    6. // create Scala DataStream API
    7. val env = StreamExecutionEnvironment.getExecutionEnvironment
    8. // set various configuration early
    9. env.setMaxParallelism(256)
    10. env.getConfig.addDefaultKryoSerializer(classOf[MyCustomType], classOf[CustomKryoSerializer])
    11. env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    12. // then switch to Scala Table API
    13. val tableEnv = StreamTableEnvironment.create(env)
    14. // set configuration early
    15. tableEnv.getConfig.setLocalTimeZone(ZoneId.of("Europe/Berlin"))
    16. // start defining your pipelines in both APIs...

    Python

    1. from pyflink.datastream import StreamExecutionEnvironment
    2. from pyflink.table import StreamTableEnvironment
    3. from pyflink.datastream.checkpointing_mode import CheckpointingMode
    4. # create Python DataStream API
    5. env = StreamExecutionEnvironment.get_execution_environment()
    6. # set various configuration early
    7. env.set_max_parallelism(256)
    8. env.get_config().add_default_kryo_serializer("type_class_name", "serializer_class_name")
    9. env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
    10. # then switch to Python Table API
    11. t_env = StreamTableEnvironment.create(env)
    12. # set configuration early
    13. t_env.get_config().set_local_timezone("Europe/Berlin")
    14. # start defining your pipelines in both APIs...

    Execution Behavior

    Both APIs provide methods to execute pipelines. In other words: if requested, they compile a job graph that will be submitted to the cluster and triggered for execution. Results will be streamed to the declared sinks.

    Usually, both APIs mark such behavior with the term execute in method names. However, the execution behavior is slightly different between Table API and DataStream API.

    DataStream API

    The DataStream API’s StreamExecutionEnvironment uses a builder pattern to construct a complex pipeline. The pipeline possibly splits into multiple branches that might or might not end with a sink. The environment buffers all these defined branches until it comes to job submission.

    StreamExecutionEnvironment.execute() submits the entire constructed pipeline and clears the builder afterward. In other words: no sources and sinks are declared anymore, and a new pipeline can be added to the builder. Thus, every DataStream program usually ends with a call to StreamExecutionEnvironment.execute(). Alternatively, DataStream.executeAndCollect() implicitly defines a sink for streaming the results to the local client.

    Table API

    In the Table API, branching pipelines are only supported within a StatementSet where each branch must declare a final sink. Both TableEnvironment and also StreamTableEnvironment do not offer a dedicated general execute() method. Instead, they offer methods for submitting a single source-to-sink pipeline or a statement set:

    1. // execute with explicit sink
    2. tableEnv.from("InputTable").insertInto("OutputTable").execute();
    3. tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");
    4. tableEnv.createStatementSet()
    5. .add(tableEnv.from("InputTable").insertInto("OutputTable"))
    6. .add(tableEnv.from("InputTable").insertInto("OutputTable2"))
    7. .execute();
    8. tableEnv.createStatementSet()
    9. .addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable")
    10. .addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable")
    11. .execute();
    12. // execute with implicit local sink
    13. tableEnv.from("InputTable").execute().print();
    14. tableEnv.executeSql("SELECT * FROM InputTable").print();

    To combine both execution behaviors, every call to StreamTableEnvironment.toDataStream or StreamTableEnvironment.toChangelogStream will materialize (i.e. compile) the Table API sub-pipeline and insert it into the DataStream API pipeline builder. This means that StreamExecutionEnvironment.execute() or DataStream.executeAndCollect must be called afterwards. An execution in Table API will not trigger these “external parts”.

    1. // (1)
    2. // adds a branch with a printing sink to the StreamExecutionEnvironment
    3. tableEnv.toDataStream(table).print();
    4. // (2)
    5. // executes a Table API end-to-end pipeline as a Flink job and prints locally,
    6. // thus (1) has still not been executed
    7. table.execute().print();
    8. // executes the DataStream API pipeline with the sink defined in (1) as a
    9. // Flink job, (2) was already running before
    10. env.execute();

    Back to top

    Batch Runtime Mode

    The batch runtime mode is a specialized execution mode for bounded Flink programs.

    Generally speaking, boundedness is a property of a data source that tells us whether all the records coming from that source are known before execution or whether new data will show up, potentially indefinitely. A job, in turn, is bounded if all its sources are bounded, and unbounded otherwise.

    Streaming runtime mode, on the other hand, can be used for both bounded and unbounded jobs.

    For more information on the different execution modes, see also the corresponding DataStream API section.

    The Table API & SQL planner provides a set of specialized optimizer rules and runtime operators for either of the two modes.

    Currently, the runtime mode is not derived automatically from sources, thus, it must be set explicitly or will be adopted from StreamExecutionEnvironment when instantiating a StreamTableEnvironment:

    Java

    1. import org.apache.flink.api.common.RuntimeExecutionMode;
    2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    3. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    4. import org.apache.flink.table.api.EnvironmentSettings;
    5. // adopt mode from StreamExecutionEnvironment
    6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    7. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    8. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    9. // or
    10. // set mode explicitly for StreamTableEnvironment
    11. // it will be propagated to StreamExecutionEnvironment during planning
    12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    13. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode());

    Scala

    1. import org.apache.flink.api.common.RuntimeExecutionMode
    2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    3. import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    4. import org.apache.flink.table.api.EnvironmentSettings
    5. // adopt mode from StreamExecutionEnvironment
    6. val env = StreamExecutionEnvironment.getExecutionEnvironment
    7. env.setRuntimeMode(RuntimeExecutionMode.BATCH)
    8. val tableEnv = StreamTableEnvironment.create(env)
    9. // or
    10. // set mode explicitly for StreamTableEnvironment
    11. val env = StreamExecutionEnvironment.getExecutionEnvironment
    12. val tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode)

    One must meet the following prerequisites before setting the runtime mode to BATCH:

    • All sources must declare themselves as bounded.

    • Currently, table sources must emit insert-only changes.

    • Operators need a sufficient amount of for sorting and other intermediate results.

    • All table operations must be available in batch mode. Currently, some of them are only available in streaming mode. Please check the corresponding Table API & SQL pages.

    A batch execution has the following implications (among others):

    • Progressive watermarks are neither generated nor used in operators. However, sources emit a maximum watermark before shutting down.

    • Exchanges between tasks might be blocking according to the execution.batch-shuffle-mode. This also means potentially less resource requirements compared to executing the same pipeline in streaming mode.

    • Checkpointing is disabled. Artificial state backends are inserted.

    • Table operations don’t produce incremental updates but only a complete final result which converts to an insert-only changelog stream.

    Since batch processing can be considered as a special case of stream processing, we recommend implementing a streaming pipeline first as it is the most general implementation for both bounded and unbounded data.

    In theory, a streaming pipeline can execute all operators. However, in practice, some operations might not make much sense as they would lead to ever-growing state and are therefore not supported. A global sort would be an example that is only available in batch mode. Simply put: it should be possible to run a working streaming pipeline in batch mode but not necessarily vice versa.

    The following example shows how to play around with batch mode using the . Many sources offer options that implicitly make the connector bounded, for example, by defining a terminating offset or timestamp. In our example, we limit the number of rows with the number-of-rows option.

    Java

    1. import org.apache.flink.table.api.DataTypes;
    2. import org.apache.flink.table.api.Schema;
    3. import org.apache.flink.table.api.Table;
    4. import org.apache.flink.table.api.TableDescriptor;
    5. Table table =
    6. tableEnv.from(
    7. TableDescriptor.forConnector("datagen")
    8. .option("number-of-rows", "10") // make the source bounded
    9. .schema(
    10. Schema.newBuilder()
    11. .column("uid", DataTypes.TINYINT())
    12. .column("payload", DataTypes.STRING())
    13. .build())
    14. .build());
    15. // convert the Table to a DataStream and further transform the pipeline
    16. tableEnv.toDataStream(table)
    17. .keyBy(r -> r.<Byte>getFieldAs("uid"))
    18. .map(r -> "My custom operator: " + r.<String>getFieldAs("payload"))
    19. .executeAndCollect()
    20. .forEachRemaining(System.out::println);
    21. // prints:
    22. // My custom operator: 9660912d30a43c7b035e15bd...
    23. // My custom operator: 29f5f706d2144f4a4f9f52a0...
    24. // ...

    Scala

    Changelog Unification

    Time-based operations that rely on event-time and leverage watermarks as a completeness marker are able to produce an insert-only changelog stream that is independent of the runtime mode.

    The following Java example illustrates a Flink program that is not only unified on an API level but also in the resulting changelog stream. The example joins two tables in SQL (UserTable and OrderTable) using an interval join based on the time attributes in both tables (ts). It uses DataStream API to implement a custom operator that deduplicates the user name using a KeyedProcessFunction and value state.

    1. import org.apache.flink.api.common.RuntimeExecutionMode;
    2. import org.apache.flink.api.common.state.ValueState;
    3. import org.apache.flink.api.common.state.ValueStateDescriptor;
    4. import org.apache.flink.api.common.typeinfo.Types;
    5. import org.apache.flink.configuration.Configuration;
    6. import org.apache.flink.streaming.api.datastream.DataStream;
    7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    8. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    9. import org.apache.flink.table.api.DataTypes;
    10. import org.apache.flink.table.api.Schema;
    11. import org.apache.flink.table.api.Table;
    12. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    13. import org.apache.flink.types.Row;
    14. import org.apache.flink.util.Collector;
    15. // setup DataStream API
    16. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    17. // use BATCH or STREAMING mode
    18. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    19. // setup Table API
    20. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    21. // create a user stream
    22. DataStream<Row> userStream = env
    23. .fromElements(
    24. Row.of(LocalDateTime.parse("2021-08-21T13:00:00"), 1, "Alice"),
    25. Row.of(LocalDateTime.parse("2021-08-21T13:05:00"), 2, "Bob"),
    26. Row.of(LocalDateTime.parse("2021-08-21T13:10:00"), 2, "Bob"))
    27. .returns(
    28. Types.ROW_NAMED(
    29. new String[] {"ts", "uid", "name"},
    30. Types.LOCAL_DATE_TIME, Types.INT, Types.STRING));
    31. // create an order stream
    32. DataStream<Row> orderStream = env
    33. .fromElements(
    34. Row.of(LocalDateTime.parse("2021-08-21T13:02:00"), 1, 122),
    35. Row.of(LocalDateTime.parse("2021-08-21T13:07:00"), 2, 239),
    36. Row.of(LocalDateTime.parse("2021-08-21T13:11:00"), 2, 999))
    37. .returns(
    38. Types.ROW_NAMED(
    39. new String[] {"ts", "uid", "amount"},
    40. Types.LOCAL_DATE_TIME, Types.INT, Types.INT));
    41. // create corresponding tables
    42. tableEnv.createTemporaryView(
    43. "UserTable",
    44. userStream,
    45. Schema.newBuilder()
    46. .column("ts", DataTypes.TIMESTAMP(3))
    47. .column("uid", DataTypes.INT())
    48. .column("name", DataTypes.STRING())
    49. .watermark("ts", "ts - INTERVAL '1' SECOND")
    50. .build());
    51. tableEnv.createTemporaryView(
    52. "OrderTable",
    53. orderStream,
    54. Schema.newBuilder()
    55. .column("ts", DataTypes.TIMESTAMP(3))
    56. .column("uid", DataTypes.INT())
    57. .column("amount", DataTypes.INT())
    58. .watermark("ts", "ts - INTERVAL '1' SECOND")
    59. .build());
    60. // perform interval join
    61. Table joinedTable =
    62. tableEnv.sqlQuery(
    63. "SELECT U.name, O.amount " +
    64. "FROM UserTable U, OrderTable O " +
    65. "WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES");
    66. DataStream<Row> joinedStream = tableEnv.toDataStream(joinedTable);
    67. joinedStream.print();
    68. // implement a custom operator using ProcessFunction and value state
    69. joinedStream
    70. .keyBy(r -> r.<String>getFieldAs("name"))
    71. .process(
    72. new KeyedProcessFunction<String, Row, String>() {
    73. ValueState<String> seen;
    74. @Override
    75. public void open(Configuration parameters) {
    76. seen = getRuntimeContext().getState(
    77. new ValueStateDescriptor<>("seen", String.class));
    78. }
    79. @Override
    80. public void processElement(Row row, Context ctx, Collector<String> out)
    81. throws Exception {
    82. String name = row.getFieldAs("name");
    83. if (seen.value() == null) {
    84. seen.update(name);
    85. out.collect(name);
    86. }
    87. }
    88. })
    89. .print();
    90. // execute unified pipeline
    91. env.execute();
    92. // prints (in both BATCH and STREAMING mode):
    93. // +I[Bob, 239]
    94. // +I[Alice, 122]
    95. // +I[Bob, 999]
    96. //
    97. // Bob
    98. // Alice

    A StreamTableEnvironment offers the following methods to convert from and to DataStream API:

    • fromDataStream(DataStream): Interprets a stream of insert-only changes and arbitrary type as a table. Event-time and watermarks are not propagated by default.

    • fromDataStream(DataStream, Schema): Interprets a stream of insert-only changes and arbitrary type as a table. The optional schema allows to enrich column data types and add time attributes, watermarks strategies, other computed columns, or primary keys.

    • createTemporaryView(String, DataStream): Registers the stream under a name to access it in SQL. It is a shortcut for createTemporaryView(String, fromDataStream(DataStream)).

    • createTemporaryView(String, DataStream, Schema): Registers the stream under a name to access it in SQL. It is a shortcut for createTemporaryView(String, fromDataStream(DataStream, Schema)).

    • toDataStream(Table): Converts a table into a stream of insert-only changes. The default stream record type is org.apache.flink.types.Row. A single rowtime attribute column is written back into the DataStream API’s record. Watermarks are propagated as well.

    • toDataStream(Table, AbstractDataType): Converts a table into a stream of insert-only changes. This method accepts a data type to express the desired stream record type. The planner might insert implicit casts and reorders columns to map columns to fields of the (possibly nested) data type.

    • toDataStream(Table, Class): A shortcut for toDataStream(Table, DataTypes.of(Class)) to quickly create the desired data type reflectively.

    From a Table API’s perspective, converting from and to DataStream API is similar to reading from or writing to a virtual table connector that has been defined using a CREATE TABLE DDL in SQL.

    The schema part in the virtual CREATE TABLE name (schema) WITH (options) statement can be automatically derived from the DataStream’s type information, enriched, or entirely defined manually using org.apache.flink.table.api.Schema.

    The virtual DataStream table connector exposes the following metadata for every row:

    The virtual DataStream table source implements and thus allows calling the SOURCE_WATERMARK() built-in function as a watermark strategy to adopt watermarks from the DataStream API.

    The following code shows how to use fromDataStream for different scenarios.

    Java

    1. import org.apache.flink.streaming.api.datastream.DataStream;
    2. import org.apache.flink.table.api.Schema;
    3. import org.apache.flink.table.api.Table;
    4. import java.time.Instant;
    5. // some example POJO
    6. public static class User {
    7. public String name;
    8. public Integer score;
    9. public Instant event_time;
    10. // default constructor for DataStream API
    11. public User() {}
    12. // fully assigning constructor for Table API
    13. public User(String name, Integer score, Instant event_time) {
    14. this.name = name;
    15. this.score = score;
    16. this.event_time = event_time;
    17. }
    18. }
    19. // create a DataStream
    20. DataStream<User> dataStream =
    21. env.fromElements(
    22. new User("Alice", 4, Instant.ofEpochMilli(1000)),
    23. new User("Bob", 6, Instant.ofEpochMilli(1001)),
    24. new User("Alice", 10, Instant.ofEpochMilli(1002)));
    25. // === EXAMPLE 1 ===
    26. // derive all physical columns automatically
    27. Table table = tableEnv.fromDataStream(dataStream);
    28. table.printSchema();
    29. // prints:
    30. // (
    31. // `name` STRING,
    32. // `score` INT,
    33. // `event_time` TIMESTAMP_LTZ(9)
    34. // )
    35. // === EXAMPLE 2 ===
    36. // derive all physical columns automatically
    37. // but add computed columns (in this case for creating a proctime attribute column)
    38. Table table = tableEnv.fromDataStream(
    39. dataStream,
    40. Schema.newBuilder()
    41. .columnByExpression("proc_time", "PROCTIME()")
    42. .build());
    43. table.printSchema();
    44. // prints:
    45. // (
    46. // `name` STRING,
    47. // `score` INT NOT NULL,
    48. // `event_time` TIMESTAMP_LTZ(9),
    49. // `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
    50. //)
    51. // === EXAMPLE 3 ===
    52. // derive all physical columns automatically
    53. // but add computed columns (in this case for creating a rowtime attribute column)
    54. // and a custom watermark strategy
    55. Table table =
    56. tableEnv.fromDataStream(
    57. dataStream,
    58. Schema.newBuilder()
    59. .columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
    60. .watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
    61. .build());
    62. table.printSchema();
    63. // prints:
    64. // (
    65. // `name` STRING,
    66. // `score` INT,
    67. // `event_time` TIMESTAMP_LTZ(9),
    68. // `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
    69. // WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
    70. // )
    71. // === EXAMPLE 4 ===
    72. // derive all physical columns automatically
    73. // but access the stream record's timestamp for creating a rowtime attribute column
    74. // also rely on the watermarks generated in the DataStream API
    75. // we assume that a watermark strategy has been defined for `dataStream` before
    76. // (not part of this example)
    77. Table table =
    78. tableEnv.fromDataStream(
    79. dataStream,
    80. Schema.newBuilder()
    81. .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
    82. .watermark("rowtime", "SOURCE_WATERMARK()")
    83. .build());
    84. table.printSchema();
    85. // prints:
    86. // (
    87. // `name` STRING,
    88. // `score` INT,
    89. // `event_time` TIMESTAMP_LTZ(9),
    90. // `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
    91. // WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
    92. // )
    93. // === EXAMPLE 5 ===
    94. // define physical columns manually
    95. // in this example,
    96. // - we can reduce the default precision of timestamps from 9 to 3
    97. // - we also project the columns and put `event_time` to the beginning
    98. Table table =
    99. tableEnv.fromDataStream(
    100. dataStream,
    101. Schema.newBuilder()
    102. .column("event_time", "TIMESTAMP_LTZ(3)")
    103. .column("name", "STRING")
    104. .column("score", "INT")
    105. .watermark("event_time", "SOURCE_WATERMARK()")
    106. .build());
    107. table.printSchema();
    108. // prints:
    109. // (
    110. // `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
    111. // `name` VARCHAR(200),
    112. // `score` INT
    113. // )
    114. // note: the watermark strategy is not shown due to the inserted column reordering projection

    Scala

    1. import org.apache.flink.api.scala._
    2. import java.time.Instant
    3. // some example case class
    4. case class User(name: String, score: java.lang.Integer, event_time: java.time.Instant)
    5. // create a DataStream
    6. val dataStream = env.fromElements(
    7. User("Alice", 4, Instant.ofEpochMilli(1000)),
    8. User("Bob", 6, Instant.ofEpochMilli(1001)),
    9. User("Alice", 10, Instant.ofEpochMilli(1002)))
    10. // === EXAMPLE 1 ===
    11. // derive all physical columns automatically
    12. val table = tableEnv.fromDataStream(dataStream)
    13. table.printSchema()
    14. // prints:
    15. // (
    16. // `name` STRING,
    17. // `score` INT,
    18. // `event_time` TIMESTAMP_LTZ(9)
    19. // )
    20. // === EXAMPLE 2 ===
    21. // derive all physical columns automatically
    22. // but add computed columns (in this case for creating a proctime attribute column)
    23. val table = tableEnv.fromDataStream(
    24. dataStream,
    25. Schema.newBuilder()
    26. .columnByExpression("proc_time", "PROCTIME()")
    27. .build())
    28. table.printSchema()
    29. // prints:
    30. // (
    31. // `name` STRING,
    32. // `score` INT NOT NULL,
    33. // `event_time` TIMESTAMP_LTZ(9),
    34. // `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
    35. //)
    36. // === EXAMPLE 3 ===
    37. // derive all physical columns automatically
    38. // but add computed columns (in this case for creating a rowtime attribute column)
    39. // and a custom watermark strategy
    40. val table =
    41. tableEnv.fromDataStream(
    42. dataStream,
    43. .columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
    44. .watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
    45. .build())
    46. table.printSchema()
    47. // prints:
    48. // (
    49. // `name` STRING,
    50. // `score` INT,
    51. // `event_time` TIMESTAMP_LTZ(9),
    52. // `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
    53. // WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
    54. // )
    55. // === EXAMPLE 4 ===
    56. // derive all physical columns automatically
    57. // but access the stream record's timestamp for creating a rowtime attribute column
    58. // also rely on the watermarks generated in the DataStream API
    59. // we assume that a watermark strategy has been defined for `dataStream` before
    60. // (not part of this example)
    61. val table =
    62. tableEnv.fromDataStream(
    63. dataStream,
    64. Schema.newBuilder()
    65. .watermark("rowtime", "SOURCE_WATERMARK()")
    66. .build())
    67. table.printSchema()
    68. // prints:
    69. // (
    70. // `name` STRING,
    71. // `score` INT,
    72. // `event_time` TIMESTAMP_LTZ(9),
    73. // `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
    74. // WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
    75. // )
    76. // === EXAMPLE 5 ===
    77. // define physical columns manually
    78. // in this example,
    79. // - we can reduce the default precision of timestamps from 9 to 3
    80. // - we also project the columns and put `event_time` to the beginning
    81. val table =
    82. tableEnv.fromDataStream(
    83. dataStream,
    84. Schema.newBuilder()
    85. .column("event_time", "TIMESTAMP_LTZ(3)")
    86. .column("name", "STRING")
    87. .column("score", "INT")
    88. .watermark("event_time", "SOURCE_WATERMARK()")
    89. .build())
    90. table.printSchema()
    91. // prints:
    92. // (
    93. // `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
    94. // `name` VARCHAR(200),
    95. // `score` INT
    96. // )
    97. // note: the watermark strategy is not shown due to the inserted column reordering projection

    Python

    1. from pyflink.common.time import Instant
    2. from pyflink.common.types import Row
    3. from pyflink.common.typeinfo import Types
    4. from pyflink.datastream import StreamExecutionEnvironment
    5. from pyflink.table import StreamTableEnvironment, Schema
    6. env = StreamExecutionEnvironment.get_execution_environment()
    7. t_env = StreamTableEnvironment.create(env)
    8. ds = env.from_collection([
    9. Row("Alice", 12, Instant.of_epoch_milli(1000)),
    10. Row("Bob", 5, Instant.of_epoch_milli(1001)),
    11. Row("Alice", 10, Instant.of_epoch_milli(1002))],
    12. type_info=Types.ROW_NAMED(['name', 'score', 'event_time'], [Types.STRING(), Types.INT(), Types.INSTANT()]))
    13. # === EXAMPLE 1 ===
    14. # derive all physical columns automatically
    15. table = t_env.from_data_stream(ds)
    16. table.print_schema()
    17. # prints:
    18. # (
    19. # `name` STRING,
    20. # `score` INT,
    21. # `event_time` TIMESTAMP_LTZ(9)
    22. # )
    23. # === EXAMPLE 2 ===
    24. # derive all physical columns automatically
    25. # but add computed columns (in this case for creating a proctime attribute column)
    26. table = t_env.from_data_stream(
    27. ds,
    28. Schema.new_builder()
    29. .column_by_expression("proc_time", "PROCTIME()")
    30. .build())
    31. table.print_schema()
    32. # prints:
    33. # (
    34. # `name` STRING,
    35. # `score` INT,
    36. # `event_time` TIMESTAMP_LTZ(9),
    37. # `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
    38. # )
    39. # === EXAMPLE 3 ===
    40. # derive all physical columns automatically
    41. # but add computed columns (in this case for creating a rowtime attribute column)
    42. # and a custom watermark strategy
    43. table = t_env.from_data_stream(
    44. ds,
    45. Schema.new_builder()
    46. .column_by_expression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
    47. .watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
    48. .build())
    49. table.print_schema()
    50. # prints:
    51. # (
    52. # `name` STRING,
    53. # `score` INT,
    54. # `event_time` TIMESTAMP_LTZ(9),
    55. # `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
    56. # WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
    57. # )
    58. # === EXAMPLE 4 ===
    59. # derive all physical columns automatically
    60. # but access the stream record's timestamp for creating a rowtime attribute column
    61. # also rely on the watermarks generated in the DataStream API
    62. # we assume that a watermark strategy has been defined for `dataStream` before
    63. # (not part of this example)
    64. table = t_env.from_data_stream(
    65. ds,
    66. Schema.new_builder()
    67. .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)")
    68. .watermark("rowtime", "SOURCE_WATERMARK()")
    69. .build())
    70. table.print_schema()
    71. # prints:
    72. # (
    73. # `name` STRING,
    74. # `score` INT,
    75. # `event_time` TIMESTAMP_LTZ(9),
    76. # `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
    77. # WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
    78. # )
    79. # === EXAMPLE 5 ===
    80. # define physical columns manually
    81. # in this example,
    82. # - we can reduce the default precision of timestamps from 9 to 3
    83. # - we also project the columns and put `event_time` to the beginning
    84. table = t_env.from_data_stream(
    85. ds,
    86. Schema.new_builder()
    87. .column("event_time", "TIMESTAMP_LTZ(3)")
    88. .column("name", "STRING")
    89. .column("score", "INT")
    90. .watermark("event_time", "SOURCE_WATERMARK()")
    91. .build())
    92. table.print_schema()
    93. # prints:
    94. # (
    95. # `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
    96. # `name` STRING,
    97. # `score` INT
    98. # )
    99. # note: the watermark strategy is not shown due to the inserted column reordering projection

    Example 1 illustrates a simple use case when no time-based operations are needed.

    Example 4 is the most common use case when time-based operations such as windows or interval joins should be part of the pipeline. Example 2 is the most common use case when these time-based operations should work in processing time.

    Example 5 entirely relies on the declaration of the user. This can be useful to replace generic types from the DataStream API (which would be RAW in the Table API) with proper data types.

    Since DataType is richer than TypeInformation, we can easily enable immutable POJOs and other complex data structures. The following example in Java shows what is possible. Check also the Data Types & Serialization page of the DataStream API for more information about the supported types there.

    1. import org.apache.flink.streaming.api.datastream.DataStream;
    2. import org.apache.flink.table.api.DataTypes;
    3. import org.apache.flink.table.api.Schema;
    4. import org.apache.flink.table.api.Table;
    5. // the DataStream API does not support immutable POJOs yet,
    6. // the class will result in a generic type that is a RAW type in Table API by default
    7. public static class User {
    8. public final String name;
    9. public final Integer score;
    10. public User(String name, Integer score) {
    11. this.name = name;
    12. this.score = score;
    13. }
    14. }
    15. // create a DataStream
    16. DataStream<User> dataStream = env.fromElements(
    17. new User("Alice", 4),
    18. new User("Bob", 6),
    19. new User("Alice", 10));
    20. // since fields of a RAW type cannot be accessed, every stream record is treated as an atomic type
    21. // leading to a table with a single column `f0`
    22. Table table = tableEnv.fromDataStream(dataStream);
    23. table.printSchema();
    24. // prints:
    25. // (
    26. // `f0` RAW('User', '...')
    27. // )
    28. // instead, declare a more useful data type for columns using the Table API's type system
    29. // in a custom schema and rename the columns in a following `as` projection
    30. Table table = tableEnv
    31. .fromDataStream(
    32. dataStream,
    33. Schema.newBuilder()
    34. .column("f0", DataTypes.of(User.class))
    35. .build())
    36. .as("user");
    37. table.printSchema();
    38. // prints:
    39. // (
    40. // `user` *User<`name` STRING,`score` INT>*
    41. // )
    42. // data types can be extracted reflectively as above or explicitly defined
    43. Table table3 = tableEnv
    44. .fromDataStream(
    45. dataStream,
    46. Schema.newBuilder()
    47. .column(
    48. "f0",
    49. DataTypes.STRUCTURED(
    50. User.class,
    51. DataTypes.FIELD("name", DataTypes.STRING()),
    52. DataTypes.FIELD("score", DataTypes.INT())))
    53. .build())
    54. .as("user");
    55. table.printSchema();
    56. // prints:
    57. // (
    58. // `user` *User<`name` STRING,`score` INT>*
    59. // )

    Examples for createTemporaryView

    A DataStream can be registered directly as a view (possibly enriched with a schema).

    The following code shows how to use createTemporaryView for different scenarios.

    Java

    1. import org.apache.flink.api.java.tuple.Tuple2;
    2. import org.apache.flink.streaming.api.datastream.DataStream;
    3. // create some DataStream
    4. DataStream<Tuple2<Long, String>> dataStream = env.fromElements(
    5. Tuple2.of(12L, "Alice"),
    6. Tuple2.of(0L, "Bob"));
    7. // === EXAMPLE 1 ===
    8. // register the DataStream as view "MyView" in the current session
    9. // all columns are derived automatically
    10. tableEnv.createTemporaryView("MyView", dataStream);
    11. tableEnv.from("MyView").printSchema();
    12. // prints:
    13. // (
    14. // `f0` BIGINT NOT NULL,
    15. // `f1` STRING
    16. // )
    17. // === EXAMPLE 2 ===
    18. // register the DataStream as view "MyView" in the current session,
    19. // provide a schema to adjust the columns similar to `fromDataStream`
    20. // in this example, the derived NOT NULL information has been removed
    21. tableEnv.createTemporaryView(
    22. "MyView",
    23. dataStream,
    24. Schema.newBuilder()
    25. .column("f0", "BIGINT")
    26. .column("f1", "STRING")
    27. .build());
    28. tableEnv.from("MyView").printSchema();
    29. // prints:
    30. // (
    31. // `f0` BIGINT,
    32. // `f1` STRING
    33. // )
    34. // === EXAMPLE 3 ===
    35. // use the Table API before creating the view if it is only about renaming columns
    36. tableEnv.createTemporaryView(
    37. "MyView",
    38. tableEnv.fromDataStream(dataStream).as("id", "name"));
    39. tableEnv.from("MyView").printSchema();
    40. // prints:
    41. // (
    42. // `id` BIGINT NOT NULL,
    43. // `name` STRING
    44. // )

    Scala

    1. // create some DataStream
    2. val dataStream: DataStream[(Long, String)] = env.fromElements(
    3. (12L, "Alice"),
    4. (0L, "Bob"))
    5. // === EXAMPLE 1 ===
    6. // register the DataStream as view "MyView" in the current session
    7. // all columns are derived automatically
    8. tableEnv.createTemporaryView("MyView", dataStream)
    9. tableEnv.from("MyView").printSchema()
    10. // prints:
    11. // (
    12. // `_1` BIGINT NOT NULL,
    13. // `_2` STRING
    14. // )
    15. // === EXAMPLE 2 ===
    16. // register the DataStream as view "MyView" in the current session,
    17. // provide a schema to adjust the columns similar to `fromDataStream`
    18. // in this example, the derived NOT NULL information has been removed
    19. tableEnv.createTemporaryView(
    20. "MyView",
    21. dataStream,
    22. Schema.newBuilder()
    23. .column("_1", "BIGINT")
    24. .column("_2", "STRING")
    25. .build())
    26. tableEnv.from("MyView").printSchema()
    27. // prints:
    28. // (
    29. // `_1` BIGINT,
    30. // `_2` STRING
    31. // )
    32. // === EXAMPLE 3 ===
    33. // use the Table API before creating the view if it is only about renaming columns
    34. tableEnv.createTemporaryView(
    35. "MyView",
    36. tableEnv.fromDataStream(dataStream).as("id", "name"))
    37. tableEnv.from("MyView").printSchema()
    38. // prints:
    39. // (
    40. // `id` BIGINT NOT NULL,
    41. // `name` STRING
    42. // )

    Python

    1. from pyflink.common.typeinfo import Types
    2. from pyflink.datastream import StreamExecutionEnvironment
    3. from pyflink.table import DataTypes, StreamTableEnvironment, Schema
    4. env = StreamExecutionEnvironment.get_execution_environment()
    5. t_env = StreamTableEnvironment.create(env)
    6. ds = env.from_collection([(12, "Alice"), (0, "Bob")], type_info=Types.TUPLE([Types.LONG(), Types.STRING()]))
    7. # === EXAMPLE 1 ===
    8. # register the DataStream as view "MyView" in the current session
    9. # all columns are derived automatically
    10. t_env.create_temporary_view("MyView", ds)
    11. t_env.from_path("MyView").print_schema()
    12. # prints:
    13. # (
    14. # `f0` BIGINT NOT NULL,
    15. # `f1` STRING
    16. # )
    17. # === EXAMPLE 2 ===
    18. # register the DataStream as view "MyView" in the current session,
    19. # provide a schema to adjust the columns similar to `fromDataStream`
    20. # in this example, the derived NOT NULL information has been removed
    21. t_env.create_temporary_view(
    22. "MyView",
    23. ds,
    24. Schema.new_builder()
    25. .column("f0", "BIGINT")
    26. .column("f1", "STRING")
    27. .build())
    28. t_env.from_path("MyView").print_schema()
    29. # prints:
    30. # (
    31. # `f0` BIGINT,
    32. # `f1` STRING
    33. # )
    34. # === EXAMPLE 3 ===
    35. # use the Table API before creating the view if it is only about renaming columns
    36. t_env.create_temporary_view(
    37. "MyView",
    38. t_env.from_data_stream(ds).alias("id", "name"))
    39. t_env.from_path("MyView").print_schema()
    40. # prints:
    41. # (
    42. # `id` BIGINT NOT NULL,
    43. # `name` STRING
    44. # )

    Back to top

    Examples for toDataStream

    The following code shows how to use toDataStream for different scenarios.

    Java

    1. import org.apache.flink.streaming.api.datastream.DataStream;
    2. import org.apache.flink.table.api.DataTypes;
    3. import org.apache.flink.table.api.Table;
    4. import org.apache.flink.types.Row;
    5. import java.time.Instant;
    6. // POJO with mutable fields
    7. // since no fully assigning constructor is defined, the field order
    8. // is alphabetical [event_time, name, score]
    9. public static class User {
    10. public String name;
    11. public Integer score;
    12. public Instant event_time;
    13. }
    14. tableEnv.executeSql(
    15. "CREATE TABLE GeneratedTable "
    16. + "("
    17. + " name STRING,"
    18. + " score INT,"
    19. + " event_time TIMESTAMP_LTZ(3),"
    20. + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
    21. + ")"
    22. + "WITH ('connector'='datagen')");
    23. Table table = tableEnv.from("GeneratedTable");
    24. // === EXAMPLE 1 ===
    25. // use the default conversion to instances of Row
    26. // since `event_time` is a single rowtime attribute, it is inserted into the DataStream
    27. // metadata and watermarks are propagated
    28. DataStream<Row> dataStream = tableEnv.toDataStream(table);
    29. // === EXAMPLE 2 ===
    30. // a data type is extracted from class `User`,
    31. // the planner reorders fields and inserts implicit casts where possible to convert internal
    32. // data structures to the desired structured type
    33. // since `event_time` is a single rowtime attribute, it is inserted into the DataStream
    34. // metadata and watermarks are propagated
    35. DataStream<User> dataStream = tableEnv.toDataStream(table, User.class);
    36. // data types can be extracted reflectively as above or explicitly defined
    37. DataStream<User> dataStream =
    38. tableEnv.toDataStream(
    39. table,
    40. DataTypes.STRUCTURED(
    41. User.class,
    42. DataTypes.FIELD("name", DataTypes.STRING()),
    43. DataTypes.FIELD("score", DataTypes.INT()),
    44. DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));

    Scala

    1. import org.apache.flink.streaming.api.scala.DataStream
    2. import org.apache.flink.table.api.DataTypes
    3. case class User(name: String, score: java.lang.Integer, event_time: java.time.Instant)
    4. tableEnv.executeSql(
    5. """
    6. CREATE TABLE GeneratedTable (
    7. name STRING,
    8. score INT,
    9. event_time TIMESTAMP_LTZ(3),
    10. WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
    11. )
    12. WITH ('connector'='datagen')
    13. """
    14. )
    15. val table = tableEnv.from("GeneratedTable")
    16. // === EXAMPLE 1 ===
    17. // use the default conversion to instances of Row
    18. // since `event_time` is a single rowtime attribute, it is inserted into the DataStream
    19. // metadata and watermarks are propagated
    20. val dataStream: DataStream[Row] = tableEnv.toDataStream(table)
    21. // === EXAMPLE 2 ===
    22. // a data type is extracted from class `User`,
    23. // the planner reorders fields and inserts implicit casts where possible to convert internal
    24. // data structures to the desired structured type
    25. // since `event_time` is a single rowtime attribute, it is inserted into the DataStream
    26. // metadata and watermarks are propagated
    27. val dataStream: DataStream[User] = tableEnv.toDataStream(table, classOf[User])
    28. // data types can be extracted reflectively as above or explicitly defined
    29. val dataStream: DataStream[User] =
    30. tableEnv.toDataStream(
    31. table,
    32. DataTypes.STRUCTURED(
    33. classOf[User],
    34. DataTypes.FIELD("name", DataTypes.STRING()),
    35. DataTypes.FIELD("score", DataTypes.INT()),
    36. DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))))

    Python

    1. t_env.execute_sql(
    2. "CREATE TABLE GeneratedTable "
    3. + "("
    4. + " name STRING,"
    5. + " score INT,"
    6. + " event_time TIMESTAMP_LTZ(3),"
    7. + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
    8. + ")"
    9. + "WITH ('connector'='datagen')");
    10. table = t_env.from_path("GeneratedTable");
    11. # === EXAMPLE 1 ===
    12. # use the default conversion to instances of Row
    13. # since `event_time` is a single rowtime attribute, it is inserted into the DataStream
    14. # metadata and watermarks are propagated
    15. ds = t_env.to_data_stream(table)

    Note that only non-updating tables are supported by toDataStream. Usually, time-based operations such as windows, interval joins, or the MATCH_RECOGNIZE clause are a good fit for insert-only pipelines next to simple operations like projections and filters.

    Pipelines with operations that produce updates can use toChangelogStream.

    Back to top

    Handling of Changelog Streams

    Internally, Flink’s table runtime is a changelog processor. The concepts page describes how dynamic tables and streams relate to each other.

    A StreamTableEnvironment offers the following methods to expose these change data capture (CDC) functionalities:

    • fromChangelogStream(DataStream): Interprets a stream of changelog entries as a table. The stream record type must be org.apache.flink.types.Row since its RowKind flag is evaluated during runtime. Event-time and watermarks are not propagated by default. This method expects a changelog containing all kinds of changes (enumerated in org.apache.flink.types.RowKind) as the default ChangelogMode.

    • fromChangelogStream(DataStream, Schema): Allows to define a schema for the DataStream similar to fromDataStream(DataStream, Schema). Otherwise the semantics are equal to fromChangelogStream(DataStream).

    • fromChangelogStream(DataStream, Schema, ChangelogMode): Gives full control about how to interpret a stream as a changelog. The passed ChangelogMode helps the planner to distinguish between insert-only, upsert, or retract behavior.

    • toChangelogStream(Table): Reverse operation of fromChangelogStream(DataStream). It produces a stream with instances of org.apache.flink.types.Row and sets the RowKind flag for every record at runtime. All kinds of updating tables are supported by this method. If the input table contains a single rowtime column, it will be propagated into a stream record’s timestamp. Watermarks will be propagated as well.

    • toChangelogStream(Table, Schema): Reverse operation of fromChangelogStream(DataStream, Schema). The method can enrich the produced column data types. The planner might insert implicit casts if necessary. It is possible to write out the rowtime as a metadata column.

    • toChangelogStream(Table, Schema, ChangelogMode): Gives full control about how to convert a table to a changelog stream. The passed ChangelogMode helps the planner to distinguish between insert-only, upsert, or retract behavior.

    From a Table API’s perspective, converting from and to DataStream API is similar to reading from or writing to a virtual table connector that has been defined using a in SQL.

    Because fromChangelogStream behaves similar to fromDataStream, we recommend reading the previous section before continuing here.

    This virtual connector also supports reading and writing the rowtime metadata of the stream record.

    The virtual table source implements .

    Examples for fromChangelogStream

    The following code shows how to use fromChangelogStream for different scenarios.

    Java

    1. import org.apache.flink.streaming.api.datastream.DataStream;
    2. import org.apache.flink.table.api.Schema;
    3. import org.apache.flink.table.api.Table;
    4. import org.apache.flink.table.connector.ChangelogMode;
    5. import org.apache.flink.types.Row;
    6. import org.apache.flink.types.RowKind;
    7. // === EXAMPLE 1 ===
    8. // interpret the stream as a retract stream
    9. // create a changelog DataStream
    10. DataStream<Row> dataStream =
    11. env.fromElements(
    12. Row.ofKind(RowKind.INSERT, "Alice", 12),
    13. Row.ofKind(RowKind.INSERT, "Bob", 5),
    14. Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
    15. Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
    16. // interpret the DataStream as a Table
    17. Table table = tableEnv.fromChangelogStream(dataStream);
    18. // register the table under a name and perform an aggregation
    19. tableEnv.createTemporaryView("InputTable", table);
    20. tableEnv
    21. .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
    22. .print();
    23. // prints:
    24. // +----+--------------------------------+-------------+
    25. // | op | name | score |
    26. // +----+--------------------------------+-------------+
    27. // | +I | Bob | 5 |
    28. // | +I | Alice | 12 |
    29. // | -D | Alice | 12 |
    30. // | +I | Alice | 100 |
    31. // +----+--------------------------------+-------------+
    32. // === EXAMPLE 2 ===
    33. // interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)
    34. // create a changelog DataStream
    35. DataStream<Row> dataStream =
    36. env.fromElements(
    37. Row.ofKind(RowKind.INSERT, "Alice", 12),
    38. Row.ofKind(RowKind.INSERT, "Bob", 5),
    39. Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
    40. // interpret the DataStream as a Table
    41. Table table =
    42. tableEnv.fromChangelogStream(
    43. dataStream,
    44. Schema.newBuilder().primaryKey("f0").build(),
    45. ChangelogMode.upsert());
    46. // register the table under a name and perform an aggregation
    47. tableEnv.createTemporaryView("InputTable", table);
    48. tableEnv
    49. .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
    50. .print();
    51. // prints:
    52. // +----+--------------------------------+-------------+
    53. // | op | name | score |
    54. // +----+--------------------------------+-------------+
    55. // | +I | Bob | 5 |
    56. // | +I | Alice | 12 |
    57. // | -U | Alice | 12 |
    58. // | +U | Alice | 100 |
    59. // +----+--------------------------------+-------------+

    Scala

    1. import org.apache.flink.api.scala.typeutils.Types
    2. import org.apache.flink.table.api.Schema
    3. import org.apache.flink.table.connector.ChangelogMode
    4. import org.apache.flink.types.{Row, RowKind}
    5. // === EXAMPLE 1 ===
    6. // interpret the stream as a retract stream
    7. // create a changelog DataStream
    8. val dataStream = env.fromElements(
    9. Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
    10. Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
    11. Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", Int.box(12)),
    12. Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
    13. )(Types.ROW(Types.STRING, Types.INT))
    14. // interpret the DataStream as a Table
    15. val table = tableEnv.fromChangelogStream(dataStream)
    16. // register the table under a name and perform an aggregation
    17. tableEnv.createTemporaryView("InputTable", table)
    18. tableEnv
    19. .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
    20. .print()
    21. // prints:
    22. // +----+--------------------------------+-------------+
    23. // | op | name | score |
    24. // +----+--------------------------------+-------------+
    25. // | +I | Bob | 5 |
    26. // | +I | Alice | 12 |
    27. // | -D | Alice | 12 |
    28. // | +I | Alice | 100 |
    29. // +----+--------------------------------+-------------+
    30. // === EXAMPLE 2 ===
    31. // interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)
    32. // create a changelog DataStream
    33. val dataStream = env.fromElements(
    34. Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
    35. Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
    36. Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
    37. )(Types.ROW(Types.STRING, Types.INT))
    38. // interpret the DataStream as a Table
    39. val table =
    40. tableEnv.fromChangelogStream(
    41. dataStream,
    42. Schema.newBuilder().primaryKey("f0").build(),
    43. ChangelogMode.upsert())
    44. // register the table under a name and perform an aggregation
    45. tableEnv.createTemporaryView("InputTable", table)
    46. tableEnv
    47. .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
    48. .print()
    49. // prints:
    50. // +----+--------------------------------+-------------+
    51. // | op | name | score |
    52. // +----+--------------------------------+-------------+
    53. // | +I | Bob | 5 |
    54. // | +I | Alice | 12 |
    55. // | -U | Alice | 12 |
    56. // | +U | Alice | 100 |
    57. // +----+--------------------------------+-------------+

    Python

    1. from pyflink.common import Row, RowKind
    2. from pyflink.common.typeinfo import Types
    3. from pyflink.datastream import StreamExecutionEnvironment
    4. from pyflink.table import DataTypes, StreamTableEnvironment, Schema
    5. env = StreamExecutionEnvironment.get_execution_environment()
    6. t_env = StreamTableEnvironment.create(env)
    7. # === EXAMPLE 1 ===
    8. # create a changelog DataStream
    9. ds = env.from_collection([
    10. Row.of_kind(RowKind.INSERT, "Alice", 12),
    11. Row.of_kind(RowKind.INSERT, "Bob", 5),
    12. Row.of_kind(RowKind.UPDATE_BEFORE, "Alice", 12),
    13. Row.of_kind(RowKind.UPDATE_AFTER, "Alice", 100)],
    14. type_info=Types.ROW([Types.STRING(),Types.INT()]))
    15. # interpret the DataStream as a Table
    16. table = t_env.from_changelog_stream(ds)
    17. # register the table under a name and perform an aggregation
    18. t_env.create_temporary_view("InputTable", table)
    19. t_env.execute_sql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print()
    20. # prints:
    21. # +----+--------------------------------+-------------+
    22. # | op | name | score |
    23. # +----+--------------------------------+-------------+
    24. # | +I | Bob | 5 |
    25. # | +I | Alice | 12 |
    26. # | -D | Alice | 12 |
    27. # | +I | Alice | 100 |
    28. # +----+--------------------------------+-------------+
    29. # === EXAMPLE 2 ===
    30. # interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)
    31. # create a changelog DataStream
    32. Row.of_kind(RowKind.INSERT, "Alice", 12),
    33. Row.of_kind(RowKind.INSERT, "Bob", 5),
    34. Row.of_kind(RowKind.UPDATE_AFTER, "Alice", 100)],
    35. type_info=Types.ROW([Types.STRING(),Types.INT()]))
    36. # interpret the DataStream as a Table
    37. table = t_env.from_changelog_stream(
    38. ds,
    39. Schema.new_builder().primary_key("f0").build(),
    40. ChangelogMode.upsert())
    41. # register the table under a name and perform an aggregation
    42. t_env.create_temporary_view("InputTable", table)
    43. t_env.execute_sql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0").print()
    44. # prints:
    45. # +----+--------------------------------+-------------+
    46. # | op | name | score |
    47. # +----+--------------------------------+-------------+
    48. # | +I | Bob | 5 |
    49. # | +I | Alice | 12 |
    50. # | -U | Alice | 12 |
    51. # | +U | Alice | 100 |
    52. # +----+--------------------------------+-------------+

    The default ChangelogMode shown in example 1 should be sufficient for most use cases as it accepts all kinds of changes.

    However, example 2 shows how to limit the kinds of incoming changes for efficiency by reducing the number of update messages by 50% using upsert mode. The number of result messages can be reduced by defining a primary key and upsert changelog mode for toChangelogStream.

    The following code shows how to use toChangelogStream for different scenarios.

    Java

    1. import org.apache.flink.streaming.api.datastream.DataStream;
    2. import org.apache.flink.streaming.api.functions.ProcessFunction;
    3. import org.apache.flink.table.api.DataTypes;
    4. import org.apache.flink.table.api.Schema;
    5. import org.apache.flink.table.api.Table;
    6. import org.apache.flink.table.data.StringData;
    7. import org.apache.flink.types.Row;
    8. import org.apache.flink.util.Collector;
    9. import static org.apache.flink.table.api.Expressions.*;
    10. // create Table with event-time
    11. tableEnv.executeSql(
    12. "CREATE TABLE GeneratedTable "
    13. + "("
    14. + " name STRING,"
    15. + " score INT,"
    16. + " event_time TIMESTAMP_LTZ(3),"
    17. + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
    18. + ")"
    19. Table table = tableEnv.from("GeneratedTable");
    20. // === EXAMPLE 1 ===
    21. // convert to DataStream in the simplest and most general way possible (no event-time)
    22. Table simpleTable = tableEnv
    23. .fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12))
    24. .as("name", "score")
    25. .groupBy($("name"))
    26. .select($("name"), $("score").sum());
    27. tableEnv
    28. .toChangelogStream(simpleTable)
    29. .executeAndCollect()
    30. .forEachRemaining(System.out::println);
    31. // prints:
    32. // +I[Bob, 12]
    33. // +I[Alice, 12]
    34. // -U[Alice, 12]
    35. // +U[Alice, 14]
    36. // === EXAMPLE 2 ===
    37. // convert to DataStream in the simplest and most general way possible (with event-time)
    38. DataStream<Row> dataStream = tableEnv.toChangelogStream(table);
    39. // since `event_time` is a single time attribute in the schema, it is set as the
    40. // stream record's timestamp by default; however, at the same time, it remains part of the Row
    41. dataStream.process(
    42. new ProcessFunction<Row, Void>() {
    43. @Override
    44. public void processElement(Row row, Context ctx, Collector<Void> out) {
    45. // prints: [name, score, event_time]
    46. System.out.println(row.getFieldNames(true));
    47. // timestamp exists twice
    48. assert ctx.timestamp() == row.<Instant>getFieldAs("event_time").toEpochMilli();
    49. }
    50. });
    51. env.execute();
    52. // === EXAMPLE 3 ===
    53. // convert to DataStream but write out the time attribute as a metadata column which means
    54. // it is not part of the physical schema anymore
    55. DataStream<Row> dataStream = tableEnv.toChangelogStream(
    56. table,
    57. Schema.newBuilder()
    58. .column("name", "STRING")
    59. .column("score", "INT")
    60. .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
    61. .build());
    62. // the stream record's timestamp is defined by the metadata; it is not part of the Row
    63. dataStream.process(
    64. new ProcessFunction<Row, Void>() {
    65. @Override
    66. public void processElement(Row row, Context ctx, Collector<Void> out) {
    67. // prints: [name, score]
    68. System.out.println(row.getFieldNames(true));
    69. // timestamp exists once
    70. System.out.println(ctx.timestamp());
    71. }
    72. });
    73. env.execute();
    74. // === EXAMPLE 4 ===
    75. // for advanced users, it is also possible to use more internal data structures for efficiency
    76. // note that this is only mentioned here for completeness because using internal data structures
    77. // adds complexity and additional type handling
    78. // however, converting a TIMESTAMP_LTZ column to `Long` or STRING to `byte[]` might be convenient,
    79. // also structured types can be represented as `Row` if needed
    80. DataStream<Row> dataStream = tableEnv.toChangelogStream(
    81. table,
    82. Schema.newBuilder()
    83. .column(
    84. "name",
    85. DataTypes.STRING().bridgedTo(StringData.class))
    86. .column(
    87. "score",
    88. DataTypes.INT())
    89. .column(
    90. "event_time",
    91. DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class))
    92. .build());
    93. // leads to a stream of Row(name: StringData, score: Integer, event_time: Long)

    Scala

    1. import org.apache.flink.api.scala._
    2. import org.apache.flink.streaming.api.functions.ProcessFunction
    3. import org.apache.flink.streaming.api.scala.DataStream
    4. import org.apache.flink.table.api._
    5. import org.apache.flink.types.Row
    6. import org.apache.flink.util.Collector
    7. import java.time.Instant
    8. // create Table with event-time
    9. tableEnv.executeSql(
    10. """
    11. CREATE TABLE GeneratedTable (
    12. name STRING,
    13. score INT,
    14. event_time TIMESTAMP_LTZ(3),
    15. WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
    16. )
    17. WITH ('connector'='datagen')
    18. """
    19. )
    20. val table = tableEnv.from("GeneratedTable")
    21. // === EXAMPLE 1 ===
    22. // convert to DataStream in the simplest and most general way possible (no event-time)
    23. val simpleTable = tableEnv
    24. .fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12))
    25. .as("name", "score")
    26. .groupBy($"name")
    27. .select($"name", $"score".sum())
    28. tableEnv
    29. .toChangelogStream(simpleTable)
    30. .executeAndCollect()
    31. .foreach(println)
    32. // prints:
    33. // +I[Bob, 12]
    34. // +I[Alice, 12]
    35. // -U[Alice, 12]
    36. // +U[Alice, 14]
    37. // === EXAMPLE 2 ===
    38. // convert to DataStream in the simplest and most general way possible (with event-time)
    39. val dataStream: DataStream[Row] = tableEnv.toChangelogStream(table)
    40. // since `event_time` is a single time attribute in the schema, it is set as the
    41. // stream record's timestamp by default; however, at the same time, it remains part of the Row
    42. dataStream.process(new ProcessFunction[Row, Unit] {
    43. override def processElement(
    44. row: Row,
    45. ctx: ProcessFunction[Row, Unit]#Context,
    46. out: Collector[Unit]): Unit = {
    47. // prints: [name, score, event_time]
    48. println(row.getFieldNames(true))
    49. // timestamp exists twice
    50. assert(ctx.timestamp() == row.getFieldAs[Instant]("event_time").toEpochMilli)
    51. }
    52. })
    53. env.execute()
    54. // === EXAMPLE 3 ===
    55. // convert to DataStream but write out the time attribute as a metadata column which means
    56. // it is not part of the physical schema anymore
    57. val dataStream: DataStream[Row] = tableEnv.toChangelogStream(
    58. table,
    59. Schema.newBuilder()
    60. .column("name", "STRING")
    61. .column("score", "INT")
    62. .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
    63. .build())
    64. // the stream record's timestamp is defined by the metadata; it is not part of the Row
    65. dataStream.process(new ProcessFunction[Row, Unit] {
    66. override def processElement(
    67. row: Row,
    68. ctx: ProcessFunction[Row, Unit]#Context,
    69. out: Collector[Unit]): Unit = {
    70. // prints: [name, score]
    71. println(row.getFieldNames(true))
    72. // timestamp exists once
    73. println(ctx.timestamp())
    74. }
    75. })
    76. env.execute()
    77. // === EXAMPLE 4 ===
    78. // for advanced users, it is also possible to use more internal data structures for better
    79. // efficiency
    80. // note that this is only mentioned here for completeness because using internal data structures
    81. // adds complexity and additional type handling
    82. // however, converting a TIMESTAMP_LTZ column to `Long` or STRING to `byte[]` might be convenient,
    83. // also structured types can be represented as `Row` if needed
    84. val dataStream: DataStream[Row] = tableEnv.toChangelogStream(
    85. table,
    86. Schema.newBuilder()
    87. .column(
    88. "name",
    89. DataTypes.STRING().bridgedTo(classOf[StringData]))
    90. .column(
    91. "score",
    92. DataTypes.INT())
    93. .column(
    94. "event_time",
    95. DataTypes.TIMESTAMP_LTZ(3).bridgedTo(class[Long]))
    96. .build())
    97. // leads to a stream of Row(name: StringData, score: Integer, event_time: Long)

    Python

    1. from pyflink.common import Row
    2. from pyflink.datastream import StreamExecutionEnvironment
    3. from pyflink.datastream.functions import ProcessFunction
    4. from pyflink.table import DataTypes, StreamTableEnvironment, Schema
    5. from pyflink.table.expressions import col
    6. env = StreamExecutionEnvironment.get_execution_environment()
    7. t_env = StreamTableEnvironment.create(env)
    8. # create Table with event-time
    9. t_env.execute_sql(
    10. "CREATE TABLE GeneratedTable "
    11. + "("
    12. + " name STRING,"
    13. + " score INT,"
    14. + " event_time TIMESTAMP_LTZ(3),"
    15. + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
    16. + ")"
    17. + "WITH ('connector'='datagen')")
    18. table = t_env.from_path("GeneratedTable")
    19. # === EXAMPLE 1 ===
    20. # convert to DataStream in the simplest and most general way possible (no event-time)
    21. simple_table = t_env.from_elements([Row("Alice", 12), Row("Alice", 2), Row("Bob", 12)],
    22. DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()),
    23. DataTypes.FIELD("score", DataTypes.INT())]))
    24. simple_table = simple_table.group_by(col('name')).select(col('name'), col('score').sum)
    25. t_env.to_changelog_stream(simple_table).print()
    26. env.execute()
    27. # prints:
    28. # +I[Bob, 12]
    29. # +I[Alice, 12]
    30. # -U[Alice, 12]
    31. # +U[Alice, 14]
    32. # === EXAMPLE 2 ===
    33. # convert to DataStream in the simplest and most general way possible (with event-time)
    34. ds = t_env.to_changelog_stream(table)
    35. # since `event_time` is a single time attribute in the schema, it is set as the
    36. # stream record's timestamp by default; however, at the same time, it remains part of the Row
    37. class MyProcessFunction(ProcessFunction):
    38. def process_element(self, row, ctx):
    39. print(row)
    40. assert ctx.timestamp() == row.event_time.to_epoch_milli()
    41. ds.process(MyProcessFunction())
    42. env.execute()
    43. # === EXAMPLE 3 ===
    44. # convert to DataStream but write out the time attribute as a metadata column which means
    45. # it is not part of the physical schema anymore
    46. ds = t_env.to_changelog_stream(
    47. table,
    48. Schema.new_builder()
    49. .column("name", "STRING")
    50. .column("score", "INT")
    51. .column_by_metadata("rowtime", "TIMESTAMP_LTZ(3)")
    52. .build())
    53. class MyProcessFunction(ProcessFunction):
    54. def process_element(self, row, ctx):
    55. print(row)
    56. print(ctx.timestamp())
    57. ds.process(MyProcessFunction())
    58. env.execute()

    For more information about which conversions are supported for data types in Example 4, see the .

    The behavior of toChangelogStream(Table).executeAndCollect() is equal to calling Table.execute().collect(). However, toChangelogStream(Table) might be more useful for tests because it allows to access the produced watermarks in a subsequent ProcessFunction in DataStream API.

    Back to top

    A single Flink job can consist of multiple disconnected pipelines that run next to each other.

    Source-to-sink pipelines defined in Table API can be attached as a whole to the StreamExecutionEnvironment and will be submitted when calling one of the execute methods in the DataStream API.

    However, a source does not necessarily have to be a table source but can also be another DataStream pipeline that was converted to Table API before. Thus, it is possible to use table sinks for DataStream API programs.

    The functionality is available through a specialized StreamStatementSet instance created with StreamTableEnvironment.createStatementSet(). By using a statement set, the planner can optimize all added statements together and come up with one or more end-to-end pipelines that are added to the StreamExecutionEnvironment when calling StreamStatementSet.attachAsDataStream().

    The following example shows how to add table programs to a DataStream API program within one job.

    Java

    1. import org.apache.flink.streaming.api.datastream.DataStream;
    2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    3. import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
    4. import org.apache.flink.table.api.*;
    5. import org.apache.flink.table.api.bridge.java.*;
    6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    7. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    8. StreamStatementSet statementSet = tableEnv.createStatementSet();
    9. // create some source
    10. TableDescriptor sourceDescriptor =
    11. TableDescriptor.forConnector("datagen")
    12. .option("number-of-rows", "3")
    13. .schema(
    14. Schema.newBuilder()
    15. .column("myCol", DataTypes.INT())
    16. .column("myOtherCol", DataTypes.BOOLEAN())
    17. .build())
    18. .build();
    19. // create some sink
    20. TableDescriptor sinkDescriptor = TableDescriptor.forConnector("print").build();
    21. // add a pure Table API pipeline
    22. Table tableFromSource = tableEnv.from(sourceDescriptor);
    23. statementSet.add(tableFromSource.insertInto(sinkDescriptor));
    24. // use table sinks for the DataStream API pipeline
    25. DataStream<Integer> dataStream = env.fromElements(1, 2, 3);
    26. Table tableFromStream = tableEnv.fromDataStream(dataStream);
    27. statementSet.add(tableFromStream.insertInto(sinkDescriptor));
    28. // attach both pipelines to StreamExecutionEnvironment
    29. // (the statement set will be cleared after calling this method)
    30. statementSet.attachAsDataStream();
    31. // define other DataStream API parts
    32. env.fromElements(4, 5, 6).addSink(new DiscardingSink<>());
    33. // use DataStream API to submit the pipelines
    34. env.execute();
    35. // prints similar to:
    36. // +I[1618440447, false]
    37. // +I[1259693645, true]
    38. // +I[158588930, false]
    39. // +I[1]
    40. // +I[2]
    41. // +I[3]

    Scala

    1. import org.apache.flink.streaming.api.functions.sink.DiscardingSink
    2. import org.apache.flink.streaming.api.scala._
    3. import org.apache.flink.table.api._
    4. import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    5. val env = StreamExecutionEnvironment.getExecutionEnvironment
    6. val tableEnv = StreamTableEnvironment.create(env)
    7. val statementSet = tableEnv.createStatementSet()
    8. // create some source
    9. val sourceDescriptor = TableDescriptor.forConnector("datagen")
    10. .option("number-of-rows", "3")
    11. .schema(Schema.newBuilder
    12. .column("myCol", DataTypes.INT)
    13. .column("myOtherCol", DataTypes.BOOLEAN).build)
    14. .build
    15. // create some sink
    16. val sinkDescriptor = TableDescriptor.forConnector("print").build
    17. // add a pure Table API pipeline
    18. val tableFromSource = tableEnv.from(sourceDescriptor)
    19. statementSet.add(tableFromSource.insertInto(sinkDescriptor))
    20. // use table sinks for the DataStream API pipeline
    21. val dataStream = env.fromElements(1, 2, 3)
    22. val tableFromStream = tableEnv.fromDataStream(dataStream)
    23. statementSet.add(tableFromStream.insertInto(sinkDescriptor))
    24. // attach both pipelines to StreamExecutionEnvironment
    25. // (the statement set will be cleared calling this method)
    26. statementSet.attachAsDataStream()
    27. // define other DataStream API parts
    28. env.fromElements(4, 5, 6).addSink(new DiscardingSink[Int]())
    29. // now use DataStream API to submit the pipelines
    30. env.execute()
    31. // prints similar to:
    32. // +I[1618440447, false]
    33. // +I[1259693645, true]
    34. // +I[158588930, false]
    35. // +I[1]
    36. // +I[2]
    37. // +I[3]

    Implicit Conversions in Scala

    Users of the Scala API can use all the conversion methods above in a more fluent way by leveraging Scala’s implicit feature.

    Those implicits are available in the API when importing the package object via org.apache.flink.table.api.bridge.scala._.

    The use of an implicit conversion should always be a conscious decision. One should pay attention whether the IDE proposes an actual Table API method, or a DataStream API method via implicits.

    For example, a table.execute().collect() stays in Table API whereas table.executeAndCollect() implicitly uses the DataStream API’s executeAndCollect() method and therefore forces an API conversion.

    1. import org.apache.flink.streaming.api.scala._
    2. import org.apache.flink.table.api.bridge.scala._
    3. import org.apache.flink.types.Row
    4. val env = StreamExecutionEnvironment.getExecutionEnvironment
    5. val tableEnv = StreamTableEnvironment.create(env)
    6. val dataStream: DataStream[(Int, String)] = env.fromElements((42, "hello"))
    7. // call toChangelogTable() implicitly on the DataStream object
    8. val table: Table = dataStream.toChangelogTable(tableEnv)
    9. // force implicit conversion
    10. val dataStreamAgain1: DataStream[Row] = table
    11. // call toChangelogStream() implicitly on the Table object
    12. val dataStreamAgain2: DataStream[Row] = table.toChangelogStream

    The DataStream API uses instances of org.apache.flink.api.common.typeinfo.TypeInformation to describe the record type that travels in the stream. In particular, it defines how to serialize and deserialize records from one DataStream operator to the other. It also helps in serializing state into savepoints and checkpoints.

    The Table API uses custom data structures to represent records internally and exposes org.apache.flink.table.types.DataType to users for declaring the external format into which the data structures are converted for easier usage in sources, sinks, UDFs, or DataStream API.

    DataType is richer than TypeInformation as it also includes details about the logical SQL type. Therefore, some details will be added implicitly during the conversion.

    Column names and types of a Table are automatically derived from the TypeInformation of the DataStream. Use DataStream.getType() to check whether the type information has been detected correctly via the DataStream API’s reflective type extraction facilities. If the outermost record’s TypeInformation is a CompositeType, it will be flattened in the first level when deriving a table’s schema.

    TypeInformation to DataType

    The following rules apply when converting TypeInformation to a DataType:

    • All subclasses of TypeInformation are mapped to logical types, including nullability that is aligned with Flink’s built-in serializers.

    • Subclasses of TupleTypeInfoBase are translated into a row (for Row) or structured type (for tuples, POJOs, and case classes).

    • BigDecimal is converted to DECIMAL(38, 18) by default.

    • The order of PojoTypeInfo fields is determined by a constructor with all fields as its parameters. If that is not found during the conversion, the field order will be alphabetical.

    • GenericTypeInfo and other TypeInformation that cannot be represented as one of the listed org.apache.flink.table.api.DataTypes will be treated as a black-box RAW type. The current session configuration is used to materialize the serializer of the raw type. Composite nested fields will not be accessible then.

    • See for the full translation logic.

    Use DataTypes.of(TypeInformation) to call the above logic in custom schema declaration or in UDFs.

    DataType to TypeInformation

    The table runtime will make sure to properly serialize the output records to the first operator of the DataStream API.

    Afterward, the type information semantics of the DataStream API need to be considered.

    Legacy Conversion

    Convert a DataStream into a Table

    A DataStream can be directly converted to a Table in a StreamTableEnvironment. The schema of the resulting view depends on the data type of the registered collection.

    Java

    Scala

    1. val tableEnv: StreamTableEnvironment = ???
    2. val stream: DataStream[(Long, String)] = ???
    3. val table2: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")

    Python

    1. t_env = ... # type: StreamTableEnvironment
    2. stream = ... # type: DataStream of Types.TUPLE([Types.LONG(), Types.STRING()])
    3. table2 = t_env.from_data_stream(stream, col('my_long'), col('my_stream'))

    Back to top

    The results of a Table can be converted into a DataStream. In this way, custom DataStream programs can be run on the result of a Table API or SQL query.

    When converting a Table into a DataStream you need to specify the data type of the resulting records, i.e., the data type into which the rows of the Table are to be converted. Often the most convenient conversion type is Row. The following list gives an overview of the features of the different options:

    • Row: fields are mapped by position, arbitrary number of fields, support for null values, no type-safe access.
    • POJO: fields are mapped by name (POJO fields must be named as Table fields), arbitrary number of fields, support for null values, type-safe access.
    • Case Class: fields are mapped by position, no support for null values, type-safe access.
    • Tuple: fields are mapped by position, limitation to 22 (Scala) or 25 (Java) fields, no support for null values, type-safe access.
    • Atomic Type: Table must have a single field, no support for null values, type-safe access.

    Convert a Table into a DataStream

    A Table that is the result of a streaming query will be updated dynamically, i.e., it is changing as new records arrive on the query’s input streams. Hence, the DataStream into which such a dynamic query is converted needs to encode the updates of the table.

    There are two modes to convert a Table into a DataStream:

    1. Append Mode: This mode can only be used if the dynamic Table is only modified by INSERT changes, i.e., it is append-only and previously emitted results are never updated.
    2. Retract Mode: This mode can always be used. It encodes INSERT and DELETE changes with a boolean flag.

      Java

    1. StreamTableEnvironment tableEnv = ...;
    2. Table table = tableEnv.fromValues(
    3. DataTypes.Row(
    4. DataTypes.FIELD("name", DataTypes.STRING()),
    5. DataTypes.FIELD("age", DataTypes.INT()),
    6. row("john", 35),
    7. row("sarah", 32));
    8. // Convert the Table into an append DataStream of Row by specifying the class
    9. DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
    10. // Convert the Table into an append DataStream of Tuple2<String, Integer> with TypeInformation
    11. TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.INT());
    12. DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType);
    13. // Convert the Table into a retract DataStream of Row.
    14. // A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
    15. // The boolean field indicates the type of the change.
    16. // True is INSERT, false is DELETE.
    17. DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);

    Scala

    1. val tableEnv: StreamTableEnvironment = ???
    2. // Table with two fields (String name, Integer age)
    3. val table: Table = tableEnv.fromValues(
    4. DataTypes.Row(
    5. DataTypes.FIELD("name", DataTypes.STRING()),
    6. DataTypes.FIELD("age", DataTypes.INT()),
    7. row("john", 35),
    8. row("sarah", 32))
    9. // Convert the Table into an append DataStream of Row by specifying the class
    10. val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
    11. // Convert the Table into an append DataStream of (String, Integer) with TypeInformation
    12. val dsTuple: DataStream[(String, Int)] dsTuple =
    13. tableEnv.toAppendStream[(String, Int)](table)
    14. // Convert the Table into a retract DataStream of Row.
    15. // A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
    16. // The boolean field indicates the type of the change.
    17. // True is INSERT, false is DELETE.
    18. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

    Python

    1. from pyflink.table import DataTypes
    2. from pyflink.common.typeinfo import Types
    3. t_env = ...
    4. table = t_env.from_elements([("john", 35), ("sarah", 32)],
    5. DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()),
    6. DataTypes.FIELD("age", DataTypes.INT())]))
    7. # Convert the Table into an append DataStream of Row by specifying the type information
    8. ds_row = t_env.to_append_stream(table, Types.ROW([Types.STRING(), Types.INT()]))
    9. # Convert the Table into an append DataStream of Tuple[str, int] with TypeInformation
    10. ds_tuple = t_env.to_append_stream(table, Types.TUPLE([Types.STRING(), Types.INT()]))
    11. # Convert the Table into a retract DataStream of Row by specifying the type information
    12. # A retract stream of type X is a DataStream of Tuple[bool, X].
    13. # The boolean field indicates the type of the change.
    14. # True is INSERT, false is DELETE.
    15. retract_stream = t_env.to_retract_stream(table, Types.ROW([Types.STRING(), Types.INT()]))

    Note: A detailed discussion about dynamic tables and their properties is given in the Dynamic Tables document.

    Once the Table is converted to a DataStream, please use the StreamExecutionEnvironment.execute() method to execute the DataStream program.

    Mapping of Data Types to Table Schema

    Flink’s DataStream API supports many diverse types. Composite types such as Tuples (built-in Scala , Flink Java tuples and Python tuples), POJOs, Scala case classes, and Flink’s Row type allow for nested data structures with multiple fields that can be accessed in table expressions. Other types are treated as atomic types. In the following, we describe how the Table API converts these types into an internal row representation and show examples of converting a DataStream into a Table.

    The mapping of a data type to a table schema can happen in two ways: based on the field positions or based on the field names.

    Position-based Mapping

    Position-based mapping can be used to give fields a more meaningful name while keeping the field order. This mapping is available for composite data types with a defined field order and atomic types. Composite data types such as tuples, rows, and case classes have such a field order. However, fields of a POJO must be mapped based on the field names (see next section). Fields can be projected out but can’t be renamed using an alias as(Java and Scala) or alias(Python).

    When defining a position-based mapping, the specified names must not exist in the input data type, otherwise, the API will assume that the mapping should happen based on the field names. If no field names are specified, the default field names and field order of the composite type are used or f0 for atomic types.

    Java

    1. StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section;
    2. DataStream<Tuple2<Long, Integer>> stream = ...;
    3. // convert DataStream into Table with field "myLong" only
    4. Table table = tableEnv.fromDataStream(stream, $("myLong"));
    5. // convert DataStream into Table with field names "myLong" and "myInt"
    6. Table table = tableEnv.fromDataStream(stream, $("myLong"), $("myInt"));

    Scala

    1. // get a TableEnvironment
    2. val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
    3. val stream: DataStream[(Long, Int)] = ...
    4. // convert DataStream into Table with field "myLong" only
    5. val table: Table = tableEnv.fromDataStream(stream, $"myLong")
    6. // convert DataStream into Table with field names "myLong" and "myInt"
    7. val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myInt")

    Python

    1. from pyflink.table.expressions import col
    2. # get a TableEnvironment
    3. t_env = ... # see "Create a TableEnvironment" section
    4. stream = ... # type: DataStream of Types.Tuple([Types.LONG(), Types.INT()])
    5. # convert DataStream into Table with field "my_long" only
    6. table = t_env.from_data_stream(stream, col('my_long'))
    7. # convert DataStream into Table with field names "my_long" and "my_int"
    8. table = t_env.from_data_stream(stream, col('my_long'), col('my_int'))

    Name-based Mapping

    Name-based mapping can be used for any data type, including POJOs. It is the most flexible way of defining a table schema mapping. All fields in the mapping are referenced by name and can be possibly renamed using an alias as. Fields can be reordered and projected out.

    If no field names are specified, the default field names and field order of the composite type are used or f0 for atomic types.

    Java

    1. StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
    2. DataStream<Tuple2<Long, Integer>> stream = ...;
    3. // convert DataStream into Table with field "f1" only
    4. Table table = tableEnv.fromDataStream(stream, $("f1"));
    5. // convert DataStream into Table with swapped fields
    6. Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));
    7. // convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
    8. Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));

    Scala

    1. // get a TableEnvironment
    2. val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
    3. val stream: DataStream[(Long, Int)] = ...
    4. // convert DataStream into Table with field "_2" only
    5. val table: Table = tableEnv.fromDataStream(stream, $"_2")
    6. // convert DataStream into Table with swapped fields
    7. val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")
    8. // convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
    9. val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myInt", $"_1" as "myLong")

    Python

    1. from pyflink.table.expressions import col
    2. # get a TableEnvironment
    3. t_env = ... # see "Create a TableEnvironment" section
    4. stream = ... # type: DataStream of Types.Tuple([Types.LONG(), Types.INT()])
    5. # convert DataStream into Table with field "f1" only
    6. table = t_env.from_data_stream(stream, col('f1'))
    7. # convert DataStream into Table with swapped fields
    8. table = t_env.from_data_stream(stream, col('f1'), col('f0'))
    9. # convert DataStream into Table with swapped fields and field names "my_int" and "my_long"
    10. table = t_env.from_data_stream(stream, col('f1').alias('my_int'), col('f0').alias('my_long'))

    Atomic Types

    Flink treats primitives (Integer, Double, String) or generic types (types that cannot be analyzed and decomposed) as atomic types. A DataStream of an atomic type is converted into a Table with a single column. The type of the column is inferred from the atomic type. The name of the column can be specified.

    Java

    1. StreamTableEnvironment tableEnv = ...;
    2. DataStream<Long> stream = ...;
    3. // Convert DataStream into Table with field name "myLong"
    4. Table table = tableEnv.fromDataStream(stream, $("myLong"));

    Scala

    1. val tableEnv: StreamTableEnvironment = ???
    2. val stream: DataStream[Long] = ...
    3. // Convert DataStream into Table with default field name "f0"
    4. val table: Table = tableEnv.fromDataStream(stream)
    5. // Convert DataStream into Table with field name "myLong"
    6. val table: Table = tableEnv.fromDataStream(stream, $"myLong")

    Python

    1. from pyflink.table.expressions import col
    2. t_env = ...
    3. stream = ... # types: DataStream of Types.Long()
    4. # Convert DataStream into Table with default field name "f0"
    5. table = t_env.from_data_stream(stream)
    6. # Convert DataStream into Table with field name "my_long"
    7. table = t_env.from_data_stream(stream, col('my_long'))

    Tuples (Scala, Java, Python) and Case Classes (Scala only)

    Java

    Flink provides its own tuple classes for Java. DataStreams of the the Java tuple classes can be converted into tables. Fields can be renamed by providing names for all fields (mapping based on position). If no field names are specified, the default field names are used. If the original field names (f0, f1, … for Flink Tuples) are referenced, the API assumes that the mapping is name-based instead of position-based. Name-based mapping allows for reordering fields and projection with alias (as).

    1. StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
    2. DataStream<Tuple2<Long, String>> stream = ...;
    3. // convert DataStream into Table with renamed field names "myLong", "myString" (position-based)
    4. Table table = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
    5. // convert DataStream into Table with reordered fields "f1", "f0" (name-based)
    6. Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));
    7. // convert DataStream into Table with projected field "f1" (name-based)
    8. Table table = tableEnv.fromDataStream(stream, $("f1"));
    9. // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
    10. Table table = tableEnv.fromDataStream(stream, $("f1").as("myString"), $("f0").as("myLong"));

    Scala

    Flink supports Scala’s built-in tuples. DataStreams of Scala’s built-in tuples can be converted into tables. Fields can be renamed by providing names for all fields (mapping based on position). If no field names are specified, the default field names are used. If the original field names (_1, _2, … for Scala Tuples) are referenced, the API assumes that the mapping is name-based instead of position-based. Name-based mapping allows for reordering fields and projection with alias (as).

    1. // get a TableEnvironment
    2. val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
    3. val stream: DataStream[(Long, String)] = ...
    4. // convert DataStream into Table with field names "myLong", "myString" (position-based)
    5. val table: Table = tableEnv.fromDataStream(stream, $"myLong", $"myString")
    6. // convert DataStream into Table with reordered fields "_2", "_1" (name-based)
    7. val table: Table = tableEnv.fromDataStream(stream, $"_2", $"_1")
    8. // convert DataStream into Table with projected field "_2" (name-based)
    9. val table: Table = tableEnv.fromDataStream(stream, $"_2")
    10. // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
    11. val table: Table = tableEnv.fromDataStream(stream, $"_2" as "myString", $"_1" as "myLong")
    12. // define case class
    13. case class Person(name: String, age: Int)
    14. val streamCC: DataStream[Person] = ...
    15. // convert DataStream into Table with field names 'myName, 'myAge (position-based)
    16. val table = tableEnv.fromDataStream(streamCC, $"myName", $"myAge")
    17. // convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
    18. val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")

    Python

    Flink supports Python’s built-in Tuples. DataStreams of tuples can be converted into tables. Fields can be renamed by providing names for all fields (mapping based on position). If no field names are specified, the default field names are used. If the original field names (f0, f1, … ) are referenced, the API assumes that the mapping is name-based instead of position-based. Name-based mapping allows for reordering fields and projection with alias (alias).

    1. from pyflink.table.expressions import col
    2. stream = ... # type: DataStream of Types.TUPLE([Types.LONG(), Types.STRING()])
    3. # convert DataStream into Table with renamed field names "my_long", "my_string" (position-based)
    4. table = t_env.from_data_stream(stream, col('my_long'), col('my_string'))
    5. # convert DataStream into Table with reordered fields "f1", "f0" (name-based)
    6. table = t_env.from_data_stream(stream, col('f1'), col('f0'))
    7. # convert DataStream into Table with projected field "f1" (name-based)
    8. table = t_env.from_data_stream(stream, col('f1'))
    9. # convert DataStream into Table with reordered and aliased fields "my_string", "my_long" (name-based)
    10. table = t_env.from_data_stream(stream, col('f1').alias('my_string'), col('f0').alias('my_long'))

    POJO (Java and Scala)

    Flink supports POJOs as composite types. The rules for what determines a POJO are documented here.

    When converting a POJO DataStream into a Table without specifying field names, the names of the original POJO fields are used. The name mapping requires the original names and cannot be done by positions. Fields can be renamed using an alias (with the as keyword), reordered, and projected.

    Java

    1. StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
    2. // Person is a POJO with fields "name" and "age"
    3. DataStream<Person> stream = ...;
    4. // convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
    5. Table table = tableEnv.fromDataStream(stream, $("age").as("myAge"), $("name").as("myName"));
    6. // convert DataStream into Table with projected field "name" (name-based)
    7. Table table = tableEnv.fromDataStream(stream, $("name"));
    8. // convert DataStream into Table with projected and renamed field "myName" (name-based)
    9. Table table = tableEnv.fromDataStream(stream, $("name").as("myName"));

    Scala

    1. // get a TableEnvironment
    2. val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
    3. // Person is a POJO with field names "name" and "age"
    4. val stream: DataStream[Person] = ...
    5. // convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
    6. val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as "myName")
    7. // convert DataStream into Table with projected field "name" (name-based)
    8. val table: Table = tableEnv.fromDataStream(stream, $"name")
    9. // convert DataStream into Table with projected and renamed field "myName" (name-based)
    10. val table: Table = tableEnv.fromDataStream(stream, $"name" as "myName")

    Python

    Custom PoJo Class is unsupported in PyFlink now.

    Row

    The Row data type supports an arbitrary number of fields and fields with null values. Field names can be specified via a RowTypeInfo or when converting a Row DataStream into a Table. The row type supports mapping of fields by position and by name. Fields can be renamed by providing names for all fields (mapping based on position) or selected individually for projection/ordering/renaming (mapping based on name).

    Java

    1. StreamTableEnvironment tableEnv = ...;
    2. // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
    3. DataStream<Row> stream = ...;
    4. // Convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
    5. Table table = tableEnv.fromDataStream(stream, $("myName"), $("myAge"));
    6. // Convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
    7. Table table = tableEnv.fromDataStream(stream, $("name").as("myName"), $("age").as("myAge"));
    8. // Convert DataStream into Table with projected field "name" (name-based)
    9. Table table = tableEnv.fromDataStream(stream, $("name"));
    10. // Convert DataStream into Table with projected and renamed field "myName" (name-based)
    11. Table table = tableEnv.fromDataStream(stream, $("name").as("myName"));

    Scala

    Python

    1. from pyflink.table.expressions import col
    2. t_env = ...;
    3. # DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
    4. stream = ...
    5. # Convert DataStream into Table with renamed field names "my_name", "my_age" (position-based)
    6. table = t_env.from_data_stream(stream, col('my_name'), col('my_age'))
    7. # Convert DataStream into Table with renamed fields "my_name", "my_age" (name-based)
    8. table = t_env.from_data_stream(stream, col('name').alias('my_name'), col('age').alias('my_age'))
    9. # Convert DataStream into Table with projected field "name" (name-based)
    10. table = t_env.from_data_stream(stream, col('name'))
    11. table = t_env.from_data_stream(stream, col('name').alias("my_name"))