Learnixo
Back to blog
AI Systemsadvanced

Integrating RAG with LLMs

How retrieval-augmented generation works end-to-end: embedding documents, querying vector stores, assembling context, and building production-grade RAG pipelines.

Asma Hafeez KhanMay 16, 20267 min read
LLMRAGVector SearchEmbeddingsProduction
Share:𝕏

Why RAG Changes LLM Behavior

Without RAG, an LLM answers from parametric memory (knowledge encoded in weights). This knowledge is frozen at training cutoff and can't be updated without retraining. RAG (Retrieval-Augmented Generation) adds a retrieval step that provides relevant documents as context, allowing the model to answer from current, domain-specific, or private knowledge.

RAG changes the answer quality in two ways:

  1. Grounding: The model answers from retrieved text rather than from memory — reducing hallucination
  2. Scope extension: The model can answer questions about documents that didn't exist at training time

End-to-End RAG Pipeline

Python
from dataclasses import dataclass
from typing import Optional
import numpy as np
from openai import OpenAI

client = OpenAI()

@dataclass
class Document:
    id: str
    title: str
    content: str
    metadata: dict

@dataclass
class RetrievedContext:
    documents: list[Document]
    scores: list[float]
    query: str

class SimpleRAGPipeline:
    """End-to-end RAG pipeline with in-memory vector store."""

    def __init__(self, embedding_model: str = "text-embedding-3-small"):
        self.embedding_model = embedding_model
        self.documents: list[Document] = []
        self.embeddings: Optional[np.ndarray] = None

    def add_documents(self, documents: list[Document]) -> None:
        """Embed and store documents."""
        texts = [f"{doc.title}\n{doc.content}" for doc in documents]

        # Batch embed all documents
        response = client.embeddings.create(
            model=self.embedding_model,
            input=texts,
        )
        new_embeddings = np.array([e.embedding for e in response.data])

        self.documents.extend(documents)
        if self.embeddings is None:
            self.embeddings = new_embeddings
        else:
            self.embeddings = np.vstack([self.embeddings, new_embeddings])

    def retrieve(self, query: str, top_k: int = 5) -> RetrievedContext:
        """Retrieve top-k most relevant documents for a query."""
        # Embed the query
        response = client.embeddings.create(
            model=self.embedding_model,
            input=[query],
        )
        query_embedding = np.array(response.data[0].embedding)

        # Cosine similarity (embeddings are unit vectors  dot product = cosine similarity)
        similarities = self.embeddings @ query_embedding

        # Get top-k indices
        top_indices = np.argsort(similarities)[::-1][:top_k]

        return RetrievedContext(
            documents=[self.documents[i] for i in top_indices],
            scores=[float(similarities[i]) for i in top_indices],
            query=query,
        )

    def generate(
        self,
        query: str,
        context: RetrievedContext,
        system_prompt: str = None,
    ) -> str:
        """Generate answer from retrieved context."""
        docs_text = "\n\n".join([
            f"[Document {i+1}: {doc.title}]\n{doc.content}"
            for i, doc in enumerate(context.documents)
        ])

        system = system_prompt or """You are a helpful assistant. Answer questions using ONLY the provided documents.
If the documents don't contain the answer, say "The provided documents don't address this."
Cite documents by number when making factual claims."""

        messages = [
            {"role": "system", "content": system},
            {
                "role": "user",
                "content": f"Documents:\n{docs_text}\n\nQuestion: {query}",
            },
        ]

        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            temperature=0,
        )
        return response.choices[0].message.content

    def query(self, question: str, top_k: int = 5) -> dict:
        """Full RAG query: retrieve then generate."""
        context = self.retrieve(question, top_k=top_k)
        answer = self.generate(question, context)

        return {
            "question": question,
            "answer": answer,
            "sources": [{"title": doc.title, "score": score}
                       for doc, score in zip(context.documents, context.scores)],
        }

Production RAG with Chroma

Using a persistent vector store instead of in-memory arrays:

Python
import chromadb
from chromadb.utils import embedding_functions

# Initialize Chroma client
chroma_client = chromadb.PersistentClient(path="./chroma_db")

