durable-task-python

SKILL.md

Durable Task Python SDK with Durable Task Scheduler

Build fault-tolerant, stateful workflows in Python applications using the Durable Task SDK connected to Azure Durable Task Scheduler.

Quick Start

Required Packages

pip install durabletask durabletask-azuremanaged azure-identity

Or add to requirements.txt:

durabletask
durabletask-azuremanaged
azure-identity

Minimal Worker + Client Setup

import os
from azure.identity import DefaultAzureCredential
from durabletask import task
from durabletask.client import OrchestrationStatus
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker


# Activity function
def hello(ctx: task.ActivityContext, name: str) -> str:
    return f"Hello {name}!"


# Orchestrator function
def my_orchestration(ctx: task.OrchestrationContext, input: str):
    result = yield ctx.call_activity(hello, input=input)
    return result


# Configuration - defaults to local emulator
taskhub = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
secure_channel = endpoint != "http://localhost:8080"
credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential()

# Start worker and run orchestration
with DurableTaskSchedulerWorker(
    host_address=endpoint,
    secure_channel=secure_channel,
    taskhub=taskhub,
    token_credential=credential
) as worker:
    worker.add_orchestrator(my_orchestration)
    worker.add_activity(hello)
    worker.start()

    # Create client and schedule orchestration
    dts_client = DurableTaskSchedulerClient(
        host_address=endpoint,
        secure_channel=secure_channel,
        taskhub=taskhub,
        token_credential=credential
    )
    
    instance_id = dts_client.schedule_new_orchestration(my_orchestration, input="World")
    state = dts_client.wait_for_orchestration_completion(instance_id, timeout=60)
    
    if state and state.runtime_status == OrchestrationStatus.COMPLETED:
        print(f"Result: {state.serialized_output}")

Pattern Selection Guide

Pattern Use When
Function Chaining Sequential steps where each depends on the previous
Fan-Out/Fan-In Parallel processing with aggregated results
Human Interaction Workflow pauses for external input/approval
Durable Entities Stateful objects with operations (counters, accounts)
Sub-Orchestrations Reusable workflow components or version isolation
Eternal Orchestrations Long-running background processes with continue_as_new
Monitoring Periodic polling with configurable timeouts

See references/patterns.md for detailed implementations.

Orchestration Structure

Basic Orchestrator

def my_orchestration(ctx: task.OrchestrationContext, input: str):
    """Orchestrator function - MUST be deterministic"""
    # Call activities sequentially
    step1 = yield ctx.call_activity(step1_activity, input=input)
    step2 = yield ctx.call_activity(step2_activity, input=step1)
    return step2

Basic Activity

def my_activity(ctx: task.ActivityContext, input: str) -> str:
    """Activity function - can have side effects, I/O, non-determinism"""
    # Perform actual work here
    print(f"Processing: {input}")
    return f"Processed: {input}"

Registering with Worker

with DurableTaskSchedulerWorker(...) as worker:
    worker.add_orchestrator(my_orchestration)
    worker.add_activity(step1_activity)
    worker.add_activity(step2_activity)
    worker.start()

Critical Rules

Orchestration Determinism

Orchestrations replay from history - all code MUST be deterministic. When an orchestration resumes, it replays all previous code to rebuild state. Non-deterministic code produces different results on replay, causing failures.

NEVER do inside orchestrations:

  • datetime.now(), datetime.utcnow() → Use ctx.current_utc_datetime
  • uuid.uuid4() → Use ctx.new_uuid()
  • random.random() → Pass random values from activities
  • Direct I/O, HTTP calls, database access → Move to activities
  • time.sleep(), asyncio.sleep() → Use ctx.create_timer()
  • Environment variables that may change → Pass as input or use activities
  • Global mutable state → Pass state through activity results

ALWAYS use:

  • yield ctx.call_activity() - Call activities
  • yield ctx.call_sub_orchestrator() - Call sub-orchestrations
  • yield ctx.create_timer() - Durable delays
  • yield ctx.wait_for_external_event() - Wait for events
  • ctx.current_utc_datetime - Current time
  • ctx.new_uuid() - Generate GUIDs
  • ctx.set_custom_status() - Set status

Non-Determinism Patterns (WRONG vs CORRECT)

Getting Current Time

# WRONG - datetime.now() returns different value on replay
def bad_orchestration(ctx: task.OrchestrationContext, _):
    current_time = datetime.now()  # Non-deterministic!
    if current_time.hour < 12:
        yield ctx.call_activity(morning_activity)

# CORRECT - ctx.current_utc_datetime is replayed consistently
def good_orchestration(ctx: task.OrchestrationContext, _):
    current_time = ctx.current_utc_datetime  # Deterministic
    if current_time.hour < 12:
        yield ctx.call_activity(morning_activity)

