Learnixo

Scenario Based Questions · Lesson 4 of 13

Scenario: Knowledge Base Is Out of Date

The Scenario

Your company updated its data retention policy three weeks ago. A user asks the RAG chatbot "how long do we keep patient records?" and receives the old answer: "7 years." The correct answer after the policy update is "10 years to comply with the new regulation."

This is a compliance risk, not just a product bug. In regulated industries — healthcare, finance, pharmaceuticals — stale knowledge bases can expose the company to liability.

The root cause is almost always the same: embeddings were computed once during initial setup, and there is no pipeline to update them when source documents change.

Why Staleness Happens

Batch ingestion with no update trigger. Most teams run a one-time script to ingest documents. Once that script runs, the knowledge base is frozen.

No deletion on update. When a document is updated, a new version is often added, but the old chunks are not removed. Retrieval now returns both old and new information, and the LLM either hallucinates or produces contradictory answers.

No version tracking. There is no record of which chunks correspond to which document version, making targeted updates impossible.

The Solution: Event-Driven Ingestion

Instead of batch ingestion, trigger re-ingestion whenever a document changes. This requires:

  1. A document store with change detection (SharePoint, Blob Storage with event grid, or a CMS)
  2. An event handler that re-processes changed documents
  3. Chunk deletion before re-insertion to avoid stale duplicates
  4. Version metadata on every chunk

Document Registry

Start with a registry that tracks document state:

Python
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import hashlib

@dataclass
class DocumentRecord:
    doc_id: str
    source_url: str
    content_hash: str          # SHA-256 of raw content
    version: int               # incremented on each update
    last_ingested_at: datetime
    chunk_ids: list[str]       # IDs of all chunks in the vector store
    is_active: bool = True

class DocumentRegistry:
    def __init__(self, db_connection):
        self.db = db_connection

    def get(self, doc_id: str) -> Optional[DocumentRecord]:
        row = self.db.execute(
            "SELECT * FROM document_registry WHERE doc_id = ?", (doc_id,)
        ).fetchone()
        return DocumentRecord(**row) if row else None

    def upsert(self, record: DocumentRecord):
        self.db.execute("""
            INSERT INTO document_registry
                (doc_id, source_url, content_hash, version, last_ingested_at, chunk_ids, is_active)
            VALUES (?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT(doc_id) DO UPDATE SET
                content_hash = excluded.content_hash,
                version = excluded.version,
                last_ingested_at = excluded.last_ingested_at,
                chunk_ids = excluded.chunk_ids
        """, (
            record.doc_id,
            record.source_url,
            record.content_hash,
            record.version,
            record.last_ingested_at.isoformat(),
            ",".join(record.chunk_ids),
            record.is_active,
        ))
        self.db.commit()

    def compute_hash(self, content: str) -> str:
        return hashlib.sha256(content.encode()).hexdigest()

    def has_changed(self, doc_id: str, new_content: str) -> bool:
        record = self.get(doc_id)
        if not record:
            return True  # new document
        return record.content_hash != self.compute_hash(new_content)

The Document Update Handler

This is the core of the solution. When a document changes, the handler:

  1. Deletes all old chunks from the vector store
  2. Parses and chunks the new version
  3. Embeds the new chunks
  4. Inserts them with updated metadata
  5. Updates the registry
Python
import logging
from openai import AzureOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter

logger = logging.getLogger("ingestion.handler")

client = AzureOpenAI(
    azure_endpoint="https://your-resource.openai.azure.com",
    api_version="2024-02-01",
)

