Skip to content

Messaging Backend Factory

MessagingBackendFactory

Class used to create queues and topics

Source code in melange/backends/factory.py
class MessagingBackendFactory:
    """
    Class used to create queues and topics
    """

    def __init__(self, backend: Optional[MessagingBackend] = None) -> None:
        self._backend = backend or BackendManager().get_default_backend()

    def init_queue(
        self,
        queue_name: str,
        topic_names_to_subscribe: Optional[List[str]] = None,
        dlq_name: Optional[str] = None,
        **kwargs: Any
    ) -> QueueWrapper:
        """
        Creates a queue if it does not exist, subscribes it to the topics, and creates a
        dead letter queue.

        Args:
            queue_name: name of the queue to create
            topic_names_to_subscribe: list of topic names to subscribe this queue. If
                the topics do not exist they will be created.
            dlq_name: name of the dead letter queue to create and attach to the main queue
            **kwargs: any other arguments required by your messaging backend

        Returns:
            The created queue wrapper

        """
        topic_names_to_subscribe = topic_names_to_subscribe or []
        topics = [self.init_topic(t) for t in topic_names_to_subscribe]

        queue, dlq = self._backend.declare_queue(
            queue_name, *topics, dead_letter_queue_name=dlq_name, **kwargs
        )

        return queue

    def init_topic(self, topic_name: str) -> TopicWrapper:
        """
        Creates a topic if it does not exist.
        Args:
            topic_name: the name of the topic to create

        Returns:
            The created topic wrapper

        """
        return self._backend.declare_topic(topic_name)

init_queue(self, queue_name, topic_names_to_subscribe=None, dlq_name=None, **kwargs)

Creates a queue if it does not exist, subscribes it to the topics, and creates a dead letter queue.

Parameters:

Name Type Description Default
queue_name str

name of the queue to create

required
topic_names_to_subscribe Optional[List[str]]

list of topic names to subscribe this queue. If the topics do not exist they will be created.

None
dlq_name Optional[str]

name of the dead letter queue to create and attach to the main queue

None
**kwargs Any

any other arguments required by your messaging backend

{}

Returns:

Type Description
QueueWrapper

The created queue wrapper

Source code in melange/backends/factory.py
def init_queue(
    self,
    queue_name: str,
    topic_names_to_subscribe: Optional[List[str]] = None,
    dlq_name: Optional[str] = None,
    **kwargs: Any
) -> QueueWrapper:
    """
    Creates a queue if it does not exist, subscribes it to the topics, and creates a
    dead letter queue.

    Args:
        queue_name: name of the queue to create
        topic_names_to_subscribe: list of topic names to subscribe this queue. If
            the topics do not exist they will be created.
        dlq_name: name of the dead letter queue to create and attach to the main queue
        **kwargs: any other arguments required by your messaging backend

    Returns:
        The created queue wrapper

    """
    topic_names_to_subscribe = topic_names_to_subscribe or []
    topics = [self.init_topic(t) for t in topic_names_to_subscribe]

    queue, dlq = self._backend.declare_queue(
        queue_name, *topics, dead_letter_queue_name=dlq_name, **kwargs
    )

    return queue

init_topic(self, topic_name)

Creates a topic if it does not exist.

Parameters:

Name Type Description Default
topic_name str

the name of the topic to create

required

Returns:

Type Description
TopicWrapper

The created topic wrapper

Source code in melange/backends/factory.py
def init_topic(self, topic_name: str) -> TopicWrapper:
    """
    Creates a topic if it does not exist.
    Args:
        topic_name: the name of the topic to create

    Returns:
        The created topic wrapper

    """
    return self._backend.declare_topic(topic_name)
Back to top