Generating UUIDs/Random Values

# WRONG - uuid4() generates different value on replay
def bad_orchestration(ctx: task.OrchestrationContext, _):
    order_id = str(uuid.uuid4())  # Non-deterministic!
    yield ctx.call_activity(create_order, input=order_id)

# CORRECT - ctx.new_uuid() replays the same value
def good_orchestration(ctx: task.OrchestrationContext, _):
    order_id = str(ctx.new_uuid())  # Deterministic
    yield ctx.call_activity(create_order, input=order_id)

Random Numbers

# WRONG - random produces different values on replay
def bad_orchestration(ctx: task.OrchestrationContext, _):
    delay = random.randint(1, 10)  # Non-deterministic!
    yield ctx.create_timer(timedelta(seconds=delay))

# CORRECT - generate random in activity, pass to orchestrator
def get_random_delay(ctx: task.ActivityContext, _) -> int:
    return random.randint(1, 10)  # OK in activity

def good_orchestration(ctx: task.OrchestrationContext, _):
    delay = yield ctx.call_activity(get_random_delay)  # Deterministic
    yield ctx.create_timer(timedelta(seconds=delay))

Sleeping/Delays

# WRONG - time.sleep blocks and doesn't persist
def bad_orchestration(ctx: task.OrchestrationContext, _):
    yield ctx.call_activity(step1)
    time.sleep(60)  # Non-durable! Lost on restart
    yield ctx.call_activity(step2)

# CORRECT - ctx.create_timer is durable
def good_orchestration(ctx: task.OrchestrationContext, _):
    yield ctx.call_activity(step1)
    yield ctx.create_timer(timedelta(seconds=60))  # Durable timer
    yield ctx.call_activity(step2)

HTTP Calls and I/O

# WRONG - HTTP call in orchestrator is non-deterministic
def bad_orchestration(ctx: task.OrchestrationContext, url: str):
    import requests
    response = requests.get(url)  # Non-deterministic!
    return response.json()

# CORRECT - move I/O to activity
def fetch_data(ctx: task.ActivityContext, url: str) -> dict:
    import requests
    response = requests.get(url)  # OK in activity
    return response.json()

def good_orchestration(ctx: task.OrchestrationContext, url: str):
    data = yield ctx.call_activity(fetch_data, input=url)  # Deterministic
    return data

Database Access

# WRONG - database query in orchestrator
def bad_orchestration(ctx: task.OrchestrationContext, user_id: str):
    import sqlite3
    conn = sqlite3.connect('db.sqlite')  # Non-deterministic!
    cursor = conn.execute("SELECT * FROM users WHERE id=?", (user_id,))
    user = cursor.fetchone()
    # ...

# CORRECT - database access in activity
def get_user(ctx: task.ActivityContext, user_id: str) -> dict:
    import sqlite3
    conn = sqlite3.connect('db.sqlite')  # OK in activity
    cursor = conn.execute("SELECT * FROM users WHERE id=?", (user_id,))
    return dict(cursor.fetchone())

def good_orchestration(ctx: task.OrchestrationContext, user_id: str):
    user = yield ctx.call_activity(get_user, input=user_id)
    # ...

Environment Variables

# WRONG - env var might change between replays
def bad_orchestration(ctx: task.OrchestrationContext, _):
    api_endpoint = os.getenv("API_ENDPOINT")  # Could change!
    yield ctx.call_activity(call_api, input=api_endpoint)

# CORRECT - pass config as input or read in activity
def good_orchestration(ctx: task.OrchestrationContext, config: dict):
    api_endpoint = config["api_endpoint"]  # From input, deterministic
    yield ctx.call_activity(call_api, input=api_endpoint)

# ALSO CORRECT - read env var in activity
def call_api(ctx: task.ActivityContext, _) -> str:
    api_endpoint = os.getenv("API_ENDPOINT")  # OK in activity
    # make the call...

Conditional Logic Based on External State

# WRONG - file existence can change between replays
def bad_orchestration(ctx: task.OrchestrationContext, path: str):
    if os.path.exists(path):  # Non-deterministic!
        yield ctx.call_activity(process_file, input=path)

# CORRECT - check in activity
def check_file_exists(ctx: task.ActivityContext, path: str) -> bool:
    return os.path.exists(path)  # OK in activity

def good_orchestration(ctx: task.OrchestrationContext, path: str):
    exists = yield ctx.call_activity(check_file_exists, input=path)
    if exists:  # Deterministic - based on activity result
        yield ctx.call_activity(process_file, input=path)

Dictionary/Set Iteration Order

