Skip to content

Messaging Backends

MessagingBackend

Source code in melange/backends/interfaces.py
class MessagingBackend:
    def __init__(self) -> None:
        self._finalizer = weakref.finalize(self, self.close_connection)

    def declare_topic(self, topic_name: str) -> TopicWrapper:
        """
        Gets or creates a topic.

        Args:
            topic_name: The name of the topic to create

        Returns:
            An object that represents a topic. The type of the object
            is only relevant inside the context of the backend, so what you
            return as a topic will be passed in next calls to the backend
            where a topic is required
        """
        raise NotImplementedError

    def get_queue(self, queue_name: str) -> QueueWrapper:
        """
        Gets the queue with the name `queue_name`. Does not perform creation.

        Args:
            queue_name: the name of the queue to retrieve

        Returns:
            A `Queue` object that represents the created the queue
        """
        raise NotImplementedError

    def declare_queue(
        self,
        queue_name: str,
        *topics_to_bind: TopicWrapper,
        dead_letter_queue_name: Optional[str] = None,
        **kwargs: Any
    ) -> Tuple[QueueWrapper, Optional[QueueWrapper]]:
        """
        Gets or creates a queue.

        Args:
            queue_name: the name of the queue to create
            *topics_to_bind: if provided, creates all these topics and subscribes
                the created queue to them
            dead_letter_queue_name: if provided, create a dead letter queue attached to
                the created `queue_name`.
            **kwargs:

        Returns:
            A tuple with the created queue and the dead letter queue (if applies)
        """

        raise NotImplementedError

    def retrieve_messages(self, queue: QueueWrapper, **kwargs: Any) -> List[Message]:
        """
        Retrieves a list of available messages from the queue.

        Args:
            queue: the queue object
            **kwargs: Other parameters/options required by the backend

        Returns:
            A list of available messages from the queue
        """
        raise NotImplementedError

    def yield_messages(self, queue: QueueWrapper, **kwargs: Any) -> Iterable[Message]:
        """
        Yields available messages from the queue.

        Args:
            queue: the queue object
            **kwargs: Other parameters/options required by the backend

        Returns:
            An iterable which will poll the queue upon requesting more messages
        """
        raise NotImplementedError

    def publish_to_topic(
        self,
        message: Message,
        topic: TopicWrapper,
        extra_attributes: Optional[Dict] = None,
    ) -> None:
        """
        Publishes a message content and the manifest to the topic

        Args:
            message: the message to send
            topic: the topic to send the message to
            extra_attributes: extra properties that might be required for the backend
        """
        raise NotImplementedError

    def publish_to_queue(
        self, message: Message, queue: QueueWrapper, **kwargs: Any
    ) -> None:
        raise NotImplementedError

    def acknowledge(self, message: Message) -> None:
        """
        Acknowledges a message so that it won't be redelivered by
        the messaging infrastructure in the future
        """
        raise NotImplementedError

    def close_connection(self) -> None:
        """
        Override this function if you want to use some finalizer code
         to shutdown your backend in a clean way
        """
        pass

    def delete_queue(self, queue: QueueWrapper) -> None:
        """
        Deletes the queue
        """
        raise NotImplementedError

    def delete_topic(self, topic: TopicWrapper) -> None:
        """
        Deletes the topic
        """
        raise NotImplementedError

acknowledge(self, message)

Acknowledges a message so that it won't be redelivered by the messaging infrastructure in the future

Source code in melange/backends/interfaces.py
def acknowledge(self, message: Message) -> None:
    """
    Acknowledges a message so that it won't be redelivered by
    the messaging infrastructure in the future
    """
    raise NotImplementedError

close_connection(self)

Override this function if you want to use some finalizer code to shutdown your backend in a clean way

Source code in melange/backends/interfaces.py
def close_connection(self) -> None:
    """
    Override this function if you want to use some finalizer code
     to shutdown your backend in a clean way
    """
    pass

declare_queue(self, queue_name, *topics_to_bind, *, dead_letter_queue_name=None, **kwargs)

Gets or creates a queue.

Parameters:

Name Type Description Default
queue_name str

the name of the queue to create

required
*topics_to_bind TopicWrapper

if provided, creates all these topics and subscribes the created queue to them

()
dead_letter_queue_name Optional[str]

if provided, create a dead letter queue attached to the created queue_name.

None
**kwargs Any {}

Returns:

Type Description
Tuple[melange.models.QueueWrapper, Optional[melange.models.QueueWrapper]]

