Scenario: Knowledge Base Is Stale
Your RAG system answers based on outdated documents and new policies are not reflected. Build an event-driven ingestion pipeline with document versioning and chunk deletion.
The Scenario
Your company updated its data retention policy three weeks ago. A user asks the RAG chatbot "how long do we keep patient records?" and receives the old answer: "7 years." The correct answer after the policy update is "10 years to comply with the new regulation."
This is a compliance risk, not just a product bug. In regulated industries — healthcare, finance, pharmaceuticals — stale knowledge bases can expose the company to liability.
The root cause is almost always the same: embeddings were computed once during initial setup, and there is no pipeline to update them when source documents change.
Why Staleness Happens
Batch ingestion with no update trigger. Most teams run a one-time script to ingest documents. Once that script runs, the knowledge base is frozen.
No deletion on update. When a document is updated, a new version is often added, but the old chunks are not removed. Retrieval now returns both old and new information, and the LLM either hallucinates or produces contradictory answers.
No version tracking. There is no record of which chunks correspond to which document version, making targeted updates impossible.
The Solution: Event-Driven Ingestion
Instead of batch ingestion, trigger re-ingestion whenever a document changes. This requires:
- A document store with change detection (SharePoint, Blob Storage with event grid, or a CMS)
- An event handler that re-processes changed documents
- Chunk deletion before re-insertion to avoid stale duplicates
- Version metadata on every chunk
Document Registry
Start with a registry that tracks document state:
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import hashlib
@dataclass
class DocumentRecord:
doc_id: str
source_url: str
content_hash: str # SHA-256 of raw content
version: int # incremented on each update
last_ingested_at: datetime
chunk_ids: list[str] # IDs of all chunks in the vector store
is_active: bool = True
class DocumentRegistry:
def __init__(self, db_connection):
self.db = db_connection
def get(self, doc_id: str) -> Optional[DocumentRecord]:
row = self.db.execute(
"SELECT * FROM document_registry WHERE doc_id = ?", (doc_id,)
).fetchone()
return DocumentRecord(**row) if row else None
def upsert(self, record: DocumentRecord):
self.db.execute("""
INSERT INTO document_registry
(doc_id, source_url, content_hash, version, last_ingested_at, chunk_ids, is_active)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(doc_id) DO UPDATE SET
content_hash = excluded.content_hash,
version = excluded.version,
last_ingested_at = excluded.last_ingested_at,
chunk_ids = excluded.chunk_ids
""", (
record.doc_id,
record.source_url,
record.content_hash,
record.version,
record.last_ingested_at.isoformat(),
",".join(record.chunk_ids),
record.is_active,
))
self.db.commit()
def compute_hash(self, content: str) -> str:
return hashlib.sha256(content.encode()).hexdigest()
def has_changed(self, doc_id: str, new_content: str) -> bool:
record = self.get(doc_id)
if not record:
return True # new document
return record.content_hash != self.compute_hash(new_content)The Document Update Handler
This is the core of the solution. When a document changes, the handler:
- Deletes all old chunks from the vector store
- Parses and chunks the new version
- Embeds the new chunks
- Inserts them with updated metadata
- Updates the registry
import logging
from openai import AzureOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
logger = logging.getLogger("ingestion.handler")
client = AzureOpenAI(
azure_endpoint="https://your-resource.openai.azure.com",
api_version="2024-02-01",
)
class DocumentUpdateHandler:
def __init__(self, vector_store, registry: DocumentRegistry):
self.vector_store = vector_store
self.registry = registry
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=64,
separators=["\n\n", "\n", ". ", " "],
)
def handle_document_update(self, doc_id: str, new_content: str, metadata: dict):
"""
Idempotent update handler. Safe to call multiple times for same version.
"""
if not self.registry.has_changed(doc_id, new_content):
logger.info(f"Document {doc_id} unchanged, skipping ingestion.")
return
logger.info(f"Document {doc_id} changed — starting update pipeline.")
# Step 1: Delete old chunks
existing_record = self.registry.get(doc_id)
if existing_record and existing_record.chunk_ids:
self._delete_old_chunks(existing_record.chunk_ids)
logger.info(f"Deleted {len(existing_record.chunk_ids)} old chunks for {doc_id}.")
# Step 2: Determine new version number
new_version = (existing_record.version + 1) if existing_record else 1
# Step 3: Chunk the new content
raw_chunks = self.splitter.split_text(new_content)
logger.info(f"Created {len(raw_chunks)} chunks for {doc_id} v{new_version}.")
# Step 4: Embed and insert new chunks
new_chunk_ids = []
for i, chunk_text in enumerate(raw_chunks):
chunk_id = f"{doc_id}_v{new_version}_chunk{i}"
embedding = self._embed(chunk_text)
chunk_metadata = {
**metadata,
"doc_id": doc_id,
"chunk_id": chunk_id,
"version": new_version,
"chunk_index": i,
"total_chunks": len(raw_chunks),
}
self.vector_store.upsert(
id=chunk_id,
embedding=embedding,
content=chunk_text,
metadata=chunk_metadata,
)
new_chunk_ids.append(chunk_id)
# Step 5: Update registry
self.registry.upsert(DocumentRecord(
doc_id=doc_id,
source_url=metadata.get("source_url", ""),
content_hash=self.registry.compute_hash(new_content),
version=new_version,
last_ingested_at=datetime.utcnow(),
chunk_ids=new_chunk_ids,
))
logger.info(f"Document {doc_id} v{new_version} ingestion complete. {len(new_chunk_ids)} chunks stored.")
def _delete_old_chunks(self, chunk_ids: list[str]):
self.vector_store.delete(ids=chunk_ids)
def _embed(self, text: str) -> list[float]:
return client.embeddings.create(
model="text-embedding-3-large",
input=text,
).data[0].embeddingEvent-Driven Trigger: Azure Blob Storage + Event Grid
When a document is uploaded or modified in Azure Blob Storage, Event Grid fires an event that your Azure Function receives:
import azure.functions as func
import json
import logging
from azure.storage.blob import BlobServiceClient
logger = logging.getLogger(__name__)
app = func.FunctionApp()
@app.blob_trigger(
arg_name="blob_event",
path="policies/{name}",
connection="STORAGE_CONNECTION_STRING",
)
def on_document_changed(blob_event: func.InputStream, **kwargs):
"""
Triggered whenever a file in the 'policies' container is created or modified.
"""
doc_name = kwargs.get("name", "unknown")
logger.info(f"Ingestion triggered for: {doc_name}")
# Read the document content
content = blob_event.read().decode("utf-8")
# Extract metadata from blob properties
metadata = {
"source_url": f"https://yourstorage.blob.core.windows.net/policies/{doc_name}",
"document_name": doc_name,
"policy_category": infer_category(doc_name),
"department": infer_department(doc_name),
}
# Run update handler
handler = build_update_handler() # factory that returns DocumentUpdateHandler
handler.handle_document_update(
doc_id=doc_name,
new_content=content,
metadata=metadata,
)
def infer_category(filename: str) -> str:
"""Simple heuristic: filename prefix determines category."""
if filename.startswith("hr_"):
return "human_resources"
elif filename.startswith("it_"):
return "information_technology"
elif filename.startswith("compliance_"):
return "compliance"
return "general"
def infer_department(filename: str) -> str:
parts = filename.split("_")
return parts[1] if len(parts) > 1 else "all"SharePoint Webhook Trigger
Many enterprises store documents in SharePoint. Here is how to trigger ingestion on SharePoint document changes:
from fastapi import FastAPI, Request
import httpx
app = FastAPI()
SHAREPOINT_TENANT = "yourtenant.sharepoint.com"
SHAREPOINT_SITE = "/sites/PolicyLibrary"
@app.post("/webhooks/sharepoint")
async def sharepoint_webhook(request: Request):
"""
SharePoint sends a POST when any file in the subscribed library changes.
"""
body = await request.json()
# SharePoint sends a validationToken on subscription — must echo it back
validation_token = request.query_params.get("validationToken")
if validation_token:
return validation_token # handshake
# Process change notifications
for notification in body.get("value", []):
resource = notification.get("resource", "")
change_type = notification.get("changeType", "")
if change_type in ("created", "updated"):
# Fetch the changed document content
doc_content = await fetch_sharepoint_document(resource)
doc_id = extract_doc_id(resource)
handler = build_update_handler()
handler.handle_document_update(
doc_id=doc_id,
new_content=doc_content,
metadata={"source": resource, "change_type": change_type},
)
return {"status": "ok"}
async def fetch_sharepoint_document(resource_url: str) -> str:
"""
Fetch document content from SharePoint Graph API.
Requires an OAuth token with Files.Read scope.
"""
token = await get_sharepoint_token()
async with httpx.AsyncClient() as http:
response = await http.get(
f"https://graph.microsoft.com/v1.0{resource_url}/content",
headers={"Authorization": f"Bearer {token}"},
)
response.raise_for_status()
return response.textMonitoring Freshness
Track how stale each document is and alert when freshness degrades:
from datetime import datetime, timedelta
import json
def check_knowledge_base_freshness(registry: DocumentRegistry, max_age_days: int = 7):
"""
Identify documents that have not been re-ingested recently.
These might be stale if the source changed but the event was missed.
"""
cutoff = datetime.utcnow() - timedelta(days=max_age_days)
all_docs = registry.get_all()
stale = [
doc for doc in all_docs
if doc.last_ingested_at < cutoff and doc.is_active
]
if stale:
logger.warning(
f"{len(stale)} documents not re-ingested in {max_age_days} days. "
f"Consider forcing a re-check: {[d.doc_id for d in stale[:5]]}..."
)
return {
"total_documents": len(all_docs),
"stale_count": len(stale),
"stale_percentage": len(stale) / max(len(all_docs), 1),
"stale_doc_ids": [d.doc_id for d in stale],
}Forced Re-Ingestion for Missed Events
Sometimes events are missed (network issues, downtime). Add a nightly reconciliation job:
import asyncio
async def nightly_reconciliation():
"""
Walk all source documents and compare content hashes to registry.
Re-ingest any that have drifted.
"""
logger.info("Starting nightly knowledge base reconciliation...")
all_source_docs = await fetch_all_source_documents()
updated_count = 0
for doc in all_source_docs:
if registry.has_changed(doc.id, doc.content):
handler.handle_document_update(
doc_id=doc.id,
new_content=doc.content,
metadata=doc.metadata,
)
updated_count += 1
logger.info(f"Reconciliation complete. Updated {updated_count} documents.")
return {"updated": updated_count, "total": len(all_source_docs)}Summary: Staleness Prevention Architecture
The full architecture to prevent stale knowledge:
Document Source (SharePoint / Blob Storage / CMS)
│
│ (event on create/update)
▼
Event Trigger (Azure Event Grid / Webhook)
│
▼
Ingestion Function
├── Check registry: has content changed?
│ └── No: skip (idempotent)
│
└── Yes:
├── Delete old chunks from vector store
├── Chunk + embed new content
├── Insert new chunks with version metadata
└── Update registry (hash, version, chunk IDs)
│
▼
Nightly Reconciliation Job (safety net for missed events)
│
▼
Freshness Monitor (alert if doc older than N days)Key design principles:
- Idempotent: Safe to run the handler multiple times. Hash comparison prevents unnecessary re-processing.
- Atomic replacement: Delete old chunks before inserting new ones. Never have a window where both versions are live.
- Version tracking: Every chunk carries a version number. Useful for debugging "why did the answer change?"
- Safety net: The nightly reconciliation catches any events the webhook missed.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.