airflow-hitl

Originally fromastronomer/agents
SKILL.md

Airflow Human-in-the-Loop Operators

Implement human approval gates, form inputs, and human-driven branching in Airflow DAGs using the HITL operators. These deferrable operators pause workflow execution until a human responds via the Airflow UI or REST API.

Implementation Checklist

Execute steps in order. Prefer deferrable HITL operators over custom sensors or polling loops.

CRITICAL: Requires Airflow 3.1+. Not available in Airflow 2.x.

All HITL operators are deferrable and release their worker slot while waiting for input.

UI Location: Browse -> Required Actions in Airflow UI. Respond via the task instance Required Actions tab or the REST API.

Step 1: Choose operator

Operator Human action Outcome
ApprovalOperator Approve or reject Reject causes downstream tasks to be skipped (approval task itself succeeds)
HITLOperator Select option(s) plus form Returns selections
HITLBranchOperator Select downstream task(s) Runs selected, skips others
HITLEntryOperator Submit form Returns form data

Step 2: Implement operator

ApprovalOperator

from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
    @task
    def prepare():
        return "Review quarterly report"

    approval = ApprovalOperator(
        task_id="approve_report",
        subject="Report Approval",
        body="{{ ti.xcom_pull(task_ids='prepare') }}",
        defaults="Approve",
        params={"comments": Param("", type="string")},
    )

    @task
    def after_approval(result):
        print(f"Decision: {result['chosen_options']}")

    chain(prepare(), approval)
    after_approval(approval.output)

approval_example()

HITLOperator

Required parameters: subject and options.

from airflow.providers.standard.operators.hitl import HITLOperator
from airflow.sdk import dag, task, chain, Param
from datetime import timedelta
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def hitl_example():
    hitl = HITLOperator(
        task_id="select_option",
        subject="Select Payment Method",
        body="Choose how to process payment",
        options=["ACH", "Wire", "Check"],
        defaults=["ACH"],
        multiple=False,
        execution_timeout=timedelta(hours=4),
        params={"amount": Param(1000, type="number")},
    )

    @task
    def process(result):
        print(f"Selected: {result['chosen_options']}")
        print(f"Amount: {result['params_input']['amount']}")

    process(hitl.output)

hitl_example()

HITLBranchOperator

Options can either match downstream task IDs directly or use options_mapping.

from airflow.providers.standard.operators.hitl import HITLBranchOperator
from airflow.sdk import dag, task, chain
from pendulum import datetime

DEPTS = ["marketing", "engineering", "sales"]

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def branch_example():
    branch = HITLBranchOperator(
        task_id="select_dept",
        subject="Select Departments",
        options=[f"Fund {d}" for d in DEPTS],
        options_mapping={f"Fund {d}": d for d in DEPTS},
        multiple=True,
    )

    for dept in DEPTS:
        @task(task_id=dept)
        def handle(dept_name: str = dept):
            print(f"Processing {dept_name}")
        chain(branch, handle())

branch_example()

HITLEntryOperator

from airflow.providers.standard.operators.hitl import HITLEntryOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def entry_example():
    entry = HITLEntryOperator(
        task_id="get_input",
        subject="Enter Details",
        body="Provide response",
        params={
            "response": Param("", type="string"),
            "priority": Param("p3", type="string"),
        },
    )

    @task
    def process(result):
        print(f"Response: {result['params_input']['response']}")

    process(entry.output)

entry_example()

Step 3: Optional features

Notifiers

from airflow.sdk import BaseNotifier, Context
from airflow.providers.standard.operators.hitl import HITLOperator

class MyNotifier(BaseNotifier):
    template_fields = ("message",)
    def __init__(self, message=""):
        self.message = message
    def notify(self, context: Context):
        if context["ti"].state == "running":
            url = HITLOperator.generate_link_to_ui_from_context(context, base_url="https://airflow.example.com")
            self.log.info(f"Action needed: {url}")

hitl = HITLOperator(..., notifiers=[MyNotifier("{{ task.subject }}")])

Restrict respondents

Format depends on your auth manager:

Auth Manager Format Example
SimpleAuthManager Username ["admin", "manager"]
FabAuthManager Email ["manager@example.com"]
Astro Astro ID ["cl1a2b3cd456789ef1gh2ijkl3"]
hitl = HITLOperator(..., respondents=["manager@example.com"])

Timeout behavior

Use execution_timeout to auto-resolve and defaults to set the choice on timeout.

Weekly Installs
18
GitHub Stars
42
First Seen
Feb 6, 2026
Installed on
github-copilot17
opencode4
cursor4
gemini-cli3
continue3
codebuddy3