# POTENTIALLY WRONG - dict iteration order may vary (Python < 3.7)
def risky_orchestration(ctx: task.OrchestrationContext, items: dict):
    for key in items:  # Order might not be guaranteed
        yield ctx.call_activity(process, input=key)

# CORRECT - use sorted keys for deterministic order
def good_orchestration(ctx: task.OrchestrationContext, items: dict):
    for key in sorted(items.keys()):  # Guaranteed order
        yield ctx.call_activity(process, input=key)

Thread-Local or Global State

# WRONG - global state can change
counter = 0

def bad_orchestration(ctx: task.OrchestrationContext, _):
    global counter
    counter += 1  # Non-deterministic across replays!
    yield ctx.call_activity(process, input=counter)

# CORRECT - pass state through orchestration input/output
def good_orchestration(ctx: task.OrchestrationContext, counter: int):
    counter += 1  # Local variable, deterministic
    yield ctx.call_activity(process, input=counter)
    # If continuing, pass counter forward
    ctx.continue_as_new(counter)

Using yield

In Python, orchestrator functions use yield to await durable operations:

# CORRECT - use yield
result = yield ctx.call_activity(my_activity, input="data")

# WRONG - will not work
result = ctx.call_activity(my_activity, input="data")  # Missing yield!

Error Handling

def orchestrator_with_error_handling(ctx: task.OrchestrationContext, input: str):
    try:
        result = yield ctx.call_activity(risky_activity, input=input)
        return result
    except task.TaskFailedError as e:
        # Activity failed - implement compensation
        ctx.set_custom_status({"error": str(e)})
        yield ctx.call_activity(compensation_activity, input=input)
        return "Compensated"

Retry Policies

from durabletask.task import RetryPolicy

retry_policy = RetryPolicy(
    first_retry_interval=5,  # seconds
    max_number_of_attempts=3,
    backoff_coefficient=2.0,
    max_retry_interval=60,  # seconds
    retry_timeout=300  # seconds
)

def orchestrator(ctx: task.OrchestrationContext, _):
    result = yield ctx.call_activity(
        unreliable_activity, 
        input="data",
        retry_policy=retry_policy
    )
    return result

Working with Custom Types

The SDK supports dataclasses, namedtuples, and custom classes:

from dataclasses import dataclass

@dataclass
class Order:
    product: str
    quantity: int
    cost: float

def process_order(ctx: task.ActivityContext, order: Order) -> str:
    return f"Processed {order.quantity}x {order.product}"

def order_workflow(ctx: task.OrchestrationContext, order: Order):
    result = yield ctx.call_activity(process_order, input=order)
    return result

Connection & Authentication

Local Emulator (Default)

# No authentication required
taskhub = "default"
endpoint = "http://localhost:8080"
credential = None
secure_channel = False

Azure with DefaultAzureCredential

from azure.identity import DefaultAzureCredential

taskhub = "my-taskhub"
endpoint = "https://my-scheduler.region.durabletask.io"
credential = DefaultAzureCredential()
secure_channel = True

Authentication Helper

def get_connection_config():
    endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
    taskhub = os.getenv("TASKHUB", "default")
    
    is_local = endpoint == "http://localhost:8080"
    
    return {
        "host_address": endpoint,
        "taskhub": taskhub,
        "secure_channel": not is_local,
        "token_credential": None if is_local else DefaultAzureCredential()
    }

config = get_connection_config()
worker = DurableTaskSchedulerWorker(**config)
client = DurableTaskSchedulerClient(**config)

Local Development with Emulator

# Pull and run the emulator
docker pull mcr.microsoft.com/dts/dts-emulator:latest
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest

# Dashboard available at http://localhost:8082

Client Operations

# Schedule new orchestration
instance_id = client.schedule_new_orchestration(my_orchestration, input="data")

# Schedule with custom instance ID
instance_id = client.schedule_new_orchestration(
    my_orchestration, 
    input="data",
    instance_id="my-custom-id"
)

# Wait for completion
state = client.wait_for_orchestration_completion(instance_id, timeout=60)

# Get current status
state = client.get_orchestration_state(instance_id)

# Raise external event
client.raise_orchestration_event(instance_id, "approval_received", data=approval_data)

# Terminate orchestration
client.terminate_orchestration(instance_id, output="User cancelled")

# Suspend/Resume
client.suspend_orchestration(instance_id)
client.resume_orchestration(instance_id)

References

  • patterns.md - Detailed pattern implementations (Fan-Out/Fan-In, Human Interaction, Entities, Sub-Orchestrations)
  • setup.md - Azure Durable Task Scheduler provisioning and deployment
Weekly Installs
14
GitHub Stars
48
First Seen
Feb 22, 2026
Installed on
codex14
cline13
gemini-cli13
github-copilot13
kimi-cli13
cursor13