embedding-strategies
SKILL.md
Embedding Strategies
Guide to selecting and optimizing embedding models for vector search applications.
When to Use This Skill
- Choosing embedding models for RAG
- Optimizing chunking strategies
- Fine-tuning embeddings for domains
- Comparing embedding model performance
- Reducing embedding dimensions
- Handling multilingual content
Core Concepts
1. Embedding Model Comparison (2026)
| Model | Dimensions | Max Tokens | Best For |
|---|---|---|---|
| voyage-3-large | 1024 | 32000 | Claude apps (Anthropic recommended) |
| voyage-3 | 1024 | 32000 | Claude apps, cost-effective |
| voyage-code-3 | 1024 | 32000 | Code search |
| voyage-finance-2 | 1024 | 32000 | Financial documents |
| voyage-law-2 | 1024 | 32000 | Legal documents |
| text-embedding-3-large | 3072 | 8191 | OpenAI apps, high accuracy |
| text-embedding-3-small | 1536 | 8191 | OpenAI apps, cost-effective |
| bge-large-en-v1.5 | 1024 | 512 | Open source, local deployment |
| all-MiniLM-L6-v2 | 384 | 256 | Fast, lightweight |
| multilingual-e5-large | 1024 | 512 | Multi-language |
2. Embedding Pipeline
Document → Chunking → Preprocessing → Embedding Model → Vector
↓
[Overlap, Size] [Clean, Normalize] [API/Local]
Templates
Template 1: Voyage AI Embeddings (Recommended for Claude)
from langchain_voyageai import VoyageAIEmbeddings
from typing import List
import os
# Initialize Voyage AI embeddings (recommended by Anthropic for Claude)
embeddings = VoyageAIEmbeddings(
model="voyage-3-large",
voyage_api_key=os.environ.get("VOYAGE_API_KEY")
)
def get_embeddings(texts: List[str]) -> List[List[float]]:
"""Get embeddings from Voyage AI."""
return embeddings.embed_documents(texts)
def get_query_embedding(query: str) -> List[float]:
"""Get single query embedding."""
return embeddings.embed_query(query)
# Specialized models for domains
code_embeddings = VoyageAIEmbeddings(model="voyage-code-3")
finance_embeddings = VoyageAIEmbeddings(model="voyage-finance-2")
legal_embeddings = VoyageAIEmbeddings(model="voyage-law-2")
Template 2: OpenAI Embeddings
from openai import OpenAI
from typing import List
import numpy as np
client = OpenAI()
def get_embeddings(
texts: List[str],
model: str = "text-embedding-3-small",
dimensions: int = None
) -> List[List[float]]:
"""Get embeddings from OpenAI with optional dimension reduction."""
# Handle batching for large lists
batch_size = 100
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
kwargs = {"input": batch, "model": model}
if dimensions:
# Matryoshka dimensionality reduction
kwargs["dimensions"] = dimensions
response = client.embeddings.create(**kwargs)
embeddings = [item.embedding for item in response.data]
all_embeddings.extend(embeddings)
return all_embeddings
def get_embedding(text: str, **kwargs) -> List[float]:
"""Get single embedding."""
return get_embeddings([text], **kwargs)[0]
# Dimension reduction with Matryoshka embeddings
def get_reduced_embedding(text: str, dimensions: int = 512) -> List[float]:
"""Get embedding with reduced dimensions (Matryoshka)."""
return get_embedding(
text,
model="text-embedding-3-small",
dimensions=dimensions
)
Template 3: Local Embeddings with Sentence Transformers
from sentence_transformers import SentenceTransformer
from typing import List, Optional
import numpy as np
class LocalEmbedder:
"""Local embedding with sentence-transformers."""
def __init__(
self,
model_name: str = "BAAI/bge-large-en-v1.5",
device: str = "cuda"
):
self.model = SentenceTransformer(model_name, device=device)
self.model_name = model_name
def embed(
self,
texts: List[str],
normalize: bool = True,
show_progress: bool = False
) -> np.ndarray:
"""Embed texts with optional normalization."""
embeddings = self.model.encode(
texts,
normalize_embeddings=normalize,
show_progress_bar=show_progress,
convert_to_numpy=True
)
return embeddings
def embed_query(self, query: str) -> np.ndarray:
"""Embed a query with appropriate prefix for retrieval models."""
# BGE and similar models benefit from query prefix
if "bge" in self.model_name.lower():
query = f"Represent this sentence for searching relevant passages: {query}"
return self.embed([query])[0]
def embed_documents(self, documents: List[str]) -> np.ndarray:
"""Embed documents for indexing."""
return self.embed(documents)
# E5 model with instructions
class E5Embedder:
def __init__(self, model_name: str = "intfloat/multilingual-e5-large"):
self.model = SentenceTransformer(model_name)
def embed_query(self, query: str) -> np.ndarray:
"""E5 requires 'query:' prefix for queries."""
return self.model.encode(f"query: {query}")
def embed_document(self, document: str) -> np.ndarray:
"""E5 requires 'passage:' prefix for documents."""
return self.model.encode(f"passage: {document}")
Template 4: Chunking Strategies
from typing import List, Tuple
import re
def chunk_by_tokens(
text: str,
chunk_size: int = 512,
chunk_overlap: int = 50,
tokenizer=None
) -> List[str]:
"""Chunk text by token count."""
import tiktoken
tokenizer = tokenizer or tiktoken.get_encoding("cl100k_base")
tokens = tokenizer.encode(text)
chunks = []
start = 0
while start < len(tokens):
end = start + chunk_size
chunk_tokens = tokens[start:end]
chunk_text = tokenizer.decode(chunk_tokens)
chunks.append(chunk_text)
start = end - chunk_overlap
return chunks
def chunk_by_sentences(
text: str,
max_chunk_size: int = 1000,
min_chunk_size: int = 100
) -> List[str]:
"""Chunk text by sentences, respecting size limits."""
import nltk
sentences = nltk.sent_tokenize(text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
if current_size + sentence_size > max_chunk_size and current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_size = 0
current_chunk.append(sentence)
current_size += sentence_size
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def chunk_by_semantic_sections(
text: str,
headers_pattern: str = r'^#{1,3}\s+.+$'
) -> List[Tuple[str, str]]:
"""Chunk markdown by headers, preserving hierarchy."""
lines = text.split('\n')
chunks = []
current_header = ""
current_content = []
for line in lines:
if re.match(headers_pattern, line, re.MULTILINE):
if current_content:
chunks.append((current_header, '\n'.join(current_content)))
current_header = line
current_content = []
else:
current_content.append(line)
if current_content:
chunks.append((current_header, '\n'.join(current_content)))
return chunks
def recursive_character_splitter(
text: str,
chunk_size: int = 1000,
chunk_overlap: int = 200,
separators: List[str] = None
) -> List[str]:
"""LangChain-style recursive splitter."""
separators = separators or ["\n\n", "\n", ". ", " ", ""]
def split_text(text: str, separators: List[str]) -> List[str]:
if not text:
return []
separator = separators[0]
remaining_separators = separators[1:]
if separator == "":
# Character-level split
return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - chunk_overlap)]
splits = text.split(separator)
chunks = []
current_chunk = []
current_length = 0
for split in splits:
split_length = len(split) + len(separator)
if current_length + split_length > chunk_size and current_chunk:
chunk_text = separator.join(current_chunk)
# Recursively split if still too large
if len(chunk_text) > chunk_size and remaining_separators:
chunks.extend(split_text(chunk_text, remaining_separators))
else:
chunks.append(chunk_text)
# Start new chunk with overlap
overlap_splits = []
overlap_length = 0
for s in reversed(current_chunk):
if overlap_length + len(s) <= chunk_overlap:
overlap_splits.insert(0, s)
overlap_length += len(s)
else:
break
current_chunk = overlap_splits
current_length = overlap_length
current_chunk.append(split)
current_length += split_length
if current_chunk:
chunks.append(separator.join(current_chunk))
return chunks
return split_text(text, separators)
Template 5: Domain-Specific Embedding Pipeline
import re
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class EmbeddedDocument:
id: str
document_id: str
chunk_index: int
text: str
embedding: List[float]
metadata: dict
class DomainEmbeddingPipeline:
"""Pipeline for domain-specific embeddings."""
def __init__(
self,
embedding_model: str = "voyage-3-large",
chunk_size: int = 512,
chunk_overlap: int = 50,
preprocessing_fn=None
):
self.embeddings = VoyageAIEmbeddings(model=embedding_model)
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.preprocess = preprocessing_fn or self._default_preprocess
def _default_preprocess(self, text: str) -> str:
"""Default preprocessing."""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters (customize for your domain)
text = re.sub(r'[^\w\s.,!?-]', '', text)
return text.strip()
async def process_documents(
self,
documents: List[dict],
id_field: str = "id",
content_field: str = "content",
metadata_fields: Optional[List[str]] = None
) -> List[EmbeddedDocument]:
"""Process documents for vector storage."""
processed = []
for doc in documents:
content = doc[content_field]
doc_id = doc[id_field]
# Preprocess
cleaned = self.preprocess(content)
# Chunk
chunks = chunk_by_tokens(
cleaned,
self.chunk_size,
self.chunk_overlap
)
# Create embeddings
embeddings = await self.embeddings.aembed_documents(chunks)
# Create records
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
metadata = {"document_id": doc_id, "chunk_index": i}
# Add specified metadata fields
if metadata_fields:
for field in metadata_fields:
if field in doc:
metadata[field] = doc[field]
processed.append(EmbeddedDocument(
id=f"{doc_id}_chunk_{i}",
document_id=doc_id,
chunk_index=i,
text=chunk,
embedding=embedding,
metadata=metadata
))
return processed
# Code-specific pipeline
class CodeEmbeddingPipeline:
"""Specialized pipeline for code embeddings."""
def __init__(self):
# Use Voyage's code-specific model
self.embeddings = VoyageAIEmbeddings(model="voyage-code-3")
def chunk_code(self, code: str, language: str) -> List[dict]:
"""Chunk code by functions/classes using tree-sitter."""
try:
import tree_sitter_languages
parser = tree_sitter_languages.get_parser(language)
tree = parser.parse(bytes(code, "utf8"))
chunks = []
# Extract function and class definitions
self._extract_nodes(tree.root_node, code, chunks)
return chunks
except ImportError:
# Fallback to simple chunking
return [{"text": code, "type": "module"}]
def _extract_nodes(self, node, source_code: str, chunks: list):
"""Recursively extract function/class definitions."""
if node.type in ['function_definition', 'class_definition', 'method_definition']:
text = source_code[node.start_byte:node.end_byte]
chunks.append({
"text": text,
"type": node.type,
"name": self._get_name(node),
"start_line": node.start_point[0],
"end_line": node.end_point[0]
})
for child in node.children:
self._extract_nodes(child, source_code, chunks)
def _get_name(self, node) -> str:
"""Extract name from function/class node."""
for child in node.children:
if child.type == 'identifier' or child.type == 'name':
return child.text.decode('utf8')
return "unknown"
async def embed_with_context(
self,
chunk: str,
context: str = ""
) -> List[float]:
"""Embed code with surrounding context."""
if context:
combined = f"Context: {context}\n\nCode:\n{chunk}"
else:
combined = chunk
return await self.embeddings.aembed_query(combined)
Template 6: Embedding Quality Evaluation
import numpy as np
from typing import List, Dict
def evaluate_retrieval_quality(
queries: List[str],
relevant_docs: List[List[str]], # List of relevant doc IDs per query
retrieved_docs: List[List[str]], # List of retrieved doc IDs per query
k: int = 10
) -> Dict[str, float]:
"""Evaluate embedding quality for retrieval."""
def precision_at_k(relevant: set, retrieved: List[str], k: int) -> float:
retrieved_k = retrieved[:k]
relevant_retrieved = len(set(retrieved_k) & relevant)
return relevant_retrieved / k if k > 0 else 0
def recall_at_k(relevant: set, retrieved: List[str], k: int) -> float:
retrieved_k = retrieved[:k]
relevant_retrieved = len(set(retrieved_k) & relevant)
return relevant_retrieved / len(relevant) if relevant else 0
def mrr(relevant: set, retrieved: List[str]) -> float:
for i, doc in enumerate(retrieved):
if doc in relevant:
return 1 / (i + 1)
return 0
def ndcg_at_k(relevant: set, retrieved: List[str], k: int) -> float:
dcg = sum(
1 / np.log2(i + 2) if doc in relevant else 0
for i, doc in enumerate(retrieved[:k])
)
ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(relevant), k)))
return dcg / ideal_dcg if ideal_dcg > 0 else 0
metrics = {
f"precision@{k}": [],
f"recall@{k}": [],
"mrr": [],
f"ndcg@{k}": []
}
for relevant, retrieved in zip(relevant_docs, retrieved_docs):
relevant_set = set(relevant)
metrics[f"precision@{k}"].append(precision_at_k(relevant_set, retrieved, k))
metrics[f"recall@{k}"].append(recall_at_k(relevant_set, retrieved, k))
metrics["mrr"].append(mrr(relevant_set, retrieved))
metrics[f"ndcg@{k}"].append(ndcg_at_k(relevant_set, retrieved, k))
return {name: np.mean(values) for name, values in metrics.items()}
def compute_embedding_similarity(
embeddings1: np.ndarray,
embeddings2: np.ndarray,
metric: str = "cosine"
) -> np.ndarray:
"""Compute similarity matrix between embedding sets."""
if metric == "cosine":
# Normalize and compute dot product
norm1 = embeddings1 / np.linalg.norm(embeddings1, axis=1, keepdims=True)
norm2 = embeddings2 / np.linalg.norm(embeddings2, axis=1, keepdims=True)
return norm1 @ norm2.T
elif metric == "euclidean":
from scipy.spatial.distance import cdist
return -cdist(embeddings1, embeddings2, metric='euclidean')
elif metric == "dot":
return embeddings1 @ embeddings2.T
else:
raise ValueError(f"Unknown metric: {metric}")
def compare_embedding_models(
texts: List[str],
models: Dict[str, callable],
queries: List[str],
relevant_indices: List[List[int]],
k: int = 5
) -> Dict[str, Dict[str, float]]:
"""Compare multiple embedding models on retrieval quality."""
results = {}
for model_name, embed_fn in models.items():
# Embed all texts
doc_embeddings = np.array(embed_fn(texts))
retrieved_per_query = []
for query in queries:
query_embedding = np.array(embed_fn([query])[0])
# Compute similarities
similarities = compute_embedding_similarity(
query_embedding.reshape(1, -1),
doc_embeddings,
metric="cosine"
)[0]
# Get top-k indices
top_k_indices = np.argsort(similarities)[::-1][:k]
retrieved_per_query.append([str(i) for i in top_k_indices])
# Convert relevant indices to string IDs
relevant_docs = [[str(i) for i in indices] for indices in relevant_indices]
results[model_name] = evaluate_retrieval_quality(
queries, relevant_docs, retrieved_per_query, k
)
return results
Best Practices
Do's
- Match model to use case: Code vs prose vs multilingual
- Chunk thoughtfully: Preserve semantic boundaries
- Normalize embeddings: For cosine similarity search
- Batch requests: More efficient than one-by-one
- Cache embeddings: Avoid recomputing for static content
- Use Voyage AI for Claude apps: Recommended by Anthropic
Don'ts
- Don't ignore token limits: Truncation loses information
- Don't mix embedding models: Incompatible vector spaces
- Don't skip preprocessing: Garbage in, garbage out
- Don't over-chunk: Lose important context
- Don't forget metadata: Essential for filtering and debugging
Resources
Weekly Installs
192
Repository
wshobson/agentsInstalled on
claude-code153
antigravity119
opencode118
cursor117
gemini-cli117
codex102