I recently read this interesting Reliable Django Signals article which left me inspired to have a play with building something similar. I wanted build a simple event-driven architecture where:

  • Publishers can publish an event with a given payload
  • One or many subscribers can subscribe to an event
  • Events are asynchronous (as opposed to the synchronous observer pattern)

I took the opportunity to try out django-tasks and blinker for this. Here’s what I came up with.

import dataclasses
import functools
import pkgutil
from typing import Protocol

import blinker
from django_tasks import task


def publish(name: str, /, *, payload: dict[str, object]) -> None:
    """
    Publish an asynchronous event with the given `payload` for all receivers
    subscribed to `name`.
    """
    signal = _get_namespace().signal(name)
    for receiver in signal.receivers_for(blinker.ANY):
        receiver_identifier = _get_receiver_identifier(receiver)
        _execute_event.enqueue(
            receiver_identifier=receiver_identifier, name=name, payload=payload
        )


def subscribe(name: str, /, *, receiver: Receiver) -> None:
    """
    Subscribe a `receiver` callable for the given event `name`.
    """
    signal = _get_namespace().signal(name)
    signal.connect(receiver)


@dataclasses.dataclass(frozen=True, kw_only=True)
class Event:
    """
    The event object a receiver is called with.
    """

    name: str
    payload: dict[str, object]


class Receiver(Protocol):
    """
    Protocol for event receiver callables.
    """

    __qualname__: str

    def __call__(self, event: Event, /) -> None: ...


# Private


@functools.cache
def _get_namespace() -> blinker.Namespace:
    """
    Return the namespace used for all events.
    """
    return blinker.Namespace()


@task(queue_name="events")
def _execute_event(
    *, receiver_identifier: str, name: str, payload: dict[str, object]
) -> None:
    """
    Call the identified receiver with the event.
    """
    receiver_callable = _get_receiver_callable(receiver_identifier)
    receiver_callable(Event(name=name, payload=payload))


def _get_receiver_identifier(receiver: Receiver, /) -> str:
    """
    Return the identifier `<package>:<object>` for the given receiver.
    """
    return f"{receiver.__module__}:{receiver.__qualname__}"


def _get_receiver_callable(receiver_identifier: str, /) -> Receiver:
    """
    Return the receiver for the given identifier.
    """
    receiver_callable: Receiver = pkgutil.resolve_name(receiver_identifier)
    return receiver_callable

I also configured the follow Django settings for django-tasks . Specifically, setting up the dedicated "events" queue.

  INSTALLED_APPS = [
      ...,
+     "django_tasks",
+     "django_tasks.backends.database",
  }

+ TASKS = {
+     "default": {
+         "BACKEND": "django_tasks.backends.database.DatabaseBackend",
+         "QUEUES": ["events"],
+     }
+ }

To subscribe to events I could then use the following.

from django import apps

import events


class Config(apps.AppConfig):
    ...

    def ready(self) -> None:
        events.subscribe("user.created", receiver=user_created)


def user_created(event: events.Event, /) -> None:
    ...

And the following to publish events.

import events


def create_user(...) -> None:
    user = ...
    events.publish("user.created", payload={"user_id": user.id})

Thanks to Haki for the original article, which is well worth the read.