Learnixo
Back to blog
AI Systemsintermediate

Writing Node Functions

Master the LangGraph node function signature — reading state, returning partial updates, calling LLMs inside nodes, error handling, and complete real-world examples.

Asma Hafeez KhanMay 15, 20267 min read
LangGraphAI AgentsNode FunctionsLLMPython
Share:𝕏

Writing Node Functions

In LangGraph, a node is a Python function. It is the basic unit of work in your agent. Every node receives the current state, does something useful (call an LLM, query a database, run a calculation, call a tool), and returns a dictionary with only the state keys it wants to update.

Getting nodes right is the foundation of every working LangGraph agent.


The Node Signature

Every node function follows the same pattern:

Python
def my_node(state: MyState) -> dict:
    # Read from state
    # Do work
    # Return only changed keys
    return {"some_key": new_value}
  • One argument: always the state object (typed with your TypedDict or Pydantic model)
  • Return a dict: include only the keys you changed — missing keys are left unchanged
  • Any Python is allowed: call external APIs, query databases, run tools, import libraries

Reading from State

Access state fields like a regular Python dict:

Python
class AgentState(TypedDict):
    query: str
    history: list[str]
    result: str
    retry_count: int

def my_node(state: AgentState) -> dict:
    # Direct access (raises KeyError if missing)
    query = state["query"]

    # Safe access with default (use when field may be None or missing)
    history = state.get("history", [])
    retry = state.get("retry_count", 0)

    return {"result": f"Processed: {query}"}

When you define your state with TypedDict, your IDE will show autocomplete and type errors for typos like state["queyr"].


Returning Partial Updates

Nodes return only the keys they changed. LangGraph merges the returned dict into the full state:

Python
class PipelineState(TypedDict):
    query: str
    docs: list[str]
    answer: str
    formatted: str
    metadata: dict

def retrieve_node(state: PipelineState) -> dict:
    docs = fetch_documents(state["query"])
    # Only update 'docs'  query, answer, formatted, metadata unchanged
    return {"docs": docs}

def generate_node(state: PipelineState) -> dict:
    answer = call_llm(state["query"], state["docs"])
    # Only update 'answer'
    return {"answer": answer}

def format_node(state: PipelineState) -> dict:
    formatted = f"**Answer:** {state['answer']}"
    meta = {"length": len(state["answer"]), "doc_count": len(state["docs"])}
    # Update two keys
    return {"formatted": formatted, "metadata": meta}

Calling an LLM Inside a Node

The most common pattern: call an LLM and store its output in state.

Python
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from typing import TypedDict, Optional

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

class SummarizationState(TypedDict):
    text: str
    summary: Optional[str]
    key_points: Optional[list[str]]

def summarize_node(state: SummarizationState) -> dict:
    """Summarize a long text."""
    messages = [
        SystemMessage(content="You are a precise summarization assistant."),
        HumanMessage(content=f"Summarize this text in 3-4 sentences:\n\n{state['text']}"),
    ]
    response = llm.invoke(messages)
    return {"summary": response.content}

def extract_key_points(state: SummarizationState) -> dict:
    """Extract key points from the summary."""
    messages = [
        SystemMessage(content="Extract the 3 most important points. Return as a JSON list of strings."),
        HumanMessage(content=state["summary"]),
    ]
    response = llm.invoke(messages)

    import json
    try:
        points = json.loads(response.content)
    except json.JSONDecodeError:
        # Fallback: split by newline if JSON fails
        points = [line.strip("- ").strip() for line in response.content.split("\n") if line.strip()]

    return {"key_points": points}

Node Error Handling

Nodes should handle errors gracefully and update state to reflect problems:

Python
class RobustState(TypedDict):
    query: str
    result: Optional[str]
    error: Optional[str]
    retry_count: int

def safe_llm_node(state: RobustState) -> dict:
    """LLM node with error handling and retry tracking."""
    try:
        response = llm.invoke(state["query"])
        return {
            "result": response.content,
            "error": None,  # clear any previous error
        }
    except Exception as e:
        error_msg = f"LLM call failed: {type(e).__name__}: {str(e)}"
        print(f"[error] {error_msg}")
        return {
            "result": None,
            "error": error_msg,
            "retry_count": state.get("retry_count", 0) + 1,
        }

def route_after_llm(state: RobustState) -> str:
    """Route to retry or finish based on error state."""
    if state.get("error") and state.get("retry_count", 0) < 3:
        return "safe_llm_node"   # retry
    elif state.get("error"):
        return "error_handler"   # too many retries
    return "done"

Complete Example: Three-Node RAG Pipeline

Here is a full retrieve → generate → format pipeline with real LangGraph wiring:

Python
import os
import json
from typing import TypedDict, Optional, Annotated
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

os.environ["OPENAI_API_KEY"] = "sk-..."

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

class RAGState(TypedDict):
    query: str
    retrieved_docs: Optional[list[str]]
    raw_answer: Optional[str]
    formatted_answer: Optional[str]
    source_count: int
    messages: Annotated[list[BaseMessage], add_messages]

# ── Node 1: Retrieve ──────────────────────────────────────────────────────────