A tuple with the created queue and the dead letter queue (if applies)

Source code in melange/backends/interfaces.py
def declare_queue(
    self,
    queue_name: str,
    *topics_to_bind: TopicWrapper,
    dead_letter_queue_name: Optional[str] = None,
    **kwargs: Any
) -> Tuple[QueueWrapper, Optional[QueueWrapper]]:
    """
    Gets or creates a queue.

    Args:
        queue_name: the name of the queue to create
        *topics_to_bind: if provided, creates all these topics and subscribes
            the created queue to them
        dead_letter_queue_name: if provided, create a dead letter queue attached to
            the created `queue_name`.
        **kwargs:

    Returns:
        A tuple with the created queue and the dead letter queue (if applies)
    """

    raise NotImplementedError

declare_topic(self, topic_name)

Gets or creates a topic.

Parameters:

Name Type Description Default
topic_name str

The name of the topic to create

required

Returns:

Type Description
TopicWrapper

An object that represents a topic. The type of the object is only relevant inside the context of the backend, so what you return as a topic will be passed in next calls to the backend where a topic is required

Source code in melange/backends/interfaces.py
def declare_topic(self, topic_name: str) -> TopicWrapper:
    """
    Gets or creates a topic.

    Args:
        topic_name: The name of the topic to create

    Returns:
        An object that represents a topic. The type of the object
        is only relevant inside the context of the backend, so what you
        return as a topic will be passed in next calls to the backend
        where a topic is required
    """
    raise NotImplementedError

delete_queue(self, queue)

Deletes the queue

Source code in melange/backends/interfaces.py
def delete_queue(self, queue: QueueWrapper) -> None:
    """
    Deletes the queue
    """
    raise NotImplementedError

delete_topic(self, topic)

Deletes the topic

Source code in melange/backends/interfaces.py
def delete_topic(self, topic: TopicWrapper) -> None:
    """
    Deletes the topic
    """
    raise NotImplementedError

get_queue(self, queue_name)

Gets the queue with the name queue_name. Does not perform creation.

Parameters:

Name Type Description Default
queue_name str

the name of the queue to retrieve

required

Returns:

Type Description
QueueWrapper

A Queue object that represents the created the queue

Source code in melange/backends/interfaces.py
def get_queue(self, queue_name: str) -> QueueWrapper:
    """
    Gets the queue with the name `queue_name`. Does not perform creation.

    Args:
        queue_name: the name of the queue to retrieve

    Returns:
        A `Queue` object that represents the created the queue
    """
    raise NotImplementedError

publish_to_topic(self, message, topic, extra_attributes=None)

Publishes a message content and the manifest to the topic

Parameters:

Name Type Description Default
message Message

the message to send

required
topic TopicWrapper

the topic to send the message to

required
extra_attributes Optional[Dict]

extra properties that might be required for the backend

None
Source code in melange/backends/interfaces.py
def publish_to_topic(
    self,
    message: Message,
    topic: TopicWrapper,
    extra_attributes: Optional[Dict] = None,
) -> None:
    """
    Publishes a message content and the manifest to the topic

    Args:
        message: the message to send
        topic: the topic to send the message to
        extra_attributes: extra properties that might be required for the backend
    """
    raise NotImplementedError

retrieve_messages(self, queue, **kwargs)

Retrieves a list of available messages from the queue.

Parameters:

Name Type Description Default
queue QueueWrapper

the queue object

required
**kwargs Any

Other parameters/options required by the backend

{}

Returns:

Type Description
List[melange.models.Message]

A list of available messages from the queue

Source code in melange/backends/interfaces.py
def retrieve_messages(self, queue: QueueWrapper, **kwargs: Any) -> List[Message]:
    """
    Retrieves a list of available messages from the queue.

    Args:
        queue: the queue object
        **kwargs: Other parameters/options required by the backend

    Returns:
        A list of available messages from the queue
    """
    raise NotImplementedError

yield_messages(self, queue, **kwargs)

Yields available messages from the queue.

Parameters:

Name Type Description Default
queue QueueWrapper

the queue object

required
**kwargs Any

Other parameters/options required by the backend

{}

Returns:

Type Description
Iterable[melange.models.Message]

An iterable which will poll the queue upon requesting more messages

Source code in melange/backends/interfaces.py
def yield_messages(self, queue: QueueWrapper, **kwargs: Any) -> Iterable[Message]:
    """
    Yields available messages from the queue.

    Args:
        queue: the queue object
        **kwargs: Other parameters/options required by the backend

    Returns:
        An iterable which will poll the queue upon requesting more messages
    """
    raise NotImplementedError

