Process Function

    • events (stream elements)
    • state (fault-tolerant, consistent, only on keyed stream)
    • timers (event time and processing time, only on keyed stream)

    The ProcessFunction can be thought of as a FlatMapFunction with access to keyed state and timers. It handles events by being invoked for each event received in the input stream(s).

    For fault-tolerant state, the ProcessFunction gives access to Flink’s keyed state, accessible via the RuntimeContext, similar to the way other stateful functions can access keyed state.

    The timers allow applications to react to changes in processing time and in . Every call to the function processElement(...) gets a Context object which gives access to the element’s event time timestamp, and to the TimerService. The TimerService can be used to register callbacks for future event-/processing-time instants. With event-time timers, the onTimer(...) method is called when the current watermark is advanced up to or beyond the timestamp of the timer, while with processing-time timers, onTimer(...) is called when wall clock time reaches the specified time. During that call, all states are again scoped to the key with which the timer was created, allowing timers to manipulate keyed state.

    To realize low-level operations on two inputs, applications can use CoProcessFunction or KeyedCoProcessFunction. This function is bound to two different inputs and gets individual calls to processElement1(...) and processElement2(...) for records from the two different inputs.

    Implementing a low level join typically follows this pattern:

    • Create a state object for one input (or both)
    • Update the state upon receiving elements from its input
    • Upon receiving elements from the other input, probe the state and produce the joined result

    For example, you might be joining customer data to financial trades, while keeping state for the customer data. If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade.

    In the following example a KeyedProcessFunction maintains counts per key, and emits a key/count pair whenever a minute passes (in event time) without an update for that key:

    • The count, key, and last-modification-timestamp are stored in a ValueState, which is implicitly scoped by key.
    • For each record, the KeyedProcessFunction increments the counter and sets the last-modification timestamp
    • The function also schedules a callback one minute into the future (in event time)
    • Upon each callback, it checks the callback’s event time timestamp against the last-modification time of the stored count and emits the key/count if they match (i.e., no further update occurred during that minute)

    Java

    1. import org.apache.flink.api.common.state.ValueState;
    2. import org.apache.flink.api.common.state.ValueStateDescriptor;
    3. import org.apache.flink.api.java.tuple.Tuple;
    4. import org.apache.flink.api.java.tuple.Tuple2;
    5. import org.apache.flink.configuration.Configuration;
    6. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    7. import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;
    8. import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;
    9. import org.apache.flink.util.Collector;
    10. // the source data stream
    11. DataStream<Tuple2<String, String>> stream = ...;
    12. // apply the process function onto a keyed stream
    13. DataStream<Tuple2<String, Long>> result = stream
    14. .keyBy(value -> value.f0)
    15. .process(new CountWithTimeoutFunction());
    16. /**
    17. * The data type stored in the state
    18. */
    19. public class CountWithTimestamp {
    20. public String key;
    21. public long count;
    22. public long lastModified;
    23. }
    24. /**
    25. * The implementation of the ProcessFunction that maintains the count and timeouts
    26. */
    27. public class CountWithTimeoutFunction
    28. extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {
    29. /** The state that is maintained by this process function */
    30. private ValueState<CountWithTimestamp> state;
    31. @Override
    32. public void open(Configuration parameters) throws Exception {
    33. state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    34. }
    35. @Override
    36. public void processElement(
    37. Tuple2<String, String> value,
    38. Context ctx,
    39. Collector<Tuple2<String, Long>> out) throws Exception {
    40. // retrieve the current count
    41. CountWithTimestamp current = state.value();
    42. if (current == null) {
    43. current = new CountWithTimestamp();
    44. current.key = value.f0;
    45. }
    46. // update the state's count
    47. current.count++;
    48. // set the state's timestamp to the record's assigned event time timestamp
    49. current.lastModified = ctx.timestamp();
    50. // write the state back
    51. state.update(current);
    52. // schedule the next timer 60 seconds from the current event time
    53. ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    54. }
    55. @Override
    56. public void onTimer(
    57. long timestamp,
    58. OnTimerContext ctx,
    59. Collector<Tuple2<String, Long>> out) throws Exception {
    60. // get the state for the key that scheduled the timer
    61. CountWithTimestamp result = state.value();
    62. // check if this is an outdated timer or the latest timer
    63. if (timestamp == result.lastModified + 60000) {
    64. // emit the state on timeout
    65. out.collect(new Tuple2<String, Long>(result.key, result.count));
    66. }
    67. }
    68. }

    Scala

    1. import org.apache.flink.api.common.state.ValueState
    2. import org.apache.flink.api.common.state.ValueStateDescriptor
    3. import org.apache.flink.api.java.tuple.Tuple
    4. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    5. import org.apache.flink.util.Collector
    6. // the source data stream
    7. val stream: DataStream[Tuple2[String, String]] = ...
    8. // apply the process function onto a keyed stream
    9. val result: DataStream[Tuple2[String, Long]] = stream
    10. .keyBy(_._1)
    11. .process(new CountWithTimeoutFunction())
    12. /**
    13. * The data type stored in the state
    14. */
    15. case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
    16. /**
    17. * The implementation of the ProcessFunction that maintains the count and timeouts
    18. */
    19. class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] {
    20. /** The state that is maintained by this process function */
    21. lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
    22. .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
    23. override def processElement(
    24. value: (String, String),
    25. ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context,
    26. out: Collector[(String, Long)]): Unit = {
    27. // initialize or retrieve/update the state
    28. val current: CountWithTimestamp = state.value match {
    29. case null =>
    30. CountWithTimestamp(value._1, 1, ctx.timestamp)
    31. case CountWithTimestamp(key, count, lastModified) =>
    32. CountWithTimestamp(key, count + 1, ctx.timestamp)
    33. }
    34. // write the state back
    35. state.update(current)
    36. // schedule the next timer 60 seconds from the current event time
    37. ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
    38. }
    39. override def onTimer(
    40. timestamp: Long,
    41. ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext,
    42. out: Collector[(String, Long)]): Unit = {
    43. state.value match {
    44. case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
    45. out.collect((key, count))
    46. case _ =>
    47. }
    48. }
    49. }

    Python

    1. import datetime
    2. from pyflink.common import Row, WatermarkStrategy
    3. from pyflink.common.typeinfo import Types
    4. from pyflink.common.watermark_strategy import TimestampAssigner
    5. from pyflink.datastream import StreamExecutionEnvironment
    6. from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
    7. from pyflink.datastream.state import ValueStateDescriptor
    8. from pyflink.table import StreamTableEnvironment
    9. class CountWithTimeoutFunction(KeyedProcessFunction):
    10. def __init__(self):
    11. self.state = None
    12. def open(self, runtime_context: RuntimeContext):
    13. self.state = runtime_context.get_state(ValueStateDescriptor(
    14. "my_state", Types.PICKLED_BYTE_ARRAY()))
    15. # retrieve the current count
    16. current = self.state.value()
    17. if current is None:
    18. current = Row(value.f1, 0, 0)
    19. # update the state's count
    20. # set the state's timestamp to the record's assigned event time timestamp
    21. current[2] = ctx.timestamp()
    22. # write the state back
    23. self.state.update(current)
    24. # schedule the next timer 60 seconds from the current event time
    25. ctx.timer_service().register_event_time_timer(current[2] + 60000)
    26. def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
    27. # get the state for the key that scheduled the timer
    28. result = self.state.value()
    29. # check if this is an outdated timer or the latest timer
    30. if timestamp == result[2] + 60000:
    31. # emit the state on timeout
    32. yield result[0], result[1]
    33. class MyTimestampAssigner(TimestampAssigner):
    34. def __init__(self):
    35. self.epoch = datetime.datetime.utcfromtimestamp(0)
    36. def extract_timestamp(self, value, record_timestamp) -> int:
    37. return int((value[0] - self.epoch).total_seconds() * 1000)
    38. if __name__ == '__main__':
    39. env = StreamExecutionEnvironment.get_execution_environment()
    40. t_env = StreamTableEnvironment.create(stream_execution_environment=env)
    41. t_env.execute_sql("""
    42. CREATE TABLE my_source (
    43. a TIMESTAMP(3),
    44. b VARCHAR,
    45. c VARCHAR
    46. ) WITH (
    47. 'connector' = 'datagen',
    48. 'rows-per-second' = '10'
    49. )
    50. """)
    51. stream = t_env.to_append_stream(
    52. t_env.from_path('my_source'),
    53. Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
    54. watermarked_stream = stream.assign_timestamps_and_watermarks(
    55. WatermarkStrategy.for_monotonous_timestamps()
    56. .with_timestamp_assigner(MyTimestampAssigner()))
    57. # apply the process function onto a keyed stream
    58. result = watermarked_stream.key_by(lambda value: value[1]) \
    59. .process(CountWithTimeoutFunction()) \
    60. .print()
    61. env.execute()

    Java

    1. @Override
    2. public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {
    3. K key = ctx.getCurrentKey();
    4. // ...
    5. }

    Scala

    1. override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {
    2. var key = ctx.getCurrentKey
    3. // ...
    4. }

    Python

    Both types of timers (processing-time and event-time) are internally maintained by the TimerService and enqueued for execution.

    The TimerService deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the onTimer() method will be called just once.

    Flink synchronizes invocations of onTimer() and processElement(). Hence, users do not have to worry about concurrent modification of state.

    Timers are fault tolerant and checkpointed along with the state of the application. In case of a failure recovery or when starting an application from a savepoint, the timers are restored.

    Timer Coalescing

    Since Flink maintains only one timer per key and timestamp, you can reduce the number of timers by reducing the timer resolution to coalesce them.

    For a timer resolution of 1 second (event or processing time), you can round down the target time to full seconds. Timers will fire at most 1 second earlier but not later than requested with millisecond accuracy. As a result, there are at most one timer per key and second.

    Java

    1. long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
    2. ctx.timerService().registerProcessingTimeTimer(coalescedTime);

    Scala

    1. val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
    2. ctx.timerService.registerProcessingTimeTimer(coalescedTime)
    1. coalesced_time = ((ctx.timestamp() + timeout) // 1000) * 1000
    2. ctx.timer_service().register_processing_time_timer(coalesced_time)

    Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce these timers with the next watermark by using the current one:

    Java

    1. long coalescedTime = ctx.timerService().currentWatermark() + 1;
    2. ctx.timerService().registerEventTimeTimer(coalescedTime);

    Scala

    1. val coalescedTime = ctx.timerService.currentWatermark + 1
    2. ctx.timerService.registerEventTimeTimer(coalescedTime)

    Python

    Timers can also be stopped and removed as follows:

    Stopping a processing-time timer:

    Java

    1. long timestampOfTimerToStop = ...;
    2. ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);

    Scala

    1. val timestampOfTimerToStop = ...
    2. ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)

    Python

    1. timestamp_of_timer_to_stop = ...
    2. ctx.timer_service().delete_processing_time_timer(timestamp_of_timer_to_stop)

    Stopping an event-time timer:

    Java

    1. long timestampOfTimerToStop = ...;
    2. ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);

    Scala

    1. val timestampOfTimerToStop = ...

    Python