workflow-canvas
Workflow Canvas
Purpose: Implement Celery canvas patterns for composing complex, distributed task workflows.
Activation Triggers:
- Sequential task execution needed (chains)
- Parallel task processing required (groups)
- Synchronization after parallel work (chords)
- Complex workflow composition
- Task result aggregation
- Error handling in workflows
- Nested or conditional workflows
Key Resources:
scripts/test-workflow.sh- Test workflow execution and validate patternsscripts/validate-canvas.sh- Validate canvas structure and dependenciesscripts/generate-workflow.sh- Generate workflows from templatestemplates/- Complete workflow pattern implementationsexamples/- Real-world workflow scenarios with explanations
Canvas Primitives
1. Signatures
Foundation of canvas system - encapsulate task invocation details:
from celery import signature
# Named signature
signature('tasks.add', args=(2, 2), countdown=10)
# Using task method
add.signature((2, 2), countdown=10)
# Shortcut syntax (most common)
add.s(2, 2)
# Immutable signature (prevents result forwarding)
add.si(2, 2)
Use templates/chain-workflow.py for signature examples.
2. Chains
Sequential task execution where each task's result becomes next task's first argument:
from celery import chain
# Explicit chain
chain(add.s(2, 2), add.s(4), add.s(8))()
# Pipe syntax (recommended)
(add.s(2, 2) | add.s(4) | add.s(8))()
# With immutable tasks (independent execution)
(add.si(2, 2) | process.s() | notify.si('done'))()
Key Pattern: Use .si() when task should NOT receive previous result.
See templates/chain-workflow.py for complete patterns.
3. Groups
Execute multiple tasks in parallel, returns GroupResult:
from celery import group
# Parallel execution
group(add.s(i, i) for i in range(10))()
# With result tracking
job = group(process.s(item) for item in batch)
result = job.apply_async()
result.get() # Wait for all tasks
Requirements:
- Tasks must NOT ignore results
- Result backend required
See templates/group-parallel.py for implementation.
4. Chords
Group header + callback that executes after all header tasks complete:
from celery import chord
# Basic chord
chord(add.s(i, i) for i in range(100))(tsum.s()).get()
# With error handling
chord(
process.s(item) for item in batch
)(aggregate_results.s()).on_error(handle_chord_error.s())
Critical Requirements:
- Result backend MUST be enabled
- Set
task_ignore_result=Falseexplicitly - Redis 2.2+ for proper operation
Error Handling: Failed header tasks → callback receives ChordError with task ID and exception.
See templates/chord-pattern.py for complete implementation.
5. Map & Starmap
Built-in tasks for sequence processing (single message, sequential execution):
# Map: one call per element
add.map([(2, 2), (4, 4), (8, 8)])
# Starmap: unpacks tuples
add.starmap(zip(range(100), range(100)))
Difference from groups: Single task message vs multiple messages.
6. Chunks
Divide large iterables into sized batches:
# Process 100 items in chunks of 10
add.chunks(zip(range(100), range(100)), 10)
Returns: Nested lists corresponding to chunk outputs.
Advanced Patterns
Partial Application
# Incomplete signature
partial = add.s(2)
# Complete later (arguments prepended)
partial.delay(4) # Executes add(4, 2)
# Kwargs merge with precedence
partial = process.s(timeout=30)
partial.apply_async(kwargs={'timeout': 60}) # Uses 60
Nested Workflows
Combine primitives for complex logic:
# Chain of chords
workflow = chain(
chord([task1.s(), task2.s()])(callback1.s()),
chord([task3.s(), task4.s()])(callback2.s())
)
# Groups in chains
workflow = (
prepare.s() |
group([process.s(i) for i in range(10)]) |
finalize.s()
)
See templates/nested-workflows.py for production patterns.
Error Callbacks
# Task-level error handling
add.s(2, 2).on_error(log_error.s()).delay()
# Chord error handling
chord(
header_tasks
)(callback.s()).on_error(handle_error.s())
Errback signature: (request, exc, traceback)
Execution: Synchronous in worker.
See templates/error-handling-workflow.py for comprehensive patterns.
Complex Workflow Example
from celery import chain, group, chord
# Data processing pipeline
workflow = chain(
# Stage 1: Fetch and validate
fetch_data.s(),
validate_data.s(),
# Stage 2: Parallel processing
group([
transform_batch.s(i) for i in range(num_batches)
]),
# Stage 3: Aggregate results
chord([
aggregate_batch.s(i) for i in range(num_batches)
])(finalize_report.s()),
# Stage 4: Notify
send_notification.si('pipeline_complete')
)
# Execute with error handling
result = workflow.apply_async(
link_error=handle_pipeline_error.s()
)
See templates/complex-workflow.py for production-ready implementation.
Testing Workflows
Validate Canvas Structure
# Check workflow composition
./scripts/validate-canvas.sh path/to/workflow.py
# Validates:
# - Result backend enabled
# - Task ignore_result settings
# - Proper signature usage
# - Error callback patterns
Test Workflow Execution
# Run workflow with test data
./scripts/test-workflow.sh workflow_name
# Options:
# --dry-run : Validate without execution
# --verbose : Show detailed task flow
# --timeout 30 : Set execution timeout
Best Practices
DO:
- Enable result backend for groups/chords
- Use
.si()for independent tasks in chains - Implement error callbacks for critical workflows
- Set explicit timeouts for long-running workflows
- Use chunks for large data processing
- Add metadata with stamping API (5.3+)
- Make error callbacks idempotent
DON'T:
- Have tasks wait synchronously for other tasks
- Ignore results for tasks in groups
- Forget to call
super()inafter_return()overrides - Use chords without Redis 2.2+
- Create deeply nested workflows (>3 levels)
- Mix synchronous and async task calls
Configuration Requirements
Celery Config:
# Required for canvas
result_backend = 'redis://localhost:6379/0'
result_extended = True # Store task args/kwargs
# Recommended
task_track_started = True
result_expires = 3600 # 1 hour
# For large workflows
worker_prefetch_multiplier = 1
task_acks_late = True
Debugging
Visualize workflow:
from celery import DependencyGraph
graph = workflow.__graph__()
with open('workflow.dot', 'w') as f:
f.write(graph.to_dot())
# Convert to image:
# dot -Tpng workflow.dot -o workflow.png
Monitor execution:
result = workflow.apply_async()
print(f"Task ID: {result.id}")
print(f"Status: {result.status}")
print(f"Children: {result.children}")
Resources
Templates:
chain-workflow.py- Sequential task patternsgroup-parallel.py- Parallel execution patternschord-pattern.py- Synchronization patternscomplex-workflow.py- Multi-stage pipelineserror-handling-workflow.py- Error callback patternsnested-workflows.py- Advanced composition
Scripts:
test-workflow.sh- Execute and validate workflowsvalidate-canvas.sh- Static analysis of canvas patternsgenerate-workflow.sh- Generate workflows from templates
Examples:
examples/chain-example.md- Real-world chain scenariosexamples/group-example.md- Parallel processing use casesexamples/chord-example.md- Synchronization patternsexamples/complex-workflows.md- Production workflow architectures
Security Compliance
This skill follows strict security rules:
- All code examples use placeholder values only
- No real API keys, passwords, or secrets
- Environment variable references in all code
.gitignoreprotection documented
Version: 1.0.0 Celery Compatibility: 5.0+ Required Backend: Redis 2.2+ or compatible result backend