Learnixo
Back to blog
AI Systemsintermediate

RAG Ingestion Pipeline

Build a production document ingestion pipeline: loading, parsing, chunking, embedding, and indexing. Handle updates, deletions, and incremental ingestion at scale.

Asma Hafeez KhanMay 16, 20266 min read
RAGIngestionPipelineDocument ProcessingProduction
Share:𝕏

Ingestion Pipeline Overview

A RAG ingestion pipeline converts raw documents into searchable vector embeddings. The pipeline runs once initially, then incrementally for new or updated documents:

Raw Documents (PDFs, Word, HTML, text)
    ↓
Document Loader (extract text + metadata)
    ↓
Text Cleaner (normalize, remove artifacts)
    ↓
Chunker (split into retrieval-sized pieces)
    ↓
Embedder (convert chunks to vectors)
    ↓
Vector Store (index for fast retrieval)

Document Loaders

Python
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional
import hashlib

@dataclass
class RawDocument:
    """Represents a loaded document before chunking."""
    id: str
    content: str
    source_path: str
    title: str
    document_type: str   # "pdf", "docx", "html", "txt"
    metadata: dict = field(default_factory=dict)
    content_hash: str = ""

    def __post_init__(self):
        if not self.content_hash:
            self.content_hash = hashlib.sha256(self.content.encode()).hexdigest()[:16]


def load_text_file(path: str) -> RawDocument:
    """Load a plain text file."""
    with open(path, encoding="utf-8") as f:
        content = f.read()

    return RawDocument(
        id=Path(path).stem,
        content=content,
        source_path=path,
        title=Path(path).stem.replace("-", " ").replace("_", " ").title(),
        document_type="txt",
    )


def load_directory(directory: str, extensions: list[str] = None) -> list[RawDocument]:
    """Load all documents from a directory."""
    if extensions is None:
        extensions = [".txt", ".md", ".pdf", ".docx"]

    documents = []
    for path in Path(directory).rglob("*"):
        if path.suffix.lower() in extensions and path.is_file():
            try:
                if path.suffix.lower() == ".txt" or path.suffix.lower() == ".md":
                    doc = load_text_file(str(path))
                elif path.suffix.lower() == ".pdf":
                    doc = load_pdf(str(path))
                elif path.suffix.lower() == ".docx":
                    doc = load_docx(str(path))
                else:
                    continue
                documents.append(doc)
            except Exception as e:
                print(f"Failed to load {path}: {e}")

    return documents

Chunking Strategies

Python
from typing import Iterator
import re

@dataclass
class Chunk:
    """A chunk ready for embedding."""
    id: str
    content: str
    document_id: str
    chunk_index: int
    title: str
    metadata: dict = field(default_factory=dict)
    token_count: Optional[int] = None


def estimate_tokens(text: str) -> int:
    """Rough token estimate (4 chars ≈ 1 token for English)."""
    return len(text) // 4


def chunk_fixed_size(
    document: RawDocument,
    chunk_size: int = 512,      # Target tokens per chunk
    overlap: int = 50,          # Overlap tokens between adjacent chunks
) -> list[Chunk]:
    """Split document into fixed-size chunks with overlap."""
    words = document.content.split()
    chunks = []
    chunk_idx = 0

    # Convert token counts to word counts (rough approximation)
    words_per_chunk = chunk_size  # 1 word  1 token for rough sizing
    overlap_words = overlap

    i = 0
    while i < len(words):
        chunk_words = words[i:i + words_per_chunk]
        chunk_text = " ".join(chunk_words)

        chunks.append(Chunk(
            id=f"{document.id}_chunk_{chunk_idx}",
            content=chunk_text,
            document_id=document.id,
            chunk_index=chunk_idx,
            title=document.title,
            metadata={**document.metadata, "source_path": document.source_path},
            token_count=estimate_tokens(chunk_text),
        ))

        chunk_idx += 1
        i += words_per_chunk - overlap_words

    return chunks


def chunk_by_sections(document: RawDocument) -> list[Chunk]:
    """Chunk markdown documents at section boundaries."""
    lines = document.content.split("\n")
    sections = []
    current_header = document.title
    current_lines = []

    for line in lines:
        if re.match(r'^#{1,3}\s+', line):
            if current_lines:
                sections.append({
                    "header": current_header,
                    "content": "\n".join(current_lines).strip(),
                })
            current_header = re.sub(r'^#+\s+', '', line)
            current_lines = []
        else:
            current_lines.append(line)

    if current_lines:
        sections.append({
            "header": current_header,
            "content": "\n".join(current_lines).strip(),
        })

    chunks = []
    for idx, section in enumerate(sections):
        if not section["content"].strip():
            continue

        # Include header in content for context
        content = f"{section['header']}\n\n{section['content']}"

        chunks.append(Chunk(
            id=f"{document.id}_section_{idx}",
            content=content,
            document_id=document.id,
            chunk_index=idx,
            title=f"{document.title} — {section['header']}",
            metadata={**document.metadata, "section": section["header"]},
        ))

    return chunks


def chunk_with_context(
    document: RawDocument,
    chunk_size: int = 512,
    context_prefix_size: int = 100,  # Characters from previous chunk
) -> list[Chunk]:
    """
    Each chunk includes a brief prefix from the previous chunk.
    Helps the model understand the context of each chunk.
    """
    base_chunks = chunk_fixed_size(document, chunk_size=chunk_size, overlap=0)
    context_chunks = []

    for i, chunk in enumerate(base_chunks):
        if i > 0:
            # Add context from previous chunk
            prev_content = base_chunks[i - 1].content
            prefix = prev_content[-context_prefix_size:].strip()
            content_with_context = f"[...{prefix}] {chunk.content}"
        else:
            content_with_context = chunk.content

        context_chunks.append(Chunk(
            id=chunk.id,
            content=content_with_context,
            document_id=chunk.document_id,
            chunk_index=chunk.chunk_index,
            title=chunk.title,
            metadata=chunk.metadata,
        ))

    return context_chunks

