airflow-dag

SKILL.md

Apache Airflow DAG for Construction

Overview

Apache Airflow orchestrates complex data pipelines. This skill creates DAGs for construction ETL processes - from BIM extraction to cost reports.

Python Implementation

from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import json


class TaskStatus(Enum):
    """Task execution status."""
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    SKIPPED = "skipped"


@dataclass
class DAGTask:
    """Single task in DAG."""
    task_id: str
    operator: str
    params: Dict[str, Any]
    upstream: List[str]
    downstream: List[str]


@dataclass
class DAGConfig:
    """DAG configuration."""
    dag_id: str
    schedule: str
    start_date: datetime
    catchup: bool
    default_args: Dict[str, Any]
    tags: List[str]


class ConstructionDAGBuilder:
    """Build Airflow DAGs for construction pipelines."""

    # Default DAG arguments
    DEFAULT_ARGS = {
        'owner': 'ddc',
        'depends_on_past': False,
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'execution_timeout': timedelta(hours=2)
    }

    def __init__(self, dag_id: str,
                 schedule: str = '@daily',
                 tags: List[str] = None):
        self.dag_id = dag_id
        self.schedule = schedule
        self.tags = tags or ['construction', 'ddc']
        self.tasks: Dict[str, DAGTask] = {}

    def add_bash_task(self, task_id: str,
                      command: str,
                      upstream: List[str] = None) -> str:
        """Add bash command task."""
        self.tasks[task_id] = DAGTask(
            task_id=task_id,
            operator='BashOperator',
            params={'bash_command': command},
            upstream=upstream or [],
            downstream=[]
        )
        self._update_downstream(task_id, upstream)
        return task_id

    def add_python_task(self, task_id: str,
                        python_callable: str,
                        op_kwargs: Dict = None,
                        upstream: List[str] = None) -> str:
        """Add Python callable task."""
        self.tasks[task_id] = DAGTask(
            task_id=task_id,
            operator='PythonOperator',
            params={
                'python_callable': python_callable,
                'op_kwargs': op_kwargs or {}
            },
            upstream=upstream or [],
            downstream=[]
        )
        self._update_downstream(task_id, upstream)
        return task_id

    def add_sensor_task(self, task_id: str,
                        filepath: str,
                        upstream: List[str] = None) -> str:
        """Add file sensor task."""
        self.tasks[task_id] = DAGTask(
            task_id=task_id,
            operator='FileSensor',
            params={
                'filepath': filepath,
                'poke_interval': 300,
                'timeout': 3600
            },
            upstream=upstream or [],
            downstream=[]
        )
        self._update_downstream(task_id, upstream)
        return task_id

    def add_branch_task(self, task_id: str,
                        python_callable: str,
                        upstream: List[str] = None) -> str:
        """Add branching task."""
        self.tasks[task_id] = DAGTask(
            task_id=task_id,
            operator='BranchPythonOperator',
            params={'python_callable': python_callable},
            upstream=upstream or [],
            downstream=[]
        )
        self._update_downstream(task_id, upstream)
        return task_id

    def _update_downstream(self, task_id: str, upstream: List[str]):
        """Update downstream references."""
        if upstream:
            for up_task in upstream:
                if up_task in self.tasks:
                    self.tasks[up_task].downstream.append(task_id)

    def generate_dag_code(self) -> str:
        """Generate Airflow DAG Python code."""

        code = '''
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'ddc',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

'''
        code += f'''
with DAG(
    dag_id='{self.dag_id}',
    default_args=default_args,
    schedule_interval='{self.schedule}',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags={self.tags}
) as dag:

'''
        # Generate task definitions
        for task_id, task in self.tasks.items():
            code += self._generate_task_code(task)
            code += '\n'

        # Generate dependencies
        code += '\n    # Task dependencies\n'
        for task_id, task in self.tasks.items():
            if task.upstream:
                for upstream in task.upstream:
                    code += f"    {upstream} >> {task_id}\n"

        return code

    def _generate_task_code(self, task: DAGTask) -> str:
        """Generate code for single task."""

        if task.operator == 'BashOperator':
            return f'''    {task.task_id} = BashOperator(
        task_id='{task.task_id}',
        bash_command="{task.params['bash_command']}"
    )'''

        elif task.operator == 'PythonOperator':
            kwargs = json.dumps(task.params.get('op_kwargs', {}))
            return f'''    {task.task_id} = PythonOperator(
        task_id='{task.task_id}',
        python_callable={task.params['python_callable']},
        op_kwargs={kwargs}
    )'''

        elif task.operator == 'FileSensor':
            return f'''    {task.task_id} = FileSensor(
        task_id='{task.task_id}',
        filepath='{task.params["filepath"]}',
        poke_interval={task.params['poke_interval']},
        timeout={task.params['timeout']}
    )'''

        elif task.operator == 'BranchPythonOperator':
            return f'''    {task.task_id} = BranchPythonOperator(
        task_id='{task.task_id}',
        python_callable={task.params['python_callable']}
    )'''

        return ''

    def save_dag(self, output_path: str):
        """Save DAG to file."""
        code = self.generate_dag_code()
        with open(output_path, 'w') as f:
            f.write(code)
        return output_path


