Skip to main content

Celery Overview

Celery is a distributed task queue used for async processing in Maybern. It handles expensive calculations, background jobs, and workflow orchestration.

What is Celery?

Celery allows you to run tasks asynchronously outside the request-response cycle. This is essential for:
  • Expensive calculations (waterfalls, XIRR)
  • PDF generation
  • Email sending
  • Event processing
  • Background data updates

Core Concepts

Native Celery Primitives

A single function to be executed asynchronously:
@celery_task
def process_entity(ctx, entity_id):
    # Do work
    pass

Why Maybern Wraps Celery

We’ve built custom wrappers around Celery for several reasons:

Clarity

Celery’s API can be confusing. Our wrappers provide a cleaner, more intuitive interface.

Performance

Celery puts all task parameters on the queue, which doesn’t scale. We store data in Redis instead.

Flexibility

Abstracting Celery makes it easier to switch task queue libraries if needed.

Observability

Our wrappers add consistent logging, error handling, and monitoring.

Task Definition

Define tasks using the @simple_celery_task decorator:
from dataclasses import dataclass

from server.apps.celery.public.constants import CeleryQueueName, CeleryTaskName
from server.apps.celery.public.decorators import simple_celery_task
from server.apps.core.dataclasses.request_ctx import RequestCtx
from server.apps.core.types.muuid import MUUID


@dataclass
class ProcessEntityTaskData:
    """Data required for the process entity task."""
    entity_id: MUUID
    process_options: dict


@simple_celery_task(
    name=CeleryTaskName.PROCESS_ENTITY,
    queue=CeleryQueueName.DEFAULT,
)
def process_entity_task(
    *, 
    ctx: RequestCtx, 
    celery_workflow_id: MUUID,
    task_data: ProcessEntityTaskData,
) -> None:
    """
    Process an entity asynchronously.
    
    Args:
        ctx: Request context
        celery_workflow_id: Workflow ID for tracking
        task_data: Task data with entity ID and options
    """
    process_entity(
        ctx=ctx,
        entity_id=task_data.entity_id,
        options=task_data.process_options,
        workflow_id=celery_workflow_id,
    )

Task Parameters

Every task receives:
ParameterDescription
ctxRequest context with customer, user, feature flags
celery_workflow_idUnique ID for tracking the workflow
task_dataTyped dataclass with task-specific data

Queue Configuration

Tasks are assigned to queues based on their type:
class CeleryQueueName(StrEnum):
    DEFAULT = "default"
    CORE_COMPUTATION = "core_computation"
    CORE_COMPUTATION_HIGH_PRIORITY = "core_computation_high_priority"
    NOTICES = "notices"
    # ...
Choose the appropriate queue:
QueueUse For
DEFAULTGeneral tasks, low priority
CORE_COMPUTATIONFinancial calculations
CORE_COMPUTATION_HIGH_PRIORITYUser-initiated calculations
NOTICESPDF/notice generation

Running Celery Locally

1

Start Redis

brew services start redis
2

Start Celery workers

just run-celery
Celery workers run in a separate process from the Django server. Make sure both are running for async tasks to work.

Error Handling

Tasks have built-in error handling:
@simple_celery_task(
    name=CeleryTaskName.MY_TASK,
    queue=CeleryQueueName.DEFAULT,
    max_retries=3,
    retry_delay=60,  # seconds
)
def my_task(*, ctx, celery_workflow_id, task_data):
    try:
        # Do work
        pass
    except TransientError:
        # Will be retried
        raise
    except PermanentError:
        # Won't be retried
        raise

Debugging Tasks

Celery logs appear in the terminal running just run-celery.
Inspect task data in Redis:
redis-cli
> KEYS celery*
For breakpoints in async tasks:
from celery.contrib import rdb
rdb.set_trace()
Then connect with telnet (see Troubleshooting).

Next Steps