扩展资源框架

扩展资源框架做了什么

扩展资源(External Resource)框架主要做了以下两件事:

  • 根据你的配置,在 Flink 从底层资源管理系统中申请资源时,设置与扩展资源相关的请求字段

  • 为算子提供使用这些资源所需要的信息

当 Flink 部署在资源管理系统(Kubernetes、Yarn)上时,扩展资源框架将确保分配的 Pod、Container 包含所需的扩展资源。目前,许多资源管理系统都支持扩展资源。 例如,Kubernetes 从 v1.10 开始通过 机制支持 GPU、FPGA 等资源调度,Yarn 从 2.10 和 3.1 开始支持 GPU 和 FPGA 的调度。 在 Standalone 模式下,由用户负责确保扩展资源的可用性。

扩展资源框架向算子提供扩展资源相关信息,这些信息由你配置的扩展资源 Driver 生成,包含了使用扩展资源所需要的基本属性。

启用扩展资源框架

为了启用扩展资源框架来使用扩展资源,你需要:

  • 为该扩展资源准备扩展资源框架的插件

  • 为该扩展资源设置相关的配置

  • 在你的算子中,从 来获取扩展资源的信息并使用这些资源

你需要为使用的扩展资源准备插件,并将其放入 Flink 发行版的 plugins/ 文件夹中, 参看 。 Flink 提供了第一方的 GPU 资源插件。你同样可以为你所使用的扩展资源实现自定义插件。

首先,你需要使用分隔符“;”将所有使用的扩展资源类型的资源名称添加到扩展资源列表(配置键“external-resources”)中,例如,“external-resources: gpu;fpga”定义了两个扩展资源“gpu”和“fpga”。 只有此处定义了扩展资源名称(<resource_name>),相应的资源才会在扩展资源框架中生效。

对于每个扩展资源,有以下配置选项。下面的所有配置选项中的 <resource_name> 对应于扩展资源列表中列出的名称:

  • 数量external.<resource_name>.amount):需要从外部系统请求的扩展资源的数量。

  • Yarn 中的扩展资源配置键external-resource.<resource_name>.yarn.config-key):可选配置。如果配置该项,扩展资源框架将把这个键添加到 Yarn 的容器请求的资源配置中,该键对应的值将被设置为external-resource.<resource_name>.amount

  • Kubernetes 中的扩展资源配置键external-resource.<resource_name>.kubernetes.config-key):可选配置。 如果配置该项,扩展资源框架将添加 resources.limits.<config-key>resources.requests.<config-key> 到 TaskManager 的主容器配置中,对应的值将被设置为 external-resource.<resource_name>.amount

  • Driver 工厂类external-resource.<resource_name>.driver-factory.class):可选配置。定义由 <resource_name> 标识的扩展资源对应的工厂类名。如果配置该项,该工厂类将被用于实例化扩展资源框架中所需要的 drivers。 如果没有配置,扩展资源依然会在其他配置正确时在存在于 TaskManager,只是算子在这种情况下无法从 RuntimeContext 中拿到该资源的信息。

  • Driver 自定义参数external-resource.<resource_name>.param.<param>):可选配置。由 <resource_name> 标识的扩展资源的自定义配置选项的命名模式。只有遵循此模式的配置才会传递到该扩展资源的工厂类。

示例配置,该配置定义两个扩展资源:

为了使用扩展资源,算子需要从 RuntimeContext 获取 ExternalResourceInfo 集合。 ExternalResourceInfo 包含了使用扩展资源所需的信息,可以使用 getProperty 检索这些信息。 其中具体包含哪些属性以及如何使用这些属性取决于特定的扩展资源插件。

算子可以通过 getExternalResourceInfos(String resourceName)RuntimeContextFunctionContext 中获取特定扩展资源的 ExternalResourceInfo。 此处的 resourceName 应与在扩展资源列表中定义的名称相同。具体用法如下:

Java

  1. public class ExternalResourceMapFunction extends RichMapFunction<String, String> {
  2. private static final String RESOURCE_NAME = "foo";
  3. @Override
  4. public String map(String value) {
  5. Set<ExternalResourceInfo> externalResourceInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME);
  6. externalResourceInfos.iterator().forEachRemaining(externalResourceInfo ->
  7. addresses.add(externalResourceInfo.getProperty("address").get()));
  8. // map function with addresses.
  9. // ...
  10. }
  11. }

中包含一个或多个键-值对,其键值表示资源的不同维度。你可以通过 ExternalResourceInfo#getKeys 获取所有的键。

为你所使用的扩展资源实现自定义插件

要为你所使用的扩展资源实现自定义插件,你需要:

  • 添加你自定义的扩展资源 Driver ,该 Driver 需要实现 org.apache.flink.api.common.externalresource.ExternalResourceDriver 接口。

  • 添加用来实例化 Driver 的工厂类,该工厂类需要实现 org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory 接口。

  • 添加服务入口。创建 META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory 文件,其中包含了 Driver 对应工厂类的类名(更多细节请参看 )。

例如,要为名为“FPGA”的扩展资源实现插件,你首先需要实现 FPGADriverFPGADriverFactory

Java

  1. public class FPGADriver implements ExternalResourceDriver {
  2. @Override
  3. public Set<FPGAInfo> retrieveResourceInfo(long amount) {
  4. // return the information set of "FPGA"
  5. }
  6. }
  7. public class FPGADriverFactory implements ExternalResourceDriverFactory {
  8. @Override
  9. public ExternalResourceDriver createExternalResourceDriver(Configuration config) {
  10. return new FPGADriver();
  11. }
  12. }
  13. // Also implement FPGAInfo which contains basic properties of "FPGA" resource.
  14. public class FPGAInfo implements ExternalResourceInfo {
  15. @Override
  16. public Optional<String> getProperty(String key) {
  17. // return the property with the given key.
  18. }
  19. public Collection<String> getKeys() {
  20. // return all property keys.
  21. }
  22. }

Scala

META-INF/services/ 中创建名为 org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory 的文件,向其中写入工厂类名,如 。

之后,将 FPGADriverFPGADriverFactoryMETA-INF/services/ 和所有外部依赖打入 jar 包。在你的 Flink 发行版的 plugins/ 文件夹中创建一个名为“fpga”的文件夹,将打好的 jar 包放入其中。 更多细节请查看 Flink Plugin

已支持的扩展资源插件

目前,Flink提供 GPU 资源插件。

我们为 GPU 提供了第一方插件。该插件利用一个脚本来发现 GPU 设备的索引,该索引可通过“index”从 ExternalResourceInfo 中获取。我们提供了一个默认脚本,可以用来发现 NVIDIA GPU。您还可以提供自定义脚本。

我们提供了 一个示例程序 ,展示了如何在 Flink 中使用 GPU 资源来做矩阵-向量乘法。

要使 GPU 资源可访问,根据您的环境,需要满足以下先决条件:

  • 对于 Standalone 模式,集群管理员应确保已安装 NVIDIA 驱动程序,并且集群中所有节点上的 GPU 资源都是可访问的。

  • 对于 Yarn 上部署,管理员需要配置 Yarn 集群使其。 请注意,所需的 Hadoop 版本是 2.10+ 和 3.1+。

  • 对于 Kubernetes 上部署,管理员需要保证 NVIDIA GPU 的 Device Plugin 已在集群上安装。 请注意,所需的 Kubernetes 版本是 1.10+。目前,Kubernetes只支持 NVIDIA GPU 和 AMD GPU。Flink 只提供了 NVIDIA GPU 的脚本,但你可以提供支持 AMD GPU 的自定义脚本,参看 。

启用扩展资源框架中所述,要使用 GPU 资源,还需要执行两项操作:

  • 在算子中获取 GPU 资源的信息,其中包含键为“index”的 GPU 索引。

