Skip to content

Message Dispatchers and Consumers

Message Dispatcher and Consumers are the counterpart of the publishers. Consumers attach themselves to a Message Dispatcher, and the message dispatchers start the consuming loop and receive the messages from a queue, forwarding them to a consumer that accepts that message.

Consumers

Consumers are very simple, since they only receive a Message as a parameter and then they do some kind of processing to send it down to the lower layers of your application (something like a REST view/controller). To mplement a consumer with Melange one of the approaches is to subclass the Consumer class and implement the process method, and, optionally, the accepts method.

Example (from examples/payment_service/consumer_draft.py):

from typing import Any, Optional

from melange import Consumer
from melange.examples.common.commands import DoPayment
from melange.examples.payment_service.events import OrderResponse
from melange.examples.payment_service.service import PaymentService


class PaymentConsumer(Consumer):
    def __init__(self, payment_service: PaymentService):
        super().__init__()
        self.payment_service = payment_service

    def process(self, message: Any, **kwargs: Any) -> None:
        if isinstance(message, OrderResponse):
            self.payment_service.process(message)
        elif isinstance(message, DoPayment):
            self.payment_service.do_payment(message)

    def accepts(self, manifest: Optional[str]) -> bool:
        return manifest in ["OrderResponse", "DoPayment"]

There is a variation of the Consumer, the SingleDispatchConsumer consumer. It relies on the singledispatch library to implement method overloading on the process function, in order to achieve a richer accepts and process methods. This has proven to make the development of complex consumers faster and cleaner.

The same PaymentConsumer as above, but implemented by subclassing SingleDispatchConsumer (from examples/payment_service/consumer.py):

from melange import SingleDispatchConsumer, consumer
from melange.examples.common.commands import DoPayment
from melange.examples.payment_service.events import OrderResponse
from melange.examples.payment_service.service import PaymentService


class PaymentConsumer(SingleDispatchConsumer):
    def __init__(self, payment_service: PaymentService):
        super().__init__()
        self.payment_service = payment_service

    @consumer
    def consume_order_response(self, event: OrderResponse) -> None:
        self.payment_service.process(event)

    @consumer
    def consume_do_payment(self, command: DoPayment) -> None:
        self.payment_service.do_payment(command)

For a consumer to be able to receive messages it requires to be attached to a MessageDispatcher

Message Dispatcher

As summarized on top of this article, the MessageDispatcher component/class is the responsible to:

  1. Start the polling loop to get new messages through the MessagingBackend.
  2. Deserialize the message with the appropriate Serializer.
  3. Pass the message to the consumers that accept it for further processing.
  4. Acknowledge the message.

There is a variation of the MessageDispatcher called SimpleMessageDispatcher which is essentially the same as the former, but when you have only one consumer.

This is the specification of the MessageDispatcher class:

The MessageDispatcher is responsible to start the message consumption loop to retrieve the available messages from the queue and dispatch them to the consumers.

Source code in melange/message_dispatcher.py
class MessageDispatcher:
    """
    The `MessageDispatcher` is responsible to start the message consumption loop
    to retrieve the available messages from the queue and dispatch them to the
    consumers.
    """

    def __init__(
        self,
        serializer_registry: SerializerRegistry,
        cache: Optional[DeduplicationCache] = None,
        backend: Optional[MessagingBackend] = None,
        always_ack: bool = False,
    ) -> None:
        self._consumers: List[Consumer] = []
        self.serializer_registry = serializer_registry
        self._backend = backend or BackendManager().get_default_backend()
        self.cache: DeduplicationCache = cache or NullCache()
        self.always_ack = always_ack

    def attach_consumer(self, consumer: Consumer) -> None:
        """
        Attaches a consumer to the dispatcher, so that it can receive messages
        Args:
            consumer: the consumer to attach
        """
        if consumer not in self._consumers:
            self._consumers.append(consumer)

    def unattach_consumer(self, consumer: Consumer) -> None:
        """
        Unattaches the consumer from the dispatcher, so that it will not receive messages anymore
        Args:
            consumer: the consumer to unattach
        """
        if consumer in self._consumers:
            self._consumers.remove(consumer)

    def consume_loop(
        self,
        queue_name: str,
        on_exception: Optional[Callable[[Exception], None]] = None,
        after_consume: Optional[Callable[[], None]] = None,
    ) -> None:
        """
        Starts the consumption loop on the queue `queue_name`
        Args:
            queue_name: The queue to poll for new messages
            on_exception: If there is any exception, the exception
                will be passed to this callback
            after_consume: After consuming a batch of events,
                invoke this callback
        """
        self.consume_event(queue_name, on_exception, after_consume)

    def consume_event(
        self,
        queue_name: str,
        on_exception: Optional[Callable[[Exception], None]] = None,
        after_consume: Optional[Callable[[], None]] = None,
    ) -> None:
        """
        Consumes events on the queue `queue_name`
        """
        event_queue = self._backend.get_queue(queue_name)

        messages = self._backend.yield_messages(event_queue)
        for message in messages:
            try:
                self._dispatch_message(message)
            except Exception as e:
                logger.exception(e)
                if on_exception:
                    on_exception(e)
            finally:
                if after_consume:
                    after_consume()

    def _get_consumers(self, message_data: Any) -> List[Consumer]:
        return [
            consumer for consumer in self._consumers if consumer.accepts(message_data)
        ]

    def _dispatch_message(self, message: Message) -> None:
        manifest = message.get_message_manifest()

        # If the message cannot be deserialized, just ignore it.
        # ACK it anyway to avoid hanging on the same message over an over again
        try:
            if message.serializer_id is not None:
                message_data = self.serializer_registry.deserialize_with_serializerid(
                    message.content, message.serializer_id, manifest=manifest
                )
            else:
                # TODO: If no serializerid is supplied, at least we need to grab some
                # clue in the message to know which serializer must be used.
                # for now, rely on the default.
                message_data = self.serializer_registry.deserialize_with_class(
                    message.content, object, manifest=manifest
                )
        except SerializationError as e:
            logger.error(e)
            self._backend.acknowledge(message)
            return

        consumers = self._get_consumers(message_data)

        successful = 0
        for consumer in consumers:
            try:
                # Store into the cache
                message_key = (
                    f"{get_fully_qualified_name(consumer)}.{message.message_id}"
                )

                if message_key in self.cache:
                    logger.info("detected a duplicated message, ignoring")
                else:
                    consumer.process(message_data, message_id=message.message_id)
                    successful += 1
                    self.cache.store(message_key, message_key)
            except Exception as e:
                logger.exception(e)

        if self.always_ack or successful == len(consumers):
            self._backend.acknowledge(message)

