Windows

    Currently, the widow operation is only supported in keyed streams

    Keyed Windows

    In the above, the commands in square brackets ([…]) are optional. This reveals that Flink allows you to customize your windowing logic in many different ways so that it best fits your needs.

    In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified (see Allowed Lateness). Flink guarantees removal only for time-based windows and not for other types, e.g. global windows (see ). For example, with an event-time-based windowing strategy that creates non-overlapping (or tumbling) windows every 5 minutes and has an allowed lateness of 1 min, Flink will create a new window for the interval between 12:00 and 12:05 when the first element with a timestamp that falls into this interval arrives, and it will remove it when the watermark passes the 12:06 timestamp.

    In addition, each window will have a Trigger (see Triggers) and a function (WindowFunction or ProcessWindowFunction) (see ) attached to it. The function will contain the computation to be applied to the contents of the window, while the Trigger specifies the conditions under which the window is considered ready for the function to be applied. A triggering policy might be something like “when the number of elements in the window is more than 4”, or “when the watermark passes the end of the window”. A trigger can also decide to purge a window’s contents any time between its creation and removal. Purging in this case only refers to the elements in the window, and not the window metadata. This means that new data can still be added to that window.

    In the following we go into more detail for each of the components above. We start with the required parts in the above snippet (see Keyed Windows, , and Window Function) before moving to the optional ones.

    Keyed Windows

    The first thing to specify is whether your stream should be keyed or not. This has to be done before defining the window. Using the key_by(...) will split your infinite stream into logical keyed streams. If key_by(...) is not called, your stream is not keyed.

    In the case of keyed streams, any attribute of your incoming events can be used as a key (more details here). Having a keyed stream will allow your windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed independently from the rest. All elements referring to the same key will be sent to the same parallel task.

    After specifying your stream is keyed, the next step is to define a window assigner. The window assigner defines how elements are assigned to windows. This is done by specifying the WindowAssigner of your choice in the window(...) (for keyed streams) call.

    A WindowAssigner is responsible for assigning each incoming element to one or more windows. You can implement a custom window assigner by extending the WindowAssigner class.

    In the following, we show how to custom a tumbling windows assigner. For details of Tumbling Windows, you can refer to the .

    1. from typing import Tuple, Collection
    2. from pyflink.common.serializer import TypeSerializer
    3. from pyflink.datastream import WindowAssigner, Trigger
    4. from pyflink.datastream.window import TimeWindow, TimeWindowSerializer
    5. class TumblingEventWindowAssigner(WindowAssigner[Tuple, TimeWindow]):
    6. def __init__(self, size: int, offset: int, is_event_time: bool):
    7. self._size = size
    8. self._offset = offset
    9. self._is_event_time = is_event_time
    10. def assign_windows(self,
    11. element: Tuple,
    12. timestamp: int,
    13. context: WindowAssigner.WindowAssignerContext) -> Collection[TimeWindow]:
    14. start = TimeWindow.get_window_start_with_offset(timestamp, self._offset, self._size)
    15. return [TimeWindow(start, start + self._size)]
    16. def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]:
    17. return EventTimeTrigger()
    18. def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
    19. return TimeWindowSerializer()
    20. def is_event_time(self) -> bool:
    21. return False

    Window Functions

    After defining the window assigner, we need to specify the computation that we want to perform on each of these windows. This is the responsibility of the window function, which is used to process the elements of each keyed window once the system determines that a window is ready for processing (see for how Flink determines when a window is ready).

    The window function can be ProcessWindowFunction or WindowFunction. They get an Iterable for all the elements contained in a window and additional meta information about the window to which the elements belong.

    In some places where a ProcessWindowFunction can be used you can also use a WindowFunction. This is an older version of ProcessWindowFunction that provides less contextual information and does not have some advances features, such as per-window keyed state. We will look at examples for each of these variants.

    A ProcessWindowFunction gets an Iterable containing all the elements of the window, and a Context object with access to time and state information, which enables it to provide more flexibility than other window functions. This comes at the cost of performance and resource consumption, because elements cannot be incrementally aggregated but instead need to be buffered internally until the window is considered ready for processing.

    The signature of ProcessWindowFunction looks as follows:

    1. class ProcessWindowFunction(Function, Generic[IN, OUT, KEY, W]):
    2. """
    3. Base interface for functions that are evaluated over keyed (grouped) windows using a context
    4. for retrieving extra information.
    5. """
    6. class Context(ABC, Generic[W2]):
    7. """
    8. The context holding window metadata.
    9. """
    10. @abstractmethod
    11. def window(self) -> W2:
    12. """
    13. :return: The window that is being evaluated.
    14. """
    15. pass
    16. @abstractmethod
    17. def current_processing_time(self) -> int:
    18. """
    19. :return: The current processing time.
    20. """
    21. pass
    22. """
    23. :return: The current event-time watermark.
    24. """
    25. pass
    26. @abstractmethod
    27. def window_state(self) -> KeyedStateStore:
    28. """
    29. State accessor for per-key and per-window state.
    30. .. note::
    31. If you use per-window state you have to ensure that you clean it up by implementing
    32. :func:`~ProcessWindowFunction.clear`.
    33. :return: The :class:`KeyedStateStore` used to access per-key and per-window states.
    34. """
    35. pass
    36. @abstractmethod
    37. def global_state(self) -> KeyedStateStore:
    38. """
    39. State accessor for per-key global state.
    40. """
    41. pass
    42. @abstractmethod
    43. def process(self,
    44. key: KEY,
    45. content: 'ProcessWindowFunction.Context',
    46. elements: Iterable[IN]) -> Iterable[OUT]:
    47. """
    48. Evaluates the window and outputs none or several elements.
    49. :param key: The key for which this window is evaluated.
    50. :param content: The context in which the window is being evaluated.
    51. :param elements: The elements in the window being evaluated.
    52. :return: The iterable object which produces the elements to emit.
    53. """
    54. pass
    55. @abstractmethod
    56. def clear(self, context: 'ProcessWindowFunction.Context') -> None:
    57. """
    58. Deletes any state in the :class:`Context` when the Window expires (the watermark passes its
    59. max_timestamp + allowed_lateness).
    60. :param context: The context to which the window is being evaluated.
    61. """
    62. pass

    The key parameter is the key that is extracted via the KeySelector that was specified for the key_by() invocation. In case of tuple-index keys or string-field references this key type is always Tuple and you have to manually cast it to a tuple of the correct size to extract the key fields.

    A ProcessWindowFunction can be defined and used like this:

    In some places where a ProcessWindowFunction can be used you can also use a WindowFunction. This is an older version of ProcessWindowFunction that provides less contextual information and does not have some advances features, such as per-window keyed state. This interface will be deprecated at some point.

    The signature of a WindowFunction looks as follows:

    1. class WindowFunction(Function, Generic[IN, OUT, KEY, W]):
    2. """
    3. Base interface for functions that are evaluated over keyed (grouped) windows.
    4. """
    5. @abstractmethod
    6. def apply(self, key: KEY, window: W, inputs: Iterable[IN]) -> Iterable[OUT]:
    7. """
    8. Evaluates the window and outputs none or several elements.
    9. :param key: The key for which this window is evaluated.
    10. :param window: The window that is being evaluated.
    11. :param inputs: The elements in the window being evaluated.
    12. """
    1. from pyflink.common.typeinfo import Types
    2. from pyflink.datastream.window import TimeWindow
    3. class SumWindowFunction(WindowFunction[Tuple, Tuple, str, TimeWindow]):
    4. def apply(self, key: str, window: TimeWindow, inputs: Iterable[Tuple]):
    5. result = 0
    6. for i in inputs:
    7. result += i[0]
    8. return [(key, result)]
    9. data_stream = env.from_collection([
    10. (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
    11. type_info=Types.TUPLE([Types.INT(), Types.STRING()])) # type: DataStream
    12. data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
    13. .window(TumblingEventWindowAssigner()) \
    14. .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()]))

    A Trigger determines when a window (as formed by the window assigner) is ready to be processed by the window function. Each WindowAssigner comes with a default Trigger. You can specify a custom trigger using trigger(…).

    The signature of ProcessWindowFunction looks as follows:

    Two things to notice about the above methods are:

    1. The first three(on_element, on_processing_time and on_event_time) decide how to act on their invocation event by returning a TriggerResult. The action can be one of the following:
    • CONTINUE: do nothing,
    • FIRE: trigger the computation,
    • PURGE: clear the elements in the window, and
    • FIRE_AND_PURGE: trigger the computation and clear the elements in the window afterwards.
    1. Any of these methods can be used to register processing- or event-time timers for future actions.

    Once a trigger determines that a window is ready for processing, it fires, i.e., it returns FIRE or FIRE_AND_PURGE. This is the signal for the window operator to emit the result of the current window. Given a window with a ProcessWindowFunction, all elements are passed to the ProcessWindowFunction.

    When a trigger fires, it can either FIRE or FIRE_AND_PURGE. While FIRE keeps the contents of the window, FIRE_AND_PURGE removes its content. By default, the pre-implemented triggers simply FIRE without purging the window state.

    You can implement a custom EventTimeTrigger as follows:

    1. from typing import Tuple
    2. from pyflink.datastream.window import TimeWindow
    3. class EventTimeTrigger(Trigger[Tuple, TimeWindow]):
    4. def on_element(self,
    5. element: Tuple,
    6. timestamp: int,
    7. window: TimeWindow,
    8. ctx: 'Trigger.TriggerContext') -> TriggerResult:
    9. return TriggerResult.CONTINUE
    10. def on_processing_time(self,
    11. time: int,
    12. window: TimeWindow,
    13. ctx: 'Trigger.TriggerContext') -> TriggerResult:
    14. return TriggerResult.CONTINUE
    15. def on_event_time(self,
    16. time: int,
    17. window: TimeWindow,
    18. ctx: 'Trigger.TriggerContext') -> TriggerResult:
    19. if time >= window.max_timestamp():
    20. return TriggerResult.FIRE_AND_PURGE
    21. else:
    22. return TriggerResult.CONTINUE
    23. def on_merge(self,
    24. window: TimeWindow,
    25. ctx: 'Trigger.OnMergeContext') -> None:
    26. pass
    27. def clear(self,
    28. window: TimeWindow,
    29. ctx: 'Trigger.TriggerContext') -> None:
    30. pass

    Allowed Lateness

    When working with event-time windowing, it can happen that elements arrive late, i.e. the watermark that Flink uses to keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See and especially late elements for a more thorough discussion of how Flink deals with event time.

    By default, late elements are dropped when the watermark is past the end of the window. However, Flink allows to specify a maximum allowed lateness for window operators. Allowed lateness specifies by how much time elements can be late before they are dropped, and its default value is 0. Elements that arrive after the watermark has passed the end of the window but before it passes the end of the window plus the allowed lateness, are still added to the window. Depending on the trigger used, a late but not dropped element may cause the window to fire again.

    In order to make this work, Flink keeps the state of windows until their allowed lateness expires. Once this happens, Flink removes the window and deletes its state, as also described in the section.

    By default, the allowed lateness is set to 0. That is, elements that arrive behind the watermark will be dropped.

    1. data_stream.key_by(<key selector>) \
    2. .window(<window assigner>) \
    3. .<windowed transformation>(<window function>)