Message Dispatchers and Consumers
Message Dispatcher and Consumers are the counterpart of the publishers. Consumers attach themselves to a Message Dispatcher, and the message dispatchers start the consuming loop and receive the messages from a queue, forwarding them to a consumer that accepts that message.
Consumers
Consumers are very simple, since they only receive a Message as a parameter and then
they do some kind of processing to send it down to the lower layers of your application (something like a REST view/controller).
To mplement a consumer with Melange one of the approaches is to subclass the Consumer class
and implement the process method, and, optionally, the accepts method.
Example (from examples/payment_service/consumer_draft.py):
from typing import Any, Optional
from melange import Consumer
from melange.examples.common.commands import DoPayment
from melange.examples.payment_service.events import OrderResponse
from melange.examples.payment_service.service import PaymentService
class PaymentConsumer(Consumer):
def __init__(self, payment_service: PaymentService):
super().__init__()
self.payment_service = payment_service
def process(self, message: Any, **kwargs: Any) -> None:
if isinstance(message, OrderResponse):
self.payment_service.process(message)
elif isinstance(message, DoPayment):
self.payment_service.do_payment(message)
def accepts(self, manifest: Optional[str]) -> bool:
return manifest in ["OrderResponse", "DoPayment"]
There is a variation of the Consumer, the SingleDispatchConsumer consumer. It relies
on the singledispatch library to implement method overloading on the process function,
in order to achieve a richer accepts and process methods. This has proven to make the development
of complex consumers faster and cleaner.
The same PaymentConsumer as above, but implemented by subclassing SingleDispatchConsumer
(from examples/payment_service/consumer.py):
from melange import SingleDispatchConsumer, consumer
from melange.examples.common.commands import DoPayment
from melange.examples.payment_service.events import OrderResponse
from melange.examples.payment_service.service import PaymentService
class PaymentConsumer(SingleDispatchConsumer):
def __init__(self, payment_service: PaymentService):
super().__init__()
self.payment_service = payment_service
@consumer
def consume_order_response(self, event: OrderResponse) -> None:
self.payment_service.process(event)
@consumer
def consume_do_payment(self, command: DoPayment) -> None:
self.payment_service.do_payment(command)
For a consumer to be able to receive messages it requires to be attached to a MessageDispatcher
Message Dispatcher
As summarized on top of this article, the MessageDispatcher component/class is the responsible to:
- Start the polling loop to get new messages through the
MessagingBackend. - Deserialize the message with the appropriate
Serializer. - Pass the message to the consumers that accept it for further processing.
- Acknowledge the message.
There is a variation of the MessageDispatcher called SimpleMessageDispatcher
which is essentially the same as the former, but when you have only one consumer.
This is the specification of the MessageDispatcher class:
The MessageDispatcher is responsible to start the message consumption loop
to retrieve the available messages from the queue and dispatch them to the
consumers.
Source code in melange/message_dispatcher.py
class MessageDispatcher:
"""
The `MessageDispatcher` is responsible to start the message consumption loop
to retrieve the available messages from the queue and dispatch them to the
consumers.
"""
def __init__(
self,
serializer_registry: SerializerRegistry,
cache: Optional[DeduplicationCache] = None,
backend: Optional[MessagingBackend] = None,
always_ack: bool = False,
) -> None:
self._consumers: List[Consumer] = []
self.serializer_registry = serializer_registry
self._backend = backend or BackendManager().get_default_backend()
self.cache: DeduplicationCache = cache or NullCache()
self.always_ack = always_ack
def attach_consumer(self, consumer: Consumer) -> None:
"""
Attaches a consumer to the dispatcher, so that it can receive messages
Args:
consumer: the consumer to attach
"""
if consumer not in self._consumers:
self._consumers.append(consumer)
def unattach_consumer(self, consumer: Consumer) -> None:
"""
Unattaches the consumer from the dispatcher, so that it will not receive messages anymore
Args:
consumer: the consumer to unattach
"""
if consumer in self._consumers:
self._consumers.remove(consumer)
def consume_loop(
self,
queue_name: str,
on_exception: Optional[Callable[[Exception], None]] = None,
after_consume: Optional[Callable[[], None]] = None,
) -> None:
"""
Starts the consumption loop on the queue `queue_name`
Args:
queue_name: The queue to poll for new messages
on_exception: If there is any exception, the exception
will be passed to this callback
after_consume: After consuming a batch of events,
invoke this callback
"""
self.consume_event(queue_name, on_exception, after_consume)
def consume_event(
self,
queue_name: str,
on_exception: Optional[Callable[[Exception], None]] = None,
after_consume: Optional[Callable[[], None]] = None,
) -> None:
"""
Consumes events on the queue `queue_name`
"""
event_queue = self._backend.get_queue(queue_name)
messages = self._backend.yield_messages(event_queue)
for message in messages:
try:
self._dispatch_message(message)
except Exception as e:
logger.exception(e)
if on_exception:
on_exception(e)
finally:
if after_consume:
after_consume()
def _get_consumers(self, message_data: Any) -> List[Consumer]:
return [
consumer for consumer in self._consumers if consumer.accepts(message_data)
]
def _dispatch_message(self, message: Message) -> None:
manifest = message.get_message_manifest()
# If the message cannot be deserialized, just ignore it.
# ACK it anyway to avoid hanging on the same message over an over again
try:
if message.serializer_id is not None:
message_data = self.serializer_registry.deserialize_with_serializerid(
message.content, message.serializer_id, manifest=manifest
)
else:
# TODO: If no serializerid is supplied, at least we need to grab some
# clue in the message to know which serializer must be used.
# for now, rely on the default.
message_data = self.serializer_registry.deserialize_with_class(
message.content, object, manifest=manifest
)
except SerializationError as e:
logger.error(e)
self._backend.acknowledge(message)
return
consumers = self._get_consumers(message_data)
successful = 0
for consumer in consumers:
try:
# Store into the cache
message_key = (
f"{get_fully_qualified_name(consumer)}.{message.message_id}"
)
if message_key in self.cache:
logger.info("detected a duplicated message, ignoring")
else:
consumer.process(message_data, message_id=message.message_id)
successful += 1
self.cache.store(message_key, message_key)
except Exception as e:
logger.exception(e)
if self.always_ack or successful == len(consumers):
self._backend.acknowledge(message)
attach_consumer(self, consumer)
Attaches a consumer to the dispatcher, so that it can receive messages
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
consumer |
Consumer |
the consumer to attach |
required |
consume_event(self, queue_name, on_exception=None, after_consume=None)
Consumes events on the queue queue_name
Source code in melange/message_dispatcher.py
def consume_event(
self,
queue_name: str,
on_exception: Optional[Callable[[Exception], None]] = None,
after_consume: Optional[Callable[[], None]] = None,
) -> None:
"""
Consumes events on the queue `queue_name`
"""
event_queue = self._backend.get_queue(queue_name)
messages = self._backend.yield_messages(event_queue)
for message in messages:
try:
self._dispatch_message(message)
except Exception as e:
logger.exception(e)
if on_exception:
on_exception(e)
finally:
if after_consume:
after_consume()
consume_loop(self, queue_name, on_exception=None, after_consume=None)
Starts the consumption loop on the queue queue_name
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_name |
str |
The queue to poll for new messages |
required |
on_exception |
Optional[Callable[[Exception], NoneType]] |
If there is any exception, the exception will be passed to this callback |
None |
after_consume |
Optional[Callable[[], NoneType]] |
After consuming a batch of events, invoke this callback |
None |
Source code in melange/message_dispatcher.py
def consume_loop(
self,
queue_name: str,
on_exception: Optional[Callable[[Exception], None]] = None,
after_consume: Optional[Callable[[], None]] = None,
) -> None:
"""
Starts the consumption loop on the queue `queue_name`
Args:
queue_name: The queue to poll for new messages
on_exception: If there is any exception, the exception
will be passed to this callback
after_consume: After consuming a batch of events,
invoke this callback
"""
self.consume_event(queue_name, on_exception, after_consume)
unattach_consumer(self, consumer)
Unattaches the consumer from the dispatcher, so that it will not receive messages anymore
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
consumer |
Consumer |
the consumer to unattach |
required |
Source code in melange/message_dispatcher.py
Note
Unless the always_ack is set to True, a message will only be acknowleged if
it's been correcly processed by all consumers that accept the message.
Unless message deduplication is in place, if a consumers fails the same
message is going to be reprocessed again by all the consumers, which can lead to issues.
Either use only one consumer per MessageDispatcher, make your consumers idempotent,
or set a DeduplicationCache when instantiating the MessageDispatcher.