“The Big Instance” Refactor

    Celery can now be instantiated, which means several instances of Celery may exist in the same process space. Also, large parts can be customized without resorting to monkey patching.

    Creating a Celery instance:

    Creating tasks:

    1. def add(x, y):
    2. return x + y

    Creating custom Task subclasses:

    1. Task = celery.create_task_cls()
    2. class DebugTask(Task):
    3. abstract = True
    4. import pdb
    5. pdb.set_trace()
    6. @app.task(base=DebugTask)
    7. def add(x, y):
    8. return x + y

    Starting a worker:

    Getting access to the configuration:

    1. celery.conf.CELERY_ALWAYS_EAGER = True

    Controlling workers:

    1. >>> celery.control.inspect().active()
    2. >>> celery.control.rate_limit(add.name, "100/m")
    3. >>> celery.control.broadcast("shutdown")
    4. >>> celery.control.discard_all()

    Other interesting attributes:

    As you can probably see, this really opens up another dimension of customization abilities.

    • celery.task.ping celery.task.PingTask

      Inferior to the ping remote control command. Will be removed in Celery 2.3.

    • celery.utils.timedelta_seconds

      Use: celery.utils.timeutils.timedelta_seconds()

    • celery.utils.defaultdict

      Use: celery.utils.compat.defaultdict()

    • celery.utils.all

      Use: celery.utils.compat.all()

    • celery.task.apply_async

      Use app.send_task

    • Use celery.registry.tasks

    • celery.task.base

      • .Task -> {app.create_task_cls}
    • celery.task.sets

    • celery.decorators / celery.task

      • .task -> {app.task}
    • celery.execute

      • .apply_async -> {task.apply_async}
      • .apply -> {task.apply}
      • .send_task -> {app.send_task}
      • .delay_task -> no alternative
    • celery.log

      • .get_default_logger -> {app.log.get_default_logger}
      • .setup_logger -> {app.log.setup_logger}
      • .get_task_logger -> {app.log.get_task_logger}
      • .setup_task_logger -> {app.log.setup_task_logger}
      • .setup_logging_subsystem -> {app.log.setup_logging_subsystem}
      • .redirect_stdouts_to_logger -> {app.log.redirect_stdouts_to_logger}
    • celery.messaging

      • .establish_connection -> {app.broker_connection}
      • .with_connection -> {app.with_connection}
      • .get_consumer_set -> {app.amqp.get_task_consumer}
      • .TaskPublisher -> {app.amqp.TaskPublisher}
      • .TaskConsumer -> {app.amqp.TaskConsumer}
      • .ConsumerSet -> {app.amqp.ConsumerSet}
    • celery.conf.* -> {app.conf}

    • celery.task.control

      • .broadcast -> {app.control.broadcast}
      • .rate_limit -> {app.control.rate_limit}
      • .ping -> {app.control.ping}
      • .revoke -> {app.control.revoke}
      • .discard_all -> {app.control.discard_all}
      • .inspect -> {app.control.inspect}
    • celery.utils.info

      • .humanize_seconds -> celery.utils.timeutils.humanize_seconds
      • .textindent -> celery.utils.textindent
      • .format_broker_info -> {app.amqp.format_broker_info}
      • .format_queues -> {app.amqp.format_queues}

    To be backward compatible, it must be possible to use all the classes/functions without passing an explicit app instance.

    This is achieved by having all app-dependent objects use if the app instance is missing.

    The problem with this approach is that there is a chance that the app instance is lost along the way, and everything seems to be working normally. Testing app instance leaks is hard. The environment variable CELERY_TRACE_APP can be used, when this is enabled celery.app.app_or_default() will raise an exception whenever it has to go back to the default app instance.

    • {app}

      • celery.loaders.base.BaseLoader

      • celery.backends.base.BaseBackend

      • {app.TaskSet}

        • celery.task.sets.TaskSet (app.TaskSet)
        • celery.result.TaskSetResult (app.TaskSetResult)
    • {app.AsyncResult}

      • celery.result.BaseAsyncResult / celery.result.AsyncResult
    • celery.bin.worker.WorkerCommand

      • celery.apps.worker.Worker

        • celery.worker.WorkerController

          • celery.worker.consumer.Consumer

            • celery.worker.job.TaskRequest

            • celery.events.EventDispatcher

            • celery.worker.control.ControlDispatch

              • celery.woker.control.registry.Panel
              • celery.pidbox.BroadcastPublisher
            • celery.pidbox.BroadcastConsumer

          • celery.worker.controllers.Mediator

          • celery.beat.EmbeddedService

    • celery.bin.events.EvCommand

      • celery.events.snapshot.evcam

        • celery.events.snapshot.Polaroid
        • celery.events.EventReceiver
      • celery.events.cursesmon.evtop

        • celery.events.EventReceiver
        • celery.events.cursesmon.CursesMonitor
      • celery.events.dumper

        • celery.events.EventReceiver
    • celery.bin.amqp.AMQPAdmin

    • celery.bin.beat.BeatCommand

      • celery.apps.beat.Beat

        • celery.beat.Service