Skip to content

Messaging backends

A messaging backend is a wrapper over your message broker. It exposes several methods that abstract the broker functionality, making it simpler to work with.

Out of the box Melange provides you with three messaging backends: The AWSBackend and the LocalSQSBackend. The RabbitMQBackend is present in the source code, but not actively maintained. Looking for contributors!

Writing your own Messaging Backend

Subclass the MessagingBackend interface and implement all the methods of that class. Here is the documentation of the interface class and all its methods. You need to take in mind, when implementing the methods that return a QueueWrapper or a TopicWrapper, to wrap inside them the real object that represents your queue or topic (in your chosen messaging technology) and unwrap it inside the messaging backend when requiring to perform specific operations. Look at the AWSBackend as a template/example of this wrapping/unwrapping.

MessagingBackend

Source code in melange/backends/interfaces.py
class MessagingBackend:
    def __init__(self) -> None:
        self._finalizer = weakref.finalize(self, self.close_connection)

    def declare_topic(self, topic_name: str) -> TopicWrapper:
        """
        Gets or creates a topic.

        Args:
            topic_name: The name of the topic to create

        Returns:
            An object that represents a topic. The type of the object
            is only relevant inside the context of the backend, so what you
            return as a topic will be passed in next calls to the backend
            where a topic is required
        """
        raise NotImplementedError

    def get_queue(self, queue_name: str) -> QueueWrapper:
        """
        Gets the queue with the name `queue_name`. Does not perform creation.

        Args:
            queue_name: the name of the queue to retrieve

        Returns:
            A `Queue` object that represents the created the queue
        """
        raise NotImplementedError

    def declare_queue(
        self,
        queue_name: str,
        *topics_to_bind: TopicWrapper,
        dead_letter_queue_name: Optional[str] = None,
        **kwargs: Any
    ) -> Tuple[QueueWrapper, Optional[QueueWrapper]]:
        """
        Gets or creates a queue.

        Args:
            queue_name: the name of the queue to create
            *topics_to_bind: if provided, creates all these topics and subscribes
                the created queue to them
            dead_letter_queue_name: if provided, create a dead letter queue attached to
                the created `queue_name`.
            **kwargs:

        Returns:
            A tuple with the created queue and the dead letter queue (if applies)
        """

        raise NotImplementedError

    def retrieve_messages(self, queue: QueueWrapper, **kwargs: Any) -> List[Message]:
        """
        Retrieves a list of available messages from the queue.

        Args:
            queue: the queue object
            **kwargs: Other parameters/options required by the backend

        Returns:
            A list of available messages from the queue
        """
        raise NotImplementedError

    def yield_messages(self, queue: QueueWrapper, **kwargs: Any) -> Iterable[Message]:
        """
        Yields available messages from the queue.

        Args:
            queue: the queue object
            **kwargs: Other parameters/options required by the backend

        Returns:
            An iterable which will poll the queue upon requesting more messages
        """
        raise NotImplementedError

    def publish_to_topic(
        self,
        message: Message,
        topic: TopicWrapper,
        extra_attributes: Optional[Dict] = None,
    ) -> None:
        """
        Publishes a message content and the manifest to the topic

        Args:
            message: the message to send
            topic: the topic to send the message to
            extra_attributes: extra properties that might be required for the backend
        """
        raise NotImplementedError

    def publish_to_queue(
        self, message: Message, queue: QueueWrapper, **kwargs: Any
    ) -> None:
        raise NotImplementedError

    def acknowledge(self, message: Message) -> None:
        """
        Acknowledges a message so that it won't be redelivered by
        the messaging infrastructure in the future
        """
        raise NotImplementedError

    def close_connection(self) -> None:
        """
        Override this function if you want to use some finalizer code
         to shutdown your backend in a clean way
        """
        pass

    def delete_queue(self, queue: QueueWrapper) -> None:
        """
        Deletes the queue
        """
        raise NotImplementedError

    def delete_topic(self, topic: TopicWrapper) -> None:
        """
        Deletes the topic
        """
        raise NotImplementedError

acknowledge(self, message)

Acknowledges a message so that it won't be redelivered by the messaging infrastructure in the future

Source code in melange/backends/interfaces.py
def acknowledge(self, message: Message) -> None:
    """
    Acknowledges a message so that it won't be redelivered by
    the messaging infrastructure in the future
    """
    raise NotImplementedError

close_connection(self)

Override this function if you want to use some finalizer code to shutdown your backend in a clean way

Source code in melange/backends/interfaces.py
def close_connection(self) -> None:
    """
    Override this function if you want to use some finalizer code
     to shutdown your backend in a clean way
    """
    pass

declare_queue(self, queue_name, *topics_to_bind, *, dead_letter_queue_name=None, **kwargs)

Gets or creates a queue.

Parameters:

Name Type Description Default
queue_name str

the name of the queue to create

required
*topics_to_bind TopicWrapper

