engineering-ai-pipelines
Engineering AI Pipelines
Production-ready patterns for AI/ML data pipelines: generating embeddings, selecting vector databases, building RAG systems, and monitoring LLM usage. Covers local and cloud-based approaches with cost and performance considerations.
Table of Contents
- Quick Comparison
- When to Use Which?
- Pipeline Selection Matrix
- Detailed Reference Guides
- Code Examples
- Best Practices
- Related Skills
- References
Quick Comparison
Embedding Approaches
| Approach | Provider | Dimensions | Cost | Speed | Best For |
|---|---|---|---|---|---|
| OpenAI 3-small | OpenAI | 1536 | $0.00002/1K tokens | ⚡⚡⚡ Fast | Production, convenience |
| OpenAI 3-large | OpenAI | 3072 | $0.00013/1K tokens | ⚡⚡ | Higher quality needs |
| MiniLM-L6 | Local (sentence-transformers) | 384 | Free | ⚡⚡ | Privacy, offline, cost control |
| MPNet-base | Local (sentence-transformers) | 768 | Free | ⚡⚡ | Better quality than MiniLM |
| OpenAI ada-002 | OpenAI | 1536 | $0.00010/1K tokens | ⚡⚡⚡ | Legacy compatibility |
Vector Databases
| Feature | LanceDB | pgvector | DuckDB | Pinecone/Weaviate |
|---|---|---|---|---|
| Deployment | Embedded file | PostgreSQL | Embedded | Managed service |
| Scale | Millions | Tens of millions | <100K | Billions |
| Index Types | IVF_PQ, HNSW | HNSW, IVFFlat | None | HNSW, IVF |
| Cloud Native | ✅ S3/GCS/Azure | Via RDS | ❌ | ✅ |
| ACID | ✅ | ✅ ✅ | ✅ | Varies |
| Cost | Free | Postgres cost | Free | Usage-based |
| Best For | RAG apps, prototyping | Production + existing PG | Quick experiments | Enterprise scale |
When to Use Which?
Embedding Generation
Choose OpenAI API when:
- You need production reliability without managing infrastructure
- Cost is acceptable ($0.02-0.13 per 1M tokens)
- You don't have GPU resources for local inference
- Token limits (8K) fit your use case
- You want state-of-the-art quality without tuning
Choose Local Models (sentence-transformers) when:
- Data privacy is critical (healthcare, finance)
- You need offline/air-gapped operation
- You have high volume (millions of documents)
- You want to avoid API rate limits
- You have GPU resources available
Choose OpenAI 3-small vs 3-large:
- Use 3-small for most use cases (cost-effective, good quality)
- Use 3-large when you need maximum retrieval accuracy
Vector Storage
Choose LanceDB when:
- Building RAG applications or prototypes
- Need embedded deployment (no server management)
- Working with multi-modal data (text + images + vectors)
- Want Arrow-native integration with Polars/PyArrow
- Need cloud storage (S3/GCS) without complex setup
Choose pgvector when:
- Already using PostgreSQL for transactional data
- Need ACID guarantees with vector search
- Complex joins between vectors and relational data
- Cannot tolerate separate vector DB infrastructure
Choose DuckDB when:
- Quick experiments or small datasets (<100K vectors)
- Already using DuckDB for analytics
- Don't need approximate nearest neighbor (ANN) indexing
- Simple cosine similarity is sufficient
Choose Managed Services (Pinecone, Weaviate) when:
- Billions of vectors
- Need managed infrastructure
- Enterprise SLA requirements
- Hybrid search (vector + keyword) out of the box
RAG Architecture
Simple RAG (single stage):
Query → Embedding → Vector Search → Context → LLM → Answer
- Best for: Quick prototypes, well-structured documents
- Tools: LanceDB + OpenAI
Advanced RAG (multi-stage):
Query → Embedding → Vector Search → Re-ranking → Context → LLM → Answer
- Best for: Production systems requiring high accuracy
- Add: Cross-encoder re-ranking, metadata filtering
Hybrid RAG:
Query → [Vector Search + Keyword/BM25] → Fusion → Context → LLM → Answer
- Best for: Domain-specific terminology, acronyms
- Tools: LanceDB hybrid search or specialized services
Pipeline Selection Matrix
| Use Case | Embedding | Vector DB | RAG Pattern | Monitoring |
|---|---|---|---|---|
| Internal knowledge base | OpenAI 3-small | LanceDB | Simple RAG | Basic cost tracking |
| Customer-facing chatbot | OpenAI 3-large | pgvector | Advanced (re-ranking) | Full observability |
| Healthcare (HIPAA) | Local MiniLM | LanceDB (on-prem) | Simple RAG | Local logging only |
| Financial documents | OpenAI 3-small | pgvector | Hybrid RAG | Cost + audit logs |
| Research paper search | Local MPNet | LanceDB | Advanced RAG | Basic metrics |
| High-volume (>10M docs) | OpenAI 3-small | Managed (Pinecone) | Simple RAG | Full observability |
| Multi-modal (text + images) | CLIP/Local | LanceDB | Simple RAG | Basic tracking |
Detailed Reference Guides
Core Patterns
references/embeddings.md- OpenAI API, sentence-transformers, batch processing, model selectionreferences/vector-stores.md- LanceDB, pgvector, DuckDB comparison and selectionreferences/rag-pipelines.md- Chunking strategies, context assembly, retrieval patternsreferences/monitoring.md- Cost tracking, retry logic, OpenTelemetry, quality evaluation
Code Examples
OpenAI Embeddings
import openai
import polars as pl
import tiktoken
class OpenAIEmbeddingPipeline:
def __init__(self, model: str = "text-embedding-3-small"):
self.client = openai.OpenAI()
self.model = model
self.encoding = tiktoken.encoding_for_model(model)
def generate_embeddings(self, texts: list[str]) -> list[list[float]]:
"""Generate embeddings in batches of 100."""
all_embeddings = []
batch_size = 100
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = self.client.embeddings.create(
model=self.model,
input=batch
)
all_embeddings.extend([e.embedding for e in response.data])
return all_embeddings
def process_dataframe(self, df: pl.DataFrame, text_col: str) -> pl.DataFrame:
"""Add embeddings column to Polars DataFrame."""
texts = df[text_col].to_list()
embeddings = self.generate_embeddings(texts)
return df.with_columns(pl.Series("embedding", embeddings))
# Usage
pipeline = OpenAIEmbeddingPipeline()
df = pl.DataFrame({"text": ["Hello world", "Goodbye world"]})
df_with_embeddings = pipeline.process_dataframe(df, "text")
Local Embeddings
from sentence_transformers import SentenceTransformer
import polars as pl
model = SentenceTransformer('all-MiniLM-L6-v2') # 384 dimensions
texts = ["Hello world", "Goodbye world"]
embeddings = model.encode(texts, show_progress_bar=True)
df = pl.DataFrame({
"text": texts,
"embedding": embeddings.tolist()
})
Vector Search with LanceDB
import lancedb
import polars as pl
# Connect and create table
db = lancedb.connect("./.lancedb")
df = pl.DataFrame({
"id": [1, 2, 3],
"text": ["Machine learning basics", "Deep learning intro", "Neural networks"],
"embedding": [[0.1] * 384, [0.2] * 384, [0.3] * 384]
})
table = db.create_table("documents", df.to_arrow())
# Create index for faster search
table.create_index(
vector_column_name="embedding",
metric="cosine",
index_type="IVF_PQ",
num_partitions=256,
num_sub_vectors=96
)
# Search with filtering
query_embedding = [0.15] * 384
results = (
table.search(query_embedding)
.where("id > 1")
.limit(5)
.to_pandas()
)
Complete RAG Pipeline
import lancedb
from sentence_transformers import SentenceTransformer
import openai
class RAGPipeline:
def __init__(self, db_path: str, embedding_model: str = "all-MiniLM-L6-v2"):
self.embedder = SentenceTransformer(embedding_model)
self.db = lancedb.connect(db_path)
self.table = self.db.open_table("documents")
self.openai_client = openai.OpenAI()
def retrieve(self, query: str, k: int = 5) -> list[dict]:
"""Retrieve relevant documents."""
query_embedding = self.embedder.encode([query])[0].tolist()
results = self.table.search(query_embedding).limit(k).to_pandas()
return results.to_dict('records')
def generate(self, query: str, context_docs: list[dict]) -> str:
"""Generate answer with LLM."""
context_text = "\n\n---\n\n".join([doc['text'] for doc in context_docs])
response = self.openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Answer based on the provided context. Cite sources using [1], [2], etc."},
{"role": "user", "content": f"Context:\n{context_text}\n\nQuestion: {query}\n\nAnswer:"}
],
temperature=0.1
)
return response.choices[0].message.content
def run(self, query: str, k: int = 5) -> dict:
"""Full RAG pipeline."""
docs = self.retrieve(query, k)
answer = self.generate(query, docs)
return {
"answer": answer,
"sources": [{"id": d['id'], "text": d['text'][:200]} for d in docs],
"source_count": len(docs)
}
# Usage
rag = RAGPipeline("./.lancedb")
result = rag.run("What is machine learning?")
print(result["answer"])
LLM Monitoring
import duckdb
from datetime import datetime
import time
class LLMMonitor:
def __init__(self, db_path: str = "llm_monitoring.db"):
self.conn = duckdb.connect(db_path)
self._init_tables()
def _init_tables(self):
self.conn.sql("""
CREATE TABLE IF NOT EXISTS llm_calls (
call_id VARCHAR PRIMARY KEY,
timestamp TIMESTAMP,
model VARCHAR,
prompt_tokens INTEGER,
completion_tokens INTEGER,
total_tokens INTEGER,
cost_usd DOUBLE,
latency_ms INTEGER,
status VARCHAR
)
""")
def log_call(self, model: str, prompt_tokens: int, completion_tokens: int, latency_ms: int):
# Approximate cost (update with current pricing)
cost_per_1k = {
"gpt-4o": 0.0025,
"gpt-4o-mini": 0.00015,
"text-embedding-3-small": 0.00002
}.get(model, 0.001)
total_tokens = prompt_tokens + completion_tokens
cost = total_tokens / 1000 * cost_per_1k
call_id = f"{datetime.now().timestamp():.6f}"
self.conn.execute("""
INSERT INTO llm_calls VALUES (?, NOW(), ?, ?, ?, ?, ?, ?, ?)
""", [call_id, model, prompt_tokens, completion_tokens, total_tokens, cost, latency_ms, "success"])
def get_usage_stats(self, days: int = 7) -> dict:
result = self.conn.sql(f"""
SELECT
COUNT(*) as total_calls,
SUM(total_tokens) as total_tokens,
SUM(cost_usd) as total_cost,
AVG(latency_ms) as avg_latency
FROM llm_calls
WHERE timestamp > NOW() - INTERVAL '{days} days'
""").fetchone()
return {
'total_calls': result[0],
'total_tokens': result[1],
'total_cost_usd': result[2],
'avg_latency_ms': result[3]
}
# Usage with retry pattern
import openai
from tenacity import retry, stop_after_attempt, wait_exponential
monitor = LLMMonitor()
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def monitored_llm_call(prompt: str, model: str = "gpt-4o") -> str:
client = openai.OpenAI()
start = time.time()
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
latency_ms = int((time.time() - start) * 1000)
usage = response.usage
monitor.log_call(model, usage.prompt_tokens, usage.completion_tokens, latency_ms)
return response.choices[0].message.content
Best Practices
Embeddings
- ✅ Batch API calls - OpenAI accepts up to 100 texts per request
- ✅ Chunk long texts - Split on token limits, respect natural boundaries
- ✅ Cache embeddings - Don't regenerate identical text (hash-based cache)
- ✅ Normalize embeddings - Most similarity metrics expect normalized vectors
- ✅ Use appropriate model - Trade off quality vs cost/latency
- ✅ Count tokens before sending - Use tiktoken to avoid truncation
- ❌ Don't embed rich text (HTML, markdown) - Extract plain text first
- ❌ Don't mix embedding models in same index - Different dimensions break search
Vector Storage
- ✅ Build index after bulk load - Not incremental for best performance
- ✅ Use metadata filtering - Filter on attributes before vector search
- ✅ Choose right index type - IVF_PQ for disk efficiency, HNSW for recall
- ✅ Normalize for cosine - Ensure embeddings are normalized if using cosine metric
- ✅ Include source metadata - Need attribution for RAG answers
- ❌ Don't store raw vectors without context - metadata essential
- ❌ Don't rebuild indexes frequently - expensive operation
- ❌ Don't use DuckDB for >100K vectors - no ANN indexing
RAG Pipelines
- ✅ Chunk intelligently - Don't split mid-sentence; respect semantic boundaries
- ✅ Include metadata - Filter on date, source, author before vector search
- ✅ Add source citations - Include [1], [2] references in answers
- ✅ Track token budget - Don't exceed LLM context window
- ✅ Handle "I don't know" - Train model to admit when context insufficient
- ❌ Don't retrieve too many chunks - quality degrades with noise
- ❌ Don't skip re-ranking for production - cross-encoder improves accuracy
- ❌ Don't put PII in vector DB without encryption
Monitoring
- ✅ Log every call - Success and failures; model, tokens, latency
- ✅ Hash prompts - Enable deduplication and caching
- ✅ Track costs by model - Different pricing tiers add up
- ✅ Set up alerts - Error rate > 5%, latency > 30s, cost spike
- ✅ Implement retry logic - Exponential backoff for rate limits
- ✅ Use structured logging - JSON format for easy parsing
- ❌ Don't log full prompts by default - PII and cost concerns
- ❌ Don't ignore cache hit rate - Measure and optimize
Related Skills
@designing-data-storage- File formats (Parquet, Lance) and lakehouse formats for storing embeddings@building-data-pipelines- Polars, DuckDB for data processing in pipelines@orchestrating-data-pipelines- Scheduling and batch processing for embedding generation@assuring-data-pipelines- Data validation and quality checks for ML pipelines@assuring-data-pipelines- General pipeline monitoring with OpenTelemetry
References
More from legout/data-agent-skills
data-engineering
Comprehensive data engineering skill suite covering core libraries (Polars, DuckDB, PyArrow), lakehouse formats, cloud storage, orchestration, streaming, quality, observability, and AI/ML pipelines.
5data-science-eda
Exploratory Data Analysis (EDA): profiling, visualization, correlation analysis, and data quality checks. Use when understanding dataset structure, distributions, relationships, or preparing for feature engineering and modeling.
4data-science-notebooks
Interactive notebooks for data science: Jupyter, JupyterLab, and marimo. Use for exploratory analysis, reproducible research, documentation, and sharing insights with stakeholders.
4data-engineering-storage-remote-access-integrations-duckdb
Using DuckDB with remote cloud storage via HTTPFS extension, fsspec, and Delta Lake integration. Covers S3, GCS, Azure, and S3-compatible endpoints.
4data-engineering-storage-remote-access-libraries-pyarrow-fs
Native Arrow filesystem integration with PyArrow. Optimized for Parquet workflows, zero-copy data transfer, predicate pushdown, and column pruning. Covers S3, GCS, HDFS with PyArrow datasets.
4flowerpower
Create and manage data pipelines using the FlowerPower framework with Hamilton DAGs and uv. Lightweight orchestration for batch ETL, data transformation, and ML pipelines. Integrates with Delta Lake, DuckDB, Polars, and cloud storage.
4