rag-evaluation
SKILL.md
RAG Evaluation
Measure, monitor, and improve RAG system performance with comprehensive metrics.
When to Use
- Setting up RAG evaluation pipelines
- Comparing retrieval strategies
- Measuring generation quality
- Building regression tests for RAG
- Debugging poor RAG performance
Evaluation Framework
┌─────────────────────────────────────────────────────────┐
│ RAG Evaluation │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Retrieval │ │ Generation │ │ End-to-End │ │
│ │ Metrics │ │ Metrics │ │ Metrics │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ • MRR │ │ • Faithful- │ │ • Answer │ │
│ │ • Recall@k │ │ ness │ │ Correct- │ │
│ │ • Precision │ │ • Relevance │ │ ness │ │
│ │ • NDCG │ │ • Coherence │ │ • Latency │ │
│ │ • Hit Rate │ │ • Toxicity │ │ • Cost │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
Retrieval Metrics
Implementation
import numpy as np
from typing import List, Dict
def mean_reciprocal_rank(results: List[List[str]], relevant: List[List[str]]) -> float:
"""
Calculate MRR across queries.
results: List of ranked document IDs per query
relevant: List of relevant document IDs per query
"""
mrr_sum = 0.0
for res, rel in zip(results, relevant):
rel_set = set(rel)
for rank, doc_id in enumerate(res, 1):
if doc_id in rel_set:
mrr_sum += 1.0 / rank
break
return mrr_sum / len(results)
def recall_at_k(results: List[List[str]], relevant: List[List[str]], k: int) -> float:
"""Calculate Recall@k."""
recall_sum = 0.0
for res, rel in zip(results, relevant):
retrieved_k = set(res[:k])
relevant_set = set(rel)
if relevant_set:
recall_sum += len(retrieved_k & relevant_set) / len(relevant_set)
return recall_sum / len(results)
def precision_at_k(results: List[List[str]], relevant: List[List[str]], k: int) -> float:
"""Calculate Precision@k."""
precision_sum = 0.0
for res, rel in zip(results, relevant):
retrieved_k = set(res[:k])
relevant_set = set(rel)
precision_sum += len(retrieved_k & relevant_set) / k
return precision_sum / len(results)
def ndcg_at_k(results: List[List[str]], relevant: List[List[str]], k: int) -> float:
"""Calculate NDCG@k."""
def dcg(scores):
return sum(s / np.log2(i + 2) for i, s in enumerate(scores))
ndcg_sum = 0.0
for res, rel in zip(results, relevant):
rel_set = set(rel)
gains = [1 if doc in rel_set else 0 for doc in res[:k]]
ideal_gains = sorted(gains, reverse=True)
dcg_val = dcg(gains)
idcg_val = dcg(ideal_gains)
ndcg_sum += dcg_val / idcg_val if idcg_val > 0 else 0
return ndcg_sum / len(results)
def hit_rate(results: List[List[str]], relevant: List[List[str]], k: int) -> float:
"""Calculate Hit Rate (any relevant doc in top-k)."""
hits = 0
for res, rel in zip(results, relevant):
if set(res[:k]) & set(rel):
hits += 1
return hits / len(results)
Evaluation Runner
class RetrievalEvaluator:
def __init__(self, retriever, test_dataset: List[Dict]):
"""
test_dataset: [{"query": str, "relevant_docs": [str]}]
"""
self.retriever = retriever
self.dataset = test_dataset
def evaluate(self, k_values: List[int] = [1, 3, 5, 10]) -> Dict:
# Run retrieval for all queries
results = []
relevant = []
for item in self.dataset:
docs = self.retriever.invoke(item["query"])
doc_ids = [d.metadata["id"] for d in docs]
results.append(doc_ids)
relevant.append(item["relevant_docs"])
# Calculate metrics
metrics = {"mrr": mean_reciprocal_rank(results, relevant)}
for k in k_values:
metrics[f"recall@{k}"] = recall_at_k(results, relevant, k)
metrics[f"precision@{k}"] = precision_at_k(results, relevant, k)
metrics[f"ndcg@{k}"] = ndcg_at_k(results, relevant, k)
metrics[f"hit_rate@{k}"] = hit_rate(results, relevant, k)
return metrics
Generation Metrics with RAGAS
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
answer_correctness
)
from datasets import Dataset
def evaluate_with_ragas(test_data: List[Dict]) -> Dict:
"""
test_data: [{
"question": str,
"answer": str, # Generated answer
"contexts": [str], # Retrieved contexts
"ground_truth": str # Expected answer (optional)
}]
"""
dataset = Dataset.from_list(test_data)
metrics = [
faithfulness, # Is answer grounded in context?
answer_relevancy, # Is answer relevant to question?
context_precision, # Are retrieved contexts precise?
context_recall, # Do contexts cover ground truth?
]
# Add answer_correctness if ground truth available
if "ground_truth" in test_data[0]:
metrics.append(answer_correctness)
result = evaluate(dataset, metrics=metrics)
return result.to_pandas().mean().to_dict()
Custom LLM-as-Judge Evaluation
from langchain_openai import ChatOpenAI
FAITHFULNESS_PROMPT = """Rate if the answer is faithful to the context (uses only information from context).
Context: {context}
Answer: {answer}
Score 1-5:
1: Completely unfaithful, makes up information
2: Mostly unfaithful, significant hallucinations
3: Partially faithful, some unsupported claims
4: Mostly faithful, minor extrapolations
5: Completely faithful, all claims supported
Return JSON: {{"score": N, "reasoning": "..."}}"""
RELEVANCE_PROMPT = """Rate if the answer is relevant to the question.
Question: {question}
Answer: {answer}
Score 1-5:
1: Completely irrelevant
2: Tangentially related
3: Partially answers the question
4: Mostly answers the question
5: Fully and directly answers the question
Return JSON: {{"score": N, "reasoning": "..."}}"""
class LLMJudge:
def __init__(self, model: str = "gpt-4"):
self.llm = ChatOpenAI(model=model, temperature=0)
def judge_faithfulness(self, context: str, answer: str) -> Dict:
prompt = FAITHFULNESS_PROMPT.format(context=context, answer=answer)
result = self.llm.invoke(prompt).content
return json.loads(result)
def judge_relevance(self, question: str, answer: str) -> Dict:
prompt = RELEVANCE_PROMPT.format(question=question, answer=answer)
result = self.llm.invoke(prompt).content
return json.loads(result)
def evaluate_batch(self, test_data: List[Dict]) -> Dict:
faithfulness_scores = []
relevance_scores = []
for item in test_data:
context = "\n".join(item["contexts"])
faith = self.judge_faithfulness(context, item["answer"])
faithfulness_scores.append(faith["score"])
rel = self.judge_relevance(item["question"], item["answer"])
relevance_scores.append(rel["score"])
return {
"avg_faithfulness": np.mean(faithfulness_scores),
"avg_relevance": np.mean(relevance_scores),
"faithfulness_scores": faithfulness_scores,
"relevance_scores": relevance_scores
}
End-to-End RAG Evaluation
import time
from dataclasses import dataclass
@dataclass
class RAGEvalResult:
query: str
retrieved_docs: List
generated_answer: str
latency_ms: float
retrieval_metrics: Dict
generation_metrics: Dict
cost_usd: float
class RAGEvaluator:
def __init__(self, rag_pipeline, judge: LLMJudge):
self.pipeline = rag_pipeline
self.judge = judge
def evaluate_single(self, query: str, ground_truth: Dict = None) -> RAGEvalResult:
# Measure latency
start = time.time()
result = self.pipeline.invoke(query)
latency_ms = (time.time() - start) * 1000
# Retrieval metrics (if ground truth available)
retrieval_metrics = {}
if ground_truth and "relevant_docs" in ground_truth:
retrieved_ids = [d.metadata["id"] for d in result["documents"]]
relevant_ids = ground_truth["relevant_docs"]
retrieval_metrics = {
"recall@5": len(set(retrieved_ids[:5]) & set(relevant_ids)) / len(relevant_ids),
"precision@5": len(set(retrieved_ids[:5]) & set(relevant_ids)) / 5
}
# Generation metrics
context = "\n".join([d.page_content for d in result["documents"]])
generation_metrics = {
"faithfulness": self.judge.judge_faithfulness(context, result["answer"]),
"relevance": self.judge.judge_relevance(query, result["answer"])
}
return RAGEvalResult(
query=query,
retrieved_docs=result["documents"],
generated_answer=result["answer"],
latency_ms=latency_ms,
retrieval_metrics=retrieval_metrics,
generation_metrics=generation_metrics,
cost_usd=estimate_cost(result)
)
def evaluate_dataset(self, test_dataset: List[Dict]) -> Dict:
results = []
for item in test_dataset:
result = self.evaluate_single(
item["query"],
item.get("ground_truth")
)
results.append(result)
# Aggregate metrics
return {
"avg_latency_ms": np.mean([r.latency_ms for r in results]),
"avg_faithfulness": np.mean([r.generation_metrics["faithfulness"]["score"] for r in results]),
"avg_relevance": np.mean([r.generation_metrics["relevance"]["score"] for r in results]),
"total_cost_usd": sum([r.cost_usd for r in results]),
"detailed_results": results
}
Continuous Evaluation Pipeline
def setup_eval_pipeline(rag_pipeline, test_dataset_path: str):
"""Set up automated evaluation."""
# Load test dataset
with open(test_dataset_path) as f:
test_data = json.load(f)
evaluator = RAGEvaluator(rag_pipeline, LLMJudge())
def run_evaluation() -> Dict:
results = evaluator.evaluate_dataset(test_data)
# Log to monitoring
log_metrics({
"rag_latency_p50": np.percentile([r.latency_ms for r in results["detailed_results"]], 50),
"rag_latency_p99": np.percentile([r.latency_ms for r in results["detailed_results"]], 99),
"rag_faithfulness": results["avg_faithfulness"],
"rag_relevance": results["avg_relevance"]
})
# Alert on regression
if results["avg_faithfulness"] < 3.5:
alert("RAG faithfulness dropped below threshold!")
return results
return run_evaluation
Best Practices
- Build golden dataset - manually curated query-answer pairs
- Test edge cases - ambiguous queries, no-answer scenarios
- Track over time - metrics should improve, not regress
- Separate retrieval vs generation - diagnose issues precisely
- Use multiple judges - LLM judges can disagree
- Monitor production - sample and evaluate live traffic
Weekly Installs
13
Repository
latestaiagents/…t-skillsGitHub Stars
2
First Seen
Feb 5, 2026
Security Audits
Installed on
gemini-cli13
github-copilot13
codex13
amp13
cline13
kimi-cli13