attach_consumer(self, consumer)

Attaches a consumer to the dispatcher, so that it can receive messages

Parameters:

Name Type Description Default
consumer Consumer

the consumer to attach

required
Source code in melange/message_dispatcher.py
def attach_consumer(self, consumer: Consumer) -> None:
    """
    Attaches a consumer to the dispatcher, so that it can receive messages
    Args:
        consumer: the consumer to attach
    """
    if consumer not in self._consumers:
        self._consumers.append(consumer)

consume_event(self, queue_name, on_exception=None, after_consume=None)

Consumes events on the queue queue_name

Source code in melange/message_dispatcher.py
def consume_event(
    self,
    queue_name: str,
    on_exception: Optional[Callable[[Exception], None]] = None,
    after_consume: Optional[Callable[[], None]] = None,
) -> None:
    """
    Consumes events on the queue `queue_name`
    """
    event_queue = self._backend.get_queue(queue_name)

    messages = self._backend.yield_messages(event_queue)
    for message in messages:
        try:
            self._dispatch_message(message)
        except Exception as e:
            logger.exception(e)
            if on_exception:
                on_exception(e)
        finally:
            if after_consume:
                after_consume()

consume_loop(self, queue_name, on_exception=None, after_consume=None)

Starts the consumption loop on the queue queue_name

Parameters:

Name Type Description Default
queue_name str

The queue to poll for new messages

required
on_exception Optional[Callable[[Exception], NoneType]]

If there is any exception, the exception will be passed to this callback

None
after_consume Optional[Callable[[], NoneType]]

After consuming a batch of events, invoke this callback

None
Source code in melange/message_dispatcher.py
def consume_loop(
    self,
    queue_name: str,
    on_exception: Optional[Callable[[Exception], None]] = None,
    after_consume: Optional[Callable[[], None]] = None,
) -> None:
    """
    Starts the consumption loop on the queue `queue_name`
    Args:
        queue_name: The queue to poll for new messages
        on_exception: If there is any exception, the exception
            will be passed to this callback
        after_consume: After consuming a batch of events,
            invoke this callback
    """
    self.consume_event(queue_name, on_exception, after_consume)

unattach_consumer(self, consumer)

Unattaches the consumer from the dispatcher, so that it will not receive messages anymore

Parameters:

Name Type Description Default
consumer Consumer

the consumer to unattach

required
Source code in melange/message_dispatcher.py
def unattach_consumer(self, consumer: Consumer) -> None:
    """
    Unattaches the consumer from the dispatcher, so that it will not receive messages anymore
    Args:
        consumer: the consumer to unattach
    """
    if consumer in self._consumers:
        self._consumers.remove(consumer)

Note

Unless the always_ack is set to True, a message will only be acknowleged if it's been correcly processed by all consumers that accept the message. Unless message deduplication is in place, if a consumers fails the same message is going to be reprocessed again by all the consumers, which can lead to issues. Either use only one consumer per MessageDispatcher, make your consumers idempotent, or set a DeduplicationCache when instantiating the MessageDispatcher.

Back to top