def retrieve_node(state: RAGState) -> dict:
    """Retrieve relevant documents for the query."""
    query = state["query"]

    # In production, use a real retriever:
    # docs = retriever.invoke(query)
    # For this example, simulate retrieval:
    simulated_docs = [
        f"Technical overview: {query} involves several architectural components...",
        f"Implementation guide: To implement {query}, start by setting up...",
        f"Best practices: When working with {query}, always consider...",
    ]

    print(f"[retrieve] Query: '{query[:50]}' | Found: {len(simulated_docs)} docs")
    return {
        "retrieved_docs": simulated_docs,
        "source_count": len(simulated_docs),
        "messages": [HumanMessage(content=query)],
    }

# ── Node 2: Generate ──────────────────────────────────────────────────────────

def generate_node(state: RAGState) -> dict:
    """Generate an answer from retrieved documents."""
    docs = state.get("retrieved_docs") or []
    query = state["query"]

    if not docs:
        return {
            "raw_answer": "No relevant documents found. I cannot answer this question reliably.",
        }

    context = "\n\n".join([f"Document {i+1}:\n{doc}" for i, doc in enumerate(docs)])

    messages = [
        SystemMessage(
            content=(
                "You are a knowledgeable assistant. Use only the provided context to answer "
                "the question. If the context doesn't contain enough information, say so clearly."
            )
        ),
        HumanMessage(
            content=f"Context:\n{context}\n\nQuestion: {query}\n\nAnswer:"
        ),
    ]

    response = llm.invoke(messages)
    print(f"[generate] Answer length: {len(response.content)} chars")

    return {
        "raw_answer": response.content,
        "messages": [AIMessage(content=response.content)],
    }

# ── Node 3: Format ────────────────────────────────────────────────────────────

def format_node(state: RAGState) -> dict:
    """Format the answer for display."""
    answer = state.get("raw_answer", "No answer generated.")
    source_count = state.get("source_count", 0)
    query = state["query"]

    formatted = (
        f"## Answer\n\n"
        f"**Question:** {query}\n\n"
        f"{answer}\n\n"
        f"---\n"
        f"*Sources consulted: {source_count} document(s)*"
    )

    print(f"[format] Final output: {len(formatted)} chars")
    return {"formatted_answer": formatted}

# ── Graph Construction ────────────────────────────────────────────────────────

builder = StateGraph(RAGState)

builder.add_node("retrieve", retrieve_node)
builder.add_node("generate", generate_node)
builder.add_node("format", format_node)

builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "generate")
builder.add_edge("generate", "format")
builder.add_edge("format", END)

app = builder.compile()

# ── Run the Graph ─────────────────────────────────────────────────────────────

result = app.invoke({
    "query": "What are the main benefits of using LangGraph for AI agents?",
    "retrieved_docs": None,
    "raw_answer": None,
    "formatted_answer": None,
    "source_count": 0,
    "messages": [],
})

print("\n" + result["formatted_answer"])

Async Node Functions

For I/O-heavy operations (API calls, database queries), use async nodes:

Python
import asyncio
import aiohttp
from typing import TypedDict, Optional

class AsyncState(TypedDict):
    url: str
    content: Optional[str]
    summary: Optional[str]

async def fetch_node(state: AsyncState) -> dict:
    """Fetch content from a URL asynchronously."""
    async with aiohttp.ClientSession() as session:
        async with session.get(state["url"]) as response:
            content = await response.text()
    return {"content": content[:5000]}  # First 5000 chars

async def summarize_async_node(state: AsyncState) -> dict:
    """Async LLM call."""
    from langchain_openai import ChatOpenAI
    llm = ChatOpenAI(model="gpt-4o-mini")
    response = await llm.ainvoke(f"Summarize: {state['content']}")
    return {"summary": response.content}

# For async nodes, use astream or ainvoke
async def run():
    builder = StateGraph(AsyncState)
    builder.add_node("fetch", fetch_node)
    builder.add_node("summarize", summarize_async_node)
    builder.add_edge(START, "fetch")
    builder.add_edge("fetch", "summarize")
    builder.add_edge("summarize", END)
    app = builder.compile()

    result = await app.ainvoke({"url": "https://example.com", "content": None, "summary": None})
    print(result["summary"])

asyncio.run(run())

Node Best Practices

Keep nodes focused. Each node should do one thing. "Retrieve documents" is one node; "Generate answer" is another. Mixing concerns makes nodes hard to test and reuse.

Log what each node does. Add a print or log statement showing what the node received and returned. This is invaluable when debugging graph execution.

Use .get() for optional fields. If a field might be None (because an upstream node hasn't set it yet), use state.get("field", default) rather than state["field"].

Test nodes in isolation. Because nodes are plain functions, you can test them without running the full graph:

Python
def test_generate_node():
    state = {
        "query": "What is LangGraph?",
        "retrieved_docs": ["LangGraph is a framework for building stateful agents..."],
        "raw_answer": None,
        "formatted_answer": None,
        "source_count": 1,
        "messages": [],
    }
    result = generate_node(state)
    assert "raw_answer" in result
    assert len(result["raw_answer"]) > 10

test_generate_node()
print("test passed")

Summary

| Aspect | Rule | |---|---| | Signature | def node(state: MyState) -> dict | | Reading state | state["key"] or state.get("key", default) | | Returning updates | Dict with only changed keys | | Mutating state | Never — always return new values | | Error handling | Catch exceptions, update error fields in state | | Async nodes | Use async def and await — pair with ainvoke/astream | | Testing | Call node function directly with a fake state dict |

Node functions are plain Python. That is their strength — they are easy to test, easy to reason about, and composable into any graph topology you can imagine.

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.