Learnixo

LangChain Mastery · Lesson 33 of 33

Interview: LangChain Production Scenarios

Q1: How would you add observability to a LangChain app already in production?

Answer:

Observability for LangChain apps has three layers: tracing (what happened), metrics (how it's performing), and alerting (when something breaks).

Python
import logging
import time
import json
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from langchain_core.runnables import RunnableConfig

logger = logging.getLogger("langchain_obs")


class ProductionObservabilityCallback(BaseCallbackHandler):
    """Emit structured logs for every LLM call and tool invocation."""

    def __init__(self, service_name: str, environment: str):
        self.service = service_name
        self.env = environment
        self._call_starts: dict = {}

    def _log(self, event: str, data: dict) -> None:
        logger.info(json.dumps({
            "service": self.service,
            "env": self.env,
            "event": event,
            "ts": time.time(),
            **data,
        }))

    def on_llm_start(self, serialized: dict, prompts: list, run_id, **kwargs) -> None:
        self._call_starts[str(run_id)] = time.time()
        self._log("llm_start", {
            "model": serialized.get("kwargs", {}).get("model_name"),
            "prompt_tokens_estimate": sum(len(p) // 4 for p in prompts),
            "run_id": str(run_id),
        })

    def on_llm_end(self, response: LLMResult, run_id, **kwargs) -> None:
        start = self._call_starts.pop(str(run_id), time.time())
        usage = response.llm_output.get("token_usage", {})
        self._log("llm_end", {
            "latency_ms": round((time.time() - start) * 1000),
            "prompt_tokens": usage.get("prompt_tokens"),
            "completion_tokens": usage.get("completion_tokens"),
            "run_id": str(run_id),
        })

    def on_llm_error(self, error: Exception, run_id, **kwargs) -> None:
        self._log("llm_error", {
            "error_type": type(error).__name__,
            "error_msg": str(error)[:200],
            "run_id": str(run_id),
        })

    def on_tool_start(self, serialized: dict, input_str: str, **kwargs) -> None:
        self._log("tool_call", {"tool": serialized.get("name"), "input_chars": len(input_str)})

    def on_tool_error(self, error: Exception, **kwargs) -> None:
        self._log("tool_error", {"error_type": type(error).__name__, "error_msg": str(error)[:200]})


# Attach at request time
def invoke_with_observability(chain, inputs: dict, session_id: str) -> dict:
    callback = ProductionObservabilityCallback(
        service_name="clinical-pharmacist",
        environment="production",
    )
    return chain.invoke(
        inputs,
        config=RunnableConfig(
            callbacks=[callback],
            metadata={"session_id": session_id},
        ),
    )

Metrics to expose (as Prometheus counters/histograms):

  • llm_calls_total — by model, status (success/error)
  • llm_latency_ms — histogram by model
  • llm_tokens_total — input + output token counts
  • llm_cost_usd — computed from token counts × price
  • tool_calls_total — by tool name, status

LangSmith in parallel: Enable LANGCHAIN_TRACING_V2=true for full trace replay. Structured logs give real-time metrics dashboards; LangSmith gives step-by-step debugging.


Q2: Your LLM cost is 10x higher than expected after a traffic spike. How do you diagnose and fix it?

Answer:

The $40K-instead-of-$4K problem. Work through the layers:

Python
# Step 1: Identify which component is expensive
class CostBreakdownCallback(BaseCallbackHandler):
    def __init__(self):
        self.calls: list[dict] = []
        self._start: dict = {}
        self._model: dict = {}

    def on_llm_start(self, serialized, prompts, run_id, **kwargs):
        self._start[str(run_id)] = time.time()
        self._model[str(run_id)] = serialized.get("kwargs", {}).get("model_name", "unknown")

    def on_llm_end(self, response, run_id, **kwargs):
        usage = response.llm_output.get("token_usage", {})
        self.calls.append({
            "model": self._model.pop(str(run_id), "?"),
            "latency_ms": round((time.time() - self._start.pop(str(run_id), time.time())) * 1000),
            "prompt_tokens": usage.get("prompt_tokens", 0),
            "completion_tokens": usage.get("completion_tokens", 0),
        })

    def breakdown(self) -> dict:
        if not self.calls:
            return {}
        total_input = sum(c["prompt_tokens"] for c in self.calls)
        total_output = sum(c["completion_tokens"] for c in self.calls)
        by_model = {}
        for call in self.calls:
            m = call["model"]
            by_model.setdefault(m, {"calls": 0, "input_tokens": 0, "output_tokens": 0})
            by_model[m]["calls"] += 1
            by_model[m]["input_tokens"] += call["prompt_tokens"]
            by_model[m]["output_tokens"] += call["completion_tokens"]
        return {
            "total_llm_calls": len(self.calls),
            "total_input_tokens": total_input,
            "total_output_tokens": total_output,
            "by_model": by_model,
        }


# Step 2: Common causes and fixes
COST_EXPLOSION_CAUSES = {
    "retry_with_full_context": {
        "symptom": "Calls per request >> 1, prompt_tokens same each call",
        "fix": "Cap retries at 3, trim context on retry",
    },
    "context_window_bloat": {
        "symptom": "prompt_tokens growing each turn in a conversation",
        "fix": "Switch to ConversationBufferWindowMemory(k=6) or SummaryBufferMemory",
    },
    "wrong_model_routing": {
        "symptom": "All requests using gpt-4o when gpt-4o-mini would suffice",
        "fix": "Add complexity classifier, route simple queries to cheap model",
    },
    "cache_miss": {
        "symptom": "Identical queries hitting LLM every time",
        "fix": "Add exact-match cache with Redis (SHA256 key on prompt)",
    },
    "retrieval_overhead": {
        "symptom": "Many small retrievals each triggering an LLM call",
        "fix": "Batch retrievals, cache retrieval results",
    },
}

# Step 3: Add model router based on query complexity
from langchain_openai import ChatOpenAI

CHEAP_MODEL = ChatOpenAI(model="gpt-4o-mini", temperature=0)
POWERFUL_MODEL = ChatOpenAI(model="gpt-4o", temperature=0)

def route_by_complexity(question: str):
    tokens = len(question.split())
    has_complex_terms = any(w in question.lower() for w in ["interaction", "mechanism", "pharmacokinetics", "compare"])
    
    if tokens < 30 and not has_complex_terms:
        return CHEAP_MODEL    # ~15x cheaper
    return POWERFUL_MODEL

# Step 4: Add exact-match cache for repeated queries
import hashlib
import redis

cache = redis.from_url("redis://localhost:6379", decode_responses=True)

def cached_invoke(chain, inputs: dict, ttl: int = 3600) -> str:
    key = "llm:" + hashlib.sha256(str(sorted(inputs.items())).encode()).hexdigest()
    cached = cache.get(key)
    if cached:
        return cached
    result = chain.invoke(inputs)
    cache.setex(key, ttl, result)
    return result

Q3: A chain that takes 8 seconds needs to respond in under 2 seconds. How do you approach this?

Answer:

Profile first, then optimize the bottleneck:

Python
import asyncio

# Step 1: Profile each stage
async def profile_chain_stages(question: str) -> dict:
    timings = {}
    
    t0 = time.time()
    # Stage 1: Retrieval
    retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
    docs = retriever.invoke(question)
    timings["retrieval_ms"] = round((time.time() - t0) * 1000)
    
    t1 = time.time()
    # Stage 2: Context formatting
    context = "\n\n".join(d.page_content for d in docs)
    timings["formatting_ms"] = round((time.time() - t1) * 1000)
    
    t2 = time.time()
    # Stage 3: LLM call
    model = ChatOpenAI(model="gpt-4o", temperature=0)
    response = await model.ainvoke(f"Context: {context}\n\nQ: {question}")
    timings["llm_ms"] = round((time.time() - t2) * 1000)
    
    timings["total_ms"] = round((time.time() - t0) * 1000)
    return timings

# Typical breakdown:
# {'retrieval_ms': 800, 'formatting_ms': 5, 'llm_ms': 6200, 'total_ms': 8005}
#  LLM is the bottleneck (78% of time)


# Fix 1: Cheaper model for simple queries (2-3x faster + cheaper)
from langchain_openai import ChatOpenAI
fast_model = ChatOpenAI(model="gpt-4o-mini", temperature=0)  # ~2s vs ~6s for gpt-4o


# Fix 2: Reduce context size (fewer tokens = faster generation)
retriever_small = vectorstore.as_retriever(search_kwargs={"k": 2})  # 4 docs  2 docs
# Also: truncate each doc to 300 chars instead of full chunk


# Fix 3: Parallelize independent retrieval + formatting
from langchain_core.runnables import RunnableParallel

parallel_chain = RunnableParallel(
    drug_info=drug_info_retriever | format_docs,
    interaction_info=interaction_retriever | format_docs,
) | combine_and_answer_chain
# Both retrievals happen simultaneously


# Fix 4: Stream the response (user sees output in under 1 second)
async def stream_fast_response(question: str):
    async for chunk in rag_chain.astream({"question": question}):
        print(chunk, end="", flush=True)
        # User reads while generation continues
        # Perceived latency: ~500ms to first token


# Fix 5: Semantic cache (instant for repeated/similar queries)
from langchain_community.cache import InMemoryCache
from langchain.globals import set_llm_cache

set_llm_cache(InMemoryCache())  # Exact match cache  first call normal, subsequent instant

Latency optimization priority:

| Optimization | Effort | Impact | |---|---|---| | Stream responses | Low | High (perceived latency drops 80%) | | Exact-match cache | Low | High (repeated queries: 0ms) | | Switch to gpt-4o-mini for simple queries | Low | Medium (2-3x faster, 15x cheaper) | | Reduce k (fewer retrieved docs) | Low | Medium (less context = faster generation) | | Parallelize retrievals | Medium | Medium (if multiple sources) | | Semantic cache | Medium | Medium (similar queries: instant) |


Q4: How do you isolate multiple tenants in a shared LangChain deployment?

Answer:

Multi-tenant isolation requires isolation at three layers: data, conversation state, and rate limits.

Python
from dataclasses import dataclass, field
from collections import defaultdict
import time

@dataclass
class TenantConfig:
    tenant_id: str
    model: str = "gpt-4o-mini"         # Different tenants can have different models
    max_tokens_per_minute: int = 10_000
    vector_collection: str = ""          # Tenant-specific vector store namespace
    allowed_tools: list[str] = field(default_factory=list)

    def __post_init__(self):
        if not self.vector_collection:
            self.vector_collection = f"tenant_{self.tenant_id}"


class MultiTenantRAGPlatform:
    """Isolated RAG pipeline per tenant."""

    def __init__(self, embeddings, base_vectorstore_path: str):
        self.embeddings = embeddings
        self.base_path = base_vectorstore_path
        self._tenant_stores: dict = {}
        self._tenant_histories: dict = defaultdict(list)
        self._token_usage: dict = defaultdict(lambda: {"tokens": 0, "window_start": time.time()})

    def _get_vectorstore(self, tenant: TenantConfig):
        """Each tenant has their own isolated vector collection."""
        if tenant.tenant_id not in self._tenant_stores:
            self._tenant_stores[tenant.tenant_id] = Chroma(
                collection_name=tenant.vector_collection,
                embedding_function=self.embeddings,
                persist_directory=f"{self.base_path}/{tenant.tenant_id}",
            )
        return self._tenant_stores[tenant.tenant_id]

    def _check_rate_limit(self, tenant: TenantConfig, estimated_tokens: int) -> None:
        usage = self._token_usage[tenant.tenant_id]
        # Reset window if 60 seconds have passed
        if time.time() - usage["window_start"] > 60:
            usage["tokens"] = 0
            usage["window_start"] = time.time()
        
        if usage["tokens"] + estimated_tokens > tenant.max_tokens_per_minute:
            raise Exception(f"Rate limit exceeded for tenant {tenant.tenant_id}")
        
        usage["tokens"] += estimated_tokens

    def query(self, tenant: TenantConfig, session_id: str, question: str) -> str:
        # Rate limiting
        estimated_tokens = len(question.split()) + 500   # Rough estimate
        self._check_rate_limit(tenant, estimated_tokens)
        
        # Tenant-specific retriever (isolated data)
        vectorstore = self._get_vectorstore(tenant)
        retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
        
        # Tenant-specific model
        model = ChatOpenAI(model=tenant.model, temperature=0)
        
        # Tenant-specific session history (isolated per tenant+session)
        history_key = f"{tenant.tenant_id}:{session_id}"
        history = self._tenant_histories[history_key][-10:]  # Last 5 turns
        
        # Build and invoke chain
        from langchain_core.prompts import ChatPromptTemplate
        prompt = ChatPromptTemplate.from_messages([
            ("system", "Answer using the retrieved context only."),
            *[(msg["role"], msg["content"]) for msg in history],
            ("human", "{question}"),
        ])
        
        context_docs = retriever.invoke(question)
        context = "\n\n".join(d.page_content for d in context_docs)
        
        chain = prompt | model
        response = chain.invoke({"question": question, "context": context})
        answer = response.content
        
        # Update session history
        self._tenant_histories[history_key].extend([
            {"role": "human", "content": question},
            {"role": "assistant", "content": answer},
        ])
        
        return answer

Q5: Design a production clinical AI assistant platform. Walk through the architecture.

Answer:

A production clinical AI platform needs: reliable retrieval, audit trails, safety filters, cost control, and regulatory compliance awareness.

Python
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
import logging
import time

logger = logging.getLogger("clinical_platform")


# --- Layer 1: Knowledge Base ---
# Chunked, embedded clinical guidelines, drug monographs, interaction databases
# Metadata: drug_name, category, source, version, effective_date

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
knowledge_base = Chroma(
    collection_name="clinical_knowledge",
    embedding_function=embeddings,
    persist_directory="./clinical_kb",
)


# --- Layer 2: Safety Filter ---
BLOCKED_PATTERNS = [
    "how to overdose", "lethal dose of", "maximum fatal dose",
    "suicide", "harm patient",
]

def safety_check(query: str) -> None:
    lower = query.lower()
    for pattern in BLOCKED_PATTERNS:
        if pattern in lower:
            raise PermissionError(f"Query blocked by safety filter: '{pattern}'")
    if len(query) > 2000:
        raise ValueError("Query exceeds maximum length of 2000 characters")


# --- Layer 3: Tools ---
@tool
def search_clinical_knowledge(query: str) -> str:
    """Search clinical guidelines, drug monographs, and pharmacology references."""
    docs = knowledge_base.similarity_search(query, k=4)
    if not docs:
        return "No relevant clinical information found. Please consult primary references."
    return "\n\n".join(
        f"[{doc.metadata.get('source', 'reference')}]: {doc.page_content}"
        for doc in docs
    )

@tool
def check_drug_interaction(drug_a: str, drug_b: str) -> str:
    """Check for clinically significant interactions between two drugs."""
    query = f"{drug_a} {drug_b} drug interaction severity mechanism"
    return search_clinical_knowledge.invoke(query)


# --- Layer 4: Agent ---
clinical_tools = [search_clinical_knowledge, check_drug_interaction]

clinical_prompt = ChatPromptTemplate.from_messages([
    ("system",
     "You are a clinical decision support assistant for qualified healthcare professionals. "
     "Use your tools to retrieve accurate drug information. "
     "Always include: mechanism, clinical significance, and recommended action. "
     "End every response with: 'Verify with Lexicomp/Micromedex and clinical judgment before prescribing.' "
     "Never provide information outside your retrieved context."),
    ("placeholder", "{chat_history}"),
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}"),
])

agent = create_tool_calling_agent(
    ChatOpenAI(model="gpt-4o", temperature=0),
    clinical_tools,
    clinical_prompt,
)

executor = AgentExecutor(
    agent=agent,
    tools=clinical_tools,
    max_iterations=6,
    max_execution_time=25.0,
    handle_parsing_errors=True,
    early_stopping_method="generate",
    return_intermediate_steps=True,
    verbose=False,
)


# --- Layer 5: Request handler with full guardrails ---
def clinical_query(
    question: str,
    session_id: str,
    user_id: str,
    user_role: str = "pharmacist",
) -> dict:
    start = time.time()
    
    try:
        # Safety gate
        safety_check(question)
        
        # Invoke with audit metadata
        from langchain_core.runnables import RunnableConfig
        result = executor.invoke(
            {"input": question, "chat_history": []},
            config=RunnableConfig(
                tags=["clinical", user_role],
                metadata={"session_id": session_id, "user_id": user_id},
            ),
        )
        
        latency_ms = round((time.time() - start) * 1000)
        tool_calls = result.get("intermediate_steps", [])
        
        logger.info(json.dumps({
            "event": "clinical_query_success",
            "session_id": session_id,
            "user_id": user_id,
            "user_role": user_role,
            "latency_ms": latency_ms,
            "tool_calls": len(tool_calls),
            "tools_used": [a.tool for a, _ in tool_calls],
        }))
        
        return {
            "answer": result["output"],
            "session_id": session_id,
            "latency_ms": latency_ms,
            "tools_consulted": len(tool_calls),
            "success": True,
            "disclaimer": "For qualified healthcare professional use only. Always verify with primary clinical references.",
        }
    
    except PermissionError as e:
        logger.warning(json.dumps({"event": "query_blocked", "reason": str(e), "user_id": user_id}))
        return {"answer": "This query cannot be processed.", "success": False, "blocked": True}
    
    except Exception as e:
        logger.error(json.dumps({"event": "query_error", "error": str(e), "session_id": session_id}))
        return {
            "answer": "Unable to process this query. Please consult clinical references directly.",
            "success": False,
        }

Architecture checklist:

| Requirement | Implementation | |---|---| | Audit trail | Structured JSON logs with session_id, user_id, tools_used | | Safety filter | Pattern matching before any LLM call | | Data isolation | Chroma collection per tenant/department | | Cost control | Max iterations cap, cheap model for simple queries, caching | | Latency SLA | 25s hard timeout, stream for perceived speed | | Regulatory | Disclaimer appended to all responses, no unsourced claims | | Fallback | Graceful error messages, never expose raw exceptions | | Observability | LangSmith tracing + structured logs + metrics |