External Resource Framework

What the external resource framework does

In general, the external resource framework does two things:

  • Set the corresponding fields of the resource requests (for requesting resources from the underlying system) with respect to your configuration.

  • Provide operators with the information needed for using the resources.

When deployed on resource management systems (Kubernetes / Yarn), the external resource framework will ensure that the allocated pod/container will contain the desired external resources. Currently, many resource management systems support external resources. For example, Kubernetes supports GPU, FPGA, etc. through its mechanism since v1.10, and Yarn supports GPU and FPGA resources since 2.10 and 3.1. In Standalone mode, the user has to ensure that the external resources are available.

The external resource framework will provide the corresponding information to operators. The external resource information, which contains the basic properties needed for using the resources, is generated by the configured external resource drivers.

Enable the external resource framework for your workload

To enable an external resource with the external resource framework, you need to:

  • Prepare the external resource plugin.

  • Set configurations for the external resource.

  • Get the external resource information from and use it in your operators.

You need to prepare the external resource plugin and put it into the plugins/ folder of your Flink distribution, see . Apache Flink provides a first-party plugin for GPU resources. You can also .

First, you need to add resource names for all the external resource types to the external resource list (with the configuration key ‘external-resources’) with delimiter “;”, e.g. “external-resources: gpu;fpga” for two external resources “gpu” and “fpga”. Only the <resource_name> defined here will go into effect in the external resource framework.

For each external resource, you could configure the below options. The <resource_name> in all the below configuration options corresponds to the name listed in the external resource list:

  • Amount (external.<resource_name>.amount): This is the quantity of the external resource that should be requested from the external system.

  • Config key in Yarn (external-resource.<resource_name>.yarn.config-key): optional. If configured, the external resource framework will add this key to the resource profile of container requests for Yarn. The value will be set to the value of external-resource.<resource_name>.amount.

  • Config key in Kubernetes (external-resource.<resource_name>.kubernetes.config-key): optional. If configured, external resource framework will add resources.limits.<config-key> and resources.requests.<config-key> to the main container spec of TaskManager and set the value to the value of external-resource.<resource_name>.amount.

  • Driver Factory (external-resource.<resource_name>.driver-factory.class): optional. Defines the factory class name for the external resource identified by <resource_name>. If configured, the factory will be used to instantiate drivers in the external resource framework. If not configured, the requested resource will still exist in the TaskManager as long as the relevant options are configured. However, the operator will not get any information of the resource from RuntimeContext in that case.

  • Driver Parameters (external-resource.<resource_name>.param.<param>): optional. The naming pattern of custom config options for the external resource specified by <resource_name>. Only the configurations that follow this pattern will be passed into the driver factory of that external resource.

An example configuration that specifies two external resources:

To use the resources, operators need to get the ExternalResourceInfo set from the RuntimeContext. ExternalResourceInfo wraps the information needed for using the resource, which can be retrieved with getProperty. What properties are available and how to access the resource with the properties depends on the specific plugin.

Operators can get the ExternalResourceInfo set of a specific external resource from RuntimeContext or FunctionContext by getExternalResourceInfos(String resourceName). The resourceName here should have the same value as the name configured in the external resource list. It can be used as follows:

Java

  1. public class ExternalResourceMapFunction extends RichMapFunction<String, String> {
  2. private static final String RESOURCE_NAME = "foo";
  3. @Override
  4. public String map(String value) {
  5. Set<ExternalResourceInfo> externalResourceInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME);
  6. externalResourceInfos.iterator().forEachRemaining(externalResourceInfo ->
  7. addresses.add(externalResourceInfo.getProperty("address").get()));
  8. // map function with addresses.
  9. // ...
  10. }

Each ExternalResourceInfo contains one or more properties with keys representing the different dimensions of the resource. You could get all valid keys by ExternalResourceInfo#getKeys.

Implement a plugin for your custom resource type

To implement a plugin for your custom resource type, you need to:

  • Add your own external resource driver by implementing the org.apache.flink.api.common.externalresource.ExternalResourceDriver interface.

  • Add a driver factory, which instantiates the driver, by implementing the org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory.

  • Add a service entry. Create a file META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory which contains the class name of your driver factory class (see the docs for more details).

For example, to implement a plugin for external resource named “FPGA”, you need to implement FPGADriver and FPGADriverFactory first:

Java

  1. public class FPGADriver implements ExternalResourceDriver {
  2. @Override
  3. public Set<FPGAInfo> retrieveResourceInfo(long amount) {
  4. // return the information set of "FPGA"
  5. }
  6. }
  7. public class FPGADriverFactory implements ExternalResourceDriverFactory {
  8. @Override
  9. public ExternalResourceDriver createExternalResourceDriver(Configuration config) {
  10. return new FPGADriver();
  11. }
  12. }
  13. // Also implement FPGAInfo which contains basic properties of "FPGA" resource.
  14. public class FPGAInfo implements ExternalResourceInfo {
  15. @Override
  16. public Optional<String> getProperty(String key) {
  17. // return the property with the given key.
  18. }
  19. public Collection<String> getKeys() {
  20. // return all property keys.
  21. }
  22. }

Scala

