durable-task-python
SKILL.md
Durable Task Python SDK with Durable Task Scheduler
Build fault-tolerant, stateful workflows in Python applications using the Durable Task SDK connected to Azure Durable Task Scheduler.
Quick Start
Required Packages
pip install durabletask durabletask-azuremanaged azure-identity
Or add to requirements.txt:
durabletask
durabletask-azuremanaged
azure-identity
Minimal Worker + Client Setup
import os
from azure.identity import DefaultAzureCredential
from durabletask import task
from durabletask.client import OrchestrationStatus
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
# Activity function
def hello(ctx: task.ActivityContext, name: str) -> str:
return f"Hello {name}!"
# Orchestrator function
def my_orchestration(ctx: task.OrchestrationContext, input: str):
result = yield ctx.call_activity(hello, input=input)
return result
# Configuration - defaults to local emulator
taskhub = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
secure_channel = endpoint != "http://localhost:8080"
credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential()
# Start worker and run orchestration
with DurableTaskSchedulerWorker(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub,
token_credential=credential
) as worker:
worker.add_orchestrator(my_orchestration)
worker.add_activity(hello)
worker.start()
# Create client and schedule orchestration
dts_client = DurableTaskSchedulerClient(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub,
token_credential=credential
)
instance_id = dts_client.schedule_new_orchestration(my_orchestration, input="World")
state = dts_client.wait_for_orchestration_completion(instance_id, timeout=60)
if state and state.runtime_status == OrchestrationStatus.COMPLETED:
print(f"Result: {state.serialized_output}")
Pattern Selection Guide
| Pattern | Use When |
|---|---|
| Function Chaining | Sequential steps where each depends on the previous |
| Fan-Out/Fan-In | Parallel processing with aggregated results |
| Human Interaction | Workflow pauses for external input/approval |
| Durable Entities | Stateful objects with operations (counters, accounts) |
| Sub-Orchestrations | Reusable workflow components or version isolation |
| Eternal Orchestrations | Long-running background processes with continue_as_new |
| Monitoring | Periodic polling with configurable timeouts |
See references/patterns.md for detailed implementations.
Orchestration Structure
Basic Orchestrator
def my_orchestration(ctx: task.OrchestrationContext, input: str):
"""Orchestrator function - MUST be deterministic"""
# Call activities sequentially
step1 = yield ctx.call_activity(step1_activity, input=input)
step2 = yield ctx.call_activity(step2_activity, input=step1)
return step2
Basic Activity
def my_activity(ctx: task.ActivityContext, input: str) -> str:
"""Activity function - can have side effects, I/O, non-determinism"""
# Perform actual work here
print(f"Processing: {input}")
return f"Processed: {input}"
Registering with Worker
with DurableTaskSchedulerWorker(...) as worker:
worker.add_orchestrator(my_orchestration)
worker.add_activity(step1_activity)
worker.add_activity(step2_activity)
worker.start()
Critical Rules
Orchestration Determinism
Orchestrations replay from history - all code MUST be deterministic. When an orchestration resumes, it replays all previous code to rebuild state. Non-deterministic code produces different results on replay, causing failures.
NEVER do inside orchestrations:
datetime.now(),datetime.utcnow()→ Usectx.current_utc_datetimeuuid.uuid4()→ Usectx.new_uuid()random.random()→ Pass random values from activities- Direct I/O, HTTP calls, database access → Move to activities
time.sleep(),asyncio.sleep()→ Usectx.create_timer()- Environment variables that may change → Pass as input or use activities
- Global mutable state → Pass state through activity results
ALWAYS use:
yield ctx.call_activity()- Call activitiesyield ctx.call_sub_orchestrator()- Call sub-orchestrationsyield ctx.create_timer()- Durable delaysyield ctx.wait_for_external_event()- Wait for eventsctx.current_utc_datetime- Current timectx.new_uuid()- Generate GUIDsctx.set_custom_status()- Set status
Non-Determinism Patterns (WRONG vs CORRECT)
Getting Current Time
# WRONG - datetime.now() returns different value on replay
def bad_orchestration(ctx: task.OrchestrationContext, _):
current_time = datetime.now() # Non-deterministic!
if current_time.hour < 12:
yield ctx.call_activity(morning_activity)
# CORRECT - ctx.current_utc_datetime is replayed consistently
def good_orchestration(ctx: task.OrchestrationContext, _):
current_time = ctx.current_utc_datetime # Deterministic
if current_time.hour < 12:
yield ctx.call_activity(morning_activity)
Generating UUIDs/Random Values
# WRONG - uuid4() generates different value on replay
def bad_orchestration(ctx: task.OrchestrationContext, _):
order_id = str(uuid.uuid4()) # Non-deterministic!
yield ctx.call_activity(create_order, input=order_id)
# CORRECT - ctx.new_uuid() replays the same value
def good_orchestration(ctx: task.OrchestrationContext, _):
order_id = str(ctx.new_uuid()) # Deterministic
yield ctx.call_activity(create_order, input=order_id)
Random Numbers
# WRONG - random produces different values on replay
def bad_orchestration(ctx: task.OrchestrationContext, _):
delay = random.randint(1, 10) # Non-deterministic!
yield ctx.create_timer(timedelta(seconds=delay))
# CORRECT - generate random in activity, pass to orchestrator
def get_random_delay(ctx: task.ActivityContext, _) -> int:
return random.randint(1, 10) # OK in activity
def good_orchestration(ctx: task.OrchestrationContext, _):
delay = yield ctx.call_activity(get_random_delay) # Deterministic
yield ctx.create_timer(timedelta(seconds=delay))
Sleeping/Delays
# WRONG - time.sleep blocks and doesn't persist
def bad_orchestration(ctx: task.OrchestrationContext, _):
yield ctx.call_activity(step1)
time.sleep(60) # Non-durable! Lost on restart
yield ctx.call_activity(step2)
# CORRECT - ctx.create_timer is durable
def good_orchestration(ctx: task.OrchestrationContext, _):
yield ctx.call_activity(step1)
yield ctx.create_timer(timedelta(seconds=60)) # Durable timer
yield ctx.call_activity(step2)
HTTP Calls and I/O
# WRONG - HTTP call in orchestrator is non-deterministic
def bad_orchestration(ctx: task.OrchestrationContext, url: str):
import requests
response = requests.get(url) # Non-deterministic!
return response.json()
# CORRECT - move I/O to activity
def fetch_data(ctx: task.ActivityContext, url: str) -> dict:
import requests
response = requests.get(url) # OK in activity
return response.json()
def good_orchestration(ctx: task.OrchestrationContext, url: str):
data = yield ctx.call_activity(fetch_data, input=url) # Deterministic
return data
Database Access
# WRONG - database query in orchestrator
def bad_orchestration(ctx: task.OrchestrationContext, user_id: str):
import sqlite3
conn = sqlite3.connect('db.sqlite') # Non-deterministic!
cursor = conn.execute("SELECT * FROM users WHERE id=?", (user_id,))
user = cursor.fetchone()
# ...
# CORRECT - database access in activity
def get_user(ctx: task.ActivityContext, user_id: str) -> dict:
import sqlite3
conn = sqlite3.connect('db.sqlite') # OK in activity
cursor = conn.execute("SELECT * FROM users WHERE id=?", (user_id,))
return dict(cursor.fetchone())
def good_orchestration(ctx: task.OrchestrationContext, user_id: str):
user = yield ctx.call_activity(get_user, input=user_id)
# ...
Environment Variables
# WRONG - env var might change between replays
def bad_orchestration(ctx: task.OrchestrationContext, _):
api_endpoint = os.getenv("API_ENDPOINT") # Could change!
yield ctx.call_activity(call_api, input=api_endpoint)
# CORRECT - pass config as input or read in activity
def good_orchestration(ctx: task.OrchestrationContext, config: dict):
api_endpoint = config["api_endpoint"] # From input, deterministic
yield ctx.call_activity(call_api, input=api_endpoint)
# ALSO CORRECT - read env var in activity
def call_api(ctx: task.ActivityContext, _) -> str:
api_endpoint = os.getenv("API_ENDPOINT") # OK in activity
# make the call...
Conditional Logic Based on External State
# WRONG - file existence can change between replays
def bad_orchestration(ctx: task.OrchestrationContext, path: str):
if os.path.exists(path): # Non-deterministic!
yield ctx.call_activity(process_file, input=path)
# CORRECT - check in activity
def check_file_exists(ctx: task.ActivityContext, path: str) -> bool:
return os.path.exists(path) # OK in activity
def good_orchestration(ctx: task.OrchestrationContext, path: str):
exists = yield ctx.call_activity(check_file_exists, input=path)
if exists: # Deterministic - based on activity result
yield ctx.call_activity(process_file, input=path)
Dictionary/Set Iteration Order
# POTENTIALLY WRONG - dict iteration order may vary (Python < 3.7)
def risky_orchestration(ctx: task.OrchestrationContext, items: dict):
for key in items: # Order might not be guaranteed
yield ctx.call_activity(process, input=key)
# CORRECT - use sorted keys for deterministic order
def good_orchestration(ctx: task.OrchestrationContext, items: dict):
for key in sorted(items.keys()): # Guaranteed order
yield ctx.call_activity(process, input=key)
Thread-Local or Global State
# WRONG - global state can change
counter = 0
def bad_orchestration(ctx: task.OrchestrationContext, _):
global counter
counter += 1 # Non-deterministic across replays!
yield ctx.call_activity(process, input=counter)
# CORRECT - pass state through orchestration input/output
def good_orchestration(ctx: task.OrchestrationContext, counter: int):
counter += 1 # Local variable, deterministic
yield ctx.call_activity(process, input=counter)
# If continuing, pass counter forward
ctx.continue_as_new(counter)
Using yield
In Python, orchestrator functions use yield to await durable operations:
# CORRECT - use yield
result = yield ctx.call_activity(my_activity, input="data")
# WRONG - will not work
result = ctx.call_activity(my_activity, input="data") # Missing yield!
Error Handling
def orchestrator_with_error_handling(ctx: task.OrchestrationContext, input: str):
try:
result = yield ctx.call_activity(risky_activity, input=input)
return result
except task.TaskFailedError as e:
# Activity failed - implement compensation
ctx.set_custom_status({"error": str(e)})
yield ctx.call_activity(compensation_activity, input=input)
return "Compensated"
Retry Policies
from durabletask.task import RetryPolicy
retry_policy = RetryPolicy(
first_retry_interval=5, # seconds
max_number_of_attempts=3,
backoff_coefficient=2.0,
max_retry_interval=60, # seconds
retry_timeout=300 # seconds
)
def orchestrator(ctx: task.OrchestrationContext, _):
result = yield ctx.call_activity(
unreliable_activity,
input="data",
retry_policy=retry_policy
)
return result
Working with Custom Types
The SDK supports dataclasses, namedtuples, and custom classes:
from dataclasses import dataclass
@dataclass
class Order:
product: str
quantity: int
cost: float
def process_order(ctx: task.ActivityContext, order: Order) -> str:
return f"Processed {order.quantity}x {order.product}"
def order_workflow(ctx: task.OrchestrationContext, order: Order):
result = yield ctx.call_activity(process_order, input=order)
return result
Connection & Authentication
Local Emulator (Default)
# No authentication required
taskhub = "default"
endpoint = "http://localhost:8080"
credential = None
secure_channel = False
Azure with DefaultAzureCredential
from azure.identity import DefaultAzureCredential
taskhub = "my-taskhub"
endpoint = "https://my-scheduler.region.durabletask.io"
credential = DefaultAzureCredential()
secure_channel = True
Authentication Helper
def get_connection_config():
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
taskhub = os.getenv("TASKHUB", "default")
is_local = endpoint == "http://localhost:8080"
return {
"host_address": endpoint,
"taskhub": taskhub,
"secure_channel": not is_local,
"token_credential": None if is_local else DefaultAzureCredential()
}
config = get_connection_config()
worker = DurableTaskSchedulerWorker(**config)
client = DurableTaskSchedulerClient(**config)
Local Development with Emulator
# Pull and run the emulator
docker pull mcr.microsoft.com/dts/dts-emulator:latest
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
# Dashboard available at http://localhost:8082
Client Operations
# Schedule new orchestration
instance_id = client.schedule_new_orchestration(my_orchestration, input="data")
# Schedule with custom instance ID
instance_id = client.schedule_new_orchestration(
my_orchestration,
input="data",
instance_id="my-custom-id"
)
# Wait for completion
state = client.wait_for_orchestration_completion(instance_id, timeout=60)
# Get current status
state = client.get_orchestration_state(instance_id)
# Raise external event
client.raise_orchestration_event(instance_id, "approval_received", data=approval_data)
# Terminate orchestration
client.terminate_orchestration(instance_id, output="User cancelled")
# Suspend/Resume
client.suspend_orchestration(instance_id)
client.resume_orchestration(instance_id)
References
- patterns.md - Detailed pattern implementations (Fan-Out/Fan-In, Human Interaction, Entities, Sub-Orchestrations)
- setup.md - Azure Durable Task Scheduler provisioning and deployment
Weekly Installs
14
Repository
azure-samples/d…chedulerGitHub Stars
48
First Seen
Feb 22, 2026
Security Audits
Installed on
codex14
cline13
gemini-cli13
github-copilot13
kimi-cli13
cursor13