Row-based Operations
Performs a operation with a python general scalar function or . The output will be flattened if the output type is a composite type.
It also supports to take a Row object (containing all the columns of the input table) as input.
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("data", DataTypes.STRING())]))
def func2(data: Row) -> Row:
return Row(data.id, data.data * 2)
# specify the function without the input columns
table.map(func2).alias('id', 'data').to_pandas()
# result is
# id data
# 0 1 HiHi
# 1 2 HelloHello
It also supports to use vectorized scalar function in the map
operation. It should be noted that the input type and output type should be pandas.DataFrame instead of Row in this case.
Performs a flat_map
operation with a python .
from pyflink.common import Row
from pyflink.table.udf import udtf
from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id', 'data'])
@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
def split(x: Row) -> Row:
for s in x.data.split(","):
yield x.id, s
# use split in `flat_map`
table.flat_map(split).to_pandas()
# result is
# f0 f1
# 0 1 Hi
# 1 1 Flink
# 2 2 Hello
Performs an aggregate
operation with a python general aggregate function or .
from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
from pyflink.table.udf import AggregateFunction, udaf
class CountAndSumAggregateFunction(AggregateFunction):
def get_value(self, accumulator):
return Row(accumulator[0], accumulator[1])
def create_accumulator(self):
return Row(0, 0)
def accumulate(self, accumulator, row):
accumulator[0] += 1
accumulator[1] += row.b
def retract(self, accumulator, row):
accumulator[0] -= 1
accumulator[1] -= row.b
def merge(self, accumulator, accumulators):
for other_acc in accumulators:
accumulator[0] += other_acc[0]
accumulator[1] += other_acc[1]
def get_accumulator_type(self):
return DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT())])
def get_result_type(self):
return DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT())])
function = CountAndSumAggregateFunction()
result_type=function.get_result_type(),
accumulator_type=function.get_accumulator_type(),
name=str(function.__class__.__name__))
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
result = t.group_by(col('a')) \
.aggregate(agg.alias("c", "d")) \
.select(col('a'), col('c'), col('d'))
result.to_pandas()
# the result is
# a c d
# 0 1 2 5
# 1 2 1 1
# aggregate with a python vectorized aggregate function
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
result_type=DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.FLOAT()),
DataTypes.FIELD("b", DataTypes.INT())]),
func_type="pandas")
t.aggregate(pandas_udaf.alias("a", "b")) \
.select(col('a'), col('b')).to_pandas()
# the result is
# a b
Note Similar to map
operation, if you specify the aggregate function without the input columns in aggregate
operation, it will take Row or Pandas.DataFrame as input which contains all the columns of the input table including the grouping keys. Note You have to close the “aggregate” with a select statement and it should not contain aggregate functions in the select statement. Besides, the output of aggregate will be flattened if it is a composite type.
Similar to GroupBy Aggregation
, FlatAggregate
groups the inputs on the grouping keys. Different from AggregateFunction
, TableAggregateFunction
could return 0, 1, or more records for a grouping key. Similar to , you have to close the flat_aggregate
with a select statement and the select statement should not contain aggregate functions.