data-engineering-ai-ml
AI/ML Data Pipelines
Data engineering patterns for AI/ML workloads: embedding generation, vector databases, retrieval-augmented generation (RAG), LLM output monitoring, and batch inference. Covers LanceDB, pgvector, and OpenAI integrations.
When to Use These Patterns?
- RAG Applications: Building chatbots, semantic search, question-answering
- LLM Monitoring: Tracking token usage, latency, output quality
- Embedding Pipelines: Generating and storing vector embeddings for ML models
- Batch Inference: Large-scale model inference pipelines
- Feature Stores: Versioned feature data for ML training/serving
Skill Dependencies
@data-engineering-core- Polars, DuckDB for data processing@data-engineering-storage-remote-access- Cloud storage for embeddings and models@data-engineering-orchestration- Schedule/batch embedding generation@data-engineering-quality- Validate embedding quality
Detailed Guides
Embeddings
See: @data-engineering-ai-ml/embeddings.md
- OpenAI embeddings API
- Sentence Transformers (local models)
- Batch processing with Polars
- Chunking strategies for text
- Token counting with tiktoken
Vector Databases
See: @data-engineering-ai-ml/vector-databases.md
- LanceDB: Embedded, Arrow-native, scales from local to cloud
- pgvector: PostgreSQL extension, ACID transactions
- DuckDB: List type for simple cosine similarity
- Indexing strategies (IVF_PQ, HNSW)
RAG Pipelines
See: @data-engineering-ai-ml/rag-pipelines.md
- Document chunking (by tokens, paragraphs, semantic)
- Context assembly with token budget
- Retrieval: vector search + metadata filters
- Prompt construction and LLM invocation
- Source attribution
LLM Monitoring
See: @data-engineering-ai-ml/monitoring.md
- Tracking API calls, costs, latencies
- Prompt caching and deduplication
- Error tracking and retry logic
- Quality evaluation (human + automated)
Quick Start: Complete RAG Pipeline
import polars as pl
import lancedb
from sentence_transformers import SentenceTransformer
# 1. Generate embeddings
model = SentenceTransformer('all-MiniLM-L6-v2')
df = pl.read_parquet("documents.parquet")
df = df.with_columns([
pl.Series("embedding", model.encode(df["text"].to_list()).tolist())
])
# 2. Store in LanceDB
db = lancedb.connect("./.lancedb")
table = db.create_table("documents", df.to_arrow())
table.create_index(
vector_column_name="embedding",
metric="cosine",
index_type="IVF_PQ",
num_partitions=256,
num_sub_vectors=96
)
# 3. Query
query_embedding = model.encode(["What is RAG?"])[0]
results = (
table.search(query_embedding)
.where("category = 'tech'")
.limit(5)
.to_pandas()
)
print(f"Top result: {results.iloc[0]['text'][:200]}...")
Common Patterns
Batch Embedding Generation
from sentence_transformers import SentenceTransformer
import polars as pl
model = SentenceTransformer('all-MiniLM-L6-v2')
# Process in batches to avoid OOM
batch_size = 1000
reader = pl.read_csv_batched("large_corpus.csv", batch_size=batch_size)
embeddings = []
while (batches := reader.next_batches(1)):
for batch in batches:
batch_embeddings = model.encode(batch["text"].to_list())
embeddings.extend(batch_embeddings)
df = batch.with_columns(pl.Series("embedding", embeddings))
df.write_parquet("corpus_with_embeddings.parquet")
Vector Search with Filtering
import lancedb
db = lancedb.connect("./.lancedb")
table = db.open_table("documents")
# Hybrid search: vector + metadata filter
results = (
table.search(query_embedding)
.where("date >= '2024-01-01' AND category IN ('tech', 'science')")
.select(["id", "text", "date"]) # Only return needed columns
.limit(10)
.to_arrow()
)
Cost Monitoring for LLMs
import duckdb
from datetime import datetime
class LLMMonitor:
def __init__(self, duckdb_path="llm_monitoring.db"):
self.conn = duckdb.connect(duckdb_path)
self._init_table()
def _init_table(self):
self.conn.sql("""
CREATE TABLE IF NOT EXISTS calls (
call_id VARCHAR PRIMARY KEY,
timestamp TIMESTAMP,
model VARCHAR,
prompt_tokens INTEGER,
completion_tokens INTEGER,
total_tokens INTEGER,
cost_usd DOUBLE,
latency_ms INTEGER
)
""")
def log_call(self, model: str, prompt_tokens: int, completion_tokens: int, latency_ms: int):
# Approximate cost calculation (update with real pricing)
cost_per_1k = {
"gpt-4o": 0.005,
"text-embedding-3-small": 0.00002
}.get(model, 0.001)
total_cost = (prompt_tokens + completion_tokens) / 1000 * cost_per_1k
call_id = f"{datetime.now().timestamp():.6f}-{model}"
self.conn.sql("""
INSERT INTO calls VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", [
call_id,
datetime.now().isoformat(),
model,
prompt_tokens,
completion_tokens,
prompt_tokens + completion_tokens,
total_cost,
latency_ms
])
monitor = LLMMonitor()
monitor.log_call("gpt-4o", 100, 50, 1200)
Best Practices
- ✅ Batch embedding generation - Use batch API calls, not one-by-one
- ✅ Index after bulk load - Build vector indexes on full dataset, not incremental
- ✅ Use appropriate embedding dimensions - Smaller = cheaper/faster, larger = more accurate
- ✅ Monitor token usage - Track costs, set quotas
- ✅ Cache prompts & results - Avoid duplicate API calls
- ✅ Human evaluation - Automated metrics (cosine similarity) ≠ quality
- ❌ Don't store embeddings without metadata - need source attribution
- ❌ Don't use cosine similarity for multi-modal vectors - use L2 or dot product
- ❌ Don't skip chunking - Very long documents need splitting for embeddings
Performance Tips
- Embedding models: Run locally for privacy/offline, or OpenAI for convenience
- Vector DB: LanceDB for embedded use; pgvector for transactional; specialized (Pinecone, Weaviate) for scale
- Index tuning: IVF_PQ for disk efficiency, HNSW for recall
- Query parameters: Tune
nprobes(accuracy) andrefine_factor(reranking) based on latency/recall tradeoff
References
- LanceDB Documentation
- Sentence Transformers
- OpenAI Embeddings Guide
- pgvector Documentation
@data-engineering-storage-lakehouse- Versioned storage for models/embeddings
More from legout/data-agent-skills
data-engineering-storage-remote-access-integrations-iceberg
Apache Iceberg catalog configuration for cloud storage (S3, GCS, Azure). Covers AWS Glue and REST catalogs, table scanning, and append/overwrite operations.
4data-science-eda
Exploratory Data Analysis (EDA): profiling, visualization, correlation analysis, and data quality checks. Use when understanding dataset structure, distributions, relationships, or preparing for feature engineering and modeling.
4data-engineering-storage-remote-access-integrations-duckdb
Using DuckDB with remote cloud storage via HTTPFS extension, fsspec, and Delta Lake integration. Covers S3, GCS, Azure, and S3-compatible endpoints.
4data-engineering-storage-remote-access-libraries-pyarrow-fs
Native Arrow filesystem integration with PyArrow. Optimized for Parquet workflows, zero-copy data transfer, predicate pushdown, and column pruning. Covers S3, GCS, HDFS with PyArrow datasets.
4flowerpower
Create and manage data pipelines using the FlowerPower framework with Hamilton DAGs and uv. Lightweight orchestration for batch ETL, data transformation, and ML pipelines. Integrates with Delta Lake, DuckDB, Polars, and cloud storage.
4data-engineering-observability
Observability and monitoring for data pipelines using OpenTelemetry (traces) and Prometheus (metrics). Covers instrumentation, dashboards, and alerting.
4