ingesting-data

SKILL.md

Data Ingestion Patterns

This skill provides patterns for getting data INTO systems from external sources.

When to Use This Skill

  • Importing CSV, JSON, Parquet, or Excel files
  • Loading data from S3, GCS, or Azure Blob storage
  • Consuming REST/GraphQL API feeds
  • Building ETL/ELT pipelines
  • Database migration and CDC (Change Data Capture)
  • Streaming data ingestion from Kafka/Kinesis

Ingestion Pattern Decision Tree

What is your data source?
├── Cloud Storage (S3, GCS, Azure) → See cloud-storage.md
├── Files (CSV, JSON, Parquet) → See file-formats.md
├── REST/GraphQL APIs → See api-feeds.md
├── Streaming (Kafka, Kinesis) → See streaming-sources.md
├── Legacy Database → See database-migration.md
└── Need full ETL framework → See etl-tools.md

Quick Start by Language

Python (Recommended for ETL)

dlt (data load tool) - Modern Python ETL:

import dlt

# Define a source
@dlt.source
def github_source(repo: str):
    @dlt.resource(write_disposition="merge", primary_key="id")
    def issues():
        response = requests.get(f"https://api.github.com/repos/{repo}/issues")
        yield response.json()
    return issues

# Load to destination
pipeline = dlt.pipeline(
    pipeline_name="github_issues",
    destination="postgres",  # or duckdb, bigquery, snowflake
    dataset_name="github_data"
)

load_info = pipeline.run(github_source("owner/repo"))
print(load_info)

Polars for file processing (faster than pandas):

import polars as pl

# Read CSV with schema inference
df = pl.read_csv("data.csv")

# Read Parquet (columnar, efficient)
df = pl.read_parquet("s3://bucket/data.parquet")

# Read JSON lines
df = pl.read_ndjson("events.jsonl")

# Write to database
df.write_database(
    table_name="events",
    connection="postgresql://user:pass@localhost/db",
    if_table_exists="append"
)

TypeScript/Node.js

S3 ingestion:

import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3";
import { parse } from "csv-parse/sync";

const s3 = new S3Client({ region: "us-east-1" });

async function ingestFromS3(bucket: string, key: string) {
  const response = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
  const body = await response.Body?.transformToString();

  // Parse CSV
  const records = parse(body, { columns: true, skip_empty_lines: true });

  // Insert to database
  await db.insert(eventsTable).values(records);
}

API feed polling:

import { Hono } from "hono";

// Webhook receiver for real-time ingestion
const app = new Hono();

app.post("/webhooks/stripe", async (c) => {
  const event = await c.req.json();

  // Validate webhook signature
  const signature = c.req.header("stripe-signature");
  // ... validation logic

  // Ingest event
  await db.insert(stripeEventsTable).values({
    eventId: event.id,
    type: event.type,
    data: event.data,
    receivedAt: new Date()
  });

  return c.json({ received: true });
});

Rust

High-performance file ingestion:

use polars::prelude::*;
use aws_sdk_s3::Client;

async fn ingest_parquet(client: &Client, bucket: &str, key: &str) -> Result<DataFrame> {
    // Download from S3
    let resp = client.get_object()
        .bucket(bucket)
        .key(key)
        .send()
        .await?;

    let bytes = resp.body.collect().await?.into_bytes();

    // Parse with Polars
    let df = ParquetReader::new(Cursor::new(bytes))
        .finish()?;

    Ok(df)
}

Go

Concurrent file processing:

package main

import (
    "context"
    "encoding/csv"
    "github.com/aws/aws-sdk-go-v2/service/s3"
)

func ingestCSV(ctx context.Context, client *s3.Client, bucket, key string) error {
    resp, err := client.GetObject(ctx, &s3.GetObjectInput{
        Bucket: &bucket,
        Key:    &key,
    })
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    reader := csv.NewReader(resp.Body)
    records, err := reader.ReadAll()
    if err != nil {
        return err
    }

    // Batch insert to database
    return batchInsert(ctx, records)
}

Ingestion Patterns

1. Batch Ingestion (Files/Storage)

For periodic bulk loads:

Source → Extract → Transform → Load → Validate
  ↓         ↓          ↓         ↓        ↓
 S3      Download   Clean/Map  Insert   Count check

Key considerations:

  • Use chunked reading for large files (>100MB)
  • Implement idempotency with checksums
  • Track file processing state
  • Handle partial failures

2. Streaming Ingestion (Real-time)

For continuous data flow:

Source → Buffer → Process → Load → Ack
  ↓        ↓         ↓        ↓      ↓
Kafka   In-memory  Transform  DB   Commit offset

Key considerations:

  • At-least-once vs exactly-once semantics
  • Backpressure handling
  • Dead letter queues for failures
  • Checkpoint management

3. API Polling (Feeds)

For external API data:

Schedule → Fetch → Dedupe → Load → Update cursor
   ↓         ↓        ↓       ↓         ↓
 Cron     API call  By ID   Insert   Last timestamp

Key considerations:

  • Rate limiting and backoff
  • Incremental loading (cursors, timestamps)
  • API pagination handling
  • Retry with exponential backoff

4. Change Data Capture (CDC)

For database replication:

Source DB → Capture changes → Transform → Target DB
    ↓             ↓               ↓            ↓
 Postgres    Debezium/WAL      Map schema   Insert/Update

Key considerations:

  • Initial snapshot + streaming changes
  • Schema evolution handling
  • Ordering guarantees
  • Conflict resolution

Library Recommendations

Use Case Python TypeScript Rust Go
ETL Framework dlt, Meltano, Dagster - - -
Cloud Storage boto3, gcsfs, adlfs @aws-sdk/, @google-cloud/ aws-sdk-s3, object_store aws-sdk-go-v2
File Processing polars, pandas, pyarrow papaparse, xlsx, parquetjs polars-rs, arrow-rs encoding/csv, parquet-go
Streaming confluent-kafka, aiokafka kafkajs rdkafka-rs franz-go, sarama
CDC Debezium, pg_logical - - -

Reference Documentation

  • references/cloud-storage.md - S3, GCS, Azure Blob patterns
  • references/file-formats.md - CSV, JSON, Parquet, Excel handling
  • references/api-feeds.md - REST polling, webhooks, GraphQL subscriptions
  • references/streaming-sources.md - Kafka, Kinesis, Pub/Sub
  • references/database-migration.md - Schema migration, CDC patterns
  • references/etl-tools.md - dlt, Meltano, Airbyte, Fivetran

Scripts

  • scripts/validate_csv_schema.py - Validate CSV against expected schema
  • scripts/test_s3_connection.py - Test S3 bucket connectivity
  • scripts/generate_dlt_pipeline.py - Generate dlt pipeline scaffold

Chaining with Database Skills

After ingestion, chain to appropriate database skill:

Destination Chain to Skill
PostgreSQL, MySQL databases-relational
MongoDB, DynamoDB databases-document
Qdrant, Pinecone databases-vector (after embedding)
ClickHouse, TimescaleDB databases-timeseries
Neo4j databases-graph

For vector databases, chain through ai-data-engineering for embedding:

ingesting-data → ai-data-engineering → databases-vector
Weekly Installs
14
GitHub Stars
310
First Seen
Jan 25, 2026
Installed on
opencode13
gemini-cli13
github-copilot12
cursor12
codex11
claude-code10