Connectors

    Note For general connector information and common configuration, please refer to the corresponding Java/Scala documentation.

    Since Flink is a Java/Scala-based project, for both connectors and formats, implementations are available as jars that need to be specified as job .

    1. source_ddl = """
    2. CREATE TABLE source_table(
    3. a VARCHAR,
    4. b INT
    5. ) WITH (
    6. 'connector' = 'kafka',
    7. 'topic' = 'source_topic',
    8. 'properties.bootstrap.servers' = 'kafka:9092',
    9. 'properties.group.id' = 'test_3',
    10. 'scan.startup.mode' = 'latest-offset',
    11. )
    12. """
    13. CREATE TABLE sink_table(
    14. a VARCHAR
    15. ) WITH (
    16. 'connector' = 'kafka',
    17. 'topic' = 'sink_topic',
    18. 'properties.bootstrap.servers' = 'kafka:9092',
    19. 'format' = 'json'
    20. )
    21. """
    22. t_env.execute_sql(source_ddl)
    23. t_env.execute_sql(sink_ddl)
    24. t_env.sql_query("SELECT a FROM source_table") \
    25. .execute_insert("sink_table").wait()

    Below is a complete example of how to use a Kafka source/sink and the JSON format in PyFlink.

    Some data sources and sinks are built into Flink and are available out-of-the-box. These predefined data sources include reading from Pandas DataFrame, or ingesting data from collections. The predefined data sinks support writing to Pandas DataFrame.

    1. import numpy as np
    2. # Create a PyFlink Table
    3. pdf = pd.DataFrame(np.random.rand(1000, 2))
    4. table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
    5. # Convert the PyFlink Table to a Pandas DataFrame
    6. pdf = table.to_pandas()

    from_elements()

    from_elements() is used to create a table from a collection of elements. The element types must be acceptable atomic types or acceptable composite types.

    The above query returns a Table like:

    1. +----+-------+
    2. | a | b |
    3. +====+=======+
    4. | 1 | Hi |
    5. +----+-------+