Advanced RAG Patterns
Beyond basic RAG: RAPTOR hierarchical indexing, SELF-RAG with retrieval decisions, iterative retrieval, adaptive context assembly, and reasoning over retrieved content.
Beyond Basic RAG
Basic RAG (embed ā retrieve ā generate) works well but breaks down on:
- Multi-hop questions requiring information from multiple disconnected documents
- Questions at different levels of abstraction (specific fact vs general principle)
- Questions where retrieval itself needs to be iterative and adaptive
- Cases where the model should decide whether retrieval is even needed
Advanced RAG patterns address these limitations.
RAPTOR: Hierarchical Document Indexing
RAPTOR (Recursive Abstractive Processing for Tree-Organized Retrieval) builds a tree of document summaries at increasing levels of abstraction:
from openai import OpenAI
import numpy as np
from sklearn.cluster import KMeans
from sklearn.preprocessing import normalize
client = OpenAI()
def summarize_cluster(texts: list[str], level: int) -> str:
"""Generate an abstract summary of a cluster of documents."""
combined = "\n\n".join(texts[:5]) # Use first 5 if cluster is large
prompt = f"""Summarize these {'document chunks' if level == 0 else 'summaries'} into a coherent overview.
Write a {150 if level == 0 else 100} word {'detailed summary' if level == 0 else 'high-level overview'}.
Content to summarize:
{combined}"""
return client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=200,
temperature=0,
).choices[0].message.content
def build_raptor_tree(
leaf_chunks: list[str],
leaf_embeddings: np.ndarray,
max_levels: int = 3,
n_clusters: int = 10,
) -> dict:
"""
Build RAPTOR hierarchical index.
Level 0: Original document chunks (leaves)
Level 1: Summaries of clusters of level-0 chunks
Level 2: Summaries of clusters of level-1 summaries
...
At query time, retrieve from all levels and let the LLM pick what's relevant.
"""
tree = {0: {"texts": leaf_chunks, "embeddings": leaf_embeddings}}
current_texts = leaf_chunks
current_embeddings = leaf_embeddings
for level in range(1, max_levels + 1):
if len(current_texts) <= n_clusters:
break
# Cluster the current level
n_k = min(n_clusters, len(current_texts) // 2)
kmeans = KMeans(n_clusters=n_k, random_state=42, n_init=10)
labels = kmeans.fit_predict(normalize(current_embeddings))
# Generate summaries for each cluster
summaries = []
for cluster_id in range(n_k):
cluster_texts = [t for t, l in zip(current_texts, labels) if l == cluster_id]
if cluster_texts:
summary = summarize_cluster(cluster_texts, level=level - 1)
summaries.append(summary)
# Embed the summaries
response = client.embeddings.create(
model="text-embedding-3-small",
input=summaries,
)
summary_embeddings = np.array([e.embedding for e in response.data])
tree[level] = {"texts": summaries, "embeddings": summary_embeddings}
current_texts = summaries
current_embeddings = summary_embeddings
return tree
def raptor_retrieve(
query_embedding: np.ndarray,
tree: dict,
top_k_per_level: int = 3,
) -> list[dict]:
"""Retrieve from all levels of the RAPTOR tree."""
all_results = []
for level, level_data in tree.items():
embeddings = level_data["embeddings"]
texts = level_data["texts"]
# Cosine similarity at this level
similarities = embeddings @ query_embedding
top_indices = np.argsort(-similarities)[:top_k_per_level]
for idx in top_indices:
all_results.append({
"level": level,
"text": texts[idx],
"similarity": float(similarities[idx]),
})
# Sort by similarity across all levels
return sorted(all_results, key=lambda x: x["similarity"], reverse=True)SELF-RAG: Adaptive Retrieval Decisions
SELF-RAG (Asai et al., 2023) trains a model to decide when to retrieve, evaluate retrieved documents, and critique its own responses:
def self_rag_pipeline(
query: str,
retriever,
generator_model: str = "gpt-4o",
) -> dict:
"""
Simplified SELF-RAG pipeline.
The model decides whether retrieval is needed, evaluates relevance,
and critiques the generated response.
"""
# Step 1: Should we retrieve?
retrieve_decision = client.chat.completions.create(
model=generator_model,
messages=[
{
"role": "system",
"content": """Decide if external retrieval is needed to answer this question.
Return JSON: {"retrieve": true/false, "reason": "why"}
Retrieve if: the question requires specific facts, recent events, domain-specific data.
Don't retrieve if: the question is about general knowledge you're confident about.""",
},
{"role": "user", "content": f"Question: {query}"},
],
response_format={"type": "json_object"},
temperature=0,
)
import json
decision = json.loads(retrieve_decision.choices[0].message.content)
if not decision.get("retrieve", True):
# Generate without retrieval
response = client.chat.completions.create(
model=generator_model,
messages=[{"role": "user", "content": query}],
temperature=0,
).choices[0].message.content
return {
"retrieved": False,
"response": response,
"reason": decision["reason"],
}
# Step 2: Retrieve documents
from openai import OpenAI as OAI
oai_client = OAI()
emb = oai_client.embeddings.create(model="text-embedding-3-small", input=[query])
query_embedding = emb.data[0].embedding
candidates = retriever.retrieve(query_embedding, top_k=5)
# Step 3: Evaluate retrieved document relevance
relevant_docs = []
for doc in candidates:
relevance_check = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": f"""Is this document relevant to answering: "{query}"?
Document: {doc['content'][:500]}
Return JSON: {{"relevant": true/false}}""",
}
],
response_format={"type": "json_object"},
temperature=0,
)
rel_result = json.loads(relevance_check.choices[0].message.content)
if rel_result.get("relevant", False):
relevant_docs.append(doc)
# Step 4: Generate response
if not relevant_docs:
context = "No relevant documents found."
else:
context = "\n\n".join([d["content"] for d in relevant_docs])
response = client.chat.completions.create(
model=generator_model,
messages=[
{
"role": "system",
"content": "Answer based on the provided context. If context is insufficient, say so.",
},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"},
],
temperature=0,
).choices[0].message.content
# Step 5: Critique the response
critique = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": f"""Does this response faithfully use only the provided context?
Context used:
{context[:1000]}
Response:
{response}
Return JSON: {{"faithful": true/false, "issue": "describe any issues"}}""",
}
],
response_format={"type": "json_object"},
temperature=0,
)
critique_result = json.loads(critique.choices[0].message.content)
return {
"retrieved": True,
"relevant_docs_used": len(relevant_docs),
"response": response,
"faithful": critique_result.get("faithful", True),
"critique": critique_result.get("issue", ""),
}Iterative Retrieval
For complex multi-hop questions, retrieve iteratively ā each answer informs the next retrieval:
def iterative_retrieval(
original_query: str,
retriever,
embedding_fn,
max_iterations: int = 3,
top_k: int = 3,
) -> dict:
"""
Iteratively retrieve and reason until the question is answered.
Each iteration uses the accumulated context to decide what to retrieve next.
"""
accumulated_context = []
retrieval_log = []
current_query = original_query
for iteration in range(max_iterations):
# Retrieve for current (refined) query
query_emb = embedding_fn(current_query)
docs = retriever.retrieve(query_emb, top_k=top_k)
# Add to accumulated context (dedup by content)
existing_content = {d["content"] for d in accumulated_context}
new_docs = [d for d in docs if d["content"] not in existing_content]
accumulated_context.extend(new_docs)
retrieval_log.append({
"iteration": iteration,
"query": current_query,
"docs_retrieved": len(new_docs),
})
# Ask: do we have enough context? What else do we need?
context_text = "\n\n".join([d["content"] for d in accumulated_context])
decision_response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": f"""Given this context and original question, determine if you can answer or need more information.
Original question: {original_query}
Current context:
{context_text[:3000]}
Return JSON:
{{
"can_answer": true/false,
"missing_information": "what's still needed (if can_answer is false)",
"refined_query": "more specific query to find missing info (if can_answer is false)"
}}""",
}
],
response_format={"type": "json_object"},
temperature=0,
)
import json
decision = json.loads(decision_response.choices[0].message.content)
if decision.get("can_answer", False) or iteration == max_iterations - 1:
break
current_query = decision.get("refined_query", original_query)
# Final answer generation
final_context = "\n\n".join([d["content"] for d in accumulated_context])
final_answer = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Answer using only the provided context."},
{
"role": "user",
"content": f"Context:\n{final_context}\n\nQuestion: {original_query}",
},
],
temperature=0,
).choices[0].message.content
return {
"answer": final_answer,
"iterations": len(retrieval_log),
"retrieval_log": retrieval_log,
"total_docs_used": len(accumulated_context),
}Contextual Compression
Use an LLM to extract only the relevant portions from retrieved documents:
def contextual_compress(
query: str,
document: str,
compression_ratio: float = 0.3, # Aim for 30% of original length
) -> str | None:
"""
Extract only the query-relevant sentences from a document.
Returns None if the document has no relevant content.
"""
target_words = int(len(document.split()) * compression_ratio)
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": f"""Extract only the sentences from this document that are directly relevant to: "{query}"
Document:
{document[:2000]}
Rules:
- Include only sentences that directly answer or inform the question
- If no sentences are relevant, return "IRRELEVANT"
- Keep extracted sentences in their original form
- Target around {target_words} words""",
}
],
max_tokens=target_words * 2,
temperature=0,
).choices[0].message.content.strip()
if response == "IRRELEVANT":
return None
return response
def compress_retrieved_documents(
query: str,
documents: list[dict],
) -> list[dict]:
"""Apply contextual compression to all retrieved documents."""
compressed = []
for doc in documents:
compressed_content = contextual_compress(query, doc["content"])
if compressed_content:
compressed.append({
**doc,
"content": compressed_content,
"original_length": len(doc["content"]),
"compressed_length": len(compressed_content),
})
return compressed
# Benefit: reduces context window usage by 50-70% while keeping relevant information
# Tradeoff: adds LLM calls (one per document) ā use gpt-4o-mini to keep cost lowFound this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.