Connectors
Note For general connector information and common configuration, please refer to the corresponding Java/Scala documentation.
Since Flink is a Java/Scala-based project, for both connectors and formats, implementations are available as jars that need to be specified as job .
source_ddl = """
CREATE TABLE source_table(
a VARCHAR,
b INT
) WITH (
'connector' = 'kafka',
'topic' = 'source_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'test_3',
'scan.startup.mode' = 'latest-offset',
)
"""
CREATE TABLE sink_table(
a VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'sink_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.sql_query("SELECT a FROM source_table") \
.execute_insert("sink_table").wait()
Below is a complete example of how to use a Kafka source/sink and the JSON format in PyFlink.
Some data sources and sinks are built into Flink and are available out-of-the-box. These predefined data sources include reading from Pandas DataFrame, or ingesting data from collections. The predefined data sinks support writing to Pandas DataFrame.
import numpy as np
# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.to_pandas()
from_elements()
from_elements()
is used to create a table from a collection of elements. The element types must be acceptable atomic types or acceptable composite types.
The above query returns a Table like:
+----+-------+
| a | b |
+====+=======+
| 1 | Hi |
+----+-------+