creating-openlineage-extractors
Creating OpenLineage Extractors
This skill guides you through creating custom OpenLineage extractors to capture lineage from Airflow operators that do not have built-in support.
When to Use Each Approach
| Scenario | Approach |
|---|---|
| Operator you own or maintain | OpenLineage Methods (recommended) |
| Third-party operator you cannot modify | Custom Extractor |
| Need column-level lineage | OpenLineage Methods or Custom Extractor |
| Complex extraction logic | OpenLineage Methods or Custom Extractor |
| Simple table-level lineage | Inlets/Outlets (simplest, lowest priority) |
Approach 1: OpenLineage Methods (recommended)
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
self._rows_processed = 0
def execute(self, context):
self._rows_processed = self._process_data()
return self._rows_processed
def get_openlineage_facets_on_start(self):
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
)
def get_openlineage_facets_on_complete(self, task_instance):
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import output_statistics_output_dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[
Dataset(
namespace="postgres://db",
name=self.target_table,
facets={
"outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
rowCount=self._rows_processed
)
},
)
],
)
Approach 2: Custom Extractors
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class MyOperatorExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["MyCustomOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
source_table = self.operator.source_table
target_table = self.operator.target_table
return OperatorLineage(
inputs=[Dataset(namespace="postgres://mydb:5432", name=f"public.{source_table}")],
outputs=[Dataset(namespace="postgres://mydb:5432", name=f"public.{target_table}")],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
return None
Registering Extractors
Configuration file:
[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor
Environment variable:
AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'
More from necatiarslan/airflow-vscode-extension
migrating-airflow-2-to-3
Guide for migrating Apache Airflow 2.x projects to Airflow 3.x. Use when the user mentions Airflow 3 migration, upgrade, compatibility issues, breaking changes, or wants to modernize their Airflow codebase.
29airflow-hitl
Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator. Requires Airflow 3.1+.
28annotating-task-lineage
Annotate Airflow tasks with data lineage using inlets and outlets. Use when the user wants to add lineage metadata to tasks, specify input/output datasets, or enable lineage tracking for operators without built-in OpenLineage extraction.
28airflow
Manages Apache Airflow operations including listing, running, and debugging DAGs, viewing logs, and checking server status using the VS Code extension tools.
27testing-dags
Complex DAG testing workflows with debugging and fixing cycles. Use for multi-step testing requests like "test this dag and fix it if it fails", "test and debug", "run the pipeline and troubleshoot issues".
27authoring-dags
Workflow and best practices for writing Apache Airflow DAGs. Use when the user wants to create a new DAG, write pipeline code, or asks about DAG patterns and conventions. For testing and debugging DAGs, see the testing-dags skill.
27