Row-based Operations

    Performs a operation with a python general scalar function or . The output will be flattened if the output type is a composite type.

    It also supports to take a Row object (containing all the columns of the input table) as input.

    1. @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
    2. DataTypes.FIELD("data", DataTypes.STRING())]))
    3. def func2(data: Row) -> Row:
    4. return Row(data.id, data.data * 2)
    5. # specify the function without the input columns
    6. table.map(func2).alias('id', 'data').to_pandas()
    7. # result is
    8. # id data
    9. # 0 1 HiHi
    10. # 1 2 HelloHello

    It also supports to use vectorized scalar function in the map operation. It should be noted that the input type and output type should be pandas.DataFrame instead of Row in this case.

    Performs a flat_map operation with a python .

    1. from pyflink.common import Row
    2. from pyflink.table.udf import udtf
    3. from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
    4. env_settings = EnvironmentSettings.in_streaming_mode()
    5. table_env = TableEnvironment.create(env_settings)
    6. table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id', 'data'])
    7. @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
    8. def split(x: Row) -> Row:
    9. for s in x.data.split(","):
    10. yield x.id, s
    11. # use split in `flat_map`
    12. table.flat_map(split).to_pandas()
    13. # result is
    14. # f0 f1
    15. # 0 1 Hi
    16. # 1 1 Flink
    17. # 2 2 Hello

    Performs an aggregate operation with a python general aggregate function or .

    1. from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
    2. from pyflink.table.udf import AggregateFunction, udaf
    3. class CountAndSumAggregateFunction(AggregateFunction):
    4. def get_value(self, accumulator):
    5. return Row(accumulator[0], accumulator[1])
    6. def create_accumulator(self):
    7. return Row(0, 0)
    8. def accumulate(self, accumulator, row):
    9. accumulator[0] += 1
    10. accumulator[1] += row.b
    11. def retract(self, accumulator, row):
    12. accumulator[0] -= 1
    13. accumulator[1] -= row.b
    14. def merge(self, accumulator, accumulators):
    15. for other_acc in accumulators:
    16. accumulator[0] += other_acc[0]
    17. accumulator[1] += other_acc[1]
    18. def get_accumulator_type(self):
    19. return DataTypes.ROW(
    20. [DataTypes.FIELD("a", DataTypes.BIGINT()),
    21. DataTypes.FIELD("b", DataTypes.BIGINT())])
    22. def get_result_type(self):
    23. return DataTypes.ROW(
    24. [DataTypes.FIELD("a", DataTypes.BIGINT()),
    25. DataTypes.FIELD("b", DataTypes.BIGINT())])
    26. function = CountAndSumAggregateFunction()
    27. result_type=function.get_result_type(),
    28. accumulator_type=function.get_accumulator_type(),
    29. name=str(function.__class__.__name__))
    30. env_settings = EnvironmentSettings.in_streaming_mode()
    31. table_env = TableEnvironment.create(env_settings)
    32. t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
    33. result = t.group_by(col('a')) \
    34. .aggregate(agg.alias("c", "d")) \
    35. .select(col('a'), col('c'), col('d'))
    36. result.to_pandas()
    37. # the result is
    38. # a c d
    39. # 0 1 2 5
    40. # 1 2 1 1
    41. # aggregate with a python vectorized aggregate function
    42. env_settings = EnvironmentSettings.in_batch_mode()
    43. table_env = TableEnvironment.create(env_settings)
    44. t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
    45. pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
    46. result_type=DataTypes.ROW(
    47. [DataTypes.FIELD("a", DataTypes.FLOAT()),
    48. DataTypes.FIELD("b", DataTypes.INT())]),
    49. func_type="pandas")
    50. t.aggregate(pandas_udaf.alias("a", "b")) \
    51. .select(col('a'), col('b')).to_pandas()
    52. # the result is
    53. # a b

    Note Similar to map operation, if you specify the aggregate function without the input columns in aggregate operation, it will take Row or Pandas.DataFrame as input which contains all the columns of the input table including the grouping keys. Note You have to close the “aggregate” with a select statement and it should not contain aggregate functions in the select statement. Besides, the output of aggregate will be flattened if it is a composite type.

    Similar to GroupBy Aggregation, FlatAggregate groups the inputs on the grouping keys. Different from AggregateFunction, TableAggregateFunction could return 0, 1, or more records for a grouping key. Similar to , you have to close the flat_aggregate with a select statement and the select statement should not contain aggregate functions.