Learnixo

RAG Systems · Lesson 3 of 24

The Full RAG Pipeline: Index → Retrieve → Generate

Pipeline Phases

A RAG system has two distinct phases: offline indexing and online retrieval+generation.

OFFLINE (indexing):
  Documents → Chunk → Embed → Store
  Runs once, then on document updates
  
ONLINE (retrieval + generation):
  Query → Embed → Search → Augment → Generate
  Runs for every user request

Phase 1: Indexing

Python
from sentence_transformers import SentenceTransformer
import chromadb
import uuid

embedder = SentenceTransformer("all-MiniLM-L6-v2")
chroma_client = chromadb.Client()
collection = chroma_client.create_collection("clinical_docs")

def chunk_text(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]:
    words = text.split()
    chunks = []
    for i in range(0, len(words), chunk_size - overlap):
        chunk = " ".join(words[i:i + chunk_size])
        if chunk:
            chunks.append(chunk)
    return chunks

def index_document(doc_id: str, text: str, metadata: dict) -> int:
    chunks = chunk_text(text)
    embeddings = embedder.encode(chunks).tolist()

    collection.add(
        ids=[f"{doc_id}_chunk_{i}" for i in range(len(chunks))],
        documents=chunks,
        embeddings=embeddings,
        metadatas=[{**metadata, "chunk_index": i} for i in range(len(chunks))]
    )
    return len(chunks)

# Usage
n_chunks = index_document(
    doc_id="nice_ng196",
    text="[Full text of NICE guideline NG196...]",
    metadata={"source": "NICE NG196", "topic": "atrial fibrillation", "year": 2021}
)

Phase 2: Retrieval

Python
def retrieve(query: str, top_k: int = 5, filters: dict | None = None) -> list[dict]:
    query_embedding = embedder.encode([query])[0].tolist()

    where = filters or {}
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k,
        where=where if where else None,
        include=["documents", "metadatas", "distances"]
    )

    return [
        {
            "content": doc,
            "metadata": meta,
            "distance": dist,
            "similarity": 1 - dist  # approximate for cosine
        }
        for doc, meta, dist in zip(
            results["documents"][0],
            results["metadatas"][0],
            results["distances"][0]
        )
    ]

# Usage
chunks = retrieve(
    query="Warfarin dose adjustment for elderly patients",
    top_k=5,
    filters={"topic": "atrial fibrillation"}
)

Phase 3: Augmentation

Python
def build_prompt(query: str, retrieved_chunks: list[dict]) -> str:
    context = "\n\n---\n\n".join(
        f"Source: {c['metadata'].get('source', 'Unknown')}\n{c['content']}"
        for c in retrieved_chunks
    )

    return f"""You are a clinical information assistant.
Answer the question based ONLY on the provided context.
If the answer is not in the context, say "The provided documents do not contain this information."
Always indicate which source you're drawing from.

CONTEXT:
{context}

QUESTION:
{query}

ANSWER:"""

Phase 4: Generation

Python
from anthropic import Anthropic

client = Anthropic()

def generate_answer(query: str, retrieved_chunks: list[dict]) -> dict:
    prompt = build_prompt(query, retrieved_chunks)

    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=512,
        messages=[{"role": "user", "content": prompt}]
    )

    return {
        "answer": response.content[0].text,
        "sources": [c["metadata"].get("source") for c in retrieved_chunks],
        "retrieved_chunks": [c["content"] for c in retrieved_chunks]
    }

def rag_query(user_query: str) -> dict:
    chunks = retrieve(user_query, top_k=5)
    return generate_answer(user_query, chunks)

# End-to-end usage
result = rag_query("What is the INR target range for AF patients on Warfarin?")
print(result["answer"])
print("Sources:", result["sources"])

Production Considerations

Document management:
  Track document versions — re-embed when guidelines are updated
  Soft delete: mark chunks as outdated rather than deleting (audit trail)
  Metadata filtering: filter by date, source, patient cohort

Caching:
  Cache embeddings for common queries (LRU cache)
  Cache full RAG responses for identical queries (TTL based on document update frequency)

Monitoring:
  Log all queries and retrieved chunks for debugging
  Track retrieval quality metrics (user satisfaction, correction rate)
  Alert on low-similarity retrievals (below threshold → may need more documents)

Security (clinical):
  Patient-specific RAG: filter retrieved chunks by patient ID
  Never mix patient contexts across requests
  Audit all retrieval operations for PHI compliance

Interview Answer

"A RAG pipeline has two phases. Offline indexing: documents are chunked (512 tokens with overlap), embedded with a sentence transformer, and stored in a vector database with metadata. Online retrieval and generation: the user query is embedded, a similarity search returns top-k chunks, those chunks are injected into the prompt as context, and the LLM generates a grounded answer citing the retrieved sources. Production considerations include: document version management (re-embed on updates), metadata filtering for multi-tenant scenarios (patient-specific context), caching for repeated queries, and monitoring retrieval quality metrics to detect gaps in the knowledge base."