Skip to main content

Celery Workflows

This guide covers Maybern’s custom Celery wrappers for building complex workflows and dynamic queue routing.

Task Execution Flow

Our wrappers optimize task execution by storing data in Redis instead of the queue: This approach solves:
  • Message size limits in SQS
  • Memory consumption on workers
  • Serialization/deserialization overhead

Sync Workflows

For sequential multi-stage workflows, use CelerySyncWorkflow:
from server.apps.celery.public.celery_group import CelerySyncWorkflow, CeleryTask
from server.apps.celery.public.dataclasses.celery_task_definition import (
    CompleteWorkflowCeleryTaskDefinition,
    CustomCeleryTaskDefinition,
)


# Create a workflow with sequential stages
workflow = CelerySyncWorkflow(
    ctx=ctx,
    workflow_id=workflow_id,
    to_chain=[
        # Stage 1: Single task
        CeleryTask(
            ctx=ctx,
            workflow_id=workflow_id,
            task_def=CustomCeleryTaskDefinition(
                task_name=CeleryTaskName.PREPARE_DATA,
                task_data=prepare_data,
            ),
        ),
        # Stage 2: Parallel tasks
        CeleryAsyncTaskGroup(
            ctx=ctx,
            workflow_id=workflow_id,
            to_group=[
                CeleryTask(...),
                CeleryTask(...),
                CeleryTask(...),
            ],
        ),
        # Stage 3: Final processing
        CeleryTask(
            ctx=ctx,
            workflow_id=workflow_id,
            task_def=CustomCeleryTaskDefinition(
                task_name=CeleryTaskName.FINALIZE,
                task_data=finalize_data,
            ),
        ),
    ],
    completion_task=CompleteWorkflowCeleryTaskDefinition(msg="Done"),
)

# Start the workflow
workflow.process()

How Sync Workflows Execute

  1. Each CeleryAsyncTaskGroup is serialized and stored in Redis
  2. Cache keys are stored in order
  3. Sync workflow task executes first group
  4. After completion, recursively executes next group
  5. Final task is always the completion task

Dynamic Priority Queue Routing

Tasks are automatically routed to different queues based on request source:
from server.apps.core.dataclasses.request_ctx import RequestCtx, RequestSource

# Frontend request - uses high priority queue
frontend_ctx = RequestCtx(
    customer_id=customer_id,
    user_id=user_id,
    request_source=RequestSource.FRONTEND,  # Determines queue priority
)

# Background job - uses default queue
background_ctx = RequestCtx(
    customer_id=customer_id,
    user_id=user_id,
    request_source=RequestSource.BACKGROUND,
)

Queue Mapping

QUEUE_PRIORITY_MAPPING = {
    "frontend": {
        CeleryQueueName.CORE_COMPUTATION: CeleryQueueName.CORE_COMPUTATION_HIGH_PRIORITY,
        # Add more overrides as needed
    },
    # Other request sources use default queues
}
No code changes required! Existing workflows automatically benefit from priority routing based on the request context.

How Routing Works

  1. @celery_task decorator registers the default queue
  2. When task is sent, route_task_dynamically is called
  3. Router fetches context from Redis
  4. Based on request_source, selects appropriate queue
  5. Task is sent to the selected queue

Worker Configuration

Workers can consume from multiple queues with priority:
# Priority order matters - high priority is checked first
celery -A server worker \
    -Q core_computation_high_priority,core_computation \
    -c 6 \
    -n shared_worker
Benefits:
  • Queue Priority: Workers check queues in order
  • Efficient Resource Usage: All workers can process any task
  • Automatic Load Balancing: Multiple workers compete for tasks

Adding New Priority Queues

1

Add queue to enum

class CeleryQueueName(StrEnum):
    MY_QUEUE = "my_queue"
    MY_QUEUE_HIGH_PRIORITY = "my_queue_high_priority"
2

Update queue mapping

QUEUE_PRIORITY_MAPPING = {
    "frontend": {
        CeleryQueueName.MY_QUEUE: CeleryQueueName.MY_QUEUE_HIGH_PRIORITY,
    },
}
3

Configure SQS (production)

Add queue URLs in Terraform configuration.
4

Update worker config

celery -A server worker -Q my_queue_high_priority,my_queue

Troubleshooting

  1. Verify the task has @celery_task decorator
  2. Check RequestSource is set correctly in context
  3. Review router logs for errors
  4. Ensure queue mapping includes your task’s queue
  1. Verify worker subscribes to correct queues
  2. Check SQS permissions and connectivity
  3. Look for errors in worker logs
  4. Ensure queues exist in the environment
  1. Check worker concurrency settings
  2. Verify no blocking operations in tasks
  3. Look for memory issues on workers
  4. Check if tasks are timing out

Error Handling

The router uses fail-fast error handling:
  • Missing task_redis_idMaybernError
  • Cache misses → MaybernError
  • Invalid task name → MaybernError
  • Missing default queue → MaybernError
All exceptions are logged with full context and sent to Sentry.

Next Steps