LangChain Mastery · Lesson 33 of 33
Interview: LangChain Production Scenarios
Q1: How would you add observability to a LangChain app already in production?
Answer:
Observability for LangChain apps has three layers: tracing (what happened), metrics (how it's performing), and alerting (when something breaks).
import logging
import time
import json
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from langchain_core.runnables import RunnableConfig
logger = logging.getLogger("langchain_obs")
class ProductionObservabilityCallback(BaseCallbackHandler):
"""Emit structured logs for every LLM call and tool invocation."""
def __init__(self, service_name: str, environment: str):
self.service = service_name
self.env = environment
self._call_starts: dict = {}
def _log(self, event: str, data: dict) -> None:
logger.info(json.dumps({
"service": self.service,
"env": self.env,
"event": event,
"ts": time.time(),
**data,
}))
def on_llm_start(self, serialized: dict, prompts: list, run_id, **kwargs) -> None:
self._call_starts[str(run_id)] = time.time()
self._log("llm_start", {
"model": serialized.get("kwargs", {}).get("model_name"),
"prompt_tokens_estimate": sum(len(p) // 4 for p in prompts),
"run_id": str(run_id),
})
def on_llm_end(self, response: LLMResult, run_id, **kwargs) -> None:
start = self._call_starts.pop(str(run_id), time.time())
usage = response.llm_output.get("token_usage", {})
self._log("llm_end", {
"latency_ms": round((time.time() - start) * 1000),
"prompt_tokens": usage.get("prompt_tokens"),
"completion_tokens": usage.get("completion_tokens"),
"run_id": str(run_id),
})
def on_llm_error(self, error: Exception, run_id, **kwargs) -> None:
self._log("llm_error", {
"error_type": type(error).__name__,
"error_msg": str(error)[:200],
"run_id": str(run_id),
})
def on_tool_start(self, serialized: dict, input_str: str, **kwargs) -> None:
self._log("tool_call", {"tool": serialized.get("name"), "input_chars": len(input_str)})
def on_tool_error(self, error: Exception, **kwargs) -> None:
self._log("tool_error", {"error_type": type(error).__name__, "error_msg": str(error)[:200]})
# Attach at request time
def invoke_with_observability(chain, inputs: dict, session_id: str) -> dict:
callback = ProductionObservabilityCallback(
service_name="clinical-pharmacist",
environment="production",
)
return chain.invoke(
inputs,
config=RunnableConfig(
callbacks=[callback],
metadata={"session_id": session_id},
),
)Metrics to expose (as Prometheus counters/histograms):
llm_calls_total— by model, status (success/error)llm_latency_ms— histogram by modelllm_tokens_total— input + output token countsllm_cost_usd— computed from token counts × pricetool_calls_total— by tool name, status
LangSmith in parallel: Enable LANGCHAIN_TRACING_V2=true for full trace replay. Structured logs give real-time metrics dashboards; LangSmith gives step-by-step debugging.
Q2: Your LLM cost is 10x higher than expected after a traffic spike. How do you diagnose and fix it?
Answer:
The $40K-instead-of-$4K problem. Work through the layers:
# Step 1: Identify which component is expensive
class CostBreakdownCallback(BaseCallbackHandler):
def __init__(self):
self.calls: list[dict] = []
self._start: dict = {}
self._model: dict = {}
def on_llm_start(self, serialized, prompts, run_id, **kwargs):
self._start[str(run_id)] = time.time()
self._model[str(run_id)] = serialized.get("kwargs", {}).get("model_name", "unknown")
def on_llm_end(self, response, run_id, **kwargs):
usage = response.llm_output.get("token_usage", {})
self.calls.append({
"model": self._model.pop(str(run_id), "?"),
"latency_ms": round((time.time() - self._start.pop(str(run_id), time.time())) * 1000),
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
})
def breakdown(self) -> dict:
if not self.calls:
return {}
total_input = sum(c["prompt_tokens"] for c in self.calls)
total_output = sum(c["completion_tokens"] for c in self.calls)
by_model = {}
for call in self.calls:
m = call["model"]
by_model.setdefault(m, {"calls": 0, "input_tokens": 0, "output_tokens": 0})
by_model[m]["calls"] += 1
by_model[m]["input_tokens"] += call["prompt_tokens"]
by_model[m]["output_tokens"] += call["completion_tokens"]
return {
"total_llm_calls": len(self.calls),
"total_input_tokens": total_input,
"total_output_tokens": total_output,
"by_model": by_model,
}
# Step 2: Common causes and fixes
COST_EXPLOSION_CAUSES = {
"retry_with_full_context": {
"symptom": "Calls per request >> 1, prompt_tokens same each call",
"fix": "Cap retries at 3, trim context on retry",
},
"context_window_bloat": {
"symptom": "prompt_tokens growing each turn in a conversation",
"fix": "Switch to ConversationBufferWindowMemory(k=6) or SummaryBufferMemory",
},
"wrong_model_routing": {
"symptom": "All requests using gpt-4o when gpt-4o-mini would suffice",
"fix": "Add complexity classifier, route simple queries to cheap model",
},
"cache_miss": {
"symptom": "Identical queries hitting LLM every time",
"fix": "Add exact-match cache with Redis (SHA256 key on prompt)",
},
"retrieval_overhead": {
"symptom": "Many small retrievals each triggering an LLM call",
"fix": "Batch retrievals, cache retrieval results",
},
}
# Step 3: Add model router based on query complexity
from langchain_openai import ChatOpenAI
CHEAP_MODEL = ChatOpenAI(model="gpt-4o-mini", temperature=0)
POWERFUL_MODEL = ChatOpenAI(model="gpt-4o", temperature=0)
def route_by_complexity(question: str):
tokens = len(question.split())
has_complex_terms = any(w in question.lower() for w in ["interaction", "mechanism", "pharmacokinetics", "compare"])
if tokens < 30 and not has_complex_terms:
return CHEAP_MODEL # ~15x cheaper
return POWERFUL_MODEL
# Step 4: Add exact-match cache for repeated queries
import hashlib
import redis
cache = redis.from_url("redis://localhost:6379", decode_responses=True)
def cached_invoke(chain, inputs: dict, ttl: int = 3600) -> str:
key = "llm:" + hashlib.sha256(str(sorted(inputs.items())).encode()).hexdigest()
cached = cache.get(key)
if cached:
return cached
result = chain.invoke(inputs)
cache.setex(key, ttl, result)
return resultQ3: A chain that takes 8 seconds needs to respond in under 2 seconds. How do you approach this?
Answer:
Profile first, then optimize the bottleneck:
import asyncio
# Step 1: Profile each stage
async def profile_chain_stages(question: str) -> dict:
timings = {}
t0 = time.time()
# Stage 1: Retrieval
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
docs = retriever.invoke(question)
timings["retrieval_ms"] = round((time.time() - t0) * 1000)
t1 = time.time()
# Stage 2: Context formatting
context = "\n\n".join(d.page_content for d in docs)
timings["formatting_ms"] = round((time.time() - t1) * 1000)
t2 = time.time()
# Stage 3: LLM call
model = ChatOpenAI(model="gpt-4o", temperature=0)
response = await model.ainvoke(f"Context: {context}\n\nQ: {question}")
timings["llm_ms"] = round((time.time() - t2) * 1000)
timings["total_ms"] = round((time.time() - t0) * 1000)
return timings
# Typical breakdown:
# {'retrieval_ms': 800, 'formatting_ms': 5, 'llm_ms': 6200, 'total_ms': 8005}
# → LLM is the bottleneck (78% of time)
# Fix 1: Cheaper model for simple queries (2-3x faster + cheaper)
from langchain_openai import ChatOpenAI
fast_model = ChatOpenAI(model="gpt-4o-mini", temperature=0) # ~2s vs ~6s for gpt-4o
# Fix 2: Reduce context size (fewer tokens = faster generation)
retriever_small = vectorstore.as_retriever(search_kwargs={"k": 2}) # 4 docs → 2 docs
# Also: truncate each doc to 300 chars instead of full chunk
# Fix 3: Parallelize independent retrieval + formatting
from langchain_core.runnables import RunnableParallel
parallel_chain = RunnableParallel(
drug_info=drug_info_retriever | format_docs,
interaction_info=interaction_retriever | format_docs,
) | combine_and_answer_chain
# Both retrievals happen simultaneously
# Fix 4: Stream the response (user sees output in under 1 second)
async def stream_fast_response(question: str):
async for chunk in rag_chain.astream({"question": question}):
print(chunk, end="", flush=True)
# User reads while generation continues
# Perceived latency: ~500ms to first token
# Fix 5: Semantic cache (instant for repeated/similar queries)
from langchain_community.cache import InMemoryCache
from langchain.globals import set_llm_cache
set_llm_cache(InMemoryCache()) # Exact match cache — first call normal, subsequent instantLatency optimization priority:
| Optimization | Effort | Impact | |---|---|---| | Stream responses | Low | High (perceived latency drops 80%) | | Exact-match cache | Low | High (repeated queries: 0ms) | | Switch to gpt-4o-mini for simple queries | Low | Medium (2-3x faster, 15x cheaper) | | Reduce k (fewer retrieved docs) | Low | Medium (less context = faster generation) | | Parallelize retrievals | Medium | Medium (if multiple sources) | | Semantic cache | Medium | Medium (similar queries: instant) |
Q4: How do you isolate multiple tenants in a shared LangChain deployment?
Answer:
Multi-tenant isolation requires isolation at three layers: data, conversation state, and rate limits.
from dataclasses import dataclass, field
from collections import defaultdict
import time
@dataclass
class TenantConfig:
tenant_id: str
model: str = "gpt-4o-mini" # Different tenants can have different models
max_tokens_per_minute: int = 10_000
vector_collection: str = "" # Tenant-specific vector store namespace
allowed_tools: list[str] = field(default_factory=list)
def __post_init__(self):
if not self.vector_collection:
self.vector_collection = f"tenant_{self.tenant_id}"
class MultiTenantRAGPlatform:
"""Isolated RAG pipeline per tenant."""
def __init__(self, embeddings, base_vectorstore_path: str):
self.embeddings = embeddings
self.base_path = base_vectorstore_path
self._tenant_stores: dict = {}
self._tenant_histories: dict = defaultdict(list)
self._token_usage: dict = defaultdict(lambda: {"tokens": 0, "window_start": time.time()})
def _get_vectorstore(self, tenant: TenantConfig):
"""Each tenant has their own isolated vector collection."""
if tenant.tenant_id not in self._tenant_stores:
self._tenant_stores[tenant.tenant_id] = Chroma(
collection_name=tenant.vector_collection,
embedding_function=self.embeddings,
persist_directory=f"{self.base_path}/{tenant.tenant_id}",
)
return self._tenant_stores[tenant.tenant_id]
def _check_rate_limit(self, tenant: TenantConfig, estimated_tokens: int) -> None:
usage = self._token_usage[tenant.tenant_id]
# Reset window if 60 seconds have passed
if time.time() - usage["window_start"] > 60:
usage["tokens"] = 0
usage["window_start"] = time.time()
if usage["tokens"] + estimated_tokens > tenant.max_tokens_per_minute:
raise Exception(f"Rate limit exceeded for tenant {tenant.tenant_id}")
usage["tokens"] += estimated_tokens
def query(self, tenant: TenantConfig, session_id: str, question: str) -> str:
# Rate limiting
estimated_tokens = len(question.split()) + 500 # Rough estimate
self._check_rate_limit(tenant, estimated_tokens)
# Tenant-specific retriever (isolated data)
vectorstore = self._get_vectorstore(tenant)
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
# Tenant-specific model
model = ChatOpenAI(model=tenant.model, temperature=0)
# Tenant-specific session history (isolated per tenant+session)
history_key = f"{tenant.tenant_id}:{session_id}"
history = self._tenant_histories[history_key][-10:] # Last 5 turns
# Build and invoke chain
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_messages([
("system", "Answer using the retrieved context only."),
*[(msg["role"], msg["content"]) for msg in history],
("human", "{question}"),
])
context_docs = retriever.invoke(question)
context = "\n\n".join(d.page_content for d in context_docs)
chain = prompt | model
response = chain.invoke({"question": question, "context": context})
answer = response.content
# Update session history
self._tenant_histories[history_key].extend([
{"role": "human", "content": question},
{"role": "assistant", "content": answer},
])
return answerQ5: Design a production clinical AI assistant platform. Walk through the architecture.
Answer:
A production clinical AI platform needs: reliable retrieval, audit trails, safety filters, cost control, and regulatory compliance awareness.
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
import logging
import time
logger = logging.getLogger("clinical_platform")
# --- Layer 1: Knowledge Base ---
# Chunked, embedded clinical guidelines, drug monographs, interaction databases
# Metadata: drug_name, category, source, version, effective_date
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
knowledge_base = Chroma(
collection_name="clinical_knowledge",
embedding_function=embeddings,
persist_directory="./clinical_kb",
)
# --- Layer 2: Safety Filter ---
BLOCKED_PATTERNS = [
"how to overdose", "lethal dose of", "maximum fatal dose",
"suicide", "harm patient",
]
def safety_check(query: str) -> None:
lower = query.lower()
for pattern in BLOCKED_PATTERNS:
if pattern in lower:
raise PermissionError(f"Query blocked by safety filter: '{pattern}'")
if len(query) > 2000:
raise ValueError("Query exceeds maximum length of 2000 characters")
# --- Layer 3: Tools ---
@tool
def search_clinical_knowledge(query: str) -> str:
"""Search clinical guidelines, drug monographs, and pharmacology references."""
docs = knowledge_base.similarity_search(query, k=4)
if not docs:
return "No relevant clinical information found. Please consult primary references."
return "\n\n".join(
f"[{doc.metadata.get('source', 'reference')}]: {doc.page_content}"
for doc in docs
)
@tool
def check_drug_interaction(drug_a: str, drug_b: str) -> str:
"""Check for clinically significant interactions between two drugs."""
query = f"{drug_a} {drug_b} drug interaction severity mechanism"
return search_clinical_knowledge.invoke(query)
# --- Layer 4: Agent ---
clinical_tools = [search_clinical_knowledge, check_drug_interaction]
clinical_prompt = ChatPromptTemplate.from_messages([
("system",
"You are a clinical decision support assistant for qualified healthcare professionals. "
"Use your tools to retrieve accurate drug information. "
"Always include: mechanism, clinical significance, and recommended action. "
"End every response with: 'Verify with Lexicomp/Micromedex and clinical judgment before prescribing.' "
"Never provide information outside your retrieved context."),
("placeholder", "{chat_history}"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
agent = create_tool_calling_agent(
ChatOpenAI(model="gpt-4o", temperature=0),
clinical_tools,
clinical_prompt,
)
executor = AgentExecutor(
agent=agent,
tools=clinical_tools,
max_iterations=6,
max_execution_time=25.0,
handle_parsing_errors=True,
early_stopping_method="generate",
return_intermediate_steps=True,
verbose=False,
)
# --- Layer 5: Request handler with full guardrails ---
def clinical_query(
question: str,
session_id: str,
user_id: str,
user_role: str = "pharmacist",
) -> dict:
start = time.time()
try:
# Safety gate
safety_check(question)
# Invoke with audit metadata
from langchain_core.runnables import RunnableConfig
result = executor.invoke(
{"input": question, "chat_history": []},
config=RunnableConfig(
tags=["clinical", user_role],
metadata={"session_id": session_id, "user_id": user_id},
),
)
latency_ms = round((time.time() - start) * 1000)
tool_calls = result.get("intermediate_steps", [])
logger.info(json.dumps({
"event": "clinical_query_success",
"session_id": session_id,
"user_id": user_id,
"user_role": user_role,
"latency_ms": latency_ms,
"tool_calls": len(tool_calls),
"tools_used": [a.tool for a, _ in tool_calls],
}))
return {
"answer": result["output"],
"session_id": session_id,
"latency_ms": latency_ms,
"tools_consulted": len(tool_calls),
"success": True,
"disclaimer": "For qualified healthcare professional use only. Always verify with primary clinical references.",
}
except PermissionError as e:
logger.warning(json.dumps({"event": "query_blocked", "reason": str(e), "user_id": user_id}))
return {"answer": "This query cannot be processed.", "success": False, "blocked": True}
except Exception as e:
logger.error(json.dumps({"event": "query_error", "error": str(e), "session_id": session_id}))
return {
"answer": "Unable to process this query. Please consult clinical references directly.",
"success": False,
}Architecture checklist:
| Requirement | Implementation | |---|---| | Audit trail | Structured JSON logs with session_id, user_id, tools_used | | Safety filter | Pattern matching before any LLM call | | Data isolation | Chroma collection per tenant/department | | Cost control | Max iterations cap, cheap model for simple queries, caching | | Latency SLA | 25s hard timeout, stream for perceived speed | | Regulatory | Disclaimer appended to all responses, no unsourced claims | | Fallback | Graceful error messages, never expose raw exceptions | | Observability | LangSmith tracing + structured logs + metrics |