BackendManager

Source code in melange/backends/backend_manager.py
class BackendManager(metaclass=Singleton):
    def __init__(self) -> None:
        self._backend: Optional[MessagingBackend] = None

    def set_default_backend(
        self,
        backend: MessagingBackend,
    ) -> None:
        """
        Sets the default backend
        Args:
            backend:
        """
        if not isinstance(backend, MessagingBackend):
            raise Exception("Invalid backend supplied")

        self._backend = backend

    def get_default_backend(self) -> MessagingBackend:
        """
        Returns the current default backend
        """
        if not self._backend:
            raise Exception(
                "No backend is registered. Please call 'set_default_backend' prior to getting it"
            )

        return self._backend

get_default_backend(self)

Returns the current default backend

Source code in melange/backends/backend_manager.py
def get_default_backend(self) -> MessagingBackend:
    """
    Returns the current default backend
    """
    if not self._backend:
        raise Exception(
            "No backend is registered. Please call 'set_default_backend' prior to getting it"
        )

    return self._backend

set_default_backend(self, backend)

Sets the default backend

Parameters:

Name Type Description Default
backend MessagingBackend required
Source code in melange/backends/backend_manager.py
def set_default_backend(
    self,
    backend: MessagingBackend,
) -> None:
    """
    Sets the default backend
    Args:
        backend:
    """
    if not isinstance(backend, MessagingBackend):
        raise Exception("Invalid backend supplied")

    self._backend = backend

AWSBackend (BaseSQSBackend)

Backend to use with AWS

Source code in melange/backends/sqs/sqs_backend.py
class AWSBackend(BaseSQSBackend):
    """
    Backend to use with AWS
    """

    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)

BaseSQSBackend (MessagingBackend)

Base class for SQS Backends