对于 GPU 插件,你需要指定的扩展资源框架配置:

  • external-resources:你需要将 GPU 的扩展资源名称(例如“gpu”)加到该列表中。

  • external-resource.<resource_name>.amount:每个 TaskManager 中的 GPU 数量。

  • external-resource.<resource_name>.yarn.config-key:对于 Yarn,GPU 的配置键是 yarn.io/gpu。请注意,Yarn 目前只支持 NVIDIA GPU。

  • external-resource.<resource_name>.kubernetes.config-key:对于 Kubernetes,GPU 的配置键是 <vendor>.com/gpu。 目前,“nvidia”和“amd”是两个支持的 GPU 品牌。请注意,如果你使用 AMD GPU,你需要提供一个自定义的。

  • external-resource.<resource_name>.driver-factory.class:需要设置为 org.apache.flink.externalresource.gpu.GPUDriverFactory。

此外,GPU 插件还有一些专有配置:

  • external-resource.<resource_name>.param.discovery-script.path发现脚本的文件路径。 它既可以是绝对路径,也可以是相对路径,如果定义了“FLINK_HOME”,该路径将相对于“FLINK_HOME”,否则相对于当前目录。如果没有显式配置该项,GPU 插件将使用默认脚本。

  • external-resource.<resource_name>.param.discovery-script.args:传递给发现脚本的参数。对于默认的发现脚本,请参见以获取可用参数。

GPU 插件示例配置:

  1. external-resources: gpu
  2. external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory # 定义 GPU 资源的工厂类。
  3. external-resource.gpu.amount: 2 # 定义每个 TaskManager 的 GPU 数量。
  4. external-resource.gpu.param.discovery-script.path: plugins/external-resource-gpu/nvidia-gpu-discovery.sh
  5. external-resource.gpu.param.discovery-script.args: --enable-coordination # 自定义参数,将被传递到 GPU 的 Driver 中。
  6. external-resource.gpu.yarn.config-key: yarn.io/gpu # for Yarn
  7. external-resource.gpu.kubernetes.config-key: nvidia.com/gpu # for Kubernetes

GPUDriver 利用发现脚本来发现 GPU 资源并生成 GPU 资源信息。

默认脚本

我们为 NVIDIA GPU 提供了一个默认脚本,位于 Flink 发行版的 plugins/external-resource-gpu/nvidia-gpu-discovery.sh。 该脚本通过 nvidia-smi 工具获取当前可见 GPU 的索引。它尝试返回一个 GPU 索引列表,其大小由 external-resource.<resource_name>.amount 指定,如果 GPU 数量不足,则以非零退出。

在 Standalone 模式中,多个 TaskManager 可能位于同一台机器上,并且每个 GPU 设备对所有 TaskManager 都是可见的。 默认脚本提供 GPU 协调模式,在这种模式下,脚本利用文件来同步 GPU 的分配情况,并确保每个GPU设备只能由一个TaskManager进程使用。相关参数为:

  • --enable-coordination-mode:启用 GPU 协调模式。默认情况下不启用。

  • --coordination-file filePath:用于同步 GPU 资源分配状态的文件路径。默认路径为 /var/tmp/flink-gpu-coordination

自定义脚本

你可以提供一个自定义的发现脚本来满足你的特殊需求,例如使用 AMD GPU。请确保自定义脚本的的路径正确配置(external-resource.<resource_name>.param.discovery-script.path)并且 Flink 可以访问。自定义的发现脚本需要:

  • GPUDriver 将 GPU 数量(由 external-resource.<resource_name>.amount 定义)作为第一个参数传递到脚本中。 external-resource.<resource_name>.param.discovery-script.args 中自定义的参数会被附加在后面。

  • 脚本需返回可用 GPU 索引的列表,用逗号分隔。空白的索引将被忽略。

  • 脚本可以通过以非零退出来表示其未正确执行。在这种情况下,算子将不会得到 GPU 资源相关信息。