data-engineering-ai-ml

SKILL.md

AI/ML Data Pipelines

Data engineering patterns for AI/ML workloads: embedding generation, vector databases, retrieval-augmented generation (RAG), LLM output monitoring, and batch inference. Covers LanceDB, pgvector, and OpenAI integrations.

When to Use These Patterns?

  • RAG Applications: Building chatbots, semantic search, question-answering
  • LLM Monitoring: Tracking token usage, latency, output quality
  • Embedding Pipelines: Generating and storing vector embeddings for ML models
  • Batch Inference: Large-scale model inference pipelines
  • Feature Stores: Versioned feature data for ML training/serving

Skill Dependencies

  • @data-engineering-core - Polars, DuckDB for data processing
  • @data-engineering-storage-remote-access - Cloud storage for embeddings and models
  • @data-engineering-orchestration - Schedule/batch embedding generation
  • @data-engineering-quality - Validate embedding quality

Detailed Guides

Embeddings

See: @data-engineering-ai-ml/embeddings.md

  • OpenAI embeddings API
  • Sentence Transformers (local models)
  • Batch processing with Polars
  • Chunking strategies for text
  • Token counting with tiktoken

Vector Databases

See: @data-engineering-ai-ml/vector-databases.md

  • LanceDB: Embedded, Arrow-native, scales from local to cloud
  • pgvector: PostgreSQL extension, ACID transactions
  • DuckDB: List type for simple cosine similarity
  • Indexing strategies (IVF_PQ, HNSW)

RAG Pipelines

See: @data-engineering-ai-ml/rag-pipelines.md

  • Document chunking (by tokens, paragraphs, semantic)
  • Context assembly with token budget
  • Retrieval: vector search + metadata filters
  • Prompt construction and LLM invocation
  • Source attribution

LLM Monitoring

See: @data-engineering-ai-ml/monitoring.md

  • Tracking API calls, costs, latencies
  • Prompt caching and deduplication
  • Error tracking and retry logic
  • Quality evaluation (human + automated)

Quick Start: Complete RAG Pipeline

import polars as pl
import lancedb
from sentence_transformers import SentenceTransformer

# 1. Generate embeddings
model = SentenceTransformer('all-MiniLM-L6-v2')
df = pl.read_parquet("documents.parquet")
df = df.with_columns([
    pl.Series("embedding", model.encode(df["text"].to_list()).tolist())
])

# 2. Store in LanceDB
db = lancedb.connect("./.lancedb")
table = db.create_table("documents", df.to_arrow())
table.create_index(
    vector_column_name="embedding",
    metric="cosine",
    index_type="IVF_PQ",
    num_partitions=256,
    num_sub_vectors=96
)

# 3. Query
query_embedding = model.encode(["What is RAG?"])[0]
results = (
    table.search(query_embedding)
    .where("category = 'tech'")
    .limit(5)
    .to_pandas()
)

print(f"Top result: {results.iloc[0]['text'][:200]}...")

Common Patterns

Batch Embedding Generation

from sentence_transformers import SentenceTransformer
import polars as pl

model = SentenceTransformer('all-MiniLM-L6-v2')

# Process in batches to avoid OOM
batch_size = 1000
reader = pl.read_csv_batched("large_corpus.csv", batch_size=batch_size)

embeddings = []
while (batches := reader.next_batches(1)):
    for batch in batches:
        batch_embeddings = model.encode(batch["text"].to_list())
        embeddings.extend(batch_embeddings)

df = batch.with_columns(pl.Series("embedding", embeddings))
df.write_parquet("corpus_with_embeddings.parquet")

Vector Search with Filtering

import lancedb

db = lancedb.connect("./.lancedb")
table = db.open_table("documents")

# Hybrid search: vector + metadata filter
results = (
    table.search(query_embedding)
    .where("date >= '2024-01-01' AND category IN ('tech', 'science')")
    .select(["id", "text", "date"])  # Only return needed columns
    .limit(10)
    .to_arrow()
)

Cost Monitoring for LLMs

import duckdb
from datetime import datetime

class LLMMonitor:
    def __init__(self, duckdb_path="llm_monitoring.db"):
        self.conn = duckdb.connect(duckdb_path)
        self._init_table()

    def _init_table(self):
        self.conn.sql("""
            CREATE TABLE IF NOT EXISTS calls (
                call_id VARCHAR PRIMARY KEY,
                timestamp TIMESTAMP,
                model VARCHAR,
                prompt_tokens INTEGER,
                completion_tokens INTEGER,
                total_tokens INTEGER,
                cost_usd DOUBLE,
                latency_ms INTEGER
            )
        """)

    def log_call(self, model: str, prompt_tokens: int, completion_tokens: int, latency_ms: int):
        # Approximate cost calculation (update with real pricing)
        cost_per_1k = {
            "gpt-4o": 0.005,
            "text-embedding-3-small": 0.00002
        }.get(model, 0.001)

        total_cost = (prompt_tokens + completion_tokens) / 1000 * cost_per_1k
        call_id = f"{datetime.now().timestamp():.6f}-{model}"

        self.conn.sql("""
            INSERT INTO calls VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        """, [
            call_id,
            datetime.now().isoformat(),
            model,
            prompt_tokens,
            completion_tokens,
            prompt_tokens + completion_tokens,
            total_cost,
            latency_ms
        ])

monitor = LLMMonitor()
monitor.log_call("gpt-4o", 100, 50, 1200)

Best Practices

  1. Batch embedding generation - Use batch API calls, not one-by-one
  2. Index after bulk load - Build vector indexes on full dataset, not incremental
  3. Use appropriate embedding dimensions - Smaller = cheaper/faster, larger = more accurate
  4. Monitor token usage - Track costs, set quotas
  5. Cache prompts & results - Avoid duplicate API calls
  6. Human evaluation - Automated metrics (cosine similarity) ≠ quality
  7. Don't store embeddings without metadata - need source attribution
  8. Don't use cosine similarity for multi-modal vectors - use L2 or dot product
  9. Don't skip chunking - Very long documents need splitting for embeddings

Performance Tips

  • Embedding models: Run locally for privacy/offline, or OpenAI for convenience
  • Vector DB: LanceDB for embedded use; pgvector for transactional; specialized (Pinecone, Weaviate) for scale
  • Index tuning: IVF_PQ for disk efficiency, HNSW for recall
  • Query parameters: Tune nprobes (accuracy) and refine_factor (reranking) based on latency/recall tradeoff

References

Weekly Installs
6
First Seen
Feb 11, 2026
Installed on
pi6
mcpjam4
claude-code4
junie4
windsurf4
zencoder4