Source code in melange/backends/sqs/sqs_backend.py
class BaseSQSBackend(MessagingBackend):
    """
    Base class for SQS Backends
    """

    def __init__(self, **kwargs: Any) -> None:
        super().__init__()
        self.max_number_of_messages = kwargs.get("max_number_of_messages", 10)
        self.visibility_timeout = kwargs.get("visibility_timeout", 100)
        self.wait_time_seconds = kwargs.get("wait_time_seconds", 10)

        self.extra_settings = kwargs.get("extra_settings", {})
        self.sns_settings = kwargs.get("sns_settings", {})

    def declare_topic(self, topic_name: str) -> TopicWrapper:
        sns = boto3.resource("sns", **self.sns_settings)
        topic = sns.create_topic(Name=topic_name)
        return TopicWrapper(topic)

    def get_queue(self, queue_name: str) -> QueueWrapper:
        sqs_res = boto3.resource("sqs", **self.extra_settings)
        return QueueWrapper(sqs_res.get_queue_by_name(QueueName=queue_name))

    def _subscribe_to_topics(
        self, queue: QueueWrapper, topics_to_bind: Iterable[TopicWrapper], **kwargs: Any
    ) -> None:
        if topics_to_bind:
            statements = []
            for topic in topics_to_bind:
                statement = {
                    "Sid": "Sid{}".format(uuid.uuid4()),
                    "Effect": "Allow",
                    "Principal": "*",
                    "Resource": queue.unwrapped_obj.attributes["QueueArn"],
                    "Action": "sqs:SendMessage",
                    "Condition": {
                        "ArnEquals": {"aws:SourceArn": topic.unwrapped_obj.arn}
                    },
                }

                statements.append(statement)

                subscription = topic.unwrapped_obj.subscribe(
                    Protocol="sqs",
                    Endpoint=queue.unwrapped_obj.attributes[
                        "QueueArn"
                    ],  # , Attributes={"RawMessageDelivery": "true"}
                )

                if kwargs.get("filter_events"):
                    filter_policy = {"manifest": kwargs["filter_events"]}
                else:
                    filter_policy = {}

                subscription.set_attributes(
                    AttributeName="FilterPolicy",
                    AttributeValue=json.dumps(filter_policy),
                )

            policy = {
                "Version": "2012-10-17",
                "Id": "sqspolicy",
                "Statement": statements,
            }

            queue.unwrapped_obj.set_attributes(
                Attributes={"Policy": json.dumps(policy)}
            )

    def declare_queue(
        self,
        queue_name: str,
        *topics_to_bind: TopicWrapper,
        dead_letter_queue_name: Optional[str] = None,
        **kwargs: Any,
    ) -> Tuple[QueueWrapper, Optional[QueueWrapper]]:
        try:
            queue = self.get_queue(queue_name)
        except Exception:
            queue = self._create_queue(queue_name, content_based_deduplication="true")

        self._subscribe_to_topics(queue, topics_to_bind, **kwargs)

        dead_letter_queue: Optional[QueueWrapper] = None
        if dead_letter_queue_name:
            try:
                dead_letter_queue = self.get_queue(dead_letter_queue_name)
            except Exception:
                dead_letter_queue = self._create_queue(
                    dead_letter_queue_name, content_based_deduplication="true"
                )

            redrive_policy = {
                "deadLetterTargetArn": dead_letter_queue.unwrapped_obj.attributes[
                    "QueueArn"
                ],
                "maxReceiveCount": "4",
            }

            queue.unwrapped_obj.set_attributes(
                Attributes={"RedrivePolicy": json.dumps(redrive_policy)}
            )

        return queue, dead_letter_queue

    def _create_queue(self, queue_name: str, **kwargs: Any) -> QueueWrapper:
        sqs_res = boto3.resource("sqs", **self.extra_settings)
        fifo = queue_name.endswith(".fifo")
        attributes = {}
        if fifo:
            attributes["FifoQueue"] = "true"
            attributes["ContentBasedDeduplication"] = (
                "true" if kwargs.get("content_based_deduplication") else "false"
            )
        queue = sqs_res.create_queue(QueueName=queue_name, Attributes=attributes)
        return QueueWrapper(queue)

    def retrieve_messages(self, queue: QueueWrapper, **kwargs: Any) -> List[Message]:
        args = dict(
            MaxNumberOfMessages=self.max_number_of_messages,
            VisibilityTimeout=self.visibility_timeout,
            WaitTimeSeconds=self.wait_time_seconds,
            MessageAttributeNames=["All"],
            AttributeNames=["All"],
        )

        if "attempt_id" in kwargs:
            args["ReceiveRequestAttemptId"] = kwargs["attempt_id"]

        messages = queue.unwrapped_obj.receive_messages(**args)

        # We need to differentiate here whether the message came from SNS or SQS
        return [self._construct_message(message) for message in messages]

    def yield_messages(self, queue: QueueWrapper, **kwargs: Any) -> Iterable[Message]:
        args = dict(
            MaxNumberOfMessages=self.max_number_of_messages,
            VisibilityTimeout=self.visibility_timeout,
            WaitTimeSeconds=self.wait_time_seconds,
            MessageAttributeNames=["All"],
            AttributeNames=["All"],
        )

        while True:
            messages = queue.unwrapped_obj.receive_messages(**args)
            for message_content in messages:
                message = self._construct_message(message_content)
                yield message

    def publish_to_queue(
        self, message: Message, queue: QueueWrapper, **kwargs: Any
    ) -> None:
        message_args: Dict = {}
        message_args["MessageBody"] = json.dumps({"Message": message.content})

        is_fifo = queue.unwrapped_obj.attributes.get("FifoQueue") == "true"
        message_deduplication_id = (
            None
            if not is_fifo
            else kwargs.get("message_deduplication_id", str(uuid.uuid4()))
        )

        if message.manifest:
            message_args["MessageAttributes"] = {
                "manifest": {"DataType": "String", "StringValue": message.manifest},
                "serializer_id": {
                    "DataType": "Number",
                    "StringValue": str(message.serializer_id),
                },
            }

        if kwargs.get("message_group_id"):
            message_args["MessageGroupId"] = kwargs["message_group_id"]

        if message_deduplication_id:
            message_args["MessageDeduplicationId"] = message_deduplication_id

        queue.unwrapped_obj.send_message(**message_args)

    def publish_to_topic(
        self,
        message: Message,
        topic: TopicWrapper,
        extra_attributes: Optional[Dict] = None,
    ) -> None:
        args: Dict = dict(
            Message=message.content,
            MessageAttributes={
                "manifest": {
                    "DataType": "String",
                    "StringValue": message.manifest or "",
                },
                "serializer_id": {
                    "DataType": "Number",
                    "StringValue": str(message.serializer_id),
                },
            },
        )

        if extra_attributes:
            if "subject" in extra_attributes:
                args["Subject"] = extra_attributes["subject"]

            if "message_attributes" in extra_attributes:
                args["MessageAttributes"].update(extra_attributes["message_attributes"])

            if "message_structure" in extra_attributes:
                args["MessageStructure"] = extra_attributes["message_structure"]

        response = topic.unwrapped_obj.publish(**args)

        if "MessageId" not in response:
            raise ConnectionError("Could not send the event to the SNS TOPIC")

    def acknowledge(self, message: Message) -> None:
        message.metadata.delete()

    def close_connection(self) -> None:
        pass

    def delete_queue(self, queue: QueueWrapper) -> None:
        queue.unwrapped_obj.delete()

    def delete_topic(self, topic: TopicWrapper) -> None:
        topic.unwrapped_obj.delete()

    def _construct_message(self, message: Any) -> Message:
        body = message.body
        manifest: Optional[str] = None
        serializer_id: Optional[int] = None
        try:
            message_content = json.loads(body)
            if "Message" in message_content:
                content = message_content["Message"]
                # Does the content have more attributes? If so, it is very likely that the message came from a non-raw
                # SNS redirection
                if "MessageAttributes" in message_content:

                    manifest = (
                        message_content["MessageAttributes"]
                        .get("manifest", {})
                        .get("Value")
                        or ""
                    )
                    serializer_id = (
                        message_content["MessageAttributes"]
                        .get("serializer_id", {})
                        .get("Value")
                    )
            else:
                content = message_content
        except JSONDecodeError:
            content = body

        try:
            manifest = manifest or message.message_attributes.get("manifest", {}).get(
                "StringValue"
            )
            serializer_id = int(
                serializer_id
                or message.message_attributes.get("serializer_id", {}).get(
                    "StringValue"
                )
            )
        except Exception:
            manifest = None
            serializer_id = None

        return Message(message.message_id, content, message, serializer_id, manifest)

