使用索引作业完成静态工作分配下的并行处理
在此示例中,你将运行一个使用多个并行工作进程的 Kubernetes Job。 每个 worker 都是在自己的 Pod 中运行的不同容器。 Pod 具有控制平面自动设置的索引编号(index number), 这些编号使得每个 Pod 能识别出要处理整个任务的哪个部分。
Pod 索引在注解 batch.kubernetes.io/job-completion-index
中呈现,具体表示为一个十进制值字符串。 为了让容器化的任务进程获得此索引,你可以使用 机制发布注解的值。为方便起见, 控制平面自动设置 Downward API 以在 JOB_COMPLETION_INDEX
环境变量中公开索引。
以下是此示例中步骤的概述:
- 定义使用带索引完成信息的 Job 清单。 Downward API 使你可以将 Pod 索引注解作为环境变量或文件传递给容器。
- 根据该清单启动一个带索引(
Indexed
)的 Job。
你应该已经熟悉 的基本的、非并行的用法。
你必须拥有一个 Kubernetes 的集群,同时你的 Kubernetes 集群必须带有 kubectl 命令行工具。 建议在至少有两个节点的集群上运行本教程,且这些节点不作为控制平面主机。 如果你还没有集群,你可以通过 Minikube 构建一个你自己的集群,或者你可以使用下面任意一个 Kubernetes 工具构建:
你的 Kubernetes 服务器版本必须不低于版本 v1.21. 要获知版本信息,请输入 kubectl version
.
要从工作程序访问工作项,你有几个选项:
- 读取
JOB_COMPLETION_INDEX
环境变量。Job 控制器 自动将此变量链接到包含完成索引的注解。 - 读取包含完整索引的文件。
- 假设你无法修改程序,你可以使用脚本包装它, 该脚本使用上述任意方法读取索引并将其转换为程序可以用作输入的内容。
你将使用 容器镜像中的 rev
工具。
由于这只是一个例子,每个 Pod 只做一小部分工作(反转一个短字符串)。 例如,在实际工作负载中,你可能会创建一个表示基于场景数据制作 60 秒视频任务的 Job 。 此视频渲染 Job 中的每个工作项都将渲染该视频剪辑的特定帧。 索引完成意味着 Job 中的每个 Pod 都知道通过从剪辑开始计算帧数,来确定渲染和发布哪一帧。
这是一个使用 Indexed
完成模式的示例 Job 清单:
apiVersion: batch/v1
kind: Job
metadata:
name: 'indexed-job'
spec:
completions: 5
parallelism: 3
completionMode: Indexed
template:
spec:
restartPolicy: Never
initContainers:
- name: 'input'
image: 'docker.io/library/bash'
- "bash"
- "-c"
- |
echo ${items[$JOB_COMPLETION_INDEX]} > /input/data.txt
volumeMounts:
- mountPath: /input
name: input
containers:
- name: 'worker'
image: 'docker.io/library/busybox'
command:
- "rev"
- "/input/data.txt"
volumeMounts:
- mountPath: /input
name: input
volumes:
- name: input
emptyDir: {}
在上面的示例中,你使用 Job 控制器为所有容器设置的内置 JOB_COMPLETION_INDEX
环境变量。 Init 容器 将索引映射到一个静态值,并将其写入一个文件,该文件通过 与运行 worker 的容器共享。或者,你可以 通过 Downward API 定义自己的环境变量 将索引发布到容器。你还可以选择从 加载值列表。
或者也可以直接 使用 Downward API 将注解值作为卷文件传递, 如下例所示:
apiVersion: batch/v1
kind: Job
metadata:
name: 'indexed-job'
spec:
parallelism: 3
completionMode: Indexed
spec:
restartPolicy: Never
containers:
- name: 'worker'
image: 'docker.io/library/busybox'
command:
- "rev"
- "/input/data.txt"
volumeMounts:
- mountPath: /input
name: input
volumes:
- name: input
downwardAPI:
items:
- path: "data.txt"
fieldRef:
fieldPath: metadata.annotations['batch.kubernetes.io/job-completion-index']
当你创建此 Job 时,控制平面会创建一系列 Pod,你指定的每个索引都会运行一个 Pod。 .spec.parallelism
的值决定了一次可以运行多少个 Pod, 而 .spec.completions
决定了 Job 总共创建了多少个 Pod。
因为 .spec.parallelism
小于 .spec.completions
, 所以控制平面在启动更多 Pod 之前,将等待第一批的某些 Pod 完成。
你可以等待 Job 成功,等待时间可以设置超时限制:
# 状况名称的检查不区分大小写
kubectl wait --for=condition=complete --timeout=300s job/indexed-job
现在,描述 Job 并检查它是否成功。
kubectl describe jobs/indexed-job
输出类似于:
在此示例中,你使用每个索引的自定义值运行 Job。 你可以检查其中一个 Pod 的输出:
输出类似于: