production-rag-checklist
SKILL.md
Production RAG Checklist
Everything you need to deploy RAG systems with confidence.
Pre-Production Checklist
Data Pipeline
-
Document ingestion automated
- Scheduled updates for dynamic sources
- Change detection for modified documents
- Deletion handling for removed documents
-
Chunking strategy validated
- Chunk sizes tested with retrieval quality
- Overlap tuned for context preservation
- Document-specific splitters for code/tables
-
Metadata enriched
- Source tracking (URL, file path, version)
- Timestamps (created, updated, indexed)
- Document type classification
- Access control tags (if needed)
-
Embedding pipeline robust
- Batch processing for efficiency
- Rate limiting for API-based embeddings
- Fallback for embedding failures
- Version tracking for re-embedding
Vector Store
-
Index configured properly
- Appropriate index type (HNSW, IVF, etc.)
- Parameters tuned (ef_construction, m, nlist)
- Distance metric matches embedding model
-
Scaling planned
- Estimated vector count and growth rate
- Sharding strategy if needed
- Backup and recovery procedures
-
High availability
- Replicas configured
- Failover tested
- Connection pooling enabled
Retrieval Quality
-
Evaluation dataset created
- Minimum 100 query-answer pairs
- Edge cases covered
- Regular updates with new patterns
-
Baseline metrics established
- Recall@5 > 0.8
- MRR > 0.7
- Latency p99 < 500ms
-
Hybrid search configured (if applicable)
- BM25/keyword weight tuned
- Reranker added and tested
Generation Quality
-
Prompt engineering complete
- System prompt tested across scenarios
- Few-shot examples if needed
- Output format specified
-
Guardrails in place
- Hallucination detection
- Toxicity filtering
- PII redaction (if needed)
-
Fallback responses defined
- "I don't know" for low confidence
- Error messages user-friendly
Infrastructure
API Layer
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
app = FastAPI()
class QueryRequest(BaseModel):
query: str
top_k: int = 5
filters: dict = None
class QueryResponse(BaseModel):
answer: str
sources: list
confidence: float
latency_ms: float
@app.post("/query", response_model=QueryResponse)
async def query_rag(request: QueryRequest):
start = time.time()
try:
# Timeout for long queries
result = await asyncio.wait_for(
rag_pipeline.ainvoke(request.query, request.top_k, request.filters),
timeout=30.0
)
return QueryResponse(
answer=result["answer"],
sources=[doc.metadata["source"] for doc in result["documents"]],
confidence=result.get("confidence", 0.0),
latency_ms=(time.time() - start) * 1000
)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="Query timeout")
except Exception as e:
logger.error(f"RAG error: {e}")
raise HTTPException(status_code=500, detail="Internal error")
Caching Layer
import hashlib
from redis import Redis
class RAGCache:
def __init__(self, redis_url: str, ttl_seconds: int = 3600):
self.redis = Redis.from_url(redis_url)
self.ttl = ttl_seconds
def _hash_query(self, query: str, filters: dict) -> str:
key = f"{query}:{json.dumps(filters, sort_keys=True)}"
return hashlib.sha256(key.encode()).hexdigest()
def get(self, query: str, filters: dict = None) -> dict | None:
key = self._hash_query(query, filters)
cached = self.redis.get(key)
return json.loads(cached) if cached else None
def set(self, query: str, filters: dict, result: dict):
key = self._hash_query(query, filters)
self.redis.setex(key, self.ttl, json.dumps(result))
def invalidate_by_source(self, source: str):
"""Invalidate cache when source document changes."""
# Store source->keys mapping for targeted invalidation
pattern = f"source:{source}:*"
for key in self.redis.scan_iter(pattern):
self.redis.delete(key)
Rate Limiting
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/query")
@limiter.limit("100/minute") # Per IP
async def query_rag(request: QueryRequest):
...
Monitoring
Metrics to Track
from prometheus_client import Counter, Histogram, Gauge
# Request metrics
rag_requests = Counter('rag_requests_total', 'Total RAG requests', ['status'])
rag_latency = Histogram('rag_latency_seconds', 'RAG latency', buckets=[0.1, 0.5, 1, 2, 5, 10])
# Quality metrics
retrieval_count = Histogram('rag_retrieval_count', 'Documents retrieved', buckets=[0, 1, 3, 5, 10])
confidence_score = Histogram('rag_confidence', 'Answer confidence', buckets=[0.1, 0.3, 0.5, 0.7, 0.9])
# System metrics
vector_store_latency = Histogram('vectorstore_latency_seconds', 'Vector store query time')
llm_latency = Histogram('llm_latency_seconds', 'LLM generation time')
cache_hits = Counter('rag_cache_hits_total', 'Cache hit count')
def track_request(func):
async def wrapper(*args, **kwargs):
with rag_latency.time():
try:
result = await func(*args, **kwargs)
rag_requests.labels(status='success').inc()
confidence_score.observe(result.get('confidence', 0))
return result
except Exception as e:
rag_requests.labels(status='error').inc()
raise
return wrapper
Alerting Rules
# Prometheus alerting rules
groups:
- name: rag_alerts
rules:
- alert: RAGHighLatency
expr: histogram_quantile(0.99, rag_latency_seconds) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "RAG p99 latency above 5s"
- alert: RAGHighErrorRate
expr: rate(rag_requests_total{status="error"}[5m]) / rate(rag_requests_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "RAG error rate above 5%"
- alert: RAGLowConfidence
expr: histogram_quantile(0.5, rag_confidence) < 0.5
for: 15m
labels:
severity: warning
annotations:
summary: "RAG median confidence below 0.5"
Logging
import structlog
logger = structlog.get_logger()
async def log_rag_request(query: str, result: dict, latency_ms: float):
logger.info(
"rag_request",
query=query[:100], # Truncate for privacy
query_hash=hashlib.sha256(query.encode()).hexdigest()[:8],
num_sources=len(result.get("sources", [])),
confidence=result.get("confidence"),
latency_ms=latency_ms,
cache_hit=result.get("cache_hit", False),
model=result.get("model_used")
)
Security
-
Input validation
- Query length limits
- Injection prevention
- Rate limiting per user
-
Access control
- Document-level permissions
- User authentication
- API key management
-
Data privacy
- PII handling defined
- Data retention policy
- Audit logging enabled
Cost Management
def estimate_monthly_cost(
queries_per_day: int,
avg_tokens_per_query: int = 2000,
embedding_calls_per_day: int = 1000
) -> dict:
"""Estimate monthly RAG costs."""
# LLM costs (GPT-4)
llm_input_cost = 0.03 / 1000 # per token
llm_output_cost = 0.06 / 1000
# Embedding costs (text-embedding-3-small)
embedding_cost = 0.00002 / 1000 # per token
# Vector DB (Pinecone Starter)
vector_db_monthly = 70 # USD
monthly_queries = queries_per_day * 30
monthly_embeddings = embedding_calls_per_day * 30
return {
"llm_cost": monthly_queries * avg_tokens_per_query * (llm_input_cost + llm_output_cost * 0.3),
"embedding_cost": monthly_embeddings * 500 * embedding_cost,
"vector_db_cost": vector_db_monthly,
"estimated_total": "Calculate based on above"
}
Go-Live Checklist
- Load testing passed (target QPS achieved)
- Failover tested
- Rollback procedure documented
- On-call rotation set up
- Runbook created
- User documentation ready
- Feedback collection mechanism in place
Best Practices
- Start with caching - reduces cost and latency significantly
- Monitor quality metrics - not just uptime
- Version everything - embeddings, prompts, indexes
- Plan for reindexing - you will need to rebuild
- Set cost alerts - LLM costs can spike unexpectedly
- Collect user feedback - thumbs up/down on answers
Weekly Installs
16
Repository
latestaiagents/…t-skillsGitHub Stars
2
First Seen
Feb 5, 2026
Security Audits
Installed on
cursor16
gemini-cli16
amp16
cline16
github-copilot16
codex16