engineering-ai-pipelines
SKILL.md
Engineering AI Pipelines
Production-ready patterns for AI/ML data pipelines: generating embeddings, selecting vector databases, building RAG systems, and monitoring LLM usage. Covers local and cloud-based approaches with cost and performance considerations.
Table of Contents
- Quick Comparison
- When to Use Which?
- Pipeline Selection Matrix
- Detailed Reference Guides
- Code Examples
- Best Practices
- Related Skills
- References
Quick Comparison
Embedding Approaches
| Approach | Provider | Dimensions | Cost | Speed | Best For |
|---|---|---|---|---|---|
| OpenAI 3-small | OpenAI | 1536 | $0.00002/1K tokens | ⚡⚡⚡ Fast | Production, convenience |
| OpenAI 3-large | OpenAI | 3072 | $0.00013/1K tokens | ⚡⚡ | Higher quality needs |
| MiniLM-L6 | Local (sentence-transformers) | 384 | Free | ⚡⚡ | Privacy, offline, cost control |
| MPNet-base | Local (sentence-transformers) | 768 | Free | ⚡⚡ | Better quality than MiniLM |
| OpenAI ada-002 | OpenAI | 1536 | $0.00010/1K tokens | ⚡⚡⚡ | Legacy compatibility |
Vector Databases
| Feature | LanceDB | pgvector | DuckDB | Pinecone/Weaviate |
|---|---|---|---|---|
| Deployment | Embedded file | PostgreSQL | Embedded | Managed service |
| Scale | Millions | Tens of millions | <100K | Billions |
| Index Types | IVF_PQ, HNSW | HNSW, IVFFlat | None | HNSW, IVF |
| Cloud Native | ✅ S3/GCS/Azure | Via RDS | ❌ | ✅ |
| ACID | ✅ | ✅ ✅ | ✅ | Varies |
| Cost | Free | Postgres cost | Free | Usage-based |
| Best For | RAG apps, prototyping | Production + existing PG | Quick experiments | Enterprise scale |
When to Use Which?
Embedding Generation
Choose OpenAI API when:
- You need production reliability without managing infrastructure
- Cost is acceptable ($0.02-0.13 per 1M tokens)
- You don't have GPU resources for local inference
- Token limits (8K) fit your use case
- You want state-of-the-art quality without tuning
Choose Local Models (sentence-transformers) when:
- Data privacy is critical (healthcare, finance)
- You need offline/air-gapped operation
- You have high volume (millions of documents)
- You want to avoid API rate limits
- You have GPU resources available
Choose OpenAI 3-small vs 3-large:
- Use 3-small for most use cases (cost-effective, good quality)
- Use 3-large when you need maximum retrieval accuracy
Vector Storage
Choose LanceDB when:
- Building RAG applications or prototypes
- Need embedded deployment (no server management)
- Working with multi-modal data (text + images + vectors)
- Want Arrow-native integration with Polars/PyArrow
- Need cloud storage (S3/GCS) without complex setup
Choose pgvector when:
- Already using PostgreSQL for transactional data
- Need ACID guarantees with vector search
- Complex joins between vectors and relational data
- Cannot tolerate separate vector DB infrastructure
Choose DuckDB when:
- Quick experiments or small datasets (<100K vectors)
- Already using DuckDB for analytics
- Don't need approximate nearest neighbor (ANN) indexing
- Simple cosine similarity is sufficient
Choose Managed Services (Pinecone, Weaviate) when:
- Billions of vectors
- Need managed infrastructure
- Enterprise SLA requirements
- Hybrid search (vector + keyword) out of the box
RAG Architecture
Simple RAG (single stage):
Query → Embedding → Vector Search → Context → LLM → Answer
- Best for: Quick prototypes, well-structured documents
- Tools: LanceDB + OpenAI
Advanced RAG (multi-stage):
Query → Embedding → Vector Search → Re-ranking → Context → LLM → Answer
- Best for: Production systems requiring high accuracy
- Add: Cross-encoder re-ranking, metadata filtering
Hybrid RAG:
Query → [Vector Search + Keyword/BM25] → Fusion → Context → LLM → Answer
- Best for: Domain-specific terminology, acronyms
- Tools: LanceDB hybrid search or specialized services
Pipeline Selection Matrix
| Use Case | Embedding | Vector DB | RAG Pattern | Monitoring |
|---|---|---|---|---|
| Internal knowledge base | OpenAI 3-small | LanceDB | Simple RAG | Basic cost tracking |
| Customer-facing chatbot | OpenAI 3-large | pgvector | Advanced (re-ranking) | Full observability |
| Healthcare (HIPAA) | Local MiniLM | LanceDB (on-prem) | Simple RAG | Local logging only |
| Financial documents | OpenAI 3-small | pgvector | Hybrid RAG | Cost + audit logs |
| Research paper search | Local MPNet | LanceDB | Advanced RAG | Basic metrics |
| High-volume (>10M docs) | OpenAI 3-small | Managed (Pinecone) | Simple RAG | Full observability |
| Multi-modal (text + images) | CLIP/Local | LanceDB | Simple RAG | Basic tracking |
Detailed Reference Guides
Core Patterns
references/embeddings.md- OpenAI API, sentence-transformers, batch processing, model selectionreferences/vector-stores.md- LanceDB, pgvector, DuckDB comparison and selectionreferences/rag-pipelines.md- Chunking strategies, context assembly, retrieval patternsreferences/monitoring.md- Cost tracking, retry logic, OpenTelemetry, quality evaluation
Code Examples
OpenAI Embeddings
import openai
import polars as pl
import tiktoken
class OpenAIEmbeddingPipeline:
def __init__(self, model: str = "text-embedding-3-small"):
self.client = openai.OpenAI()
self.model = model
self.encoding = tiktoken.encoding_for_model(model)
def generate_embeddings(self, texts: list[str]) -> list[list[float]]:
"""Generate embeddings in batches of 100."""
all_embeddings = []
batch_size = 100
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = self.client.embeddings.create(
model=self.model,
input=batch
)
all_embeddings.extend([e.embedding for e in response.data])
return all_embeddings
def process_dataframe(self, df: pl.DataFrame, text_col: str) -> pl.DataFrame:
"""Add embeddings column to Polars DataFrame."""
texts = df[text_col].to_list()
embeddings = self.generate_embeddings(texts)
return df.with_columns(pl.Series("embedding", embeddings))
# Usage
pipeline = OpenAIEmbeddingPipeline()
df = pl.DataFrame({"text": ["Hello world", "Goodbye world"]})
df_with_embeddings = pipeline.process_dataframe(df, "text")
Local Embeddings
from sentence_transformers import SentenceTransformer
import polars as pl
model = SentenceTransformer('all-MiniLM-L6-v2') # 384 dimensions
texts = ["Hello world", "Goodbye world"]
embeddings = model.encode(texts, show_progress_bar=True)
df = pl.DataFrame({
"text": texts,
"embedding": embeddings.tolist()
})
Vector Search with LanceDB
import lancedb
import polars as pl
# Connect and create table
db = lancedb.connect("./.lancedb")
df = pl.DataFrame({
"id": [1, 2, 3],
"text": ["Machine learning basics", "Deep learning intro", "Neural networks"],
"embedding": [[0.1] * 384, [0.2] * 384, [0.3] * 384]
})
table = db.create_table("documents", df.to_arrow())
# Create index for faster search
table.create_index(
vector_column_name="embedding",
metric="cosine",
index_type="IVF_PQ",
num_partitions=256,
num_sub_vectors=96
)
# Search with filtering
query_embedding = [0.15] * 384
results = (
table.search(query_embedding)
.where("id > 1")
.limit(5)
.to_pandas()
)
Complete RAG Pipeline
import lancedb
from sentence_transformers import SentenceTransformer
import openai
class RAGPipeline:
def __init__(self, db_path: str, embedding_model: str = "all-MiniLM-L6-v2"):
self.embedder = SentenceTransformer(embedding_model)
self.db = lancedb.connect(db_path)
self.table = self.db.open_table("documents")
self.openai_client = openai.OpenAI()
def retrieve(self, query: str, k: int = 5) -> list[dict]:
"""Retrieve relevant documents."""
query_embedding = self.embedder.encode([query])[0].tolist()
results = self.table.search(query_embedding).limit(k).to_pandas()
return results.to_dict('records')
def generate(self, query: str, context_docs: list[dict]) -> str:
"""Generate answer with LLM."""
context_text = "\n\n---\n\n".join([doc['text'] for doc in context_docs])
response = self.openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Answer based on the provided context. Cite sources using [1], [2], etc."},
{"role": "user", "content": f"Context:\n{context_text}\n\nQuestion: {query}\n\nAnswer:"}
],
temperature=0.1
)
return response.choices[0].message.content
def run(self, query: str, k: int = 5) -> dict:
"""Full RAG pipeline."""
docs = self.retrieve(query, k)
answer = self.generate(query, docs)
return {
"answer": answer,
"sources": [{"id": d['id'], "text": d['text'][:200]} for d in docs],
"source_count": len(docs)
}
# Usage
rag = RAGPipeline("./.lancedb")
result = rag.run("What is machine learning?")
print(result["answer"])
LLM Monitoring
import duckdb
from datetime import datetime
import time
class LLMMonitor:
def __init__(self, db_path: str = "llm_monitoring.db"):
self.conn = duckdb.connect(db_path)
self._init_tables()
def _init_tables(self):
self.conn.sql("""
CREATE TABLE IF NOT EXISTS llm_calls (
call_id VARCHAR PRIMARY KEY,
timestamp TIMESTAMP,
model VARCHAR,
prompt_tokens INTEGER,
completion_tokens INTEGER,
total_tokens INTEGER,
cost_usd DOUBLE,
latency_ms INTEGER,
status VARCHAR
)
""")
def log_call(self, model: str, prompt_tokens: int, completion_tokens: int, latency_ms: int):
# Approximate cost (update with current pricing)
cost_per_1k = {
"gpt-4o": 0.0025,
"gpt-4o-mini": 0.00015,
"text-embedding-3-small": 0.00002
}.get(model, 0.001)
total_tokens = prompt_tokens + completion_tokens
cost = total_tokens / 1000 * cost_per_1k
call_id = f"{datetime.now().timestamp():.6f}"
self.conn.execute("""
INSERT INTO llm_calls VALUES (?, NOW(), ?, ?, ?, ?, ?, ?, ?)
""", [call_id, model, prompt_tokens, completion_tokens, total_tokens, cost, latency_ms, "success"])
def get_usage_stats(self, days: int = 7) -> dict:
result = self.conn.sql(f"""
SELECT
COUNT(*) as total_calls,
SUM(total_tokens) as total_tokens,
SUM(cost_usd) as total_cost,
AVG(latency_ms) as avg_latency
FROM llm_calls
WHERE timestamp > NOW() - INTERVAL '{days} days'
""").fetchone()
return {
'total_calls': result[0],
'total_tokens': result[1],
'total_cost_usd': result[2],
'avg_latency_ms': result[3]
}
# Usage with retry pattern
import openai
from tenacity import retry, stop_after_attempt, wait_exponential
monitor = LLMMonitor()
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def monitored_llm_call(prompt: str, model: str = "gpt-4o") -> str:
client = openai.OpenAI()
start = time.time()
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
latency_ms = int((time.time() - start) * 1000)
usage = response.usage
monitor.log_call(model, usage.prompt_tokens, usage.completion_tokens, latency_ms)
return response.choices[0].message.content
Best Practices
Embeddings
- ✅ Batch API calls - OpenAI accepts up to 100 texts per request
- ✅ Chunk long texts - Split on token limits, respect natural boundaries
- ✅ Cache embeddings - Don't regenerate identical text (hash-based cache)
- ✅ Normalize embeddings - Most similarity metrics expect normalized vectors
- ✅ Use appropriate model - Trade off quality vs cost/latency
- ✅ Count tokens before sending - Use tiktoken to avoid truncation
- ❌ Don't embed rich text (HTML, markdown) - Extract plain text first
- ❌ Don't mix embedding models in same index - Different dimensions break search
Vector Storage
- ✅ Build index after bulk load - Not incremental for best performance
- ✅ Use metadata filtering - Filter on attributes before vector search
- ✅ Choose right index type - IVF_PQ for disk efficiency, HNSW for recall
- ✅ Normalize for cosine - Ensure embeddings are normalized if using cosine metric
- ✅ Include source metadata - Need attribution for RAG answers
- ❌ Don't store raw vectors without context - metadata essential
- ❌ Don't rebuild indexes frequently - expensive operation
- ❌ Don't use DuckDB for >100K vectors - no ANN indexing
RAG Pipelines
- ✅ Chunk intelligently - Don't split mid-sentence; respect semantic boundaries
- ✅ Include metadata - Filter on date, source, author before vector search
- ✅ Add source citations - Include [1], [2] references in answers
- ✅ Track token budget - Don't exceed LLM context window
- ✅ Handle "I don't know" - Train model to admit when context insufficient
- ❌ Don't retrieve too many chunks - quality degrades with noise
- ❌ Don't skip re-ranking for production - cross-encoder improves accuracy
- ❌ Don't put PII in vector DB without encryption
Monitoring
- ✅ Log every call - Success and failures; model, tokens, latency
- ✅ Hash prompts - Enable deduplication and caching
- ✅ Track costs by model - Different pricing tiers add up
- ✅ Set up alerts - Error rate > 5%, latency > 30s, cost spike
- ✅ Implement retry logic - Exponential backoff for rate limits
- ✅ Use structured logging - JSON format for easy parsing
- ❌ Don't log full prompts by default - PII and cost concerns
- ❌ Don't ignore cache hit rate - Measure and optimize
Related Skills
@designing-data-storage- File formats (Parquet, Lance) and lakehouse formats for storing embeddings@building-data-pipelines- Polars, DuckDB for data processing in pipelines@orchestrating-data-pipelines- Scheduling and batch processing for embedding generation@assuring-data-pipelines- Data validation and quality checks for ML pipelines@assuring-data-pipelines- General pipeline monitoring with OpenTelemetry
References
Weekly Installs
1
Repository
legout/data-agent-skillsFirst Seen
4 days ago
Security Audits
Installed on
mcpjam1
claude-code1
replit1
junie1
windsurf1
zencoder1