acknowledge(self, message)

Acknowledges a message so that it won't be redelivered by the messaging infrastructure in the future

Source code in melange/backends/sqs/sqs_backend.py
def acknowledge(self, message: Message) -> None:
    message.metadata.delete()

close_connection(self)

Override this function if you want to use some finalizer code to shutdown your backend in a clean way

Source code in melange/backends/sqs/sqs_backend.py
def close_connection(self) -> None:
    pass

declare_queue(self, queue_name, *topics_to_bind, *, dead_letter_queue_name=None, **kwargs)

Gets or creates a queue.

Parameters:

Name Type Description Default
queue_name str

the name of the queue to create

required
*topics_to_bind TopicWrapper

if provided, creates all these topics and subscribes the created queue to them

()
dead_letter_queue_name Optional[str]

if provided, create a dead letter queue attached to the created queue_name.

None
**kwargs Any {}

Returns:

Type Description
Tuple[melange.models.QueueWrapper, Optional[melange.models.QueueWrapper]]

A tuple with the created queue and the dead letter queue (if applies)

Source code in melange/backends/sqs/sqs_backend.py
def declare_queue(
    self,
    queue_name: str,
    *topics_to_bind: TopicWrapper,
    dead_letter_queue_name: Optional[str] = None,
    **kwargs: Any,
) -> Tuple[QueueWrapper, Optional[QueueWrapper]]:
    try:
        queue = self.get_queue(queue_name)
    except Exception:
        queue = self._create_queue(queue_name, content_based_deduplication="true")

    self._subscribe_to_topics(queue, topics_to_bind, **kwargs)

    dead_letter_queue: Optional[QueueWrapper] = None
    if dead_letter_queue_name:
        try:
            dead_letter_queue = self.get_queue(dead_letter_queue_name)
        except Exception:
            dead_letter_queue = self._create_queue(
                dead_letter_queue_name, content_based_deduplication="true"
            )

        redrive_policy = {
            "deadLetterTargetArn": dead_letter_queue.unwrapped_obj.attributes[
                "QueueArn"
            ],
            "maxReceiveCount": "4",
        }

        queue.unwrapped_obj.set_attributes(
            Attributes={"RedrivePolicy": json.dumps(redrive_policy)}
        )

    return queue, dead_letter_queue

declare_topic(self, topic_name)

Gets or creates a topic.

Parameters:

Name Type Description Default
topic_name str

The name of the topic to create

required

Returns:

Type Description
TopicWrapper

An object that represents a topic. The type of the object is only relevant inside the context of the backend, so what you return as a topic will be passed in next calls to the backend where a topic is required

