Skip to main content

Celery App

The Celery app provides async task infrastructure for distributed computation, including custom wrappers that improve on native Celery functionality.

Overview

Celery handles expensive operations asynchronously:
  • Financial calculations (waterfalls, XIRR)
  • PDF generation
  • Event processing
  • Background data updates
See Celery Overview for concepts and Celery Workflows for advanced patterns.

Custom Wrappers

Maybern wraps Celery for several reasons:

Clarity

Simpler API than native Celery.

Performance

Store task data in Redis, not the queue.

Flexibility

Easier to switch task queue libraries.

Observability

Consistent logging and error handling.

Task Definition

from server.apps.celery.public.decorators import simple_celery_task
from server.apps.celery.public.constants import CeleryQueueName, CeleryTaskName

@dataclass
class ProcessEntityTaskData:
    entity_id: MUUID
    options: dict

@simple_celery_task(
    name=CeleryTaskName.PROCESS_ENTITY,
    queue=CeleryQueueName.DEFAULT,
)
def process_entity_task(
    *, 
    ctx: RequestCtx, 
    celery_workflow_id: MUUID,
    task_data: ProcessEntityTaskData,
) -> None:
    process_entity(
        ctx=ctx,
        entity_id=task_data.entity_id,
        options=task_data.options,
    )

Queue Configuration

QueueUse For
DEFAULTGeneral tasks
CORE_COMPUTATIONFinancial calculations
CORE_COMPUTATION_HIGH_PRIORITYUser-initiated calculations
NOTICESPDF generation

Dynamic Queue Routing

Tasks automatically route to priority queues based on request source:
# Frontend requests use high priority
ctx = RequestCtx(
    request_source=RequestSource.FRONTEND,
    # ...
)

# Background jobs use default
ctx = RequestCtx(
    request_source=RequestSource.BACKGROUND,
    # ...
)
No code changes needed - routing is automatic.

Sync Workflows

For multi-stage sequential workflows:
workflow = CelerySyncWorkflow(
    ctx=ctx,
    workflow_id=workflow_id,
    to_chain=[
        CeleryTask(...),
        CeleryAsyncTaskGroup([...]),  # Parallel stage
        CeleryTask(...),
    ],
    completion_task=CompleteWorkflowCeleryTaskDefinition(msg="Done"),
)
workflow.process()

Running Locally

# Start Redis
brew services start redis

# Start workers
just run-celery

Debugging

See Troubleshooting for:
  • Worker not picking up changes
  • Remote debugging with rdb
  • Redis inspection