Testing
Any developer worth its salt does some kind of testing over the code they develop. However, testing software that spans several processes/threads (like when you do pub/sub over a queue/topic) can be a daunting task.
Melange offers several utilities to help you test your publishers and consumers (or just making everything synchronous inside the context of the test for the sake of simplicity). Here some examples are presented on how you could potentially use the library in your tests.
Asynchronous testing with threads
Follow the next steps whenever you need to have one or more consumers running on the background of your test:
-
Make sure to create (or ensure that they exist at least) the queues and topics where the message exchange happens.
-
Start the consumer loop in a separate thread, and make sure the thread is stopped upon termination of the test.
-
Call your code that invokes the publishing methods, and have
probes
in place that poll the environment to check whether the consumers have done their work or not before doing any kind of assertion that requires of the consumers' results. -
Bonus: After the test is finished, delete the queue/topic to keep the environment clean.
Full Example:
import os
import threading
from dataclasses import dataclass
from typing import Dict, Optional
import polling
from hamcrest import *
from melange import MessagingBackendFactory
from melange.backends import LocalSQSBackend
from melange import Consumer
from melange import SimpleMessageDispatcher
from melange import QueuePublisher
from melange.serializers import PickleSerializer
from melange.serializers import SerializerRegistry
from tests.probe import Probe
class StateProbe(Probe):
def __init__(self, state: "State") -> None:
self.state = state
def sample(self) -> None:
pass
def can_be_measured(self) -> bool:
self.sample()
return self.state.value_set is not None
def wait(self) -> None:
try:
polling.poll(self.can_be_measured, 1, timeout=60)
except polling.TimeoutException as e:
raise Exception("Timeout!") from e
@dataclass
class State:
value_set: Optional[int] = None
serializer_settings = {
"serializers": {
"pickle": PickleSerializer,
},
"serializer_bindings": {},
"default": "pickle",
}
def test_async_consumer(request):
# We'll use the ElasticMQ as backend since it works like a real SQS queue
backend = LocalSQSBackend(
host=os.environ.get("SQSHOST"), port=os.environ.get("SQSPORT")
)
queue_name = "testqueue"
# Initialize the queue
queue = MessagingBackendFactory(backend).init_queue(queue_name)
# Delete the queue upon finishing the execution of the test
def teardown():
backend.delete_queue(queue)
request.addfinalizer(teardown)
# Create a consumer that, upon receiving a message, will set
# the variable "value set" to later assert that this value
# has, indeed, been set by the consumer that is running on another thread
state = State()
def set_state(message: Dict) -> None:
state.value_set = message["value"]
consumer = Consumer(on_message=set_state)
registry = SerializerRegistry(serializer_settings)
handler = SimpleMessageDispatcher(consumer, registry, backend=backend)
# Start the consumer loop thread to run the consumer loop in the background
threading.Thread(
target=lambda: handler.consume_loop(queue_name), daemon=True
).start()
# Publish a message and...
publisher = QueuePublisher(registry, backend)
publisher.publish(queue_name, {"value": 1})
# ...wait until the value is set
probe = StateProbe(state)
probe.wait()
assert_that(state.value_set, is_(1))
This kind of test has the advantage of being very explicit in the sense that it expresses, through the probe, that this test has some asynchronous processing in the background, and waits for it. It's quite realistic as well, pub/sub is asynchronous in its nature and we work with it in this test.
However the arrangement is complex. It's a trade-off between completeness and complexity that you have to embrace if you want to follow this route.
Tip
Try to abstract away all this arrangement code from the main body of the test to keep it clean and clear, avoiding pollution. Testing frameworks have different techniques to abstract away arrangements (like pytest fixtures).
Synchronous testing with the InMemoryMessagingBackend
Another option is to use the bundled InMemoryMessagingBackend
when instantiating your
publishers and consumers. This will make the entirety of the test synchronous in respect
to the message passing.
from melange.backends import InMemoryMessagingBackend, link_synchronously
from melange import Consumer
from melange import QueuePublisher
from melange.serializers import PickleSerializer
from melange.serializers import SerializerRegistry
serializer_settings = {
"serializers": {
"pickle": PickleSerializer,
},
"serializer_bindings": {},
"default": "pickle",
}
def test_inmemory_messaging_backend():
consumer_1 = Consumer(lambda message: print(f"Hello {message['message']}!"))
consumer_2 = Consumer(lambda message: print(f"Hello {message['message']} 2!"))
backend = InMemoryMessagingBackend()
registry = SerializerRegistry(serializer_settings)
link_synchronously("somequeue", [consumer_1, consumer_2], registry, backend)
registry = SerializerRegistry(serializer_settings)
publisher = QueuePublisher(registry, backend=backend)
publisher.publish("somequeue", {"message": "Mary"})
What the InMemoryMessagingBackend
does, upon
publish, is to store the serialized message in memory and
forward it to the internal consumer dispatcher, so that
the consumers can synchronously receive and process the message.
The link_synchronously
function is a helper which glues everything together. All the
messages sent to the queue or topic with
that name will be dispatched to those consumers (if the consumers accept that message).