class DocumentUpdateHandler:
    def __init__(self, vector_store, registry: DocumentRegistry):
        self.vector_store = vector_store
        self.registry = registry
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=512,
            chunk_overlap=64,
            separators=["\n\n", "\n", ". ", " "],
        )

    def handle_document_update(self, doc_id: str, new_content: str, metadata: dict):
        """
        Idempotent update handler. Safe to call multiple times for same version.
        """
        if not self.registry.has_changed(doc_id, new_content):
            logger.info(f"Document {doc_id} unchanged, skipping ingestion.")
            return

        logger.info(f"Document {doc_id} changed — starting update pipeline.")

        # Step 1: Delete old chunks
        existing_record = self.registry.get(doc_id)
        if existing_record and existing_record.chunk_ids:
            self._delete_old_chunks(existing_record.chunk_ids)
            logger.info(f"Deleted {len(existing_record.chunk_ids)} old chunks for {doc_id}.")

        # Step 2: Determine new version number
        new_version = (existing_record.version + 1) if existing_record else 1

        # Step 3: Chunk the new content
        raw_chunks = self.splitter.split_text(new_content)
        logger.info(f"Created {len(raw_chunks)} chunks for {doc_id} v{new_version}.")

        # Step 4: Embed and insert new chunks
        new_chunk_ids = []
        for i, chunk_text in enumerate(raw_chunks):
            chunk_id = f"{doc_id}_v{new_version}_chunk{i}"
            embedding = self._embed(chunk_text)

            chunk_metadata = {
                **metadata,
                "doc_id": doc_id,
                "chunk_id": chunk_id,
                "version": new_version,
                "chunk_index": i,
                "total_chunks": len(raw_chunks),
            }

            self.vector_store.upsert(
                id=chunk_id,
                embedding=embedding,
                content=chunk_text,
                metadata=chunk_metadata,
            )
            new_chunk_ids.append(chunk_id)

        # Step 5: Update registry
        self.registry.upsert(DocumentRecord(
            doc_id=doc_id,
            source_url=metadata.get("source_url", ""),
            content_hash=self.registry.compute_hash(new_content),
            version=new_version,
            last_ingested_at=datetime.utcnow(),
            chunk_ids=new_chunk_ids,
        ))

        logger.info(f"Document {doc_id} v{new_version} ingestion complete. {len(new_chunk_ids)} chunks stored.")

    def _delete_old_chunks(self, chunk_ids: list[str]):
        self.vector_store.delete(ids=chunk_ids)

    def _embed(self, text: str) -> list[float]:
        return client.embeddings.create(
            model="text-embedding-3-large",
            input=text,
        ).data[0].embedding

Event-Driven Trigger: Azure Blob Storage + Event Grid

When a document is uploaded or modified in Azure Blob Storage, Event Grid fires an event that your Azure Function receives:

Python
import azure.functions as func
import json
import logging
from azure.storage.blob import BlobServiceClient

logger = logging.getLogger(__name__)

app = func.FunctionApp()

@app.blob_trigger(
    arg_name="blob_event",
    path="policies/{name}",
    connection="STORAGE_CONNECTION_STRING",
)
def on_document_changed(blob_event: func.InputStream, **kwargs):
    """
    Triggered whenever a file in the 'policies' container is created or modified.
    """
    doc_name = kwargs.get("name", "unknown")
    logger.info(f"Ingestion triggered for: {doc_name}")

    # Read the document content
    content = blob_event.read().decode("utf-8")

    # Extract metadata from blob properties
    metadata = {
        "source_url": f"https://yourstorage.blob.core.windows.net/policies/{doc_name}",
        "document_name": doc_name,
        "policy_category": infer_category(doc_name),
        "department": infer_department(doc_name),
    }

    # Run update handler
    handler = build_update_handler()  # factory that returns DocumentUpdateHandler
    handler.handle_document_update(
        doc_id=doc_name,
        new_content=content,
        metadata=metadata,
    )

def infer_category(filename: str) -> str:
    """Simple heuristic: filename prefix determines category."""
    if filename.startswith("hr_"):
        return "human_resources"
    elif filename.startswith("it_"):
        return "information_technology"
    elif filename.startswith("compliance_"):
        return "compliance"
    return "general"

def infer_department(filename: str) -> str:
    parts = filename.split("_")
    return parts[1] if len(parts) > 1 else "all"

SharePoint Webhook Trigger

Many enterprises store documents in SharePoint. Here is how to trigger ingestion on SharePoint document changes:

Python
from fastapi import FastAPI, Request
import httpx

app = FastAPI()

SHAREPOINT_TENANT = "yourtenant.sharepoint.com"
SHAREPOINT_SITE = "/sites/PolicyLibrary"

