Message Dispatchers and Consumers
Message Dispatcher and Consumers are the counterpart of the publishers. Consumers attach themselves to a Message Dispatcher, and the message dispatchers start the consuming loop and receive the messages from a queue, forwarding them to a consumer that accepts that message.
Consumers
Consumers are very simple, since they only receive a Message as a parameter and then
they do some kind of processing to send it down to the lower layers of your application (something like a REST view/controller).
To mplement a consumer with Melange one of the approaches is to subclass the Consumer
class
and implement the process
method, and, optionally, the accepts
method.
Example (from examples/payment_service/consumer_draft.py
):
from typing import Any, Optional
from melange import Consumer
from melange.examples.common.commands import DoPayment
from melange.examples.payment_service.events import OrderResponse
from melange.examples.payment_service.service import PaymentService
class PaymentConsumer(Consumer):
def __init__(self, payment_service: PaymentService):
super().__init__()
self.payment_service = payment_service
def process(self, message: Any, **kwargs: Any) -> None:
if isinstance(message, OrderResponse):
self.payment_service.process(message)
elif isinstance(message, DoPayment):
self.payment_service.do_payment(message)
def accepts(self, manifest: Optional[str]) -> bool:
return manifest in ["OrderResponse", "DoPayment"]
There is a variation of the Consumer
, the SingleDispatchConsumer
consumer. It relies
on the singledispatch
library to implement method overloading on the process
function,
in order to achieve a richer accepts
and process
methods. This has proven to make the development
of complex consumers faster and cleaner.
The same PaymentConsumer
as above, but implemented by subclassing SingleDispatchConsumer
(from examples/payment_service/consumer.py
):
from melange import SingleDispatchConsumer, consumer
from melange.examples.common.commands import DoPayment
from melange.examples.payment_service.events import OrderResponse
from melange.examples.payment_service.service import PaymentService
class PaymentConsumer(SingleDispatchConsumer):
def __init__(self, payment_service: PaymentService):
super().__init__()
self.payment_service = payment_service
@consumer
def consume_order_response(self, event: OrderResponse) -> None:
self.payment_service.process(event)
@consumer
def consume_do_payment(self, command: DoPayment) -> None:
self.payment_service.do_payment(command)
For a consumer to be able to receive messages it requires to be attached to a MessageDispatcher
Message Dispatcher
As summarized on top of this article, the MessageDispatcher
component/class is the responsible to:
- Start the polling loop to get new messages through the
MessagingBackend
. - Deserialize the message with the appropriate
Serializer
. - Pass the message to the consumers that accept it for further processing.
- Acknowledge the message.
There is a variation of the MessageDispatcher
called SimpleMessageDispatcher
which is essentially the same as the former, but when you have only one consumer.
This is the specification of the MessageDispatcher
class:
The MessageDispatcher
is responsible to start the message consumption loop
to retrieve the available messages from the queue and dispatch them to the
consumers.
Source code in melange/message_dispatcher.py
class MessageDispatcher:
"""
The `MessageDispatcher` is responsible to start the message consumption loop
to retrieve the available messages from the queue and dispatch them to the
consumers.
"""
def __init__(
self,
serializer_registry: SerializerRegistry,
cache: Optional[DeduplicationCache] = None,
backend: Optional[MessagingBackend] = None,
always_ack: bool = False,
) -> None:
self._consumers: List[Consumer] = []
self.serializer_registry = serializer_registry
self._backend = backend or BackendManager().get_default_backend()
self.cache: DeduplicationCache = cache or NullCache()
self.always_ack = always_ack
def attach_consumer(self, consumer: Consumer) -> None:
"""
Attaches a consumer to the dispatcher, so that it can receive messages
Args:
consumer: the consumer to attach
"""
if consumer not in self._consumers:
self._consumers.append(consumer)
def unattach_consumer(self, consumer: Consumer) -> None:
"""
Unattaches the consumer from the dispatcher, so that it will not receive messages anymore
Args:
consumer: the consumer to unattach
"""
if consumer in self._consumers:
self._consumers.remove(consumer)
def consume_loop(
self,
queue_name: str,
on_exception: Optional[Callable[[Exception], None]] = None,
after_consume: Optional[Callable[[], None]] = None,
) -> None:
"""
Starts the consumption loop on the queue `queue_name`
Args:
queue_name: The queue to poll for new messages
on_exception: If there is any exception, the exception
will be passed to this callback
after_consume: After consuming a batch of events,
invoke this callback
"""
self.consume_event(queue_name, on_exception, after_consume)
def consume_event(
self,
queue_name: str,
on_exception: Optional[Callable[[Exception], None]] = None,
after_consume: Optional[Callable[[], None]] = None,
) -> None:
"""
Consumes events on the queue `queue_name`
"""
event_queue = self._backend.get_queue(queue_name)
messages = self._backend.yield_messages(event_queue)
for message in messages:
try:
self._dispatch_message(message)
except Exception as e:
logger.exception(e)
if on_exception:
on_exception(e)
finally:
if after_consume:
after_consume()
def _get_consumers(self, message_data: Any) -> List[Consumer]:
return [
consumer for consumer in self._consumers if consumer.accepts(message_data)
]
def _dispatch_message(self, message: Message) -> None:
manifest = message.get_message_manifest()
# If the message cannot be deserialized, just ignore it.
# ACK it anyway to avoid hanging on the same message over an over again
try:
if message.serializer_id is not None:
message_data = self.serializer_registry.deserialize_with_serializerid(
message.content, message.serializer_id, manifest=manifest
)
else:
# TODO: If no serializerid is supplied, at least we need to grab some
# clue in the message to know which serializer must be used.
# for now, rely on the default.
message_data = self.serializer_registry.deserialize_with_class(
message.content, object, manifest=manifest
)
except SerializationError as e:
logger.error(e)
self._backend.acknowledge(message)
return
consumers = self._get_consumers(message_data)
successful = 0
for consumer in consumers:
try:
# Store into the cache
message_key = (
f"{get_fully_qualified_name(consumer)}.{message.message_id}"
)
if message_key in self.cache:
logger.info("detected a duplicated message, ignoring")
else:
consumer.process(message_data, message_id=message.message_id)
successful += 1
self.cache.store(message_key, message_key)
except Exception as e:
logger.exception(e)
if self.always_ack or successful == len(consumers):
self._backend.acknowledge(message)
attach_consumer(self, consumer)
Attaches a consumer to the dispatcher, so that it can receive messages
Parameters:
Name | Type | Description | Default |
---|---|---|---|
consumer |
Consumer |
the consumer to attach |
required |
consume_event(self, queue_name, on_exception=None, after_consume=None)
Consumes events on the queue queue_name
Source code in melange/message_dispatcher.py
def consume_event(
self,
queue_name: str,
on_exception: Optional[Callable[[Exception], None]] = None,
after_consume: Optional[Callable[[], None]] = None,
) -> None:
"""
Consumes events on the queue `queue_name`
"""
event_queue = self._backend.get_queue(queue_name)
messages = self._backend.yield_messages(event_queue)
for message in messages:
try:
self._dispatch_message(message)
except Exception as e:
logger.exception(e)
if on_exception:
on_exception(e)
finally:
if after_consume:
after_consume()
consume_loop(self, queue_name, on_exception=None, after_consume=None)
Starts the consumption loop on the queue queue_name
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queue_name |
str |
The queue to poll for new messages |
required |
on_exception |
Optional[Callable[[Exception], NoneType]] |
If there is any exception, the exception will be passed to this callback |
None |
after_consume |
Optional[Callable[[], NoneType]] |
After consuming a batch of events, invoke this callback |
None |
Source code in melange/message_dispatcher.py
def consume_loop(
self,
queue_name: str,
on_exception: Optional[Callable[[Exception], None]] = None,
after_consume: Optional[Callable[[], None]] = None,
) -> None:
"""
Starts the consumption loop on the queue `queue_name`
Args:
queue_name: The queue to poll for new messages
on_exception: If there is any exception, the exception
will be passed to this callback
after_consume: After consuming a batch of events,
invoke this callback
"""
self.consume_event(queue_name, on_exception, after_consume)
unattach_consumer(self, consumer)
Unattaches the consumer from the dispatcher, so that it will not receive messages anymore
Parameters:
Name | Type | Description | Default |
---|---|---|---|
consumer |
Consumer |
the consumer to unattach |
required |
Source code in melange/message_dispatcher.py
Note
Unless the always_ack
is set to True
, a message will only be acknowleged if
it's been correcly processed by all consumers that accept the message.
Unless message deduplication is in place, if a consumers fails the same
message is going to be reprocessed again by all the consumers, which can lead to issues.
Either use only one consumer per MessageDispatcher
, make your consumers idempotent,
or set a DeduplicationCache
when instantiating the MessageDispatcher
.