openai_ef = embedding_functions.OpenAIEmbeddingFunction(
    api_key="your-api-key",
    model_name="text-embedding-3-small",
)

# Create or get collection
collection = chroma_client.get_or_create_collection(
    name="clinical_documents",
    embedding_function=openai_ef,
    metadata={"hnsw:space": "cosine"},
)


def ingest_documents(documents: list[Document]) -> None:
    """Ingest documents into Chroma."""
    collection.add(
        ids=[doc.id for doc in documents],
        documents=[doc.content for doc in documents],
        metadatas=[{"title": doc.title, **doc.metadata} for doc in documents],
    )
    print(f"Ingested {len(documents)} documents")


def retrieve_from_chroma(
    query: str,
    top_k: int = 5,
    filter_metadata: dict = None,
) -> list[dict]:
    """Query Chroma for relevant documents with optional metadata filtering."""
    results = collection.query(
        query_texts=[query],
        n_results=top_k,
        where=filter_metadata,  # e.g., {"category": "drug_interaction"}
        include=["documents", "metadatas", "distances"],
    )

    documents = []
    for doc, meta, dist in zip(
        results["documents"][0],
        results["metadatas"][0],
        results["distances"][0],
    ):
        documents.append({
            "content": doc,
            "title": meta.get("title", ""),
            "metadata": meta,
            "score": 1 - dist,  # Convert distance to similarity
        })

    return documents

Chunking Strategies

How you split documents dramatically affects retrieval quality:

Python
from typing import Iterator

def chunk_by_sentences(
    text: str,
    chunk_size: int = 500,
    overlap: int = 100,
) -> list[str]:
    """Split text into overlapping chunks by approximate token count."""
    import re

    # Split on sentence boundaries
    sentences = re.split(r'(?<=[.!?])\s+', text)

    chunks = []
    current_chunk = []
    current_size = 0

    for sentence in sentences:
        sentence_size = len(sentence.split())

        if current_size + sentence_size > chunk_size and current_chunk:
            # Save current chunk
            chunks.append(" ".join(current_chunk))

            # Keep last few sentences for overlap
            overlap_words = overlap
            overlap_chunk = []
            for sent in reversed(current_chunk):
                overlap_chunk.insert(0, sent)
                if sum(len(s.split()) for s in overlap_chunk) >= overlap_words:
                    break

            current_chunk = overlap_chunk
            current_size = sum(len(s.split()) for s in current_chunk)

        current_chunk.append(sentence)
        current_size += sentence_size

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks


def chunk_by_markdown_sections(text: str) -> list[dict]:
    """Split markdown document at header boundaries — natural semantic units."""
    import re

    sections = []
    current_header = ""
    current_content = []

    for line in text.split("\n"):
        header_match = re.match(r'^(#{1,3})\s+(.+)$', line)
        if header_match:
            if current_content:
                sections.append({
                    "header": current_header,
                    "content": "\n".join(current_content),
                })
            current_header = header_match.group(2)
            current_content = []
        else:
            current_content.append(line)

    if current_content:
        sections.append({
            "header": current_header,
            "content": "\n".join(current_content),
        })

    return sections

Hybrid Search: Dense + Sparse

Combining semantic search (embeddings) with keyword search (BM25):

Python
from rank_bm25 import BM25Okapi
import numpy as np

class HybridRetriever:
    """Combines dense (embedding) and sparse (BM25) retrieval."""

    def __init__(self, documents: list[str], embeddings: np.ndarray):
        self.documents = documents
        self.embeddings = embeddings

        # Build BM25 index for keyword search
        tokenized = [doc.lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized)

    def retrieve(
        self,
        query: str,
        query_embedding: np.ndarray,
        top_k: int = 10,
        alpha: float = 0.5,  # Weight: 0=BM25 only, 1=embedding only
    ) -> list[tuple[int, float]]:
        """
        Hybrid retrieval using Reciprocal Rank Fusion (RRF).
        alpha controls the balance between dense and sparse.
        """
        # Dense (semantic) retrieval
        dense_scores = self.embeddings @ query_embedding
        dense_ranks = np.argsort(-dense_scores)  # Descending

        # Sparse (BM25) retrieval
        bm25_scores = np.array(self.bm25.get_scores(query.lower().split()))
        sparse_ranks = np.argsort(-bm25_scores)  # Descending

        # Reciprocal Rank Fusion
        rrf_k = 60  # Standard RRF constant
        doc_scores = {}

        for rank, doc_idx in enumerate(dense_ranks):
            doc_scores[doc_idx] = doc_scores.get(doc_idx, 0) + alpha / (rrf_k + rank)

        for rank, doc_idx in enumerate(sparse_ranks):
            doc_scores[doc_idx] = doc_scores.get(doc_idx, 0) + (1 - alpha) / (rrf_k + rank)

        # Sort by combined score
        sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
        return sorted_docs[:top_k]

