Monitoring and Management Guide
This document describes some of these, as as well as features related to monitoring, like events and broadcast commands.
Workers
celery can also be used to inspect and manage worker nodes (and to some degree tasks).
To list all the commands available do:
or to get help for a specific command do:
Commands
shell: Drop into a Python shell.
The locals will include the celery variable, which is the current app. Also all known tasks will be automatically added to locals (unless the --without-tasks flag is set).
Uses Ipython, bpython, or regular python in that order if installed. You can force an implementation using --force-ipython|-I, --force-bpython|-B, or --force-python|-P.
status: List active nodes in this cluster
result: Show the result of a task
$ celery result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
Note that you can omit the name of the task as long as the task doesn’t use a custom result backend.
purge: Purge messages from all configured task queues.
$ celery purge
警告
There is no undo for this operation, and messages will be permanently deleted!
inspect active: List active tasks
$ celery inspect active
These are all the tasks that are currently being executed.
inspect scheduled: List scheduled ETA tasks
$ celery inspect scheduled
These are tasks reserved by the worker because they have the eta or countdown argument set.
inspect reserved: List reserved tasks
$ celery inspect reserved
This will list all tasks that have been prefetched by the worker, and is currently waiting to be executed (does not include tasks with an eta).
inspect revoked: List history of revoked tasks
inspect registered: List registered tasks
$ celery inspect registered
inspect stats: Show worker statistics (see )
$ celery inspect stats
control enable_events: Enable events
control disable_events: Disable events
$ celery control disable_events
migrate: Migrate tasks from one broker to another (EXPERIMENTAL).
$ celery migrate redis://localhost amqp://localhost
This command will migrate all the tasks on one broker to another. As this command is new and experimental you should be sure to have a backup of the data before proceeding.
注解
All inspect and control commands supports a --timeout argument, This is the number of seconds to wait for responses. You may have to increase this timeout if you’re not getting a response due to latency.
By default the inspect and control commands operates on all workers. You can specify a single, or a list of workers by using the –destination argument:
$ celery inspect -d w1,w2 reserved
$ celery control -d w1,w2 enable_events
Flower is a real-time web based monitor and administration tool for Celery. It is under active development, but is already an essential tool. Being the recommended monitor for Celery, it obsoletes the Django-Admin monitor, celerymon and the ncurses based monitor.
Flower is pronounced like “flow”, but you can also use the botanical version if you prefer.
Real-time monitoring using Celery Events
Remote Control
- View worker status and statistics.
- Shutdown and restart worker instances.
- Control worker pool size and autoscale settings.
- View and modify the queues a worker instance consumes from.
- View currently running tasks
- View scheduled tasks (ETA/countdown)
- View reserved and revoked tasks
- Apply time and rate limits
- Configuration viewer
- Revoke or terminate tasks
HTTP API
OpenID authentication
Screenshots
More :
You can use pip to install Flower:
$ pip install flower
Running the flower command will start a web-server that you can visit:
$ celery flower
The default port is http://localhost:5555, but you can change this using the –port argument:
$ celery flower --port=5555
Broker URL can also be passed through the –broker argument :
$ celery flower --broker=amqp://guest:[email protected]:5672//
or
$ celery flower --broker=redis://guest:[email protected]:6379/0
Then, you can visit flower in your web browser :
$ open http://localhost:5555
celery events: Curses Monitor
2.0 新版功能.
celery events is a simple curses monitor displaying task and worker history. You can inspect the result and traceback of tasks, and it also supports some management commands like rate limiting and shutting down workers. This monitor was started as a proof of concept, and you probably want to use Flower instead.
Starting:
$ celery events
You should see a screen like:
celery events is also used to start snapshot cameras (see :
$ celery events --camera=<camera-class> --frequency=1.0
and it includes a tool to dump events to stdout:
For a complete list of options use --help:
$ celery events --help
To manage a Celery cluster it is important to know how RabbitMQ can be monitored.
RabbitMQ ships with the rabbitmqctl(1) command, with this you can list queues, exchanges, bindings, queue lengths, the memory usage of each queue, as well as manage users, virtual hosts and their permissions.
注解
The default virtual host (“/“) is used in these examples, if you use a custom virtual host you have to add the -p argument to the command, e.g: rabbitmqctl list_queues -p my_vhost …
Finding the number of tasks in a queue:
$ rabbitmqctl list_queues name messages messages_ready \
messages_unacknowledged
Here messages_ready is the number of messages ready for delivery (sent but not received), messages_unacknowledged is the number of messages that has been received by a worker but not acknowledged yet (meaning it is in progress, or has been reserved). messages is the sum of ready and unacknowledged messages.
Finding the number of workers currently consuming from a queue:
Finding the amount of memory allocated to a queue:
$ rabbitmqctl list_queues name memory
Redis
If you’re using Redis as the broker, you can monitor the Celery cluster using the redis-cli(1) command to list lengths of queues.
Inspecting queues
Finding the number of tasks in a queue:
$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
The default queue is named celery. To get all available queues, invoke:
$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER keys \*
注解
Queue keys only exists when there are tasks in them, so if a key does not exist it simply means there are no messages in that queue. This is because in Redis a list with no elements in it is automatically removed, and hence it won’t show up in the keys command output, and llen for that list returns 0.
Also, if you’re using Redis for other purposes, the output of the keys command will include unrelated values stored in the database. The recommended way around this is to use a dedicated DATABASE_NUMBER for Celery, you can also use database numbers to separate Celery applications from each other (virtual hosts), but this will not affect the monitoring events used by e.g. Flower as Redis pub/sub commands are global rather than database based.
This is a list of known Munin plug-ins that can be useful when maintaining a Celery cluster.
rabbitmq-munin: Munin plug-ins for RabbitMQ.
celery_tasks: Monitors the number of times each task type has been executed (requires celerymon).
http://exchange.munin-monitoring.org/plugins/celery_tasks-2/details
celery_task_states: Monitors the number of tasks in each state (requires celerymon).
2.1 新版功能.
Even a single worker can produce a huge amount of events, so storing the history of all events on disk may be very expensive.
A sequence of events describes the cluster state in that time period, by taking periodic snapshots of this state you can keep all history, but still only periodically write it to disk.
To take snapshots you need a Camera class, with this you can define what should happen every time the state is captured; You can write it to a database, send it by email or something else entirely.
celery events is then used to take snapshots with the camera, for example if you want to capture state every 2 seconds using the camera myapp.Camera you run celery events with the following arguments:
$ celery events -c myapp.Camera --frequency=2.0
Cameras can be useful if you need to capture events and do something with those events at an interval. For real-time event processing you should use celery.events.Receiver directly, like in .
Here is an example camera, dumping the snapshot to screen:
from pprint import pformat
from celery.events.snapshot import Polaroid
def on_shutter(self, state):
if not state.event_count:
# No new events since last snapshot.
return
print('Workers: {0}'.format(pformat(state.workers, indent=4)))
print('Tasks: {0}'.format(pformat(state.tasks, indent=4)))
print('Total: {0.event_count} events, %s {0.task_count}'.format(
state))
See the API reference for celery.events.state to read more about state objects.
Now you can use this cam with celery events by specifying it with the -c option:
$ celery events -c myapp.DumpCam --frequency=2.0
Or you can use it programmatically like this:
from celery import Celery
from myapp import DumpCam
def main(app, freq=1.0):
state = app.events.State()
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={'*': state.event})
recv.capture(limit=None, timeout=None)
if __name__ == '__main__':
app = Celery(broker='amqp:[email protected]//')
main(app)
To process events in real-time you need the following
An event consumer (this is the Receiver)
A set of handlers called when events come in.
You can have different handlers for each event type, or a catch-all handler can be used (‘*’)
State (optional)
is a convenient in-memory representation of tasks and workers in the cluster that is updated as events come in.
It encapsulates solutions for many common things, like checking if a worker is still alive (by verifying heartbeats), merging event fields together as events come in, making sure timestamps are in sync, and so on.
Combining these you can easily process events in real-time:
注解
The wakeup argument to capture sends a signal to all workers to force them to send a heartbeat. This way you can immediately see workers when the monitor starts.
You can listen to specific events by specifying the handlers:
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(), ))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
my_monitor(app)
This list contains the events sent by the worker, and their arguments.
signature: | task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange, routing_key) |
---|
Sent when a task message is published and the CELERY_SEND_TASK_SENT_EVENT setting is enabled.
task-received
signature: | task-received(uuid, name, args, kwargs, retries, eta, hostname, timestamp) |
---|
Sent when the worker receives a task.
task-started
Sent just before the worker executes the task.
task-succeeded
signature: | task-succeeded(uuid, result, runtime, hostname, timestamp) |
---|
Sent if the task executed successfully.
Runtime is the time it took to execute the task using the pool. (Starting from the task is sent to the worker pool, and ending when the pool result handler callback is called).
task-failed
signature: | task-failed(uuid, exception, traceback, hostname, timestamp) |
---|
Sent if the execution of the task failed.
task-revoked
Sent if the task has been revoked (Note that this is likely to be sent by more than one worker).
terminated is set to true if the task process was terminated,
and the signum field set to the signal used.
expired is set to true if the task expired.
task-retried
signature: | task-retried(uuid, exception, traceback, hostname, timestamp) |
---|
Sent if the task failed, but will be retried in the future.
Worker Events
worker-online
signature: | worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys) |
---|
The worker has connected to the broker and is online.
- hostname: Hostname of the worker.
- timestamp: Event timestamp.
- freq: Heartbeat frequency in seconds (float).
- sw_ident: Name of worker software (e.g. py-celery).
- sw_ver: Software version (e.g. 2.2.0).
- sw_sys: Operating System (e.g. Linux, Windows, Darwin).
worker-heartbeat
Sent every minute, if the worker has not sent a heartbeat in 2 minutes, it is considered to be offline.
- hostname: Hostname of the worker.
- timestamp: Event timestamp.
- freq: Heartbeat frequency in seconds (float).
- sw_ident: Name of worker software (e.g. py-celery).
- sw_ver: Software version (e.g. 2.2.0).
- sw_sys: Operating System (e.g. Linux, Windows, Darwin).
- active: Number of currently executing tasks.
- processed: Total number of tasks processed by this worker.
worker-offline
signature: | worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys) |
---|
The worker has disconnected from the broker.