Event-driven Applications

    If you’ve done the hands-on exercise in the , you will recall that it uses a TumblingEventTimeWindow to compute the sum of the tips for each driver during each hour, like this:

    It is reasonably straightforward, and educational, to do the same thing with a KeyedProcessFunction. Let us begin by replacing the code above with this:

    1. // compute the sum of the tips per hour for each driver
    2. DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
    3. .keyBy((TaxiFare fare) -> fare.driverId)
    4. .process(new PseudoWindow(Time.hours(1)));

    In this code snippet a KeyedProcessFunction called PseudoWindow is being applied to a keyed stream, the result of which is a DataStream<Tuple3<Long, Long, Float>> (the same kind of stream produced by the implementation that uses Flink’s built-in time windows).

    The overall outline of PseudoWindow has this shape:

    1. // Compute the sum of the tips for each driver in hour-long windows.
    2. // The keys are driverIds.
    3. public static class PseudoWindow extends
    4. KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
    5. private final long durationMsec;
    6. public PseudoWindow(Time duration) {
    7. this.durationMsec = duration.toMilliseconds();
    8. }
    9. @Override
    10. // Called once during initialization.
    11. public void open(Configuration conf) {
    12. . . .
    13. }
    14. @Override
    15. // Called as each fare arrives to be processed.
    16. public void processElement(
    17. TaxiFare fare,
    18. Collector<Tuple3<Long, Long, Float>> out) throws Exception {
    19. . . .
    20. }
    21. @Override
    22. // Called when the current watermark indicates that a window is now complete.
    23. public void onTimer(long timestamp,
    24. OnTimerContext context,
    25. Collector<Tuple3<Long, Long, Float>> out) throws Exception {
    26. }
    27. }

    Things to be aware of:

    • There are several types of ProcessFunctions – this is a KeyedProcessFunction, but there are also CoProcessFunctions, BroadcastProcessFunctions, etc.

    • A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open and getRuntimeContext methods needed for working with managed keyed state.

    • There are two callbacks to implement: processElement and onTimer. processElement is called with each incoming event; onTimer is called when timers fire. These can be either event time or processing time timers. Both processElement and onTimer are provided with a context object that can be used to interact with a TimerService (among other things). Both callbacks are also passed a Collector that can be used to emit results.

    The open() method

    The processElement() method

    1. public void processElement(
    2. TaxiFare fare,
    3. Context ctx,
    4. Collector<Tuple3<Long, Long, Float>> out) throws Exception {
    5. long eventTime = fare.getEventTime();
    6. TimerService timerService = ctx.timerService();
    7. if (eventTime <= timerService.currentWatermark()) {
    8. // This event is late; its window has already been triggered.
    9. } else {
    10. // Round up eventTime to the end of the window containing this event.
    11. long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
    12. // Schedule a callback for when the window has been completed.
    13. timerService.registerEventTimeTimer(endOfWindow);
    14. // Add this fare's tip to the running total for that window.
    15. Float sum = sumOfTips.get(endOfWindow);
    16. if (sum == null) {
    17. sum = 0.0F;
    18. sum += fare.tip;
    19. sumOfTips.put(endOfWindow, sum);
    20. }
    21. }

    Things to consider:

    • What happens with late events? Events that are behind the watermark (i.e., late) are being dropped. If you want to do something better than this, consider using a side output, which is explained in the next section.

    • This example uses a where the keys are timestamps, and sets a Timer for that same timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information when the timer fires.

    The onTimer() method

    1. public void onTimer(
    2. long timestamp,
    3. OnTimerContext context,
    4. Collector<Tuple3<Long, Long, Float>> out) throws Exception {
    5. long driverId = context.getCurrentKey();
    6. // Look up the result for the hour that just ended.
    7. Float sumOfTips = this.sumOfTips.get(timestamp);
    8. Tuple3<Long, Long, Float> result = Tuple3.of(driverId, timestamp, sumOfTips);
    9. out.collect(result);
    10. this.sumOfTips.remove(timestamp);
    11. }

    Observations:

    • The OnTimerContext context passed in to onTimer can be used to determine the current key.

    • Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at which point onTimer is called. This onTimer method removes the related entry from sumOfTips, which has the effect of making it impossible to accommodate late events. This is the equivalent of setting the allowedLateness to zero when working with Flink’s time windows.

    Flink provides MapState and ListState types that are optimized for RocksDB. Where possible, these should be used instead of a ValueState object holding some sort of collection. The RocksDB state backend can append to ListState without going through (de)serialization, and for MapState, each key/value pair is a separate RocksDB object, so MapState can be efficiently accessed and updated.

    There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting:

    • exceptions
    • malformed events
    • late events
    • operational alerts, such as timed-out connections to external services

    You are now in a position to do something with the late events that were ignored in the previous section.

    A side output channel is associated with an OutputTag<T>. These tags have generic types that correspond to the type of the side output’s DataStream, and they have names.

    Shown above is a static OutputTag<TaxiFare> that can be referenced both when emitting late events in the processElement method of the PseudoWindow:

    1. if (eventTime <= timerService.currentWatermark()) {
    2. // This event is late; its window has already been triggered.
    3. ctx.output(lateFares, fare);
    4. } else {
    5. . . .
    6. }

    and when accessing the stream from this side output in the main method of the job:

    1. // compute the sum of the tips per hour for each driver
    2. SingleOutputStreamOperator hourlyTips = fares
    3. .keyBy((TaxiFare fare) -> fare.driverId)
    4. .process(new PseudoWindow(Time.hours(1)));
    5. hourlyTips.getSideOutput(lateFares).print();

    Alternatively, you can use two OutputTags with the same name to refer to the same side output, but if you do, they must have the same type.

    In this example you have seen how a ProcessFunction can be used to reimplement a straightforward time window. Of course, if Flink’s built-in windowing API meets your needs, by all means, go ahead and use it. But if you find yourself considering doing something contorted with Flink’s windows, don’t be afraid to roll your own.

    Also, ProcessFunctions are useful for many other use cases beyond computing analytics. The hands-on exercise below provides an example of something completely different.

    Another common use case for ProcessFunctions is for expiring stale state. If you think back to the Rides and Fares Exercise , where a RichCoFlatMapFunction is used to compute a simple join, the sample solution assumes that the TaxiRides and TaxiFares are perfectly matched, one-to-one for each rideId. If an event is lost, the other event for the same will be held in state forever. This could instead be implemented as a KeyedCoProcessFunction, and a timer could be used to detect and clear any stale state.