if provided, creates all these topics and subscribes the created queue to them

()
dead_letter_queue_name Optional[str]

if provided, create a dead letter queue attached to the created queue_name.

None
**kwargs Any {}

Returns:

Type Description
Tuple[melange.models.QueueWrapper, Optional[melange.models.QueueWrapper]]

A tuple with the created queue and the dead letter queue (if applies)

Source code in melange/backends/interfaces.py
def declare_queue(
    self,
    queue_name: str,
    *topics_to_bind: TopicWrapper,
    dead_letter_queue_name: Optional[str] = None,
    **kwargs: Any
) -> Tuple[QueueWrapper, Optional[QueueWrapper]]:
    """
    Gets or creates a queue.

    Args:
        queue_name: the name of the queue to create
        *topics_to_bind: if provided, creates all these topics and subscribes
            the created queue to them
        dead_letter_queue_name: if provided, create a dead letter queue attached to
            the created `queue_name`.
        **kwargs:

    Returns:
        A tuple with the created queue and the dead letter queue (if applies)
    """

    raise NotImplementedError

declare_topic(self, topic_name)

Gets or creates a topic.

Parameters:

Name Type Description Default
topic_name str

The name of the topic to create

required

Returns:

Type Description
TopicWrapper

An object that represents a topic. The type of the object is only relevant inside the context of the backend, so what you return as a topic will be passed in next calls to the backend where a topic is required

Source code in melange/backends/interfaces.py
def declare_topic(self, topic_name: str) -> TopicWrapper:
    """
    Gets or creates a topic.

    Args:
        topic_name: The name of the topic to create

    Returns:
        An object that represents a topic. The type of the object
        is only relevant inside the context of the backend, so what you
        return as a topic will be passed in next calls to the backend
        where a topic is required
    """
    raise NotImplementedError

delete_queue(self, queue)

Deletes the queue

Source code in melange/backends/interfaces.py
def delete_queue(self, queue: QueueWrapper) -> None:
    """
    Deletes the queue
    """
    raise NotImplementedError

delete_topic(self, topic)

Deletes the topic

Source code in melange/backends/interfaces.py
def delete_topic(self, topic: TopicWrapper) -> None:
    """
    Deletes the topic
    """
    raise NotImplementedError

get_queue(self, queue_name)

Gets the queue with the name queue_name. Does not perform creation.

Parameters:

Name Type Description Default
queue_name str

the name of the queue to retrieve

required

Returns:

Type Description
QueueWrapper

A Queue object that represents the created the queue

Source code in melange/backends/interfaces.py
def get_queue(self, queue_name: str) -> QueueWrapper:
    """
    Gets the queue with the name `queue_name`. Does not perform creation.

    Args:
        queue_name: the name of the queue to retrieve

    Returns:
        A `Queue` object that represents the created the queue
    """
    raise NotImplementedError

publish_to_topic(self, message, topic, extra_attributes=None)

Publishes a message content and the manifest to the topic

Parameters:

Name Type Description Default
message Message

the message to send

required
topic TopicWrapper

the topic to send the message to

required
extra_attributes Optional[Dict]

extra properties that might be required for the backend

None
Source code in melange/backends/interfaces.py
def publish_to_topic(
    self,
    message: Message,
    topic: TopicWrapper,
    extra_attributes: Optional[Dict] = None,
) -> None:
    """
    Publishes a message content and the manifest to the topic

    Args:
        message: the message to send
        topic: the topic to send the message to
        extra_attributes: extra properties that might be required for the backend
    """
    raise NotImplementedError

retrieve_messages(self, queue, **kwargs)

Retrieves a list of available messages from the queue.

Parameters:

Name Type Description Default
queue QueueWrapper

the queue object

required
**kwargs Any

Other parameters/options required by the backend

{}

Returns:

Type Description
List[melange.models.Message]

A list of available messages from the queue

Source code in melange/backends/interfaces.py
def retrieve_messages(self, queue: QueueWrapper, **kwargs: Any) -> List[Message]:
    """
    Retrieves a list of available messages from the queue.

    Args:
        queue: the queue object
        **kwargs: Other parameters/options required by the backend

    Returns:
        A list of available messages from the queue
    """
    raise NotImplementedError

yield_messages(self, queue, **kwargs)

Yields available messages from the queue.

Parameters:

Name Type Description Default
queue QueueWrapper

the queue object

required
**kwargs Any

Other parameters/options required by the backend

{}

Returns:

Type Description
Iterable[melange.models.Message]

An iterable which will poll the queue upon requesting more messages

Source code in melange/backends/interfaces.py
def yield_messages(self, queue: QueueWrapper, **kwargs: Any) -> Iterable[Message]:
    """
    Yields available messages from the queue.

    Args:
        queue: the queue object
        **kwargs: Other parameters/options required by the backend

    Returns:
        An iterable which will poll the queue upon requesting more messages
    """
    raise NotImplementedError
Back to top