Skip to content

Publishers

Publishers, as implied by the name, publish messages to a message broker, which are then propagated/stored into a queue for consumers/subscribers to process.

You can publish messages to queues or topics.

Publishing to a queue

Publishing a message to a queue makes this message available to a single consumer (that's the concept of a queue after all). To do that, build an instance of the QueuePublisher class and call the publish method with your message:

from simple_cqrs.domain_event import DomainEvent

from melange.backends import LocalSQSBackend
from melange import QueuePublisher
from melange.serializers import PickleSerializer
from melange.serializers import SerializerRegistry


class MyTestMessage:
    def __init__(self, message: str) -> None:
        self.message = message


if __name__ == "__main__":
    serializer_settings = {
        "serializers": {"pickle": PickleSerializer},
        "serializer_bindings": {DomainEvent: "pickle"},
    }

    serializer_registry = SerializerRegistry(serializer_settings)

    backend = LocalSQSBackend(host="localhost", port=9324)
    publisher = QueuePublisher(serializer_registry, backend)
    message = MyTestMessage("Hello World!")
    publisher.publish("melangetutorial-queue", message)
    print("Message sent successfully!")

The QueuePublisher requires a backend and the serializer registry as constructor parameters. The serializer registry is necessary to properly serialize and send the message to the messaging backend.

Tip

In a production project where you would have a proper dependency injection framework in place (e.g. pinject), you could instantiate the Publisher once and provide that instance through your application

Publishing to a topic

Publishing a message to a topic works exactly the same way as publishing to a queue, but it will work with the Fanout pattern to distribute the message to several subscribers of that topic. To do that, build an instance of the TopicPublisher class and call the publish method with your message:

from melange.backends import LocalSQSBackend
from melange.examples.shared import serializer_registry
from melange import TopicPublisher
from melange.serializers import PickleSerializer


class MyTestMessage:
    def __init__(self, message: str) -> None:
        self.message = message


if __name__ == "__main__":
    backend = LocalSQSBackend(host="localhost", port=9324)
    serializer = PickleSerializer()
    publisher = TopicPublisher(serializer_registry, backend)
    message = MyTestMessage("Hello World!")
    publisher.publish("melangetutorial-topic", message)
    print("Message sent successfully!")

As you can appreciate it works exactly the same way as publishing to a queue, the only difference happens behind the scenes.

Back to top