Integrating RAG with LLMs
How retrieval-augmented generation works end-to-end: embedding documents, querying vector stores, assembling context, and building production-grade RAG pipelines.
Why RAG Changes LLM Behavior
Without RAG, an LLM answers from parametric memory (knowledge encoded in weights). This knowledge is frozen at training cutoff and can't be updated without retraining. RAG (Retrieval-Augmented Generation) adds a retrieval step that provides relevant documents as context, allowing the model to answer from current, domain-specific, or private knowledge.
RAG changes the answer quality in two ways:
- Grounding: The model answers from retrieved text rather than from memory — reducing hallucination
- Scope extension: The model can answer questions about documents that didn't exist at training time
End-to-End RAG Pipeline
from dataclasses import dataclass
from typing import Optional
import numpy as np
from openai import OpenAI
client = OpenAI()
@dataclass
class Document:
id: str
title: str
content: str
metadata: dict
@dataclass
class RetrievedContext:
documents: list[Document]
scores: list[float]
query: str
class SimpleRAGPipeline:
"""End-to-end RAG pipeline with in-memory vector store."""
def __init__(self, embedding_model: str = "text-embedding-3-small"):
self.embedding_model = embedding_model
self.documents: list[Document] = []
self.embeddings: Optional[np.ndarray] = None
def add_documents(self, documents: list[Document]) -> None:
"""Embed and store documents."""
texts = [f"{doc.title}\n{doc.content}" for doc in documents]
# Batch embed all documents
response = client.embeddings.create(
model=self.embedding_model,
input=texts,
)
new_embeddings = np.array([e.embedding for e in response.data])
self.documents.extend(documents)
if self.embeddings is None:
self.embeddings = new_embeddings
else:
self.embeddings = np.vstack([self.embeddings, new_embeddings])
def retrieve(self, query: str, top_k: int = 5) -> RetrievedContext:
"""Retrieve top-k most relevant documents for a query."""
# Embed the query
response = client.embeddings.create(
model=self.embedding_model,
input=[query],
)
query_embedding = np.array(response.data[0].embedding)
# Cosine similarity (embeddings are unit vectors — dot product = cosine similarity)
similarities = self.embeddings @ query_embedding
# Get top-k indices
top_indices = np.argsort(similarities)[::-1][:top_k]
return RetrievedContext(
documents=[self.documents[i] for i in top_indices],
scores=[float(similarities[i]) for i in top_indices],
query=query,
)
def generate(
self,
query: str,
context: RetrievedContext,
system_prompt: str = None,
) -> str:
"""Generate answer from retrieved context."""
docs_text = "\n\n".join([
f"[Document {i+1}: {doc.title}]\n{doc.content}"
for i, doc in enumerate(context.documents)
])
system = system_prompt or """You are a helpful assistant. Answer questions using ONLY the provided documents.
If the documents don't contain the answer, say "The provided documents don't address this."
Cite documents by number when making factual claims."""
messages = [
{"role": "system", "content": system},
{
"role": "user",
"content": f"Documents:\n{docs_text}\n\nQuestion: {query}",
},
]
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0,
)
return response.choices[0].message.content
def query(self, question: str, top_k: int = 5) -> dict:
"""Full RAG query: retrieve then generate."""
context = self.retrieve(question, top_k=top_k)
answer = self.generate(question, context)
return {
"question": question,
"answer": answer,
"sources": [{"title": doc.title, "score": score}
for doc, score in zip(context.documents, context.scores)],
}Production RAG with Chroma
Using a persistent vector store instead of in-memory arrays:
import chromadb
from chromadb.utils import embedding_functions
# Initialize Chroma client
chroma_client = chromadb.PersistentClient(path="./chroma_db")
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key="your-api-key",
model_name="text-embedding-3-small",
)
# Create or get collection
collection = chroma_client.get_or_create_collection(
name="clinical_documents",
embedding_function=openai_ef,
metadata={"hnsw:space": "cosine"},
)
def ingest_documents(documents: list[Document]) -> None:
"""Ingest documents into Chroma."""
collection.add(
ids=[doc.id for doc in documents],
documents=[doc.content for doc in documents],
metadatas=[{"title": doc.title, **doc.metadata} for doc in documents],
)
print(f"Ingested {len(documents)} documents")
def retrieve_from_chroma(
query: str,
top_k: int = 5,
filter_metadata: dict = None,
) -> list[dict]:
"""Query Chroma for relevant documents with optional metadata filtering."""
results = collection.query(
query_texts=[query],
n_results=top_k,
where=filter_metadata, # e.g., {"category": "drug_interaction"}
include=["documents", "metadatas", "distances"],
)
documents = []
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0],
):
documents.append({
"content": doc,
"title": meta.get("title", ""),
"metadata": meta,
"score": 1 - dist, # Convert distance to similarity
})
return documentsChunking Strategies
How you split documents dramatically affects retrieval quality:
from typing import Iterator
def chunk_by_sentences(
text: str,
chunk_size: int = 500,
overlap: int = 100,
) -> list[str]:
"""Split text into overlapping chunks by approximate token count."""
import re
# Split on sentence boundaries
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence.split())
if current_size + sentence_size > chunk_size and current_chunk:
# Save current chunk
chunks.append(" ".join(current_chunk))
# Keep last few sentences for overlap
overlap_words = overlap
overlap_chunk = []
for sent in reversed(current_chunk):
overlap_chunk.insert(0, sent)
if sum(len(s.split()) for s in overlap_chunk) >= overlap_words:
break
current_chunk = overlap_chunk
current_size = sum(len(s.split()) for s in current_chunk)
current_chunk.append(sentence)
current_size += sentence_size
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def chunk_by_markdown_sections(text: str) -> list[dict]:
"""Split markdown document at header boundaries — natural semantic units."""
import re
sections = []
current_header = ""
current_content = []
for line in text.split("\n"):
header_match = re.match(r'^(#{1,3})\s+(.+)$', line)
if header_match:
if current_content:
sections.append({
"header": current_header,
"content": "\n".join(current_content),
})
current_header = header_match.group(2)
current_content = []
else:
current_content.append(line)
if current_content:
sections.append({
"header": current_header,
"content": "\n".join(current_content),
})
return sectionsHybrid Search: Dense + Sparse
Combining semantic search (embeddings) with keyword search (BM25):
from rank_bm25 import BM25Okapi
import numpy as np
class HybridRetriever:
"""Combines dense (embedding) and sparse (BM25) retrieval."""
def __init__(self, documents: list[str], embeddings: np.ndarray):
self.documents = documents
self.embeddings = embeddings
# Build BM25 index for keyword search
tokenized = [doc.lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized)
def retrieve(
self,
query: str,
query_embedding: np.ndarray,
top_k: int = 10,
alpha: float = 0.5, # Weight: 0=BM25 only, 1=embedding only
) -> list[tuple[int, float]]:
"""
Hybrid retrieval using Reciprocal Rank Fusion (RRF).
alpha controls the balance between dense and sparse.
"""
# Dense (semantic) retrieval
dense_scores = self.embeddings @ query_embedding
dense_ranks = np.argsort(-dense_scores) # Descending
# Sparse (BM25) retrieval
bm25_scores = np.array(self.bm25.get_scores(query.lower().split()))
sparse_ranks = np.argsort(-bm25_scores) # Descending
# Reciprocal Rank Fusion
rrf_k = 60 # Standard RRF constant
doc_scores = {}
for rank, doc_idx in enumerate(dense_ranks):
doc_scores[doc_idx] = doc_scores.get(doc_idx, 0) + alpha / (rrf_k + rank)
for rank, doc_idx in enumerate(sparse_ranks):
doc_scores[doc_idx] = doc_scores.get(doc_idx, 0) + (1 - alpha) / (rrf_k + rank)
# Sort by combined score
sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_docs[:top_k]Evaluating RAG Pipelines
from dataclasses import dataclass
@dataclass
class RAGEvalCase:
question: str
expected_answer: str
relevant_doc_ids: list[str] # Ground truth relevant documents
def evaluate_retrieval(
retriever,
eval_cases: list[RAGEvalCase],
top_k: int = 5,
) -> dict:
"""Compute retrieval precision, recall, and MRR."""
precisions = []
recalls = []
mrrs = []
for case in eval_cases:
retrieved = retriever.retrieve(case.question, top_k=top_k)
retrieved_ids = [doc["id"] for doc in retrieved]
relevant_set = set(case.relevant_doc_ids)
# Precision: what fraction of retrieved docs are relevant?
relevant_retrieved = set(retrieved_ids) & relevant_set
precision = len(relevant_retrieved) / len(retrieved_ids)
# Recall: what fraction of relevant docs were retrieved?
recall = len(relevant_retrieved) / len(relevant_set) if relevant_set else 0
# Mean Reciprocal Rank: position of first relevant document
mrr = 0
for rank, doc_id in enumerate(retrieved_ids, 1):
if doc_id in relevant_set:
mrr = 1 / rank
break
precisions.append(precision)
recalls.append(recall)
mrrs.append(mrr)
return {
"precision_at_k": sum(precisions) / len(precisions),
"recall_at_k": sum(recalls) / len(recalls),
"mrr": sum(mrrs) / len(mrrs),
}RAG Failure Modes and Fixes
Retrieval fails (wrong docs):
- Symptom: model says "the documents don't address this" when they do
- Fix: improve chunking, use hybrid search, rerank with a cross-encoder
Context ignored (model uses parametric knowledge):
- Symptom: model answers from training data, ignoring retrieved docs
- Fix: stronger grounding instruction, move documents before the question
Context window overflow:
- Symptom: long documents truncated, model misses key info
- Fix: reduce chunk size, retrieve fewer documents, summarize before inserting
Lost-in-the-middle:
- Symptom: model ignores middle documents but uses first and last
- Fix: put most relevant document first, use fewer documents
Citation hallucination:
- Symptom: model cites "Document 3" but the claim isn't in Document 3
- Fix: validate citations programmatically by checking if the claim is in the cited doc
def validate_citations(answer: str, documents: list[dict]) -> list[dict]:
"""Check that each citation in the answer is supported by the cited document."""
import re
citations = re.findall(r'\[Doc(?:ument)?\s*(\d+)\]', answer)
issues = []
for citation in citations:
doc_idx = int(citation) - 1
if doc_idx >= len(documents):
issues.append(f"[Doc {citation}] doesn't exist")
continue
doc_content = documents[doc_idx]["content"]
# Extract the claim preceding this citation
claim_match = re.search(
r'([^.!?]+)' + re.escape(f'[Doc {citation}]'),
answer,
)
if claim_match:
claim = claim_match.group(1).strip()
# Rough check: key words of claim in doc content
key_words = [w for w in claim.split() if len(w) > 4]
if not any(kw.lower() in doc_content.lower() for kw in key_words[:3]):
issues.append(f"Claim '{claim[:50]}...' not found in [Doc {citation}]")
return issuesFound this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.