AI Systemsintermediate
RAG Ingestion Pipeline
Build a production document ingestion pipeline: loading, parsing, chunking, embedding, and indexing. Handle updates, deletions, and incremental ingestion at scale.
Asma Hafeez KhanMay 16, 20266 min read
RAGIngestionPipelineDocument ProcessingProduction
Ingestion Pipeline Overview
A RAG ingestion pipeline converts raw documents into searchable vector embeddings. The pipeline runs once initially, then incrementally for new or updated documents:
Raw Documents (PDFs, Word, HTML, text)
↓
Document Loader (extract text + metadata)
↓
Text Cleaner (normalize, remove artifacts)
↓
Chunker (split into retrieval-sized pieces)
↓
Embedder (convert chunks to vectors)
↓
Vector Store (index for fast retrieval)Document Loaders
Python
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional
import hashlib
@dataclass
class RawDocument:
"""Represents a loaded document before chunking."""
id: str
content: str
source_path: str
title: str
document_type: str # "pdf", "docx", "html", "txt"
metadata: dict = field(default_factory=dict)
content_hash: str = ""
def __post_init__(self):
if not self.content_hash:
self.content_hash = hashlib.sha256(self.content.encode()).hexdigest()[:16]
def load_text_file(path: str) -> RawDocument:
"""Load a plain text file."""
with open(path, encoding="utf-8") as f:
content = f.read()
return RawDocument(
id=Path(path).stem,
content=content,
source_path=path,
title=Path(path).stem.replace("-", " ").replace("_", " ").title(),
document_type="txt",
)
def load_directory(directory: str, extensions: list[str] = None) -> list[RawDocument]:
"""Load all documents from a directory."""
if extensions is None:
extensions = [".txt", ".md", ".pdf", ".docx"]
documents = []
for path in Path(directory).rglob("*"):
if path.suffix.lower() in extensions and path.is_file():
try:
if path.suffix.lower() == ".txt" or path.suffix.lower() == ".md":
doc = load_text_file(str(path))
elif path.suffix.lower() == ".pdf":
doc = load_pdf(str(path))
elif path.suffix.lower() == ".docx":
doc = load_docx(str(path))
else:
continue
documents.append(doc)
except Exception as e:
print(f"Failed to load {path}: {e}")
return documentsChunking Strategies
Python
from typing import Iterator
import re
@dataclass
class Chunk:
"""A chunk ready for embedding."""
id: str
content: str
document_id: str
chunk_index: int
title: str
metadata: dict = field(default_factory=dict)
token_count: Optional[int] = None
def estimate_tokens(text: str) -> int:
"""Rough token estimate (4 chars ≈ 1 token for English)."""
return len(text) // 4
def chunk_fixed_size(
document: RawDocument,
chunk_size: int = 512, # Target tokens per chunk
overlap: int = 50, # Overlap tokens between adjacent chunks
) -> list[Chunk]:
"""Split document into fixed-size chunks with overlap."""
words = document.content.split()
chunks = []
chunk_idx = 0
# Convert token counts to word counts (rough approximation)
words_per_chunk = chunk_size # 1 word ≈ 1 token for rough sizing
overlap_words = overlap
i = 0
while i < len(words):
chunk_words = words[i:i + words_per_chunk]
chunk_text = " ".join(chunk_words)
chunks.append(Chunk(
id=f"{document.id}_chunk_{chunk_idx}",
content=chunk_text,
document_id=document.id,
chunk_index=chunk_idx,
title=document.title,
metadata={**document.metadata, "source_path": document.source_path},
token_count=estimate_tokens(chunk_text),
))
chunk_idx += 1
i += words_per_chunk - overlap_words
return chunks
def chunk_by_sections(document: RawDocument) -> list[Chunk]:
"""Chunk markdown documents at section boundaries."""
lines = document.content.split("\n")
sections = []
current_header = document.title
current_lines = []
for line in lines:
if re.match(r'^#{1,3}\s+', line):
if current_lines:
sections.append({
"header": current_header,
"content": "\n".join(current_lines).strip(),
})
current_header = re.sub(r'^#+\s+', '', line)
current_lines = []
else:
current_lines.append(line)
if current_lines:
sections.append({
"header": current_header,
"content": "\n".join(current_lines).strip(),
})
chunks = []
for idx, section in enumerate(sections):
if not section["content"].strip():
continue
# Include header in content for context
content = f"{section['header']}\n\n{section['content']}"
chunks.append(Chunk(
id=f"{document.id}_section_{idx}",
content=content,
document_id=document.id,
chunk_index=idx,
title=f"{document.title} — {section['header']}",
metadata={**document.metadata, "section": section["header"]},
))
return chunks
def chunk_with_context(
document: RawDocument,
chunk_size: int = 512,
context_prefix_size: int = 100, # Characters from previous chunk
) -> list[Chunk]:
"""
Each chunk includes a brief prefix from the previous chunk.
Helps the model understand the context of each chunk.
"""
base_chunks = chunk_fixed_size(document, chunk_size=chunk_size, overlap=0)
context_chunks = []
for i, chunk in enumerate(base_chunks):
if i > 0:
# Add context from previous chunk
prev_content = base_chunks[i - 1].content
prefix = prev_content[-context_prefix_size:].strip()
content_with_context = f"[...{prefix}] {chunk.content}"
else:
content_with_context = chunk.content
context_chunks.append(Chunk(
id=chunk.id,
content=content_with_context,
document_id=chunk.document_id,
chunk_index=chunk.chunk_index,
title=chunk.title,
metadata=chunk.metadata,
))
return context_chunksEmbedding and Indexing
Python
from openai import OpenAI
import chromadb
from chromadb.utils import embedding_functions
client = OpenAI()
def embed_chunks(
chunks: list[Chunk],
model: str = "text-embedding-3-small",
batch_size: int = 100,
) -> list[tuple[Chunk, list[float]]]:
"""Embed all chunks, return (chunk, embedding) pairs."""
results = []
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
texts = [chunk.content for chunk in batch]
response = client.embeddings.create(
model=model,
input=texts,
)
embeddings = sorted(response.data, key=lambda x: x.index)
for chunk, emb_data in zip(batch, embeddings):
results.append((chunk, emb_data.embedding))
print(f"Embedded {min(i + batch_size, len(chunks))}/{len(chunks)} chunks")
return results
def ingest_to_chroma(
chunk_embeddings: list[tuple[Chunk, list[float]]],
collection,
batch_size: int = 100,
) -> None:
"""Ingest chunks into Chroma vector store."""
for i in range(0, len(chunk_embeddings), batch_size):
batch = chunk_embeddings[i:i + batch_size]
ids = [chunk.id for chunk, _ in batch]
documents = [chunk.content for chunk, _ in batch]
embeddings = [emb for _, emb in batch]
metadatas = [
{
"document_id": chunk.document_id,
"title": chunk.title,
"chunk_index": chunk.chunk_index,
**{k: str(v) for k, v in chunk.metadata.items()},
}
for chunk, _ in batch
]
collection.upsert(
ids=ids,
documents=documents,
embeddings=embeddings,
metadatas=metadatas,
)
print(f"Indexed {len(chunk_embeddings)} chunks")Full Pipeline Orchestration
Python
from dataclasses import dataclass
@dataclass
class IngestionConfig:
"""Configuration for the ingestion pipeline."""
chunk_size: int = 512
chunk_overlap: int = 50
chunking_strategy: str = "fixed" # "fixed", "sections", "context"
embedding_model: str = "text-embedding-3-small"
batch_size: int = 100
collection_name: str = "documents"
class IngestionPipeline:
"""Full document ingestion pipeline."""
def __init__(self, config: IngestionConfig, vector_store):
self.config = config
self.vector_store = vector_store
self.processed_hashes = set() # Track processed docs for incremental ingestion
def _chunk(self, document: RawDocument) -> list[Chunk]:
"""Apply the configured chunking strategy."""
if self.config.chunking_strategy == "sections":
return chunk_by_sections(document)
elif self.config.chunking_strategy == "context":
return chunk_with_context(
document,
chunk_size=self.config.chunk_size,
)
else:
return chunk_fixed_size(
document,
chunk_size=self.config.chunk_size,
overlap=self.config.chunk_overlap,
)
def process_document(self, document: RawDocument) -> dict:
"""Process a single document through the full pipeline."""
# Skip if already processed (incremental ingestion)
if document.content_hash in self.processed_hashes:
return {"status": "skipped", "reason": "already_processed"}
chunks = self._chunk(document)
if not chunks:
return {"status": "skipped", "reason": "no_chunks"}
chunk_embeddings = embed_chunks(
chunks,
model=self.config.embedding_model,
batch_size=self.config.batch_size,
)
ingest_to_chroma(chunk_embeddings, self.vector_store)
self.processed_hashes.add(document.content_hash)
return {
"status": "processed",
"document_id": document.id,
"n_chunks": len(chunks),
}
def process_directory(self, directory: str) -> dict:
"""Process all documents in a directory."""
documents = load_directory(directory)
results = {"processed": 0, "skipped": 0, "failed": 0}
for doc in documents:
try:
result = self.process_document(doc)
if result["status"] == "processed":
results["processed"] += 1
else:
results["skipped"] += 1
except Exception as e:
print(f"Failed to process {doc.id}: {e}")
results["failed"] += 1
return results
# Usage
chroma_client = chromadb.PersistentClient(path="./chroma_db")
collection = chroma_client.get_or_create_collection("clinical_docs")
config = IngestionConfig(
chunking_strategy="sections",
embedding_model="text-embedding-3-small",
chunk_size=512,
)
pipeline = IngestionPipeline(config, collection)
results = pipeline.process_directory("./clinical_documents/")
print(f"Ingestion complete: {results}")Handling Document Updates and Deletions
Python
def update_document(
collection,
old_document_id: str,
new_document: RawDocument,
pipeline: IngestionPipeline,
) -> None:
"""Replace an existing document's chunks with new content."""
# Delete old chunks
existing = collection.get(where={"document_id": old_document_id})
if existing["ids"]:
collection.delete(ids=existing["ids"])
print(f"Deleted {len(existing['ids'])} old chunks for {old_document_id}")
# Ingest new version
pipeline.process_document(new_document)
print(f"Ingested updated document: {new_document.id}")
def delete_document(collection, document_id: str) -> int:
"""Remove all chunks for a document from the vector store."""
existing = collection.get(where={"document_id": document_id})
if existing["ids"]:
collection.delete(ids=existing["ids"])
print(f"Deleted {len(existing['ids'])} chunks for {document_id}")
return len(existing["ids"])
return 0Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.