Create a file with name in META-INF/services/ and write the factory class name (e.g. your.domain.FPGADriverFactory) to it.

Then, create a jar which includes FPGADriver, FPGADriverFactory, META-INF/services/ and all the external dependencies. Make a directory in plugins/ of your Flink distribution with an arbitrary name, e.g. “fpga”, and put the jar into this directory. See Flink Plugin for more details.

Existing supported external resource plugins

Currently, Flink supports GPUs as external resources.

We provide a first-party plugin for GPU resources. The plugin leverages a discovery script to discover indexes of GPU devices, which can be accessed from the resource information via the property “index”. We provide a default discovery script that can be used to discover NVIDIA GPUs. You can also provide your custom script.

We provide an example which shows how to use the GPUs to do matrix-vector multiplication in Flink.

To make GPU resources accessible, certain prerequisites are needed depending on your environment:

  • For standalone mode, administrators should ensure the NVIDIA driver is installed and GPU resources are accessible on all the nodes in the cluster.

  • For Yarn deployment, administrators should configure the Yarn cluster to enable . Notice the required Hadoop version is 2.10+ or 3.1+.

  • For Kubernetes deployment, administrators should make sure the NVIDIA GPU device plugin is installed. Notice the required version is 1.10+. At the moment, Kubernetes only supports NVIDIA GPU and AMD GPU. Flink only provides discovery script for NVIDIA GPUs, but you can provide a custom discovery script for AMD GPUs yourself, see .

As mentioned in Enable external resources for your workload, you also need to do two things to enable GPU resources:

  • Get the information of GPU resources, which contains the GPU index as property with key “index”, in operators.

For the GPU plugin, you need to specify the common external resource configurations:

  • external-resources: You need to append your resource name (e.g. gpu) for GPU resources to it.

  • external-resource.<resource_name>.amount: The amount of GPU devices per TaskManager.

  • external-resource.<resource_name>.yarn.config-key: For Yarn, the config key of GPU is yarn.io/gpu. Notice that Yarn only supports NVIDIA GPU at the moment.

  • external-resource.<resource_name>.kubernetes.config-key: For Kubernetes, the config key of GPU is <vendor>.com/gpu. Currently, “nvidia” and “amd” are the two supported vendors. Notice that if you use AMD GPUs, you need to provide a discovery script yourself, see .

  • external-resource.<resource_name>.driver-factory.class: Should be set to org.apache.flink.externalresource.gpu.GPUDriverFactory.

In addition, there are some specific configurations for the GPU plugin:

  • external-resource.<resource_name>.param.discovery-script.path: The path of the discovery script. It can either be an absolute path, or a relative path to FLINK_HOME when defined or current directory otherwise. If not explicitly configured, the default script will be used.

  • external-resource.<resource_name>.param.discovery-script.args: The arguments passed to the discovery script. For the default discovery script, see for the available parameters.

An example configuration for GPU resource:

  1. external-resources: gpu
  2. external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory # Define the driver factory class of gpu resource.
  3. external-resource.gpu.amount: 2 # Define the amount of gpu resource per TaskManager.
  4. external-resource.gpu.param.discovery-script.path: plugins/external-resource-gpu/nvidia-gpu-discovery.sh
  5. external-resource.gpu.param.discovery-script.args: --enable-coordination # Define the custom param "discovery-script.args" which will be passed into the gpu driver.
  6. external-resource.gpu.yarn.config-key: yarn.io/gpu # for Yarn
  7. external-resource.gpu.kubernetes.config-key: nvidia.com/gpu # for Kubernetes

The GPUDriver leverages a discovery script to discover GPU resources and generate the GPU resource information.

Default Script

We provide a default discovery script for NVIDIA GPU, located at plugins/external-resource-gpu/nvidia-gpu-discovery.sh of your Flink distribution. The script gets the indexes of visible GPU resources through the nvidia-smi command. It tries to return the required amount (specified by external-resource.<resource_name>.amount) of GPU indexes in a list, and exit with non-zero if the amount cannot be satisfied.

For standalone mode, multiple TaskManagers might be co-located on the same machine, and each GPU device is visible to all the TaskManagers. The default discovery script supports a coordination mode, in which it leverages a coordination file to synchronize the allocation state of GPU devices and ensure each GPU device can only be used by one TaskManager process. The relevant arguments are:

  • --enable-coordination-mode: Enable the coordination mode. By default the coordination mode is disabled.

  • --coordination-file filePath: The path of the coordination file used to synchronize the allocation state of GPU resources. The default path is /var/tmp/flink-gpu-coordination.

Custom Script

You can also provide a discovery script to address your custom requirements, e.g. discovering AMD GPU. Please make sure the path of your custom script is accessible to Flink and configured (external-resource.<resource_name>.param.discovery-script.path) correctly. The contract of the discovery script:

  • GPUDriver passes the amount (specified by ) as the first argument into the script. The user-defined arguments in external-resource.<resource_name>.param.discovery-script.args would be appended after it.

  • The script should return a list of the available GPU indexes, split by a comma. Whitespace only indexes will be ignored.

  • The script can also suggest that the discovery is not properly performed, by exiting with non-zero. In that case, no gpu information will be provided to operators.