Skip to content

Message de-duplication

Distributed architectures are hard, complex and come with a good deal of burdens, but they are required to achieve levels of scalability and isolation harder to achieve on monolithic architectures. One of this issues is the possibility of of the same message being received twice by the listeners. Network failures, application crashes, etc... can cause this issue which, if not though or left undealt can cause your system to be out of sync and run in an inconsistent state. This is why you need to take measures.

One of this measures is to, simply, write your listeners to be idempotent. This means that it does not matter how many times a listener is called, the result will be the same and it won't impact or leave the system into an inconsistent state.

However, sometimes writing idempotent code is just not possible. You require message deduplication to account for this and ensure that a message won't be sent twice. You could use Amazon SQS FIFO Queues which they say they provide this message deduplication, though not only FIFO queues are more expensive than standard ones, but exactly-once delivery is just impossible. In Melange we have accounted for this with a cache interface that you can supply to the ConsumerHandler (like a Redis cache) that will control that no message is delivered twice to the same consumer.

In Melange we provide a RedisCache class that you could use to perform this message deduplication. However we do not want to tie the library to any specific technology, so as long as you comply with the DeduplicationCache interface it will work just fine.

Info

The cache for message deduplication is completely optional, but on a production environment having some kind of cache to handle deduplication is encouraged.

This is the DeduplicationCache specification:

Source code in melange/infrastructure/cache.py
class DeduplicationCache(Protocol):
    def store(self, key: str, value: Any, expire: Optional[int] = None) -> None:
        """
        Stores a key into the cache
        Args:
            key:
            value:
            expire: expiration time in seconds
        """
        raise NotImplementedError

    def get(self, key: str) -> Any:
        """
        Retrieves the value stored under a key
        Args:
            key: the key to fetch in the cache

        Returns:
            The value stored under that key, or None
        """
        raise NotImplementedError

    def __contains__(self, key: str) -> bool:
        """
        Checks whether a certain key is present in the store

        Args:
            key: the key to check

        Returns:
            `True` if the value is present, `False` otherwise.
        """
        raise NotImplementedError

__contains__(self, key) special

Checks whether a certain key is present in the store

Parameters:

Name Type Description Default
key str

the key to check

required

Returns:

Type Description
bool

True if the value is present, False otherwise.

Source code in melange/infrastructure/cache.py
def __contains__(self, key: str) -> bool:
    """
    Checks whether a certain key is present in the store

    Args:
        key: the key to check

    Returns:
        `True` if the value is present, `False` otherwise.
    """
    raise NotImplementedError

get(self, key)

Retrieves the value stored under a key

Parameters:

Name Type Description Default
key str

the key to fetch in the cache

required

Returns:

Type Description
Any

The value stored under that key, or None

Source code in melange/infrastructure/cache.py
def get(self, key: str) -> Any:
    """
    Retrieves the value stored under a key
    Args:
        key: the key to fetch in the cache

    Returns:
        The value stored under that key, or None
    """
    raise NotImplementedError

store(self, key, value, expire=None)

Stores a key into the cache

Parameters:

Name Type Description Default
key str required
value Any required
expire Optional[int]

expiration time in seconds

None
Source code in melange/infrastructure/cache.py
def store(self, key: str, value: Any, expire: Optional[int] = None) -> None:
    """
    Stores a key into the cache
    Args:
        key:
        value:
        expire: expiration time in seconds
    """
    raise NotImplementedError
Back to top