data-ingestion-pipeline
Data Ingestion Pipeline
Extract, validate, and load data from diverse sources into target systems.
Pipeline Architecture
Sources → Extract → Validate → Transform → Stage → Load → Verify
│ │ │ │ │ │ │
│ │ │ │ │ │ └─ Row counts match
│ │ │ │ │ └─ Write to target
│ │ │ │ └─ Staging table/file
│ │ │ └─ Normalize, enrich, deduplicate
│ │ └─ Schema validation, business rules
│ └─ Pull from source
└─ APIs, files, databases, streams
Source Extraction
File-Based Sources
from pathlib import Path
import json
import csv
import yaml
class FileExtractor:
PARSERS = {
".json": lambda p: json.loads(p.read_text()),
".yaml": lambda p: yaml.safe_load(p.read_text()),
".yml": lambda p: yaml.safe_load(p.read_text()),
".csv": lambda p: list(csv.DictReader(p.open())),
}
def extract(self, path: Path) -> list[dict]:
parser = self.PARSERS.get(path.suffix)
if not parser:
raise ValueError(f"Unsupported format: {path.suffix}")
data = parser(path)
return data if isinstance(data, list) else [data]
API Extraction with Pagination
import httpx
async def extract_paginated(base_url: str, params: dict = {}) -> list[dict]:
all_records = []
page = 1
async with httpx.AsyncClient() as client:
while True:
response = await client.get(base_url, params={**params, "page": page, "per_page": 100})
response.raise_for_status()
data = response.json()
records = data.get("items", data.get("results", data))
if not records:
break
all_records.extend(records)
page += 1
return all_records
Database Extraction
import asyncpg
async def extract_from_db(dsn: str, query: str, batch_size: int = 1000):
conn = await asyncpg.connect(dsn)
try:
async for batch in conn.cursor(query, prefetch=batch_size):
yield dict(batch)
finally:
await conn.close()
Validation
Schema Validation
from dataclasses import dataclass
@dataclass
class ValidationResult:
valid: list[dict]
invalid: list[tuple[dict, str]] # (record, error_message)
def validate_records(records: list[dict], schema: dict) -> ValidationResult:
result = ValidationResult(valid=[], invalid=[])
required_fields = schema.get("required", [])
for record in records:
errors = []
for field in required_fields:
if field not in record or record[field] is None:
errors.append(f"Missing required field: {field}")
for field, rules in schema.get("fields", {}).items():
if field in record and record[field] is not None:
value = record[field]
if "type" in rules and not isinstance(value, rules["type"]):
errors.append(f"{field}: expected {rules['type'].__name__}")
if "max_length" in rules and len(str(value)) > rules["max_length"]:
errors.append(f"{field}: exceeds max length {rules['max_length']}")
if errors:
result.invalid.append((record, "; ".join(errors)))
else:
result.valid.append(record)
return result
Business Rule Validation
def apply_business_rules(records: list[dict]) -> ValidationResult:
result = ValidationResult(valid=[], invalid=[])
for record in records:
errors = []
# Example: organ must be valid
if record.get("organ") not in {"I", "II", "III", "IV", "V", "VI", "VII", "META"}:
errors.append(f"Invalid organ: {record.get('organ')}")
# Example: status must follow promotion state machine
valid_statuses = {"LOCAL", "CANDIDATE", "PUBLIC_PROCESS", "GRADUATED", "ARCHIVED"}
if record.get("status") not in valid_statuses:
errors.append(f"Invalid status: {record.get('status')}")
if errors:
result.invalid.append((record, "; ".join(errors)))
else:
result.valid.append(record)
return result
Deduplication
def deduplicate(records: list[dict], key_fields: list[str]) -> list[dict]:
seen = set()
unique = []
for record in records:
key = tuple(record.get(f) for f in key_fields)
if key not in seen:
seen.add(key)
unique.append(record)
return unique
Merge Strategy
from enum import Enum
class MergeStrategy(str, Enum):
KEEP_FIRST = "keep_first"
KEEP_LATEST = "keep_latest"
MERGE_FIELDS = "merge_fields"
def merge_duplicates(records: list[dict], key_fields: list[str], strategy: MergeStrategy) -> list[dict]:
groups: dict[tuple, list[dict]] = {}
for record in records:
key = tuple(record.get(f) for f in key_fields)
groups.setdefault(key, []).append(record)
merged = []
for key, group in groups.items():
if strategy == MergeStrategy.KEEP_FIRST:
merged.append(group[0])
elif strategy == MergeStrategy.KEEP_LATEST:
merged.append(group[-1])
elif strategy == MergeStrategy.MERGE_FIELDS:
result = {}
for record in group:
for k, v in record.items():
if v is not None:
result[k] = v
merged.append(result)
return merged
Staging Pattern
from pathlib import Path
from datetime import datetime
class StagingArea:
def __init__(self, base_dir: str):
self.base = Path(base_dir)
def stage(self, batch_id: str, records: list[dict]) -> Path:
stage_dir = self.base / batch_id
stage_dir.mkdir(parents=True, exist_ok=True)
data_path = stage_dir / "data.json"
meta_path = stage_dir / "metadata.json"
data_path.write_text(json.dumps(records, indent=2, default=str))
meta_path.write_text(json.dumps({
"batch_id": batch_id,
"record_count": len(records),
"staged_at": datetime.now().isoformat(),
"status": "staged",
}))
return stage_dir
def promote(self, batch_id: str) -> list[dict]:
stage_dir = self.base / batch_id
data = json.loads((stage_dir / "data.json").read_text())
meta = json.loads((stage_dir / "metadata.json").read_text())
meta["status"] = "promoted"
meta["promoted_at"] = datetime.now().isoformat()
(stage_dir / "metadata.json").write_text(json.dumps(meta, indent=2))
return data
Pipeline Orchestration
class IngestionPipeline:
def __init__(self, extractor, validator, transformer, loader):
self.extractor = extractor
self.validator = validator
self.transformer = transformer
self.loader = loader
async def run(self, source: str) -> dict:
# Extract
raw = await self.extractor.extract(source)
# Validate
validation = self.validator.validate(raw)
if validation.invalid:
log.warning("validation_failures", count=len(validation.invalid))
# Transform
transformed = self.transformer.transform(validation.valid)
# Deduplicate
unique = deduplicate(transformed, key_fields=["id"])
# Load
loaded = await self.loader.load(unique)
return {
"extracted": len(raw),
"valid": len(validation.valid),
"invalid": len(validation.invalid),
"loaded": loaded,
}
Anti-Patterns
- No validation gate — Always validate before loading; corrupt data is worse than missing data
- Loading directly from source — Stage first; staging enables inspection and rollback
- No deduplication — Sources often contain duplicates; handle at ingestion
- Silent data loss — Log and report every skipped/invalid record
- Monolithic pipeline — Break into composable stages for testing and reuse
- No idempotency — Pipeline re-runs should produce the same result
More from 4444j99/a-i--skills
creative-writing-craft
Craft compelling fiction and creative nonfiction with attention to structure, voice, prose style, and revision. Supports short stories, novel chapters, essays, and hybrid forms. Triggers on creative writing, fiction writing, story craft, prose style, or literary technique requests.
183skill-creator
Guide for creating effective skills. This skill should be used when users want to create a new skill (or update an existing skill) that extends Claude's capabilities with specialized knowledge, workflows, or tool integrations.
15freelance-client-ops
Manage freelance and client work professionally—proposals, contracts, scope management, invoicing, and client communication. Covers the business side of creative work. Triggers on freelance, client work, proposals, contracts, pricing, or project scope requests.
14generative-music-composer
Creates algorithmic music composition systems using procedural generation, Markov chains, L-systems, and neural approaches for ambient, adaptive, and experimental music.
12generative-art-algorithms
Create algorithmic and generative art using mathematical patterns, noise functions, particle systems, and procedural generation. Covers flow fields, L-systems, fractals, and creative coding foundations. Triggers on generative art, algorithmic art, creative coding, procedural generation, or mathematical visualization requests.
10interfaith-sacred-geometry
Generate sacred geometry patterns with interfaith symbolism for spiritual visualizations and art. Use when creating visual representations that honor multiple religious traditions, designing meditation aids, building soul journey visualizations, or producing art that bridges sacred traditions through geometric harmony. Triggers on sacred geometry requests, interfaith symbol design, spiritual visualization projects, or multi-tradition sacred art.
8