Source code in melange/backends/sqs/sqs_backend.py
def declare_topic(self, topic_name: str) -> TopicWrapper:
    sns = boto3.resource("sns", **self.sns_settings)
    topic = sns.create_topic(Name=topic_name)
    return TopicWrapper(topic)

delete_queue(self, queue)

Deletes the queue

Source code in melange/backends/sqs/sqs_backend.py
def delete_queue(self, queue: QueueWrapper) -> None:
    queue.unwrapped_obj.delete()

delete_topic(self, topic)

Deletes the topic

Source code in melange/backends/sqs/sqs_backend.py
def delete_topic(self, topic: TopicWrapper) -> None:
    topic.unwrapped_obj.delete()

get_queue(self, queue_name)

Gets the queue with the name queue_name. Does not perform creation.

Parameters:

Name Type Description Default
queue_name str

the name of the queue to retrieve

required

Returns:

Type Description
QueueWrapper

A Queue object that represents the created the queue

Source code in melange/backends/sqs/sqs_backend.py
def get_queue(self, queue_name: str) -> QueueWrapper:
    sqs_res = boto3.resource("sqs", **self.extra_settings)
    return QueueWrapper(sqs_res.get_queue_by_name(QueueName=queue_name))

publish_to_topic(self, message, topic, extra_attributes=None)

Publishes a message content and the manifest to the topic

Parameters:

Name Type Description Default
message Message

the message to send

required
topic TopicWrapper

the topic to send the message to

required
extra_attributes Optional[Dict]

extra properties that might be required for the backend

None
Source code in melange/backends/sqs/sqs_backend.py
def publish_to_topic(
    self,
    message: Message,
    topic: TopicWrapper,
    extra_attributes: Optional[Dict] = None,
) -> None:
    args: Dict = dict(
        Message=message.content,
        MessageAttributes={
            "manifest": {
                "DataType": "String",
                "StringValue": message.manifest or "",
            },
            "serializer_id": {
                "DataType": "Number",
                "StringValue": str(message.serializer_id),
            },
        },
    )

    if extra_attributes:
        if "subject" in extra_attributes:
            args["Subject"] = extra_attributes["subject"]

        if "message_attributes" in extra_attributes:
            args["MessageAttributes"].update(extra_attributes["message_attributes"])

        if "message_structure" in extra_attributes:
            args["MessageStructure"] = extra_attributes["message_structure"]

    response = topic.unwrapped_obj.publish(**args)

    if "MessageId" not in response:
        raise ConnectionError("Could not send the event to the SNS TOPIC")

retrieve_messages(self, queue, **kwargs)

Retrieves a list of available messages from the queue.

Parameters:

Name Type Description Default
queue QueueWrapper

the queue object

required
**kwargs Any

Other parameters/options required by the backend

{}

Returns:

Type Description
List[melange.models.Message]

A list of available messages from the queue

Source code in melange/backends/sqs/sqs_backend.py
def retrieve_messages(self, queue: QueueWrapper, **kwargs: Any) -> List[Message]:
    args = dict(
        MaxNumberOfMessages=self.max_number_of_messages,
        VisibilityTimeout=self.visibility_timeout,
        WaitTimeSeconds=self.wait_time_seconds,
        MessageAttributeNames=["All"],
        AttributeNames=["All"],
    )

    if "attempt_id" in kwargs:
        args["ReceiveRequestAttemptId"] = kwargs["attempt_id"]

    messages = queue.unwrapped_obj.receive_messages(**args)

    # We need to differentiate here whether the message came from SNS or SQS
    return [self._construct_message(message) for message in messages]

yield_messages(self, queue, **kwargs)

Yields available messages from the queue.

Parameters:

Name Type Description Default
queue QueueWrapper

the queue object

required
**kwargs Any

Other parameters/options required by the backend

{}

Returns:

Type Description
Iterable[melange.models.Message]

An iterable which will poll the queue upon requesting more messages

Source code in melange/backends/sqs/sqs_backend.py
def yield_messages(self, queue: QueueWrapper, **kwargs: Any) -> Iterable[Message]:
    args = dict(
        MaxNumberOfMessages=self.max_number_of_messages,
        VisibilityTimeout=self.visibility_timeout,
        WaitTimeSeconds=self.wait_time_seconds,
        MessageAttributeNames=["All"],
        AttributeNames=["All"],
    )

    while True:
        messages = queue.unwrapped_obj.receive_messages(**args)
        for message_content in messages:
            message = self._construct_message(message_content)
            yield message
Back to top