class ConstructionPipelineTemplates:
    """Pre-built construction pipeline templates."""

    @staticmethod
    def bim_validation_pipeline(dag_id: str = 'bim_validation') -> ConstructionDAGBuilder:
        """Create BIM validation pipeline."""
        builder = ConstructionDAGBuilder(dag_id, schedule='@daily',
                                         tags=['bim', 'validation'])

        # Wait for file
        builder.add_sensor_task('wait_for_model', '/data/input/*.ifc')

        # Convert to Excel
        builder.add_bash_task(
            'convert_ifc',
            'IfcExporter.exe /data/input/*.ifc bbox',
            upstream=['wait_for_model']
        )

        # Validate data
        builder.add_python_task(
            'validate_data',
            'validate_bim_data',
            {'rules_file': '/config/validation_rules.xlsx'},
            upstream=['convert_ifc']
        )

        # Branch based on validation
        builder.add_branch_task(
            'check_validation',
            'check_validation_result',
            upstream=['validate_data']
        )

        # Success path
        builder.add_python_task(
            'generate_report',
            'generate_validation_report',
            upstream=['check_validation']
        )

        # Failure path
        builder.add_python_task(
            'send_alert',
            'send_validation_alert',
            upstream=['check_validation']
        )

        return builder

    @staticmethod
    def cost_estimation_pipeline(dag_id: str = 'cost_estimation') -> ConstructionDAGBuilder:
        """Create cost estimation pipeline."""
        builder = ConstructionDAGBuilder(dag_id, schedule='@weekly',
                                         tags=['cost', 'estimation'])

        # Extract BIM data
        builder.add_bash_task('extract_bim', 'RvtExporter.exe /data/model.rvt complete bbox')

        # Generate QTO
        builder.add_python_task(
            'generate_qto',
            'generate_quantity_takeoff',
            upstream=['extract_bim']
        )

        # Match with cost database
        builder.add_python_task(
            'match_costs',
            'match_cwicr_costs',
            upstream=['generate_qto']
        )

        # Calculate estimate
        builder.add_python_task(
            'calculate_estimate',
            'calculate_project_estimate',
            upstream=['match_costs']
        )

        # Generate report
        builder.add_python_task(
            'create_report',
            'create_cost_report',
            upstream=['calculate_estimate']
        )

        return builder

    @staticmethod
    def batch_conversion_pipeline(dag_id: str = 'batch_convert') -> ConstructionDAGBuilder:
        """Create batch CAD conversion pipeline."""
        builder = ConstructionDAGBuilder(dag_id, schedule='0 2 * * *',  # 2 AM daily
                                         tags=['conversion', 'batch'])

        # Scan for new files
        builder.add_python_task('scan_files', 'scan_input_folder')

        # Convert Revit files
        builder.add_bash_task(
            'convert_rvt',
            'for %%f in (/data/input/*.rvt) do RvtExporter.exe "%%f" standard',
            upstream=['scan_files']
        )

        # Convert IFC files
        builder.add_bash_task(
            'convert_ifc',
            'for %%f in (/data/input/*.ifc) do IfcExporter.exe "%%f"',
            upstream=['scan_files']
        )

        # Convert DWG files
        builder.add_bash_task(
            'convert_dwg',
            'for %%f in (/data/input/*.dwg) do DwgExporter.exe "%%f"',
            upstream=['scan_files']
        )

        # Consolidate results
        builder.add_python_task(
            'consolidate',
            'consolidate_conversion_results',
            upstream=['convert_rvt', 'convert_ifc', 'convert_dwg']
        )

        # Archive input files
        builder.add_python_task(
            'archive',
            'archive_processed_files',
            upstream=['consolidate']
        )

        return builder

Quick Start

# Create custom pipeline
builder = ConstructionDAGBuilder('my_pipeline', schedule='@daily')

# Add tasks
builder.add_bash_task('convert', 'RvtExporter.exe model.rvt')
builder.add_python_task('analyze', 'analyze_data', upstream=['convert'])
builder.add_python_task('report', 'create_report', upstream=['analyze'])

# Generate DAG code
code = builder.generate_dag_code()
print(code)

# Save to file
builder.save_dag('/airflow/dags/my_pipeline.py')

Pipeline Templates

1. BIM Validation

templates = ConstructionPipelineTemplates()
validation_dag = templates.bim_validation_pipeline()
validation_dag.save_dag('/airflow/dags/bim_validation.py')

2. Cost Estimation

cost_dag = templates.cost_estimation_pipeline()
cost_dag.save_dag('/airflow/dags/cost_estimation.py')

3. Batch Conversion

batch_dag = templates.batch_conversion_pipeline()
batch_dag.save_dag('/airflow/dags/batch_convert.py')

Resources

Weekly Installs
3
GitHub Stars
55
First Seen
11 days ago
Installed on
opencode3
antigravity3
claude-code3
github-copilot3
codex3
kimi-cli3