Writing Node Functions
Master the LangGraph node function signature — reading state, returning partial updates, calling LLMs inside nodes, error handling, and complete real-world examples.
Writing Node Functions
In LangGraph, a node is a Python function. It is the basic unit of work in your agent. Every node receives the current state, does something useful (call an LLM, query a database, run a calculation, call a tool), and returns a dictionary with only the state keys it wants to update.
Getting nodes right is the foundation of every working LangGraph agent.
The Node Signature
Every node function follows the same pattern:
def my_node(state: MyState) -> dict:
# Read from state
# Do work
# Return only changed keys
return {"some_key": new_value}- One argument: always the state object (typed with your
TypedDictor Pydantic model) - Return a dict: include only the keys you changed — missing keys are left unchanged
- Any Python is allowed: call external APIs, query databases, run tools, import libraries
Reading from State
Access state fields like a regular Python dict:
class AgentState(TypedDict):
query: str
history: list[str]
result: str
retry_count: int
def my_node(state: AgentState) -> dict:
# Direct access (raises KeyError if missing)
query = state["query"]
# Safe access with default (use when field may be None or missing)
history = state.get("history", [])
retry = state.get("retry_count", 0)
return {"result": f"Processed: {query}"}When you define your state with TypedDict, your IDE will show autocomplete and type errors for typos like state["queyr"].
Returning Partial Updates
Nodes return only the keys they changed. LangGraph merges the returned dict into the full state:
class PipelineState(TypedDict):
query: str
docs: list[str]
answer: str
formatted: str
metadata: dict
def retrieve_node(state: PipelineState) -> dict:
docs = fetch_documents(state["query"])
# Only update 'docs' — query, answer, formatted, metadata unchanged
return {"docs": docs}
def generate_node(state: PipelineState) -> dict:
answer = call_llm(state["query"], state["docs"])
# Only update 'answer'
return {"answer": answer}
def format_node(state: PipelineState) -> dict:
formatted = f"**Answer:** {state['answer']}"
meta = {"length": len(state["answer"]), "doc_count": len(state["docs"])}
# Update two keys
return {"formatted": formatted, "metadata": meta}Calling an LLM Inside a Node
The most common pattern: call an LLM and store its output in state.
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from typing import TypedDict, Optional
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
class SummarizationState(TypedDict):
text: str
summary: Optional[str]
key_points: Optional[list[str]]
def summarize_node(state: SummarizationState) -> dict:
"""Summarize a long text."""
messages = [
SystemMessage(content="You are a precise summarization assistant."),
HumanMessage(content=f"Summarize this text in 3-4 sentences:\n\n{state['text']}"),
]
response = llm.invoke(messages)
return {"summary": response.content}
def extract_key_points(state: SummarizationState) -> dict:
"""Extract key points from the summary."""
messages = [
SystemMessage(content="Extract the 3 most important points. Return as a JSON list of strings."),
HumanMessage(content=state["summary"]),
]
response = llm.invoke(messages)
import json
try:
points = json.loads(response.content)
except json.JSONDecodeError:
# Fallback: split by newline if JSON fails
points = [line.strip("- ").strip() for line in response.content.split("\n") if line.strip()]
return {"key_points": points}Node Error Handling
Nodes should handle errors gracefully and update state to reflect problems:
class RobustState(TypedDict):
query: str
result: Optional[str]
error: Optional[str]
retry_count: int
def safe_llm_node(state: RobustState) -> dict:
"""LLM node with error handling and retry tracking."""
try:
response = llm.invoke(state["query"])
return {
"result": response.content,
"error": None, # clear any previous error
}
except Exception as e:
error_msg = f"LLM call failed: {type(e).__name__}: {str(e)}"
print(f"[error] {error_msg}")
return {
"result": None,
"error": error_msg,
"retry_count": state.get("retry_count", 0) + 1,
}
def route_after_llm(state: RobustState) -> str:
"""Route to retry or finish based on error state."""
if state.get("error") and state.get("retry_count", 0) < 3:
return "safe_llm_node" # retry
elif state.get("error"):
return "error_handler" # too many retries
return "done"Complete Example: Three-Node RAG Pipeline
Here is a full retrieve → generate → format pipeline with real LangGraph wiring:
import os
import json
from typing import TypedDict, Optional, Annotated
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
os.environ["OPENAI_API_KEY"] = "sk-..."
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
class RAGState(TypedDict):
query: str
retrieved_docs: Optional[list[str]]
raw_answer: Optional[str]
formatted_answer: Optional[str]
source_count: int
messages: Annotated[list[BaseMessage], add_messages]
# ── Node 1: Retrieve ──────────────────────────────────────────────────────────
def retrieve_node(state: RAGState) -> dict:
"""Retrieve relevant documents for the query."""
query = state["query"]
# In production, use a real retriever:
# docs = retriever.invoke(query)
# For this example, simulate retrieval:
simulated_docs = [
f"Technical overview: {query} involves several architectural components...",
f"Implementation guide: To implement {query}, start by setting up...",
f"Best practices: When working with {query}, always consider...",
]
print(f"[retrieve] Query: '{query[:50]}' | Found: {len(simulated_docs)} docs")
return {
"retrieved_docs": simulated_docs,
"source_count": len(simulated_docs),
"messages": [HumanMessage(content=query)],
}
# ── Node 2: Generate ──────────────────────────────────────────────────────────
def generate_node(state: RAGState) -> dict:
"""Generate an answer from retrieved documents."""
docs = state.get("retrieved_docs") or []
query = state["query"]
if not docs:
return {
"raw_answer": "No relevant documents found. I cannot answer this question reliably.",
}
context = "\n\n".join([f"Document {i+1}:\n{doc}" for i, doc in enumerate(docs)])
messages = [
SystemMessage(
content=(
"You are a knowledgeable assistant. Use only the provided context to answer "
"the question. If the context doesn't contain enough information, say so clearly."
)
),
HumanMessage(
content=f"Context:\n{context}\n\nQuestion: {query}\n\nAnswer:"
),
]
response = llm.invoke(messages)
print(f"[generate] Answer length: {len(response.content)} chars")
return {
"raw_answer": response.content,
"messages": [AIMessage(content=response.content)],
}
# ── Node 3: Format ────────────────────────────────────────────────────────────
def format_node(state: RAGState) -> dict:
"""Format the answer for display."""
answer = state.get("raw_answer", "No answer generated.")
source_count = state.get("source_count", 0)
query = state["query"]
formatted = (
f"## Answer\n\n"
f"**Question:** {query}\n\n"
f"{answer}\n\n"
f"---\n"
f"*Sources consulted: {source_count} document(s)*"
)
print(f"[format] Final output: {len(formatted)} chars")
return {"formatted_answer": formatted}
# ── Graph Construction ────────────────────────────────────────────────────────
builder = StateGraph(RAGState)
builder.add_node("retrieve", retrieve_node)
builder.add_node("generate", generate_node)
builder.add_node("format", format_node)
builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "generate")
builder.add_edge("generate", "format")
builder.add_edge("format", END)
app = builder.compile()
# ── Run the Graph ─────────────────────────────────────────────────────────────
result = app.invoke({
"query": "What are the main benefits of using LangGraph for AI agents?",
"retrieved_docs": None,
"raw_answer": None,
"formatted_answer": None,
"source_count": 0,
"messages": [],
})
print("\n" + result["formatted_answer"])Async Node Functions
For I/O-heavy operations (API calls, database queries), use async nodes:
import asyncio
import aiohttp
from typing import TypedDict, Optional
class AsyncState(TypedDict):
url: str
content: Optional[str]
summary: Optional[str]
async def fetch_node(state: AsyncState) -> dict:
"""Fetch content from a URL asynchronously."""
async with aiohttp.ClientSession() as session:
async with session.get(state["url"]) as response:
content = await response.text()
return {"content": content[:5000]} # First 5000 chars
async def summarize_async_node(state: AsyncState) -> dict:
"""Async LLM call."""
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
response = await llm.ainvoke(f"Summarize: {state['content']}")
return {"summary": response.content}
# For async nodes, use astream or ainvoke
async def run():
builder = StateGraph(AsyncState)
builder.add_node("fetch", fetch_node)
builder.add_node("summarize", summarize_async_node)
builder.add_edge(START, "fetch")
builder.add_edge("fetch", "summarize")
builder.add_edge("summarize", END)
app = builder.compile()
result = await app.ainvoke({"url": "https://example.com", "content": None, "summary": None})
print(result["summary"])
asyncio.run(run())Node Best Practices
Keep nodes focused. Each node should do one thing. "Retrieve documents" is one node; "Generate answer" is another. Mixing concerns makes nodes hard to test and reuse.
Log what each node does. Add a print or log statement showing what the node received and returned. This is invaluable when debugging graph execution.
Use .get() for optional fields. If a field might be None (because an upstream node hasn't set it yet), use state.get("field", default) rather than state["field"].
Test nodes in isolation. Because nodes are plain functions, you can test them without running the full graph:
def test_generate_node():
state = {
"query": "What is LangGraph?",
"retrieved_docs": ["LangGraph is a framework for building stateful agents..."],
"raw_answer": None,
"formatted_answer": None,
"source_count": 1,
"messages": [],
}
result = generate_node(state)
assert "raw_answer" in result
assert len(result["raw_answer"]) > 10
test_generate_node()
print("test passed")Summary
| Aspect | Rule |
|---|---|
| Signature | def node(state: MyState) -> dict |
| Reading state | state["key"] or state.get("key", default) |
| Returning updates | Dict with only changed keys |
| Mutating state | Never — always return new values |
| Error handling | Catch exceptions, update error fields in state |
| Async nodes | Use async def and await — pair with ainvoke/astream |
| Testing | Call node function directly with a fake state dict |
Node functions are plain Python. That is their strength — they are easy to test, easy to reason about, and composable into any graph topology you can imagine.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.