The Fanout Pattern Explained

With my work at AspirEDU, we use Celery to capture education information on a daily basis. We’ve gone through a few different strategies for designing the Celery task signatures1. The most powerful of which has been the Fanout Pattern.

The fanout pattern is a single task replacing itself with a variable number of other tasks.

Let’s look at some code. If we have the task signature:

task1.si() | task2.si() | task3.si()

Each task must complete before the next one starts. However, let’s assume in task2 that we are fetching some collection and need to run another task for each item in the collection. The fanout pattern would involve replacing task2 with item1, item2, ..., itemN where N is the number of items in the collection.

Here’s that same point again, but in code:

# We schedule this:
(
    task1.si()
    | task2.si()
    | task3.si()
).apply_async()
# But this is what runs:
(
    task1.si()
    | task2.si()
    | group([item1.si(), item2.si(), itemN.si()])
    | task3.si()
).apply_async()

Keep in mind that you don’t need to replace the task with a group of other tasks. In Celery, you could replace the currently running task with any other task signature, such as a single task or a complex task signature.

Below is a deeper dive into an example and the pattern, but you could probably get away with the above example and reading the docs on Task.replace.

Realistic Taco Bell example problem

A more realistic example of this would be you’re tasked with capturing all the menu options across every Taco Bell2 franchise then sending an email with the least frequently listed items3. The API endpoints you need to consume data from are:

Let’s imagine we make a single Celery task for responsible each of these API endpoints.

from celery import shared_task

@shared_task
def capture_franchises():
    for franchise in fetch_franchises():
        # Do something


@shared_task
def capture_menu_for_franchise(franchise_id: int):
    for menu_item in fetch_menu(franchise_id).menu_items:
        # Do something

Now our task for sending the report:

from celery import shared_task
from django.core.mail import send_mail
from .models import MenuItem # Our imaginary django model

@shared_task
def send_report():
    least_listed_menu_items = MenuItem.objects.least_listed().values_list('name', flat=True)
    send_mail(
        subject="Least listed items report",
        message=f"{', '.join(least_listed_menu_items)}",
        recipient_list=["menu_design@taco-bell.better-simple.com"],
        from_email="fanout_pattern@better-simple",
    )

The challenge is how do we create a Celery task signature that will run capture_franchises, then run a capture_menu_for_franchise for every franchise found, then after all of those tasks finish call run the send_report task.

One solution is the fanout pattern.

The fanout pattern in practice

The key to the fanout pattern is the ability to replace a task with another task signature.

In Celery, to replace a task you must use create a bound task via @shared_task(bind=True) or @app.task(bind=True)4. This grants you access to the task request via the self argument to your task. Then on self there’s the replace function.

That’s a lot of links and documentation. So here’s the code that should highlight my point:

from celery import group
from .models import Franchise # Our imaginary django model

@shared_task(bind=True)
def capture_franchises(self):
    for franchise in fetch_franchises():
        # Do something

    # We now have fetched all franchises, let's replace this
    # task with another task signature

    self.replace(
        group([
            capture_menu_for_franchise.si(franchise_id=franchise.id)
            for franchise in Franchise.objects.all()
        ])
    )

You can see that we’re replacing the task capture_franchises with a group of capture_menu_for_franchise tasks via the self.replace() line.

Now we need to chain the report task to the above.

def run_least_listed_report():
    signature = capture_franchises.si() | send_report.si()
    signature.apply_async()

A word on task design

If you’re like me, the above doesn’t sit well with you. The signature definition doesn’t make sense to a reader, capture_franchises.si() | send_report.si(). The franchises are being captured, then the report is sent out? If the menus are also being captured, does that imply there is other data also being captured in capture_franchises?

Instead, I think a more appropriate solution would be:

@shared_task
def capture_franchises():
    for franchise in fetch_franchises():
        # Do something


@shared_task(bind=True)
def capture_franchises_menu_fanout(self):
    franchise_ids = list(Franchise.objects.all().values_list("id", flat=True))
    if franchise_ids:
        # Celery doesn't handle group([]) with an empty collection well
        # so don't replace if there are no franchises stored
        self.replace(
            group([
                capture_menu_for_franchise.si(franchise_id=franchise_id)
                for franchise_id in franchise_ids
            ])
        )

The new signature would be:

capture_franchises.si() | capture_franchises_menu_fanout.si() | send_report.si()

Which is more descriptive and easier to extend or reproduce for a similar workflow elsewhere.

Recap

The fanout pattern is great way to implement a dynamic workflow in your Celery task signature. You can use it to reduce the size of your signatures and programmatically select the next task based on external data.

I hope you found this helpful. If you have questions, don’t hesitate to reach out to me. You can find me on the Fediverse, Django Discord server or via email.

  1. A Celery task signature meaning one or more Celery tasks linked together. 

  2. Cheesy gordita crunch and chicken quesadilla. It’s the answer to the question you didn’t know you had. 

  3. We’re using the least frequently listed items because it requires you to search every franchise no matter what. 

  4. I use shared_task since it’s recommended for Django, but you can use app.task if you prefer.