slimta.queue

Package defining common functionality for SMTP queuing. According to the SMTP RFCs, a server must persistently store a message before telling the client it accepts the message. Rather than being a traditional queue data structure, SMTP queues will retry messages after certain periods of time.

exception slimta.queue.QueueError([*args[, **kwargs]])

Bases: slimta.core.SlimtaError

Base exception for errors in the queue package.

reply

If set, Edge services may use this as the Reply object to return to clients when queuing fails.

class slimta.queue.QueueStorage

Bases: object

Base class to show the interface that classes must implement to be a storage mechanism for Queue.

write(envelope, timestamp)

Writes the given envelope to storage, along with the timestamp of its next delivery attempt. The number of delivery attempts asociated with the message should start at zero.

Parameters:
  • envelopeEnvelope object to write.
  • timestamp – Timestamp of message’s next delivery attempt.
Returns:

Unique identifier string for the message in queue.

Raises:

QueueError

set_timestamp(id, timestamp)

Sets a new timestamp for the message’s next delivery attempt.

Parameters:
  • id – The unique identifier string for the message.
  • timestamp – The new delivery attempt timestamp.
Raises:

QueueError

increment_attempts(id)

Increments the number of delivery attempts associated with the message.

Parameters:id – The unique identifier string for the message.
Returns:The new number of message delivery attempts.
Raises:QueueError
set_recipients_delivered(id, rcpt_indexes)

New in version 1.1.0.

Marks the given recipients from the original envelope as already-delivered, meaning they will be skipped by future relay attempts.

Parameters:
  • id – The unique identifier string for the message.
  • rcpt_indexes – List of indexes in the original envelope’s recipients list to mark as delivered.
Raises:

QueueError

load()

Loads all queued messages from the storage engine, such that the Queue can be aware of all upcoming delivery attempts.

Returns:Iterable that yields tuples of the form (timestamp, id) for each message in storage.
Raises:QueueError
get(id)

Returns the Envelope object associated with the given unique identifier sring.

The envelope’s recipients should not include those marked as delivered by set_recipients_delivered().

Parameters:id – The unique identifier string of the requested Envelope.
Returns:Tuple with the Envelope object and number of attempts.
Raises:KeyError, QueueError
remove(id)

Removes the Envelope associated with the given unique identifier string from queue storage. This is typically used when the message has been successfully delivered or delivery has permanently failed.

Parameters:id – The unique identifier string of the Envelope to remove.
Raises:QueueError
wait()

If messages are not being delivered from the same process in which they were received, the storage mechanism needs a way to wait until it is notified that a new message has been stored.

Returns:An iterable or generator producing tuples with the timestamp and unique identifier string of a new message in storage. When the iterable or generator is exhausted, wait() is simply called again.
get_info()

New in version 0.3.20.

Queries the storage backend for relevant information about the contents of the queue. The result is a dict() containing required keys along with any other custom keys dependent on the particular backend.

Only one key is required in the result:

  • size: The number of messages currently in the queue.
Return type:dict()
class slimta.queue.Queue(store[, relay=None[, backoff=None[, bounce_factory=None[, bounce_queue=None[, store_pool=None[, relay_pool=None]]]]]])

Bases: gevent._greenlet.Greenlet

Manages the queue of Envelope objects waiting for delivery. This is not a standard FIFO queue, a message’s place in the queue depends entirely on the timestamp of its next delivery attempt.

Parameters:
  • store – Object implementing QueueStorage.
  • relayRelay object used to attempt message deliveries. If this is not given, no deliveries will be attempted on received messages.
  • backoff – Function that, given an Envelope and number of delivery attempts, will return the number of seconds before the next attempt. If it returns None, the message will be permanently failed. The default backoff function simply returns None and messages are never retried.
  • bounce_factory – Function that produces a Bounce object given the same parameters as the Bounce constructor. If the function returns None, no bounce is delivered. By default, a new Bounce is created in every case.
  • bounce_queueQueue object that will be used for delivering bounce messages. The default is self.
  • store_pool – Number of simultaneous operations performable against the store object. Default is unlimited.
  • relay_pool – Number of simultaneous operations performable against the relay object. Default is unlimited.
add_policy(policy)

Adds a QueuePolicy to be executed before messages are persisted to storage.

Parameters:policyQueuePolicy object to execute.
enqueue(envelope)

Drops a new message in the queue for delivery. The first delivery attempt is made immediately (depending on relay pool availability). This method is not typically called directly, Edge objects use it when they receive new messages.

Parameters:envelopeEnvelope object to enqueue.
Returns:Zipped list of envelopes and their respective queue IDs (or thrown QueueError objects).
flush()

Attempts to immediately flush all messages waiting in the queue, regardless of their retry timers.

Warning

This can be a very expensive operation, use with care.

kill()

This method is used by Queue and Queue-like objects to properly end any associated services (such as running Greenlet threads) and close resources.


Sub-Modules: