Celery Workflows
This guide covers Maybern’s custom Celery wrappers for building complex workflows and dynamic queue routing.Task Execution Flow
Our wrappers optimize task execution by storing data in Redis instead of the queue: This approach solves:- Message size limits in SQS
- Memory consumption on workers
- Serialization/deserialization overhead
Sync Workflows
For sequential multi-stage workflows, useCelerySyncWorkflow:
How Sync Workflows Execute
- Each
CeleryAsyncTaskGroupis serialized and stored in Redis - Cache keys are stored in order
- Sync workflow task executes first group
- After completion, recursively executes next group
- Final task is always the completion task
Dynamic Priority Queue Routing
Tasks are automatically routed to different queues based on request source:Queue Mapping
No code changes required! Existing workflows automatically benefit from priority routing based on the request context.
How Routing Works
@celery_taskdecorator registers the default queue- When task is sent,
route_task_dynamicallyis called - Router fetches context from Redis
- Based on
request_source, selects appropriate queue - Task is sent to the selected queue
Worker Configuration
Shared Workers (Recommended)
Workers can consume from multiple queues with priority:- Queue Priority: Workers check queues in order
- Efficient Resource Usage: All workers can process any task
- Automatic Load Balancing: Multiple workers compete for tasks
Adding New Priority Queues
1
Add queue to enum
2
Update queue mapping
3
Configure SQS (production)
Add queue URLs in Terraform configuration.
4
Update worker config
Troubleshooting
Tasks not routing correctly
Tasks not routing correctly
- Verify the task has
@celery_taskdecorator - Check
RequestSourceis set correctly in context - Review router logs for errors
- Ensure queue mapping includes your task’s queue
Worker not processing tasks
Worker not processing tasks
- Verify worker subscribes to correct queues
- Check SQS permissions and connectivity
- Look for errors in worker logs
- Ensure queues exist in the environment
Tasks stuck in queue
Tasks stuck in queue
- Check worker concurrency settings
- Verify no blocking operations in tasks
- Look for memory issues on workers
- Check if tasks are timing out
Error Handling
The router uses fail-fast error handling:- Missing
task_redis_id→MaybernError - Cache misses →
MaybernError - Invalid task name →
MaybernError - Missing default queue →
MaybernError