使用工作队列进行精细的并行处理
在这个例子中,当每个 Pod 被创建时,它会从一个任务队列中获取一个工作单元,处理它,然后重复,直到到达队列的尾部。
下面是这个示例的步骤概述:
启动存储服务用于保存工作队列。 在这个例子中,我们使用 Redis 来存储工作项。 在上一个例子中,我们使用了 RabbitMQ。 在这个例子中,由于 AMQP 不能为客户端提供一个良好的方法来检测一个有限长度的工作队列是否为空, 我们使用了 Redis 和一个自定义的工作队列客户端库。 在实践中,你可能会设置一个类似于 Redis 的存储库,并将其同时用于多项任务或其他事务的工作队列。
创建一个队列,然后向其中填充消息。 每个消息表示一个将要被处理的工作任务。 在这个例子中,消息是一个我们将用于进行长度计算的整数。
启动一个 Job 对队列中的任务进行处理。这个 Job 启动了若干个 Pod。 每个 Pod 从消息队列中取出一个工作任务,处理它,然后重复,直到到达队列的尾部。
你必须拥有一个 Kubernetes 的集群,同时你的 Kubernetes 集群必须带有 kubectl 命令行工具。 建议在至少有两个节点的集群上运行本教程,且这些节点不作为控制平面主机。 如果你还没有集群,你可以通过 构建一个你自己的集群,或者你可以使用下面任意一个 Kubernetes 工具构建:
熟悉基本的、非并行的 Job。
启动 Redis
对于这个例子,为了简单起见,我们将启动一个单实例的 Redis。 了解如何部署一个可伸缩、高可用的 Redis 例子,请查看 Redis 示例
你也可以直接下载如下文件:
现在,让我们往队列里添加一些 “任务”。在这个例子中,我们的任务是一些将被打印出来的字符串。
输出类似于:
Hit enter for command prompt
现在按回车键,启动 Redis 命令行界面,然后创建一个存在若干个工作项的列表。
# redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"
因此,这个键为 job2
的列表就是我们的工作队列。
注意:如果你还没有正确地配置 Kube DNS,你可能需要将上面的第一步改为 redis-cli -h $REDIS_SERVICE_HOST
。
创建镜像
现在我们已经准备好创建一个我们要运行的镜像。
我们会使用一个带有 Redis 客户端的 Python 工作程序从消息队列中读出消息。
这里提供了一个简单的 Redis 工作队列客户端库,名为 rediswq.py ()。
Job 中每个 Pod 内的 “工作程序” 使用工作队列客户端库获取工作。具体如下:
application/job/redis/worker.py
#!/usr/bin/env python
import time
host="redis"
# 如果你未在运行 Kube-DNS,请取消下面两行的注释
# import os
# host = os.getenv("REDIS_SERVICE_HOST")
q = rediswq.RedisWQ(name="job2", host=host)
print("Worker with sessionID: " + q.sessionID())
print("Initial queue state: empty=" + str(q.empty()))
while not q.empty():
item = q.lease(lease_secs=10, block=True, timeout=2)
if item is not None:
itemstr = item.decode("utf-8")
print("Working on " + itemstr)
time.sleep(10) # 将你的实际工作放在此处来取代 sleep
q.complete(item)
else:
print("Waiting for work")
print("Queue empty, exiting")
你也可以下载 、 rediswq.py 和 文件。然后构建镜像:
docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2
你需要将镜像 push 到一个公共仓库或者 。
如果你使用的是 Google Container Registry, 请先用你的 project ID 给你的镜像打上标签,然后 push 到 GCR 。请将 <project>
替换为你自己的 project ID。
docker tag job-wq-2 gcr.io/<project>/job-wq-2
gcloud docker -- push gcr.io/<project>/job-wq-2
这是 Job 定义:
application/job/redis/job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: job-wq-2
parallelism: 2
template:
metadata:
name: job-wq-2
spec:
- name: c
image: gcr.io/myproject/job-wq-2
restartPolicy: OnFailure
请确保将 Job 模板中的 gcr.io/myproject
更改为你自己的路径。
在这个例子中,每个 Pod 处理了队列中的多个项目,直到队列中没有项目时便退出。 因为是由工作程序自行检测工作队列是否为空,并且 Job 控制器不知道工作队列的存在, 这依赖于工作程序在完成工作时发出信号。 工作程序以成功退出的形式发出信号表示工作队列已经为空。 所以,只要有任意一个工作程序成功退出,控制器就知道工作已经完成了,所有的 Pod 将很快会退出。 因此,我们将 Job 的完成计数(Completion Count)设置为 1。 尽管如此,Job 控制器还是会等待其它 Pod 完成。
运行 Job
现在运行这个 Job:
稍等片刻,然后检查这个 Job。
kubectl describe jobs/job-wq-2
Name: job-wq-2
Namespace: default
Selector: controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
Labels: controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
job-name=job-wq-2
Annotations: <none>
Parallelism: 2
Completions: <unset>
Start Time: Mon, 11 Jan 2016 17:07:59 -0800
Pods Statuses: 1 Running / 0 Succeeded / 0 Failed
Pod Template:
Labels: controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
job-name=job-wq-2
Containers:
c:
Image: gcr.io/exampleproject/job-wq-2
Port:
Environment: <none>
Mounts: <none>
Volumes: <none>
Events:
FirstSeen LastSeen Count From SubobjectPath Type Reason Message
--------- -------- ----- ---- ------------- -------- ------ -------
33s 33s 1 {job-controller } Normal SuccessfulCreate Created pod: job-wq-2-lglf8
你可以等待 Job 成功,等待时长有超时限制:
# 状况名称的检查不区分大小写
kubectl wait --for=condition=complete --timeout=300s job/job-wq-2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Initial queue state: empty=False
Working on banana
Working on date
你可以看到,其中的一个 Pod 处理了若干个工作单元。
如果你有持续的后台处理业务,那么可以考虑使用 ReplicaSet
来运行你的后台业务, 和运行一个类似 的后台处理库。