Tutorial - Getting started
Talk is cheap. show me the code.
- Linus Torvalds -
Event-driven architectures work with the Publish/Subscribe pattern to achieve decoupling. With this pattern, publishers and subscribers do not know about each other while they can exchange information among them. In order to achieve this and communicate effectively a mediator, or better said, a Message Broker is required to transfer messages from publishers to subscribers. Clients can subscribe this broker, waiting for events they are interested in, or publish messages so that the broker can distribute these messages appropriately.
This tutorial assumes that you have basic understanding of the pub/sub
mechanics. If not, there are a whole bunch of resources to get your feet
wet on the topic. Also it's good to have docker
installed since we are
going to spin up local infrastructure to serve as a messaging broker.
Choosing a Messaging Backend
A messaging backend is the infrastructure where your messages are going to be published
and consumed. In this tutorial we are going to use LocalStack
as our messaging backend. Basically spinning up an ElasticMQ (for example with docker-compose
)
in your machine will provide you with a local SQS+SNS for development with boto, which
makes it ideal for testing and for the sake of this tutorial. You could follow the instructions
in the LocalStack project to install it to your local machine. Though the quickest route
is to launch the docker image with melange:
This will start LocalStack in the port 4566
in localhost
, ready to be used.
Creating a queue
Before using a queue you need to create it. Put the following code snippet into a
file called create_queue.py
and execute it to create the queue:
from melange import MessagingBackendFactory
from melange.backends import LocalSQSBackend
backend = LocalSQSBackend(host="localhost", port=9324)
factory = MessagingBackendFactory(backend)
factory.init_queue("melangetutorial-queue")
Publishing messages
Publishing messages to a queue with Melange is easy. Just create an instance of the message
publisher and publish
the message. Put the following code snippet into a file called publish_example.py
:
from simple_cqrs.domain_event import DomainEvent
from melange.backends import LocalSQSBackend
from melange import QueuePublisher
from melange.serializers import PickleSerializer
from melange.serializers import SerializerRegistry
class MyTestMessage:
def __init__(self, message: str) -> None:
self.message = message
if __name__ == "__main__":
serializer_settings = {
"serializers": {"pickle": PickleSerializer},
"serializer_bindings": {DomainEvent: "pickle"},
}
serializer_registry = SerializerRegistry(serializer_settings)
backend = LocalSQSBackend(host="localhost", port=9324)
publisher = QueuePublisher(serializer_registry, backend)
message = MyTestMessage("Hello World!")
publisher.publish("melangetutorial-queue", message)
print("Message sent successfully!")
Once you run this code it will publish a message MyTestMessage
with the contents Hello World
in
the queue melangetutorial-queue
.
You can send anything as long as the SerializerRegistry
can serialize/deserialize
the object. Refer Serialization for further details.
Note
For the sake of this tutorial you can use simple config which just uses the PickleSerializer
to serialize your messages. For production applications however you should probably use another
type of serializer or create your own, since pickle
is considered unsafe and
only works with python consumers. Consider creating your own serializer or use the
JsonSerializer
at the very least once you put Melange in production.
Consuming messages
It's good to publish messages, but they are worth nothing if nobody reads them. Therefore, we need a consumer that reads these messages and reacts to them.
Put the following code snippet in a file called consumer-example.py
and run it:
from simple_cqrs.domain_event import DomainEvent
from melange.backends import LocalSQSBackend
from melange import SingleDispatchConsumer, consumer
from melange.examples.doc_examples.tutorial.publish import MyTestMessage
from melange import SimpleMessageDispatcher
from melange.serializers import PickleSerializer
from melange.serializers import SerializerRegistry
class MyConsumer(SingleDispatchConsumer):
@consumer
def on_my_test_message_received(self, event: MyTestMessage) -> None:
print(event.message)
if __name__ == "__main__":
serializer_settings = {
"serializers": {"pickle": PickleSerializer},
"serializer_bindings": {DomainEvent: "pickle"},
}
serializer_registry = SerializerRegistry(serializer_settings)
backend = LocalSQSBackend(host="localhost", port=9324)
consumer = MyConsumer()
message_dispatcher = SimpleMessageDispatcher(
consumer,
serializer_registry,
backend=backend,
)
print("Consuming...")
message_dispatcher.consume_loop("melangetutorial-queue")
Upon hitting the consume_loop
method, the process will start polling the queue for new
messages. Once it receives a message, as long as the message is of type MyTestMessage
it will
forward this message to the MyConsumer
. If your infrastructure was set correctly, every time
you run the publish_example.py
script you will see a print with the message on the screen where
the consumer is running.
Congratulations! You just run your very first example of a Pub/Sub mechanism with Melange!
Note
It's a good idea to have shared classes (like the MyTestMessage
in the example) in its
own python module (e.g. shared.py
)
Where to go from here
Now that you grasped the basic idea on how you could use Melange, you could go further and read more details about:
To add to that, although the exposed example is quite simple, it serves as the foundation to implement a number of use cases and distributed architectures with microservices. With Melange you can:
- Build a CQRS + Event sourcing architecture, where you publish your events to a queue or topic from the Command side and read those with a consumer from the Read side to create your data projections.
- Build choreography Sagas for long-running processes which can span several transactions.
- Implement microservices which consume messages from a queue to do their job (e.g. an staticstics microservice
that reacts to a
OrderCreated
event and increments a counter to track how many orders your system has).
We have not covered the case of topics. Refer to Publishers for further details.
In addition, Melange is bundled with a consumer that works with a python application. But the consumer can be implemented in any language and any technology that can read messages from your queue (AWS Lambda, Azure functions, a NodeJS app...)