Messaging Backend Factory
MessagingBackendFactory
Class used to create queues and topics
Source code in melange/backends/factory.py
class MessagingBackendFactory:
"""
Class used to create queues and topics
"""
def __init__(self, backend: Optional[MessagingBackend] = None) -> None:
self._backend = backend or BackendManager().get_default_backend()
def init_queue(
self,
queue_name: str,
topic_names_to_subscribe: Optional[List[str]] = None,
dlq_name: Optional[str] = None,
**kwargs: Any
) -> QueueWrapper:
"""
Creates a queue if it does not exist, subscribes it to the topics, and creates a
dead letter queue.
Args:
queue_name: name of the queue to create
topic_names_to_subscribe: list of topic names to subscribe this queue. If
the topics do not exist they will be created.
dlq_name: name of the dead letter queue to create and attach to the main queue
**kwargs: any other arguments required by your messaging backend
Returns:
The created queue wrapper
"""
topic_names_to_subscribe = topic_names_to_subscribe or []
topics = [self.init_topic(t) for t in topic_names_to_subscribe]
queue, dlq = self._backend.declare_queue(
queue_name, *topics, dead_letter_queue_name=dlq_name, **kwargs
)
return queue
def init_topic(self, topic_name: str) -> TopicWrapper:
"""
Creates a topic if it does not exist.
Args:
topic_name: the name of the topic to create
Returns:
The created topic wrapper
"""
return self._backend.declare_topic(topic_name)
init_queue(self, queue_name, topic_names_to_subscribe=None, dlq_name=None, **kwargs)
Creates a queue if it does not exist, subscribes it to the topics, and creates a dead letter queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queue_name |
str |
name of the queue to create |
required |
topic_names_to_subscribe |
Optional[List[str]] |
list of topic names to subscribe this queue. If the topics do not exist they will be created. |
None |
dlq_name |
Optional[str] |
name of the dead letter queue to create and attach to the main queue |
None |
**kwargs |
Any |
any other arguments required by your messaging backend |
{} |
Returns:
Type | Description |
---|---|
QueueWrapper |
The created queue wrapper |
Source code in melange/backends/factory.py
def init_queue(
self,
queue_name: str,
topic_names_to_subscribe: Optional[List[str]] = None,
dlq_name: Optional[str] = None,
**kwargs: Any
) -> QueueWrapper:
"""
Creates a queue if it does not exist, subscribes it to the topics, and creates a
dead letter queue.
Args:
queue_name: name of the queue to create
topic_names_to_subscribe: list of topic names to subscribe this queue. If
the topics do not exist they will be created.
dlq_name: name of the dead letter queue to create and attach to the main queue
**kwargs: any other arguments required by your messaging backend
Returns:
The created queue wrapper
"""
topic_names_to_subscribe = topic_names_to_subscribe or []
topics = [self.init_topic(t) for t in topic_names_to_subscribe]
queue, dlq = self._backend.declare_queue(
queue_name, *topics, dead_letter_queue_name=dlq_name, **kwargs
)
return queue
init_topic(self, topic_name)
Creates a topic if it does not exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic_name |
str |
the name of the topic to create |
required |
Returns:
Type | Description |
---|---|
TopicWrapper |
The created topic wrapper |