Publishers
QueuePublisher
Source code in melange/publishers.py
class QueuePublisher:
def __init__(
self,
serializer_registry: SerializerRegistry,
backend: Optional[MessagingBackend] = None,
) -> None:
self._backend = backend or BackendManager().get_default_backend()
self._serializer_registry = serializer_registry
def publish(self, queue_name: str, data: Any, **kwargs: Any) -> None:
"""
Publishes data to a queue
Args:
queue_name: The queue to send the message to.
data: The data to send to this queue. It will be serialized before sending to the
queue using the serializers.
**kwargs: Any extra attributes. They will be passed to the backend upon publish.
"""
serializer = self._serializer_registry.find_serializer_for(data)
content = serializer.serialize(data)
manifest = serializer.manifest(data)
queue = self._backend.get_queue(queue_name)
self._backend.publish_to_queue(
Message.create(content, manifest, serializer.identifier()), queue, **kwargs
)
publish(self, queue_name, data, **kwargs)
Publishes data to a queue
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queue_name |
str |
The queue to send the message to. |
required |
data |
Any |
The data to send to this queue. It will be serialized before sending to the queue using the serializers. |
required |
**kwargs |
Any |
Any extra attributes. They will be passed to the backend upon publish. |
{} |
Source code in melange/publishers.py
def publish(self, queue_name: str, data: Any, **kwargs: Any) -> None:
"""
Publishes data to a queue
Args:
queue_name: The queue to send the message to.
data: The data to send to this queue. It will be serialized before sending to the
queue using the serializers.
**kwargs: Any extra attributes. They will be passed to the backend upon publish.
"""
serializer = self._serializer_registry.find_serializer_for(data)
content = serializer.serialize(data)
manifest = serializer.manifest(data)
queue = self._backend.get_queue(queue_name)
self._backend.publish_to_queue(
Message.create(content, manifest, serializer.identifier()), queue, **kwargs
)
TopicPublisher
Source code in melange/publishers.py
class TopicPublisher:
def __init__(
self,
serializer_registry: SerializerRegistry,
backend: Optional[MessagingBackend] = None,
) -> None:
self._backend = backend or BackendManager().get_default_backend()
self._serializer_registry = serializer_registry
def publish(self, topic_name: str, data: Any, **extra_attributes: Any) -> None:
"""
Publishes data to a topic
Args:
topic_name: The topic to send the message to.
data: The data to send to this topic. It will be serialized before sending to the
topic using the serializers.
**extra_attributes: Any extra attributes. They will be passed to the backend upon publishing.
"""
topic = self._backend.declare_topic(topic_name)
serializer = self._serializer_registry.find_serializer_for(data)
content = serializer.serialize(data)
manifest = serializer.manifest(data)
self._backend.publish_to_topic(
Message.create(content, manifest, serializer.identifier()),
topic,
extra_attributes=extra_attributes,
)
publish(self, topic_name, data, **extra_attributes)
Publishes data to a topic
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic_name |
str |
The topic to send the message to. |
required |
data |
Any |
The data to send to this topic. It will be serialized before sending to the topic using the serializers. |
required |
**extra_attributes |
Any |
Any extra attributes. They will be passed to the backend upon publishing. |
{} |
Source code in melange/publishers.py
def publish(self, topic_name: str, data: Any, **extra_attributes: Any) -> None:
"""
Publishes data to a topic
Args:
topic_name: The topic to send the message to.
data: The data to send to this topic. It will be serialized before sending to the
topic using the serializers.
**extra_attributes: Any extra attributes. They will be passed to the backend upon publishing.
"""
topic = self._backend.declare_topic(topic_name)
serializer = self._serializer_registry.find_serializer_for(data)
content = serializer.serialize(data)
manifest = serializer.manifest(data)
self._backend.publish_to_topic(
Message.create(content, manifest, serializer.identifier()),
topic,
extra_attributes=extra_attributes,
)