Evaluating RAG Pipelines

Python
from dataclasses import dataclass

@dataclass
class RAGEvalCase:
    question: str
    expected_answer: str
    relevant_doc_ids: list[str]  # Ground truth relevant documents

def evaluate_retrieval(
    retriever,
    eval_cases: list[RAGEvalCase],
    top_k: int = 5,
) -> dict:
    """Compute retrieval precision, recall, and MRR."""
    precisions = []
    recalls = []
    mrrs = []

    for case in eval_cases:
        retrieved = retriever.retrieve(case.question, top_k=top_k)
        retrieved_ids = [doc["id"] for doc in retrieved]
        relevant_set = set(case.relevant_doc_ids)

        # Precision: what fraction of retrieved docs are relevant?
        relevant_retrieved = set(retrieved_ids) & relevant_set
        precision = len(relevant_retrieved) / len(retrieved_ids)

        # Recall: what fraction of relevant docs were retrieved?
        recall = len(relevant_retrieved) / len(relevant_set) if relevant_set else 0

        # Mean Reciprocal Rank: position of first relevant document
        mrr = 0
        for rank, doc_id in enumerate(retrieved_ids, 1):
            if doc_id in relevant_set:
                mrr = 1 / rank
                break

        precisions.append(precision)
        recalls.append(recall)
        mrrs.append(mrr)

    return {
        "precision_at_k": sum(precisions) / len(precisions),
        "recall_at_k": sum(recalls) / len(recalls),
        "mrr": sum(mrrs) / len(mrrs),
    }

RAG Failure Modes and Fixes

Retrieval fails (wrong docs):

  • Symptom: model says "the documents don't address this" when they do
  • Fix: improve chunking, use hybrid search, rerank with a cross-encoder

Context ignored (model uses parametric knowledge):

  • Symptom: model answers from training data, ignoring retrieved docs
  • Fix: stronger grounding instruction, move documents before the question

Context window overflow:

  • Symptom: long documents truncated, model misses key info
  • Fix: reduce chunk size, retrieve fewer documents, summarize before inserting

Lost-in-the-middle:

  • Symptom: model ignores middle documents but uses first and last
  • Fix: put most relevant document first, use fewer documents

Citation hallucination:

  • Symptom: model cites "Document 3" but the claim isn't in Document 3
  • Fix: validate citations programmatically by checking if the claim is in the cited doc
Python
def validate_citations(answer: str, documents: list[dict]) -> list[dict]:
    """Check that each citation in the answer is supported by the cited document."""
    import re

    citations = re.findall(r'\[Doc(?:ument)?\s*(\d+)\]', answer)
    issues = []

    for citation in citations:
        doc_idx = int(citation) - 1
        if doc_idx >= len(documents):
            issues.append(f"[Doc {citation}] doesn't exist")
            continue

        doc_content = documents[doc_idx]["content"]
        # Extract the claim preceding this citation
        claim_match = re.search(
            r'([^.!?]+)' + re.escape(f'[Doc {citation}]'),
            answer,
        )
        if claim_match:
            claim = claim_match.group(1).strip()
            # Rough check: key words of claim in doc content
            key_words = [w for w in claim.split() if len(w) > 4]
            if not any(kw.lower() in doc_content.lower() for kw in key_words[:3]):
                issues.append(f"Claim '{claim[:50]}...' not found in [Doc {citation}]")

    return issues

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.