slimta.celeryqueue

This module is available in the python-slimta-celeryqueue extension package.

Implements the Queue interface on top of celery, a distributed task queue system. When a message is enqueued, a delivery task is queued up in celery. Celery workers will then pick up the task and attempt delivery, retrying and bouncing in the same manner as Queue.

A celery.Celery object must be given to CeleryQueue, and a task will be registered to attempt delivery of Envelope objects. It may be desirable to configure celery workers to use gevent, since slimta Relay objects are expected to support it.

class slimta.celeryqueue.CeleryQueue(celery, relay[, suffix=None[, backoff=None[, bounce_factory=None]]])

Bases: object

Instantiates a new object that can be used wherever a Queue is expected.

Parameters:
  • celerycelery.Celery object to register delivery task with.
  • relayRelay instance to attempt delivery with.
  • suffix – If given, the task registered in the Celery object will have its name suffixed wih an underscore and this string.
  • backoff – Function that, given an Envelope and number of delivery attempts, will return the number of seconds before the next attempt. If it returns None, the message will be permanently failed. The default backoff function simply returns None and messages are never retried.
  • bounce_factory – Function that produces a Bounce or Envelope object given the same parameters as the Bounce constructor. If the function returns None, no bounce is delivered. By default, a new Bounce is created in every case.
add_policy(policy)

Adds a QueuePolicy to be executed before messages are persisted to storage.

Parameters:policyQueuePolicy object to execute.
flush()

The flush() method from Queue is not available to CeleryQueue objects.

Raises:NotImplementedError