Task Lifecycle

    The is the base for all different task sub-types in Flink’s streaming engine. This document goes through the different phases in the lifecycle of the StreamTask and describes the main methods representing each of these phases.

    Because the task is the entity that executes a parallel instance of an operator, its lifecycle is tightly integrated with that of an operator. So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the StreamTask itself. The list is presented below in the order that each of the methods is called. Given that an operator can have a user-defined function (UDF), below each of the operator methods we also present (indented) the methods in the lifecycle of the UDF that it calls. These methods are available if your operator extends the AbstractUdfStreamOperator, which is the basic class for all operators that execute UDFs.

    In a nutshell, the setup() is called to initialize some operator-specific machinery, such as its RuntimeContext and its metric collection data-structures. After this, the initializeState() gives an operator its initial state, and the open() method executes any operator-specific initialization, such as opening the user-defined function in the case of the AbstractUdfStreamOperator.

    Now that everything is set, the operator is ready to process incoming data. Incoming elements can be one of the following: input elements, watermark, and checkpoint barriers. Each one of them has a special element for handling it. Elements are processed by the processElement() method, watermarks by the processWatermark(), and checkpoint barriers trigger a checkpoint which invokes (asynchronously) the snapshotState() method, which we describe below. For each incoming element, depending on its type one of the aforementioned methods is called. Note that the processElement() is also the place where the UDF’s logic is invoked, e.g. the map() method of your MapFunction.

    Finally, in the case of a normal, fault-free termination of the operator (e.g. if the stream is finite and its end is reached), the finish() method is called to perform any final bookkeeping action required by the operator’s logic (e.g. flush any buffered data, or emit data to mark end of processing), and the close() is called after that to free any resources held by the operator (e.g. open network connections, io streams, or native memory held by the operator’s data).

    Checkpoints: The method of the operator is called asynchronously to the rest of the methods described above whenever a checkpoint barrier is received. Checkpoints are performed during the processing phase, i.e. after the operator is opened and before it is closed. The responsibility of this method is to store the current state of the operator to the specified state backend from where it will be retrieved when the job resumes execution after a failure. Below we include a brief description of Flink’s checkpointing mechanism, and for a more detailed discussion on the principles around checkpointing in Flink please read the corresponding documentation: .

    Task Lifecycle

    Following that brief introduction on the operator’s main phases, this section describes in more detail how a task calls the respective methods during its execution on a cluster. The sequence of the phases described here is mainly included in the invoke() method of the StreamTask class. The remainder of this document is split into two subsections, one describing the phases during a regular, fault-free execution of a task (see ), and (a shorter) one describing the different sequence followed in case the task is cancelled (see Interrupted Execution), either manually, or due some other reason, e.g. an exception thrown during execution.

    The steps a task goes through when executed until completion without being interrupted are illustrated below:

    1. TASK::setInitialState
    2. create basic utils (config, etc) and load the chain of operators
    3. setup-operators
    4. task-specific-init
    5. initialize-operator-states
    6. open-operators
    7. finish-operators
    8. close-operators
    9. task-specific-cleanup
    10. common-cleanup

    As shown above, after recovering the task configuration and initializing some important runtime parameters, the very first step for the task is to retrieve its initial, task-wide state. This is done in the setInitialState(), and it is particularly important in two cases:

    1. when the task is recovering from a failure and restarts from the last successful checkpoint

    If it is the first time the task is executed, the initial task state is empty.

    After recovering any initial state, the task goes into its invoke() method. There, it first initializes the operators involved in the local computation by calling the setup() method of each one of them and then performs its task-specific initialization by calling the local init() method. By task-specific, we mean that depending on the type of the task (SourceTask, or TwoInputStreamTask, etc), this step may differ, but in any case, here is where the necessary task-wide resources are acquired. As an example, the OneInputStreamTask, which represents a task that expects to have a single input stream, initializes the connection(s) to the location(s) of the different partitions of the input stream that are relevant to the local task.

    Now that all operators in the task have been initialized, the open() method of each individual operator is called by the openAllOperators() method of the StreamTask. This method performs all the operational initialization, such as registering any retrieved timers with the timer service. A single task may be executing multiple operators with one consuming the output of its predecessor. In this case, the open() method is called from the last operator, i.e. the one whose output is also the output of the task itself, to the first. This is done so that when the first operator starts processing the task’s input, all downstream operators are ready to receive its output.

    Now the task can resume execution and operators can start processing fresh input data. This is the place where the task-specific run() method is called. This method will run until either there is no more input data (finite stream), or the task is cancelled (manually or not). Here is where the operator specific processElement() and processWatermark() methods are called.

    In the case of running till completion, i.e. there is no more input data to process, after exiting from the run() method, the task enters its shutdown process. Initially, the timer service stops registering any new timers (e.g. from fired timers that are being executed), clears all not-yet-started timers, and awaits the completion of currently executing timers. Then the finishAllOperators() notifies the operators involved in the computation by calling the finish() method of each operator. Then, any buffered output data is flushed so that they can be processed by the downstream tasks, and finally the task tries to clear all the resources held by the operators by calling the close() method of each one. When opening the different operators, we mentioned that the order is from the last to the first. Closing happens in the opposite manner, from first to last.

    Finally, when all operators have been closed and all their resources freed, the task shuts down its timer service, performs its task-specific cleanup, e.g. cleans all its internal buffers, and then performs its generic task clean up which consists of closing all its output channels and cleaning any output buffers.

    Interrupted Execution

    In the previous sections we described the lifecycle of a task that runs till completion. In case the task is cancelled at any point, then the normal execution is interrupted and the only operations performed from that point on are the timer service shutdown, the task-specific cleanup, the closing of the operators, and the general task cleanup, as described above.