@app.post("/webhooks/sharepoint")
async def sharepoint_webhook(request: Request):
    """
    SharePoint sends a POST when any file in the subscribed library changes.
    """
    body = await request.json()

    # SharePoint sends a validationToken on subscription  must echo it back
    validation_token = request.query_params.get("validationToken")
    if validation_token:
        return validation_token  # handshake

    # Process change notifications
    for notification in body.get("value", []):
        resource = notification.get("resource", "")
        change_type = notification.get("changeType", "")

        if change_type in ("created", "updated"):
            # Fetch the changed document content
            doc_content = await fetch_sharepoint_document(resource)
            doc_id = extract_doc_id(resource)

            handler = build_update_handler()
            handler.handle_document_update(
                doc_id=doc_id,
                new_content=doc_content,
                metadata={"source": resource, "change_type": change_type},
            )

    return {"status": "ok"}

async def fetch_sharepoint_document(resource_url: str) -> str:
    """
    Fetch document content from SharePoint Graph API.
    Requires an OAuth token with Files.Read scope.
    """
    token = await get_sharepoint_token()
    async with httpx.AsyncClient() as http:
        response = await http.get(
            f"https://graph.microsoft.com/v1.0{resource_url}/content",
            headers={"Authorization": f"Bearer {token}"},
        )
        response.raise_for_status()
        return response.text

Monitoring Freshness

Track how stale each document is and alert when freshness degrades:

Python
from datetime import datetime, timedelta
import json

def check_knowledge_base_freshness(registry: DocumentRegistry, max_age_days: int = 7):
    """
    Identify documents that have not been re-ingested recently.
    These might be stale if the source changed but the event was missed.
    """
    cutoff = datetime.utcnow() - timedelta(days=max_age_days)
    all_docs = registry.get_all()

    stale = [
        doc for doc in all_docs
        if doc.last_ingested_at < cutoff and doc.is_active
    ]

    if stale:
        logger.warning(
            f"{len(stale)} documents not re-ingested in {max_age_days} days. "
            f"Consider forcing a re-check: {[d.doc_id for d in stale[:5]]}..."
        )

    return {
        "total_documents": len(all_docs),
        "stale_count": len(stale),
        "stale_percentage": len(stale) / max(len(all_docs), 1),
        "stale_doc_ids": [d.doc_id for d in stale],
    }

Forced Re-Ingestion for Missed Events

Sometimes events are missed (network issues, downtime). Add a nightly reconciliation job:

Python
import asyncio

async def nightly_reconciliation():
    """
    Walk all source documents and compare content hashes to registry.
    Re-ingest any that have drifted.
    """
    logger.info("Starting nightly knowledge base reconciliation...")
    all_source_docs = await fetch_all_source_documents()

    updated_count = 0
    for doc in all_source_docs:
        if registry.has_changed(doc.id, doc.content):
            handler.handle_document_update(
                doc_id=doc.id,
                new_content=doc.content,
                metadata=doc.metadata,
            )
            updated_count += 1

    logger.info(f"Reconciliation complete. Updated {updated_count} documents.")
    return {"updated": updated_count, "total": len(all_source_docs)}

Summary: Staleness Prevention Architecture

The full architecture to prevent stale knowledge:

Document Source (SharePoint / Blob Storage / CMS)
        │
        │ (event on create/update)
        ▼
Event Trigger (Azure Event Grid / Webhook)
        │
        ▼
Ingestion Function
        ├── Check registry: has content changed?
        │     └── No: skip (idempotent)
        │
        └── Yes:
              ├── Delete old chunks from vector store
              ├── Chunk + embed new content
              ├── Insert new chunks with version metadata
              └── Update registry (hash, version, chunk IDs)
        │
        ▼
Nightly Reconciliation Job (safety net for missed events)
        │
        ▼
Freshness Monitor (alert if doc older than N days)

Key design principles:

  • Idempotent: Safe to run the handler multiple times. Hash comparison prevents unnecessary re-processing.
  • Atomic replacement: Delete old chunks before inserting new ones. Never have a window where both versions are live.
  • Version tracking: Every chunk carries a version number. Useful for debugging "why did the answer change?"
  • Safety net: The nightly reconciliation catches any events the webhook missed.