Embedding and Indexing

Python
from openai import OpenAI
import chromadb
from chromadb.utils import embedding_functions

client = OpenAI()

def embed_chunks(
    chunks: list[Chunk],
    model: str = "text-embedding-3-small",
    batch_size: int = 100,
) -> list[tuple[Chunk, list[float]]]:
    """Embed all chunks, return (chunk, embedding) pairs."""
    results = []

    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        texts = [chunk.content for chunk in batch]

        response = client.embeddings.create(
            model=model,
            input=texts,
        )
        embeddings = sorted(response.data, key=lambda x: x.index)

        for chunk, emb_data in zip(batch, embeddings):
            results.append((chunk, emb_data.embedding))

        print(f"Embedded {min(i + batch_size, len(chunks))}/{len(chunks)} chunks")

    return results


def ingest_to_chroma(
    chunk_embeddings: list[tuple[Chunk, list[float]]],
    collection,
    batch_size: int = 100,
) -> None:
    """Ingest chunks into Chroma vector store."""
    for i in range(0, len(chunk_embeddings), batch_size):
        batch = chunk_embeddings[i:i + batch_size]

        ids = [chunk.id for chunk, _ in batch]
        documents = [chunk.content for chunk, _ in batch]
        embeddings = [emb for _, emb in batch]
        metadatas = [
            {
                "document_id": chunk.document_id,
                "title": chunk.title,
                "chunk_index": chunk.chunk_index,
                **{k: str(v) for k, v in chunk.metadata.items()},
            }
            for chunk, _ in batch
        ]

        collection.upsert(
            ids=ids,
            documents=documents,
            embeddings=embeddings,
            metadatas=metadatas,
        )

    print(f"Indexed {len(chunk_embeddings)} chunks")

Full Pipeline Orchestration

Python
from dataclasses import dataclass

@dataclass
class IngestionConfig:
    """Configuration for the ingestion pipeline."""
    chunk_size: int = 512
    chunk_overlap: int = 50
    chunking_strategy: str = "fixed"   # "fixed", "sections", "context"
    embedding_model: str = "text-embedding-3-small"
    batch_size: int = 100
    collection_name: str = "documents"


class IngestionPipeline:
    """Full document ingestion pipeline."""

    def __init__(self, config: IngestionConfig, vector_store):
        self.config = config
        self.vector_store = vector_store
        self.processed_hashes = set()  # Track processed docs for incremental ingestion

    def _chunk(self, document: RawDocument) -> list[Chunk]:
        """Apply the configured chunking strategy."""
        if self.config.chunking_strategy == "sections":
            return chunk_by_sections(document)
        elif self.config.chunking_strategy == "context":
            return chunk_with_context(
                document,
                chunk_size=self.config.chunk_size,
            )
        else:
            return chunk_fixed_size(
                document,
                chunk_size=self.config.chunk_size,
                overlap=self.config.chunk_overlap,
            )

    def process_document(self, document: RawDocument) -> dict:
        """Process a single document through the full pipeline."""
        # Skip if already processed (incremental ingestion)
        if document.content_hash in self.processed_hashes:
            return {"status": "skipped", "reason": "already_processed"}

        chunks = self._chunk(document)
        if not chunks:
            return {"status": "skipped", "reason": "no_chunks"}

        chunk_embeddings = embed_chunks(
            chunks,
            model=self.config.embedding_model,
            batch_size=self.config.batch_size,
        )

        ingest_to_chroma(chunk_embeddings, self.vector_store)

        self.processed_hashes.add(document.content_hash)

        return {
            "status": "processed",
            "document_id": document.id,
            "n_chunks": len(chunks),
        }

    def process_directory(self, directory: str) -> dict:
        """Process all documents in a directory."""
        documents = load_directory(directory)
        results = {"processed": 0, "skipped": 0, "failed": 0}

        for doc in documents:
            try:
                result = self.process_document(doc)
                if result["status"] == "processed":
                    results["processed"] += 1
                else:
                    results["skipped"] += 1
            except Exception as e:
                print(f"Failed to process {doc.id}: {e}")
                results["failed"] += 1

        return results


# Usage
chroma_client = chromadb.PersistentClient(path="./chroma_db")
collection = chroma_client.get_or_create_collection("clinical_docs")

config = IngestionConfig(
    chunking_strategy="sections",
    embedding_model="text-embedding-3-small",
    chunk_size=512,
)

pipeline = IngestionPipeline(config, collection)
results = pipeline.process_directory("./clinical_documents/")
print(f"Ingestion complete: {results}")

Handling Document Updates and Deletions

Python
def update_document(
    collection,
    old_document_id: str,
    new_document: RawDocument,
    pipeline: IngestionPipeline,
) -> None:
    """Replace an existing document's chunks with new content."""
    # Delete old chunks
    existing = collection.get(where={"document_id": old_document_id})
    if existing["ids"]:
        collection.delete(ids=existing["ids"])
        print(f"Deleted {len(existing['ids'])} old chunks for {old_document_id}")

    # Ingest new version
    pipeline.process_document(new_document)
    print(f"Ingested updated document: {new_document.id}")


def delete_document(collection, document_id: str) -> int:
    """Remove all chunks for a document from the vector store."""
    existing = collection.get(where={"document_id": document_id})
    if existing["ids"]:
        collection.delete(ids=existing["ids"])
        print(f"Deleted {len(existing['ids'])} chunks for {document_id}")
        return len(existing["ids"])
    return 0

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.