annotating-task-lineage

Originally fromastronomer/agents
SKILL.md

Annotating Task Lineage with Inlets and Outlets

This skill guides you through adding manual lineage annotations to Airflow tasks using inlets and outlets.

When to Use This Approach

Scenario Use Inlets/Outlets?
Operator has OpenLineage methods No, modify the OL method directly
Operator has no built-in OpenLineage extractor Yes
Simple table-level lineage is sufficient Yes
Quick lineage setup without custom code Yes
Need column-level lineage No, use OpenLineage methods or custom extractor
Complex extraction logic needed No, use OpenLineage methods or custom extractor

Supported Types for Inlets/Outlets

OpenLineage Datasets (recommended)

from openlineage.client.event_v2 import Dataset

source_table = Dataset(
    namespace="postgres://mydb:5432",
    name="public.orders",
)

Airflow Assets (Airflow 3+)

from airflow.sdk import Asset

orders_asset = Asset(uri="s3://my-bucket/data/orders")

Airflow Datasets (Airflow 2.4+)

from airflow.datasets import Dataset

orders_dataset = Dataset(uri="s3://my-bucket/data/orders")

Basic Usage

Setting Inlets and Outlets on Operators

from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulum

source_table = Dataset(namespace="snowflake://account", name="raw.orders")
target_table = Dataset(namespace="snowflake://account", name="staging.orders_clean")
output_file = Dataset(namespace="s3://my-bucket", name="exports/orders.parquet")

with DAG(
    dag_id="etl_with_lineage",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    schedule="@daily",
) as dag:

    transform = BashOperator(
        task_id="transform_orders",
        bash_command="echo 'transforming...'",
        inlets=[source_table],
        outlets=[target_table],
    )

    export = BashOperator(
        task_id="export_to_s3",
        bash_command="echo 'exporting...'",
        inlets=[target_table],
        outlets=[output_file],
    )

    transform >> export

Multiple Inputs and Outputs

from openlineage.client.event_v2 import Dataset

customers = Dataset(namespace="postgres://crm:5432", name="public.customers")
orders = Dataset(namespace="postgres://sales:5432", name="public.orders")
products = Dataset(namespace="postgres://inventory:5432", name="public.products")

daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary")
customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")

aggregate_task = PythonOperator(
    task_id="build_daily_aggregates",
    python_callable=build_aggregates,
    inlets=[customers, orders, products],
    outlets=[daily_summary, customer_metrics],
)

Custom Operators

Option 1: Implement OpenLineage Methods (recommended)

from airflow.models import BaseOperator

class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

    def get_openlineage_facets_on_complete(self, task_instance):
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
            outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
        )

Option 2: Set Inlets/Outlets Dynamically

from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset

class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        self.inlets = [Dataset(namespace="warehouse://db", name=self.source_table)]
        self.outlets = [Dataset(namespace="warehouse://db", name=self.target_table)]
Weekly Installs
19
GitHub Stars
42
First Seen
Feb 6, 2026
Installed on
github-copilot18
opencode5
amp4
codex4
kimi-cli4
gemini-cli4