使用工作队列进行精细的并行处理
在这个例子中,当每个pod被创建时,它会从一个任务队列中获取一个工作单元,处理它,然后重复,直到到达队列的尾部。
下面是这个示例的步骤概述:
启动存储服务用于保存工作队列。 在这个例子中,我们使用 Redis 来存储工作项。 在上一个例子中,我们使用了 RabbitMQ。 在这个例子中,由于 AMQP 不能为客户端提供一个良好的方法来检测一个有限长度的工作队列是否为空, 我们使用了 Redis 和一个自定义的工作队列客户端库。 在实践中,你可能会设置一个类似于 Redis 的存储库,并将其同时用于多项任务或其他事务的工作队列。
创建一个队列,然后向其中填充消息。 每个消息表示一个将要被处理的工作任务。 在这个例子中,消息是一个我们将用于进行长度计算的整数。
启动一个 Job 对队列中的任务进行处理。这个 Job 启动了若干个 Pod 。 每个 Pod 从消息队列中取出一个工作任务,处理它,然后重复,直到到达队列的尾部。
你必须拥有一个 Kubernetes 的集群,同时你的 Kubernetes 集群必须带有 kubectl 命令行工具。 建议在至少有两个节点的集群上运行本教程,且这些节点不作为控制平面主机。 如果你还没有集群,你可以通过 Minikube 构建一个你自己的集群,或者你可以使用下面任意一个 Kubernetes 工具构建:
Your Kubernetes server must be at or later than version v1.8. To check the version, enter .
熟悉基本的、非并行的 。
启动 Redis
对于这个例子,为了简单起见,我们将启动一个单实例的 Redis。 了解如何部署一个可伸缩、高可用的 Redis 例子,请查看
你也可以直接下载如下文件:
现在,让我们往队列里添加一些“任务”。在这个例子中,我们的任务是一些将被打印出来的字符串。
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt
现在按回车键,启动 redis 命令行界面,然后创建一个存在若干个工作项的列表。
# redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"
因此,这个键为 job2
的列表就是我们的工作队列。
注意:如果你还没有正确地配置 Kube DNS,你可能需要将上面的第一步改为 redis-cli -h $REDIS_SERVICE_HOST
。
创建镜像
现在我们已经准备好创建一个我们要运行的镜像
我们会使用一个带有 redis 客户端的 python 工作程序从消息队列中读出消息。
这里提供了一个简单的 Redis 工作队列客户端库,叫 rediswq.py ()。
Job 中每个 Pod 内的 “工作程序” 使用工作队列客户端库获取工作。如下:
application/job/redis/worker.py
#!/usr/bin/env python
import time
import rediswq
host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")
q = rediswq.RedisWQ(name="job2", host=host)
print("Worker with sessionID: " + q.sessionID())
print("Initial queue state: empty=" + str(q.empty()))
while not q.empty():
item = q.lease(lease_secs=10, block=True, timeout=2)
if item is not None:
itemstr = item.decode("utf-8")
print("Working on " + itemstr)
time.sleep(10) # Put your actual work here instead of sleep.
q.complete(item)
else:
print("Waiting for work")
print("Queue empty, exiting")
你也可以下载 、 rediswq.py 和 。然后构建镜像:
对于 Docker Hub,请先用你的用户名给镜像打上标签, 然后使用下面的命令 push 你的镜像到仓库。请将 <username>
替换为你自己的 Hub 用户名。
docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2
你需要将镜像 push 到一个公共仓库或者 。
docker tag job-wq-2 gcr.io/<project>/job-wq-2
gcloud docker -- push gcr.io/<project>/job-wq-2
这是 job 定义:
application/job/redis/job.yaml
kind: Job
metadata:
name: job-wq-2
spec:
parallelism: 2
template:
name: job-wq-2
spec:
containers:
- name: c
image: gcr.io/myproject/job-wq-2
restartPolicy: OnFailure
请确保将 job 模板中的 gcr.io/myproject
更改为你自己的路径。
在这个例子中,每个 pod 处理了队列中的多个项目,直到队列中没有项目时便退出。 因为是由工作程序自行检测工作队列是否为空,并且 Job 控制器不知道工作队列的存在, 这依赖于工作程序在完成工作时发出信号。 工作程序以成功退出的形式发出信号表示工作队列已经为空。 所以,只要有任意一个工作程序成功退出,控制器就知道工作已经完成了,所有的 Pod 将很快会退出。 因此,我们将 Job 的完成计数(Completion Count)设置为 1 。 尽管如此,Job 控制器还是会等待其它 Pod 完成。
运行 Job
现在运行这个 Job :
稍等片刻,然后检查这个 Job。
kubectl describe jobs/job-wq-2
Name: job-wq-2
Namespace: default
Selector: controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
Labels: controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
job-name=job-wq-2
Annotations: <none>
Parallelism: 2
Completions: <unset>
Start Time: Mon, 11 Jan 2016 17:07:59 -0800
Pods Statuses: 1 Running / 0 Succeeded / 0 Failed
Pod Template:
Labels: controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
job-name=job-wq-2
Containers:
c:
Image: gcr.io/exampleproject/job-wq-2
Port:
Environment: <none>
Mounts: <none>
Volumes: <none>
Events:
FirstSeen LastSeen Count From SubobjectPath Type Reason Message
--------- -------- ----- ---- ------------- -------- ------ -------
33s 33s 1 {job-controller } Normal SuccessfulCreate Created pod: job-wq-2-lglf8
查看日志:
你可以看到,其中的一个 pod 处理了若干个工作单元。
如果你不方便运行一个队列服务或者修改你的容器用于运行一个工作队列,你可以考虑其它的 Job 模式。
如果你有持续的后台处理业务,那么可以考虑使用 ReplicaSet
来运行你的后台业务, 和运行一个类似 的后台处理库。
最后修改 May 13, 2021 at 11:25 AM PST: [zh]Resync tasks files[10] (df00b046c)