Task 生命周期

    是 Flink 流式计算引擎中所有不同 task 子类的基础。本文会深入讲解 StreamTask 生命周期的不同阶段,并阐述每个阶段的主要方法。

    因为 task 是算子并行实例的执行实体,所以它的生命周期跟算子的生命周期紧密联系在一起。因此,在深入介绍 StreamTask 生命周期之前,先简要介绍一下代表算子生命周期的基本方法。这些方法按调用的先后顺序如下所示。考虑到算子可能是用户自定义函数(UDF),因此我们在每个算子下也展示(以缩进的方式)了 UDF 生命周期中调用的各个方法。AbstractUdfStreamOperator 是所有执行 UDF 的算子的基类,如果算子继承了 AbstractUdfStreamOperator,那么这些方法都是可用的。

    简而言之,在算子初始化时调用 setup() 来初始化算子的特定设置,比如 RuntimeContext 和指标收集的数据结构。在这之后,算子通过 initializeState() 初始化状态,算子的所有初始化工作在 open() 方法中执行,比如在继承 AbstractUdfStreamOperator 的情况下,初始化用户自定义函数。

    当所有初始化都完成之后,算子开始处理流入的数据。流入的数据可以分为三种类型:用户数据、watermark 和 checkpoint barriers。每种类型的数据都有单独的方法来处理。用户数据通过 processElement() 方法来处理,watermark 通过 processWatermark() 来处理,checkpoint barriers 会调用(异步)snapshotState() 方法触发 checkpoint。对于每个流入的数据,根据其类型调用上述相应的方法。注意,processElement() 方法也是用户自定义函数逻辑执行的地方,比如用户自定义 MapFunction 里的 map() 方法。

    最后,在算子正常无故障的情况下(比如,如果流式数据是有限的,并且最后一个数据已经到达),会调用 finish() 方法结束算子并进行必要的清理工作(比如刷新所有缓冲数据,或发送处理结束的标记数据)。在这之后会调用 close() 方法来释放算子持有的资源(比如算子数据持有的本地内存)。

    Checkpoints: 算子的 snapshotState() 方法是在收到 checkpoint barrier 后异步调用的。Checkpoint 在处理阶段执行,即算子打开之后,结束之前的这个阶段。这个方法的职责是存储算子的当前状态到一个特定的状态后端,当作业失败后恢复执行时会从这个后端恢复状态数据。下面我们简要描述了 Flink 的 checkpoint 机制,如果想了解更多 Flink checkpoint 相关的原理,可以读一读 。

    Task 生命周期

    在上文对算子主要阶段的简介之后,本节将详细介绍 task 在集群执行期间是如何调用相关方法的。这里所说的阶段主要包含在 StreamTask 类的 invoke() 方法里。本文档后续内容将分成两个子章节,一节描述了 task 在正常无故障情况下的执行阶段(请参考),另一节(稍微简短的部分)描述了 task 取消之后的执行阶段(请参考中断执行),不管是手动取消还是其他原因(比如执行期间遇到异常)导致的取消。

    Task 在没有中断的情况下执行到结束的阶段如下所示:

    1. TASK::setInitialState
    2. create basic utils (config, etc) and load the chain of operators
    3. setup-operators
    4. task-specific-init
    5. initialize-operator-states
    6. open-operators
    7. finish-operators
    8. wait for the final checkponit completed (if enabled)
    9. close-operators
    10. task-specific-cleanup
    11. common-cleanup

    如上所示,在恢复 task 配置和初始化一些重要的运行时参数之后,task 的下一步是读取 task 级别的初始状态。这一步在 setInitialState() 方法里完成,在下面两种情况尤其重要:

    1. 当 Task 从 恢复的时候。

    如果 task 是第一次执行的话,它的初始状态为空。

    在恢复初始状态之后,task 进入到 invoke() 方法。在这里,首先调用 setup() 方法来初始化本地计算涉及到的每个算子,然后调用本地的 方法来做特定 task 的初始化。这里所说的特定 task,取决于 task 的类型 (SourceTaskOneInputStreamTaskTwoInputStreamTask 等)。这一步可能会有所不同,但无论如何这是获取 task 范围内所需资源的地方。例如,OneInputStreamTask,代表期望一个单一输入流的 task,初始化与本地任务相关输入流的不同分区位置的连接。

    现在 task 里的所有算子都已经被初始化了,每个算子里的 open()方法也通过 StreamTaskopenAllOperators() 方法调用了。这个方法执行所有操作的初始化,比如在定时器服务里注册获取到的定时器。一个 task 可能会执行多个算子,即一个算子消费它之前算子的输出数据流。在这种情况下,open() 方法从最后一个算子调用到第一个算子,即最后一个算子的输出刚好也是整个 task 的输出。这样做是为了当第一个算子开始处理 task 的输入数据流时,所有下游算子已经准备接收它的输出数据了。

    现在 task 可以恢复执行,算子可以开始处理新输入的数据。在这里,特定 task 的 run() 方法会被调用。这个方法会一直运行直到没有更多输入数据进来(有限的数据流)或者 task 被取消了(人为的或其他的原因)。这里也是算子定义的 processElement() 方法和 processWatermark() 方法执行的地方。

    在运行到完成的情况下,即没有更多的输入数据要处理,从run()方法退出后,task 进入关闭阶段。首先定时器服务停止注册任何新的定时器(比如从正在执行的定时器里注册),清理掉所有还未启动的定时器,并等待当前执行中的定时器运行结束。然后通过调用 finishAllOperators() 方法调用每个算子的 finish() 方法来通知所有参与计算的算子。然后所有缓存的输出数据会刷出去以便下游 task 处理。 如果开启了部分任务结束后继续 checkpoint 的功能,任务将 等待下一个 checkpoint 结束 来保证使用两阶段提交的算子可能最终提交所有的记录。 最终 task 通过调用每个算子的 close() 方法来尝试清理掉算子持有的所有资源。与我们之前提到的开启算子不同是,开启时从后往前依次调用 open();而关闭时刚好相反,从前往后依次调用 close()

    最后,当所有算子都已经关闭,所有资源都已被释放时,task 关掉它的定时器服务,进行特定 task 的清理操作,例如清理掉所有内部缓存,然后进行常规的 task 清理操作,包括关闭所有的输出管道,清理所有输出缓存等。

    中断执行

    在前面的章节,我们描述的是运行直到完成的 task 生命周期。在任意时间点取消 task 的话,正常的执行过程会被中断,从这个时候开始只会进行以下操作,关闭定时器服务、执行特定 task 的清理、执行所有算子的关闭,执行常规 task 的清理。