RAG Systems · Lesson 3 of 24
The Full RAG Pipeline: Index → Retrieve → Generate
Pipeline Phases
A RAG system has two distinct phases: offline indexing and online retrieval+generation.
OFFLINE (indexing):
Documents → Chunk → Embed → Store
Runs once, then on document updates
ONLINE (retrieval + generation):
Query → Embed → Search → Augment → Generate
Runs for every user requestPhase 1: Indexing
from sentence_transformers import SentenceTransformer
import chromadb
import uuid
embedder = SentenceTransformer("all-MiniLM-L6-v2")
chroma_client = chromadb.Client()
collection = chroma_client.create_collection("clinical_docs")
def chunk_text(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]:
words = text.split()
chunks = []
for i in range(0, len(words), chunk_size - overlap):
chunk = " ".join(words[i:i + chunk_size])
if chunk:
chunks.append(chunk)
return chunks
def index_document(doc_id: str, text: str, metadata: dict) -> int:
chunks = chunk_text(text)
embeddings = embedder.encode(chunks).tolist()
collection.add(
ids=[f"{doc_id}_chunk_{i}" for i in range(len(chunks))],
documents=chunks,
embeddings=embeddings,
metadatas=[{**metadata, "chunk_index": i} for i in range(len(chunks))]
)
return len(chunks)
# Usage
n_chunks = index_document(
doc_id="nice_ng196",
text="[Full text of NICE guideline NG196...]",
metadata={"source": "NICE NG196", "topic": "atrial fibrillation", "year": 2021}
)Phase 2: Retrieval
def retrieve(query: str, top_k: int = 5, filters: dict | None = None) -> list[dict]:
query_embedding = embedder.encode([query])[0].tolist()
where = filters or {}
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
where=where if where else None,
include=["documents", "metadatas", "distances"]
)
return [
{
"content": doc,
"metadata": meta,
"distance": dist,
"similarity": 1 - dist # approximate for cosine
}
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
)
]
# Usage
chunks = retrieve(
query="Warfarin dose adjustment for elderly patients",
top_k=5,
filters={"topic": "atrial fibrillation"}
)Phase 3: Augmentation
def build_prompt(query: str, retrieved_chunks: list[dict]) -> str:
context = "\n\n---\n\n".join(
f"Source: {c['metadata'].get('source', 'Unknown')}\n{c['content']}"
for c in retrieved_chunks
)
return f"""You are a clinical information assistant.
Answer the question based ONLY on the provided context.
If the answer is not in the context, say "The provided documents do not contain this information."
Always indicate which source you're drawing from.
CONTEXT:
{context}
QUESTION:
{query}
ANSWER:"""Phase 4: Generation
from anthropic import Anthropic
client = Anthropic()
def generate_answer(query: str, retrieved_chunks: list[dict]) -> dict:
prompt = build_prompt(query, retrieved_chunks)
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": prompt}]
)
return {
"answer": response.content[0].text,
"sources": [c["metadata"].get("source") for c in retrieved_chunks],
"retrieved_chunks": [c["content"] for c in retrieved_chunks]
}
def rag_query(user_query: str) -> dict:
chunks = retrieve(user_query, top_k=5)
return generate_answer(user_query, chunks)
# End-to-end usage
result = rag_query("What is the INR target range for AF patients on Warfarin?")
print(result["answer"])
print("Sources:", result["sources"])Production Considerations
Document management:
Track document versions — re-embed when guidelines are updated
Soft delete: mark chunks as outdated rather than deleting (audit trail)
Metadata filtering: filter by date, source, patient cohort
Caching:
Cache embeddings for common queries (LRU cache)
Cache full RAG responses for identical queries (TTL based on document update frequency)
Monitoring:
Log all queries and retrieved chunks for debugging
Track retrieval quality metrics (user satisfaction, correction rate)
Alert on low-similarity retrievals (below threshold → may need more documents)
Security (clinical):
Patient-specific RAG: filter retrieved chunks by patient ID
Never mix patient contexts across requests
Audit all retrieval operations for PHI complianceInterview Answer
"A RAG pipeline has two phases. Offline indexing: documents are chunked (512 tokens with overlap), embedded with a sentence transformer, and stored in a vector database with metadata. Online retrieval and generation: the user query is embedded, a similarity search returns top-k chunks, those chunks are injected into the prompt as context, and the LLM generates a grounded answer citing the retrieved sources. Production considerations include: document version management (re-embed on updates), metadata filtering for multi-tenant scenarios (patient-specific context), caching for repeated queries, and monitoring retrieval quality metrics to detect gaps in the knowledge base."