一、LangGraph概述

LangGraph 受到许多正在塑造智能体未来的公司信任,包括 Klarna、Uber、J.P. Morgan 等。它是一个底层的编排框架和运行时,用来构建、管理和部署长期运行、带状态的智能体。LangGraph 非常偏底层,重点关注智能体编排所需的底层能力,例如:持久化执行、流式输出、人在回路中参与,以及更多相关能力。

官方源码地址:https://github.com/langchain-ai/langgraph

官方文档地址:https://docs.langchain.com/oss/python/langgraph/overview

1.1 什么是LangGraph

LangGraph 是用图的方式,把一个复杂的工作流拆分成多个步骤,并让这些步骤共享状态、按规则流转

LangGraph 是一个低级编排框架和运行时环境,用于构建、管理和部署长期运行的有效状态智能体(agent)。核心理念是将 Agent 工作流建模为图(Graph),其中:

  • 节点(Nodes):代表计算单元,可以是 LLM 调用、工具执行或任何自定义逻辑

  • 边(Edges):定义节点之间的转换逻辑,决定执行流程

  • 状态(state):在整个图执行过程中共享和传递的数据

image-20260613144146661

LangGraph 提供了构建生产级智能体应用的核心能力:

  • 持久化执行:构建能够从故障中恢复并长时间运行的智能体

  • 人际协作:在任何时刻检查和修改智能体状态

  • 记忆管理:支持短期工作记忆和跨会话的长期记忆

  • 流式处理:专为流式工作流设计

  • 生产级部署:为有状态、长期运行的工作流提供可扩展的基础设施

1.2 与 LangChain 的区别

LangGraph 与 langchain 的高级抽象不同,它提供了更细粒度的控制,让开发者能够精确控制智能体的执行流程,适合需要定制化编排的复杂应用场景。

LangChain 更像是把一些 AI 能力串起来;

LangGraph 更适合做 有状态、可分支、可循环、多步骤 的复杂流程。

一句话记:

简单调用模型,用 LangChain 就够;复杂流程控制,用 LangGraph 更合适。

以下表格以更细粒度的方式展示了两者的区别:

特性

LangGraph

LangChain

抽象级别

低级,提供细粒度控制

高级,开箱即用

状态管理

内置状态机和检查点

需要自行管理状态

执行模型

基于图的并行执行

线性链式执行

持久化

原生支持

需要额外实现

适用场景

复杂、有状态的智能体应用开发

简单的链式调用

正是因为LangGraph和LangChain上述的区别,使得LangGraph在以下场景下,有了更加广泛的应用:

  • 复杂的多智能体系统

  • 需要长期记忆的应用

  • 需要人工审核的工作流

  • 后台处理任务和实时交互

  • 需要精细控制的定制化智能体编排

二、入门案例

一个最基础的 LangGraph 程序通常由 5 步组成。

  1. 定义 State

  2. 定义 Node 函数

  3. 创建 StateGraph

  4. 添加节点和边

  5. 编译并调用 graph

2.1 环境安装

  1. 创建环境

 uv venv langgraph_venv --python 3.12
 source langgraph_venv/bin/activate

 uv pip install langgraph==1.0.5
  1. 确认是否成功

 uv pip show langgraph

2.2 案例

比如一个 AI 问答系统,不是简单地问大模型一句话就结束,而是可能要:

  1. 接收用户问题

  2. 去知识库查资料

  3. 去网络搜索资料

  4. 把两边结果交给大模型总结

  5. 输出最终答案

在 LangGraph 里:

Node 节点:每一个具体步骤,比如“知识库检索”是一个节点,“联网搜索”是一个节点,“总结回答”也是一个节点。

Edge 边:步骤之间怎么走,比如先从开始走到“知识库检索”和“联网搜索”,再走到“总结回答”。

State 状态:整个流程里共享的数据,比如用户问题、知识库结果、搜索结果、最终答案,都放在 State 里传来传去。

flowchart TD
    A[用户问题] --> B[知识库检索]
    A[用户问题] --> C[联网搜索]

    B --> D[结果汇总生成最终答案]
    C --> D[结果汇总生成最终答案]
    D --> F[结束]

具体代码如下:

# 使用 LangGraph 构建一个应用,
from typing import TypedDict
from langgraph.graph import StateGraph
from langgraph.constants import START, END


# 1. 定义状态
class MyAgentState(TypedDict):
    query: str
    rag_search: str
    web_search: str
    llm_answer: str


# 2. 定义节点
def rag_node(state: MyAgentState):
    # 1. 从state中获取query
    query = state["query"]
    # 2.模拟rag检索
    rag_search = f"根据用户问题: {query},从知识库中检索到的相关内容"
    # 3. 将rag_search添加到state中
    return {"rag_search": rag_search}


def web_search_node(state: MyAgentState):
    # 1. 从state中获取query
    query = state["query"]
    # 2.模拟web检索
    web_search = f"根据用户问题: {query},从互联网上检索到的相关内容"
    # 3. 将web_search添加到state中
    return {"web_search": web_search}


def llm_node(state: MyAgentState):
    # 1. 从state中获取query
    rag_search = state["rag_search"]
    web_search = state["web_search"]

    # 2. 模拟llm调用,基于两路检索,生成最终答案
    llm_answer = f"根据用户问题:,根据 rag_search{rag_search} 和 web_search{web_search} 生成的答案"
    # 3. 将llm_answer添加到state中
    return {"llm_answer": llm_answer}


# 3. 构建builder,将节点和边添加到builder中
builder = StateGraph(state_schema=MyAgentState)
# 添加节点
builder.add_node(rag_node)  # add_node 底层会将节点名称设置为函数名
builder.add_node(web_search_node)
builder.add_node(llm_node)
# 添加边 开始 -> 知识库检索 -> 最终回答 -> 结束
builder.add_edge(START, "rag_node")  # 此处添加边时,需要传入节点名称,当前为函数名
builder.add_edge(START, "web_search_node")
builder.add_edge("rag_node", "llm_node")
builder.add_edge("web_search_node", "llm_node")
# 添加END节点的连接
builder.add_edge("llm_node", END)
# 4. 编译图
graph = builder.compile()
# 5. 调用图
res = graph.invoke({"query": "如何使用langgraph"})
print(res)


# 编译好之后,可以通过mermaid查看图的结构。 uv add grandalf
graph_structure = graph.get_graph()
res = graph_structure.draw_ascii()
print(res)

编译好之后,可以通过mermaid查看图的结构。

运行结果

三、状态

需要通过 LangGraph 构建一个应用程序时,第一步就是定义 State ,因为 State 代表了整个图的结果目标,和图当中节点需要修改的数据目标。

3.1 状态的定义

3.3.1 定义状态类

State Schema可以通过三种方式定义:

  • TypedDict(推荐方式): TypedDict是Python提供的一种类型提示工具,用于为字典(Dict)的键和值指定精确的类型信息。状态类继承TypedDict,定义键的类型和reducer函数(具体参看3.1.4 reducer章节内容)即可;
    代码示例如下:

    from typing import TypeDict
    class MyStateFull(TypeDict):
      rag_result: str
      web_search_result: str
      final_answer: str
      query: str
  • Pydantic BaseModel:Pydantic 提供运行时数据校验,并支持静态类型检查工具进行类型推导。状态类通过继承Pydantic的BaseModel,定义键的类型和reducer函数
    代码示例如下:

    from pydantic import BaseModel
    class MyStateFull(BaseModel):
      rag_result: str
      web_search_result: str
      final_answer: str
      query: str
  • Dataclass: dataclass是Python标准库中的一个装饰器,用于自动生成常见特殊方法(如__init__、__repr__、__eq__等),从而简化主要用作数据容器的类的定义。状态类通过dataclass装饰器装饰后,定义键的类型和reducer函数即可。
    代码实例如下:

    from dataclass import dataclass
    @dataclass
    class MyStateFull():
      rag_result: str
      web_search_result: str
      final_answer: str
      query: str

    3.1.2 输入输出数据隔离

    ​ 在LangGraph 当中,可以精细管理输入到图中的状态键有哪些,以及输出的状态键有哪些。这是通过初始化StateGraph时,分别指定三个参数:state_schema、input_schema 和 output_schema 来实现的。
    ​ 其具体作用,可以参考如下图示:

    image-20260614152135078
  1. state_schema
    这是图的完整内部状态,包含了所有节点可能读写的字段,必须指定,不能为空。
    特点:

  • 是图的"全局状态空间";

  • 所有节点都可以访问和写入这个 schema 中的任何字段。

  1. input_schema
    定义图接受什么输入,是 state_schema 的子集
    特点:

  • 可选参数,如果不指定,默认等于 state_schema;

  • 限制图的输入接口,只能传入这些字段;

  • 是 state_schema 的子集或相等。

  1. output_schema
    定义图返回什么输出,是 state_schema 的子集
    特点:

  • 可选参数,如果不指定,默认等于 state_schema;

  • 限制图的输出接口,只返回这些字段;

  • 是 state_schema 的子集或相等。

代码如下所示:

from typing import TypedDict
from langgraph.constants import START, END
from langgraph.graph import StateGraph
class MyStateFull(TypedDict):
    rag_result:str
    web_search_result:str
    final_answer:str
    query:str
    a_new_key:str

class InputSchema(TypedDict):
    query:str

class OutputSchema(TypedDict):
    final_answer:str

graph = StateGraph(state_schema=MyStateFull,input_schema=InputSchema,output_schema=OutputSchema)

def rag_search_node(state:MyStateFull):
    print(state)
    query = state["query"]
    rag_result = f"关于{query}的rag_result"
    return {"rag_result":rag_result,"a_new_key":"a_new_key_value"}

def web_search_node(state:MyStateFull):
    print(state)
    query = state["query"]
    web_search_result = f"关于{query}的web_search_result"
    return {"web_search_result":web_search_result}

def final_answer_node(state:MyStateFull):
    print('在final_answer_node当中的state',state)
    rag_result = state["rag_result"]
    web_search_result = state["web_search_result"]
    final_answer = f"LLM基于{rag_result}和{web_search_result}的最终回复"
    return {"final_answer":final_answer}

graph.add_node(rag_search_node)
graph.add_node(web_search_node)
graph.add_node(final_answer_node)
graph.add_edge(START, "rag_search_node")
graph.add_edge(START,"web_search_node")
graph.add_edge("rag_search_node", "final_answer_node")
graph.add_edge("web_search_node","final_answer_node")
graph.add_edge("final_answer_node", END)
compiled_graph = graph.compile()
res = compiled_graph.invoke({"query":"如何使用LangGraph","a_new_key":"a_new_key_value"})
print('最终结果为',res)

3.1.3 节点间数据隔离

LangGraph当中节点所接收的状态,还可以为一个非全局状态的私有状态。

示例代码如下:

from typing import TypedDict
from pydantic import BaseModel
from langgraph.constants import START, END
from langgraph.graph import StateGraph
class MyState(TypedDict):
    query:str
    final_answer:str

class SearchState(TypedDict):
    rag_result:str
    web_search_result:str

class InputSchema(TypedDict):
    query:str

class OutputSchema(TypedDict):
    final_answer:str

graph = StateGraph(state_schema=MyState)

def rag_search_node(state:MyState):
    print(state)
    query = state["query"]
    rag_result = f"关于{query}的rag_result"
    return {"rag_result":rag_result,"a_new_key":"a_new_key_value"}

def web_search_node(state:MyState):
    query = state["query"]
    web_search_result = f"关于{query}的web_search_result"
    return {"web_search_result":web_search_result}

def final_answer_node(state:SearchState):
    print('在final_answer_node当中的state',state)
    rag_result = state["rag_result"]
    web_search_result = state["web_search_result"]
    final_answer = f"LLM基于{rag_result},和{web_search_result}的最终回复"
    return {"final_answer":final_answer}

graph.add_node(rag_search_node)
graph.add_node(web_search_node)
graph.add_node(final_answer_node)
graph.add_edge(START, "rag_search_node")
graph.add_edge(START,"web_search_node")
graph.add_edge("rag_search_node", "final_answer_node")
graph.add_edge("web_search_node","final_answer_node")
graph.add_edge("final_answer_node", END)
compiled_graph = graph.compile()
res = compiled_graph.invoke({"query":"如何使用LangGraph"})
print('最终结果为',res)

3.1.4 Reducer函数

Reducer用于进行当前增量状态(节点输出的状态)和全局状态的合并。State中的每个键都有其独立的reducer函数。每个node的返回值中的每个key与全局state_schema中对应的key进行合并更新,具体更新逻辑取决于每个key指定的reducer函数

Reducer常用函数有以下几种:

  • 默认行为:未指定Reducer时使用覆盖更新

  • 内置reducer函数:例如langgraph.graph.messages当中的add_messages函数

  • 自定义Reducer:支持用户自定义合并逻辑

如果未明确指定reducer函数,则默认对该键的更新是覆盖行为。以下为默认覆盖行为:

"""
LangGraph Reducer函数演示 - 默认Reducer(覆盖更新)
"""

from typing import List
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

# 1. 默认Reducer(覆盖更新)
class DefaultReducerState(TypedDict):
    foo: int
    bar: List[str]
def node_default_1(state: DefaultReducerState) -> dict:
    return {"foo": 2}
def node_default_2(state: DefaultReducerState) -> dict:
    return {"bar": ["bye"]}
def run_demo():
    print("1. 默认Reducer(覆盖更新)演示:")
    builder = StateGraph(DefaultReducerState)
    builder.add_node("node1", node_default_1)
    builder.add_node("node2", node_default_2)
    builder.add_edge(START, "node1")
    builder.add_edge("node1", "node2")
    builder.add_edge("node2", END)
    graph = builder.compile()
    result = graph.invoke({"foo": 1, "bar": ["hi"]})
    print(f"初始状态: {{'foo': 1, 'bar': ['hi']}}")
    print(f"执行结果: {result}\n")

if __name__ == "__main__":
run_demo()

可以自定义Reducer函数,用于合并操作。自定义Reducer函数接收两个参数,用以将其合并,最终返回一个结果。

例如此处定义一个add_message函数,用于将全局状态当中的message和节点的message进行合并操作:

from langgraph.graph import StateGraph
from langchain_core.messages import ToolMessage,HumanMessage,AIMessage,BaseMessage
from typing import Annotated, TypedDict,List
from langgraph.constants import START

# 自定义Reducer
def add_message(message_list_left:list,message_list_right:list):
    print("="*20)
    print("正在执行add_message")
    print('左边的',message_list_left)
    print('右边的',message_list_right)
    print("="*20)

    return message_list_left + message_list_right

class MyAgent(TypedDict):

    # 在初始使用自定义的reducer
    messages:Annotated[List[BaseMessage],add_message]

def tool_node(state:MyAgent):
    return {"messages":[ToolMessage(content="来自tool_node的内容",tool_call_id="xxx")]}

def llm_node(state:MyAgent):
    return {"messages":[AIMessage(content="来自llm_node的内容")]}

builder = StateGraph(state_schema=MyAgent)

builder.add_node(tool_node)
builder.add_node(llm_node)
builder.add_edge(START,"tool_node")
builder.add_edge("tool_node","llm_node")

graph = builder.compile()
graph.invoke({"messages":[HumanMessage(content="你好")]})

3.2 状态的存储

3.2.1 实际场景中的问题

在前面的例子当中,我们看到,每次用户invoke时,LangGraph都会初始化一个空状态,然后将用户传入的初始状态合并进来,再继续往下执行。

这在一些一次性、简单任务过程中,没有什么问题。但是对于一些复杂任务,就会出现一些问题,考虑以下两个场景

  1. 场景一:在agent的一个会话里,需要在多次调用当中保持上下文

    image-20260615191644881

在这个场景下,我们定义的状态为messages列表,第一次调用过程中,图会在messages列表当中写入值。而第二次调用,又会初始化一个新的messages列表,导致缺失了第一次调用的上下文。

  1. 场景二:图执行过程当中报错,不想重复已执行完节点,想要实现断点续传

    image-20260615191623348

在这个场景下,我们定义的状态为messages列表,第一次调用过程中,图会在messages列表当中写入值。而第二次调用,又会初始化一个新的messages列表,导致缺失了第一次调用的上下文。

3.2.2 状态的存储

前面所涉及到的问题,解决方案都是将第一次调用的结果,存储起来,然后在第二次调用时,从存储的结果当中取出状态,而不是让 LangGraph 去初始化空状态。

由于同一个 agent 回村在多个用户或者单用户多个会话等场景,要实现状态存储,并且后续能够找到正确的状态,就需要在状态当中做一个 id 映射,写入时指定一个 id ,读取时读取指定 id 的状态。

首先需要将状态存储器来,LangGraph 为我们提供了一个存储状态的方式: Checkpointer。即检查点,他可以存储图执行过程中所产生的状态,具体存储的位置,有内存、数据库等方式。checkpointer所有的相关实例化类都来自于 langgraph.checkpointer 包。

LangGraph 同样做了前述的 id 隔离, 这个 id 在 langgraph 当中就称为 thread_id。需要注意:Thread_id 与实际编码当中的线程没有任何关系,只是一种表示隔离状态空间的方式。

实际编码进行隔离状态的步骤,如下所示:

  1. 创建 checkpointer 实例

  2. 图编译时,传入 checkpointer 实例

  3. 图调用时,传递 thread_id 来保证上下文空间,就传入相同的 thread_id ,需要开辟新的上下文空间,就传入新的 thread_id

对于前面所讲到的场景一,要想在一个会话中保持上下文,只需要在多次调用时,传入同一个thread_id即可;要想启动一个新会话,就传入一个新的thread_id。这也就是所谓“Agent短期记忆”。

示例代码如下所示,重点关注内容参看带序号注释:

def scenario_1_demo():
    # 1、create_agent底层就是通过LangGraph构建的图结构,此处直接通过create_agent来举一个例子
    from langchain.agents import create_agent
    from langchain.tools import tool
    from langchain_openai import ChatOpenAI

    # 1、引入checkpointer
    from langgraph.checkpoint.memory import InMemorySaver

    llm = ChatOpenAI(model="gpt-4o-mini")

    # 2、定义checkpointer实例
    checkpointer = InMemorySaver()
    @tool
    def weather_tool(city:str,date:str)->str:
        """查天气的工具"""
        return f'{city}在{date}的天气是晴朗的'
    
    # 3、构建Agent时引入checkpointer
    agent = create_agent(
        model=llm,
        tools=[weather_tool],
        checkpointer=checkpointer,
    )

    # 4、用户第一次调用
    user1_res = agent.invoke({"messages":"北京2026-01-14天气怎么样"},config={"configurable":{"thread_id":"user_1_session_1"}})
    print('第一次调用',user1_res['messages'][-1],end="\n\n==============\n\n")
    
    # 5、用户在同一个会话当中,第二次调用
    user1_res_2 = agent.invoke({"messages":"适合出去玩吗"},config={"configurable":{"thread_id":"user_1_session_1"}})
    print('第二次调用',user1_res_2['messages'][-1],end="\n\n==============\n\n")

3.2.3 从状态中恢复执行

checkpointer是通过InMemorySaver进行实例化的,这个实例会将状态全部存储到内存当中,这仍然不能解决场景二的问题,场景二涉及的问题不再是多次调用之间,保持上下文不中断,而是如何进行故障恢复

在因为故障导致进程退出后,内存状态空间当中的内容,将全部丢失。在实际生产环境下,需要使用数据库做持久化状态存储。

下面代码当中,Node2节点表示在实际执行过程中,因为某些原因导致的错误。如果将相关的状态存储到数据库中,在第一次执行时,node2前面节点的状态,会写入到数据库中,示例代码如下(关键点参见带序号注释):

def scenario_2_demo():
    """
    场景二:如何从checkpointer中恢复状态
    """
    from langgraph.graph import StateGraph
    from typing import TypedDict
    from langgraph.graph import START
    # 1、引入sqlite3包和SqliteSaver类
    import sqlite3
    from langgraph.checkpoint.sqlite import SqliteSaver
    # 2、构建Connection对象
    conn = sqlite3.connect(database="./sqlite_data/langgraph_sqlite", check_same_thread=False)
    # 3、通过connection对象构建checkpointer实例
    memory = SqliteSaver(conn)
    class MyState(TypedDict):
        key_1: str
        key_2: str
        key_3: str

    def node_1(state: MyState) -> MyState:
        # 4、打印node_1初始状态
        print('node_1初始状态为',state)
        return {"key_1":"value_1"}
    
    def node_2(state: MyState) -> MyState:
        # 5、此处模拟生产环境下代码的bug或者其他不可预见情况,导致的报错
        raise Exception("node_2节点执行报错")
        return {"key_2":"value_2"}
    
    def node_3(state: MyState) -> MyState:
        return {"key_3":"value_3"}

    # 添加节点,添加边
    builder = StateGraph(MyState)
    builder.add_node(node_1)
    builder.add_node(node_2)
    builder.add_node(node_3)
    builder.add_edge(START, "node_1")
    builder.add_edge("node_1", "node_2")
    builder.add_edge("node_1", "node_3")
    config = {
        "configurable":{
            "thread_id":"123"
        }
    }
    # 6、compile时传入checkpointer实例
    graph = graph.compile(checkpointer=memory)
    # 7、调用时传入config
    graph.invoke({},config=config)

在 node2 节点问题处理完成后,想要从故障中恢复,需要以下两点:

  1. invoke 时传递 None 作为初始参数

  2. 传入相同的 thread_id

满足以上两点,即可让图从断点开始,以第一次调用的历史状态,继续往下执行,示例代码如下(关键点参见带序号注释):

def scenario_2_demo():
    """
    场景一:如何从checkpointer中恢复状态
    """
    from langgraph.graph import StateGraph
    from typing import TypedDict
    from langgraph.graph import START
    
    import sqlite3
    from langgraph.checkpoint.sqlite import SqliteSaver
    
    conn = sqlite3.connect(database="./sqlite_data/langgraph_sqlite", check_same_thread=False)
    
    memory = SqliteSaver(conn)
    class MyState(TypedDict):
        key_1: str
        key_2: str
        key_3: str

    def node_1(state: MyState) -> MyState:
        
        print('node_1初始状态为',state)
        return {"key_1":"value_1"}
    
    def node_2(state: MyState) -> MyState:
        # 1、修复错误,注释掉报错代码
        # raise Exception("node_2节点执行报错")
        return {"key_2":"value_2"}
    
    def node_3(state: MyState) -> MyState:
        return {"key_3":"value_3"}

    # 添加节点,添加边
    builder = StateGraph(MyState)
    builder.add_node(node_1)
    builder.add_node(node_2)
    builder.add_node(node_3)
    builder.add_edge(START, "node_1")
    builder.add_edge("node_1", "node_2")
    builder.add_edge("node_1", "node_3")
    config = {
        "configurable":{
            "thread_id":"123"
        }
    }
    
    graph = builder.compile(checkpointer=memory)
    # 2、调用时传入相同的config,注意,从故障中恢复,此处传递None即可
    graph.invoke(None,config=config)

3.3 获取历史状态

构造 checkpointer 之后,LangGraph 可以保证在多次调用之间,保证状态的连续,或者是通过状态进行恢复。不过,要想在深入了解状态,就需要知道状态的保存时机和如何获取到所有的历史状态。

3.3.1 LangGraph 底层运行算法

在了解如何获取历史状态之前,首选我们需要了解一下 LangGraph 底层算法——Pregel。

Pregel 是 LangGraph 底层的一个类,用于管理 LangGraph 应用程序运行时行为。也就是说,整个图结构,从开始到结束的迭代执行过程,是由 Pregel 控制管理的

在这个类当中,有两大重要组件:

  1. Actors
    Actors 及前面的 Node , 对应在 LangGraph 当中的类为 PregelNode, 其订阅某些通道,从通道中读取数据,或写入数据,PregelNode 也实现了 LangChai 的Runnable 接口。

  2. Channels
    Chanels 用于 actors 之间通信,每个通道都有一个值类型、一个更新类型、一个更新函数(该函数用于接受一系列更新并修改存储的值)

正是因为这种设计,在 LangGraph 中通过边和节点构建的图,不是一个传统的有向无环图 (DAG)。在传统的有向无环图下,节点的执行顺序,完全边的连接,某个节点只有在和它连接的所有的上游节点都执行完成了之后才会执行。

而在LangGraph图当中,节点的执行来自于一个Pregel的超步(SuperStep)。而一个超步的执行逻辑,分为如下三个过程:

  1. Plan阶段:确定在此步骤中要执行哪些Actors。例如,在第一步中,选择订阅特殊输入通道的Actors;在后续步骤中,选择订阅上一步骤中更新的通道的参与者

  2. Execute阶段:并行执行所有选定的Actors,直到所有参与者完成、其中一个失败或达到超时时间

  3. Update阶段:用本步骤中Actors所写的值更新channels。

由以上三个步骤组合而成的超步会重复执行,直到没有Actors被选中执行,或者达到最大步骤数为止。

以如下的图为例:

image-20260616100146403

图中展示了builder.compile()内部逻辑:将逻辑图,转换成Pregel实例,而节点和边,分别转换成了Actors和Channels实例。

针对于Actors实例,每个Actor实例都有其订阅和写入的Channels,如下图所示:

image-20260616100553079

整个图的执行流程,可以分为如下几个Step,Step索引从0开始:

整个执行流程如下:

首先在step为-1时,__START__节点执行(图中未标注),将相关状态写入到branch:to:a当中;在Step0当中,节点a执行,将状态写入到branch:to:b和branch:to:c;在Step1当中,节点b和c执行,将状态写入到branch:to:b_2和branch:to:d当中;在Step2,节点b_2和节点d执行,b_2节点再次将状态写入到branch:to:d当中。在Step3当中,节点d再次执行,由于没有新的状态更新,图执行过程结束。

示例代码如下所示:

import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    """
    状态类型定义
    """
    aggregate: Annotated[list, operator.add]

def a(state: State,config):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State,config):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def b_2(state: State,config):
    print(f'Adding "B_2" to {state["aggregate"]}')
    return {"aggregate": ["B_2"]}

def c(state: State,config):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State,config):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}

builder = StateGraph(State)

builder.add_node("a", a)
builder.add_node("b", b)
builder.add_node("b_2", b_2)
builder.add_node("c", c)
builder.add_node("d", d)

builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)

graph = builder.compile()
# 1、查看当前图的channels
print('当前图的channels为',graph.channels,end="\n\n")
# 2、查看当前图的节点
print('当前图的节点为',graph.nodes,end="\n\n")

# 3、查看a节点的trigger(当前节点的订阅)和writers
print('当前图的节点a的triggers为',graph.nodes['a'].triggers,end="\n\n")
print('当前图的节点a的writers为',graph.nodes['a'].writers,end="\n\n")

# 4、查看d节点的trigger和writers
print('当前图的节点d的triggers为',graph.nodes['d'].triggers,end="\n\n")
print('当前图的节点d的writers为',graph.nodes['d'].writers,end="\n\n")
# 5、执行图,查看执行结果
output_state = graph.invoke({"aggregate": []}) 
print('执行图后的状态为',output_state,end="\n\n")

3.3.2 获取图执行的历史状态

理解了Pregel和Step的概念之后,现在可以来学习如何获取图执行的历史状态,以及如何看懂历史状态当中所存储的内容。

构建好的graph实例,可以通过get_state() / get_state_history()方法,传入想要获取的历史状态的thread_id,即可拿到图执行过程中的历史状态,get_state()方法获取到的是最近一个时间步的状态,get_state_history()获取到的是图执行当中所有时间步的历史状态,其输出值为一个迭代器,按照时间步倒序排列。

历史状态由StateSnapShot实例表示,也即状态快照,其中包含的信息如下:

名称

类型

描述

values

dict[str, Any]

Any

next

tuple[str, …]

本超步中每个任务要执行的节点名称

config

RunnableConfig

用于获取此快照的配置。

metadata

CheckpointMetadata

与此快照相关联的元数据

parent_config

RunnableConfig

用于获取父快照(如果有的话)的配置。

interrupts

tuple[Interrupt, …]

此超步中发生且有待解决的中断。

我们重点需要关注的是,在状态快照中,除了有当前状态值以外,还包含接下来需要执行的节点名称,这也为从故障中恢复提供了有用的信息,我们可以通过状态快照直接知道,从哪一个节点开始继续往下执行。

示例代码如下所示:

import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
connection = sqlite3.connect('checkpointer.db',check_same_thread=False)
checkpointer = SqliteSaver(connection)
class State(TypedDict):
    """
    状态类型定义
    """
    aggregate: Annotated[list, operator.add]

def a(state: State,config):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State,config):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

def b_2(state: State,config):
    print(f'Adding "B_2" to {state["aggregate"]}')
    return {"aggregate": ["B_2"]}

def c(state: State,config):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State,config):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}

builder = StateGraph(State)

builder.add_node("a", a)
builder.add_node("b", b)
builder.add_node("b_2", b_2)
builder.add_node("c", c)
builder.add_node("d", d) 

builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)

graph = builder.compile(checkpointer=checkpointer)
res = graph.invoke({},config={'configurable':{'thread_id':'1'}})

# 1、最终状态
print(res)

# 2、查看历史所有状态
all_states = graph.get_state_history(config={'configurable':{'thread_id':'1'}})
all_states_list = list(all_states)
print("历史所有状态如下:\n")
for state in all_states_list:
    print(state,end="\n"+"="*30+"\n")

print("\n\n最近一次状态如下:\n")
last_state = graph.get_state(config={'configurable':{'thread_id':'1'}})
print(last_state)

四、节点

4.1 节点的输入输出

4.1.1 节点输入

在LangGraph中,一般来讲,节点都是Python函数(可以是同步的,也可以是异步的),它们接受以下参数:

  • state :图的状态,代表了具体的业务数据

  • config: 一个 RunnalbleConfig 对象,包含诸如 thread_id 之类的配置信息,在调用图时,可以传递用户在自定义的其他配置

  • runtime:一个 Runtime 对象,包含运行时 context (可自定义 context, 在调用图时传入即可)以及其他信息,如 store 和 stream_writer 等

以上参数,会在运行过程中,自动被LangGraph运行时注入。

节点定义config和runtime入参如下所示,在运行时,这两个参数LangGraph名称会通过关键字传参方式进行传参,因此两个参数位置可以调换。

具体示例如下:

import time
from typing import TypedDict, Any, List
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph, START, END
from langgraph.runtime import Runtime

class MockLLM:
    def invoke(self, prompt: str):
        return f"AI Generated: Answer for '{prompt}'"

class MockDatabase:
    def get_user_info(self, user_id: str):
        return {"id": user_id, "role": "vip" if "vip" in user_id else "standard"}

class CustomerSupportState(TypedDict):
    query: str          # 用户问题
    response: str       # 客服回复
    log: List[str]      # 处理日志

def node_customer_service(state: CustomerSupportState, config: RunnableConfig, runtime: Runtime) -> dict:
    """
    客服节点:展示如何从 config 中获取注入的依赖 (LLM, DB) 并使用 runtime 推送进度。
    """

    user_query = state["query"]
    
    # 1、从runtime当中获取context对象
    llm = runtime.context['llm']
    db = runtime.context['db']
    

    configurable = config.get("configurable", {})
    user_id = configurable.get("user_id", "guest")
    
    print(f"\n[Node] 开始处理,User ID: {user_id}")
    
    # 2、验证依赖是否存在
    if not llm or not db:
        return {"response": "System Error: Dependencies not injected!", "log": ["Error"]}

    # 3、使用db对象查看用户角色
    user_info = db.get_user_info(user_id)
    user_tier = user_info.get("role")
    print(f"[Node] 从 DB 获取用户角色: {user_tier}")

    # 4、使用 runtime.writer 向图外输出自定义数据 ---
    writer = runtime.stream_writer
    writer({"status": "thinking", "message": f"正在调用 LLM 为 {user_tier} 用户生成回复..."})
    time.sleep(0.5)

    # 5、根据用户角色构建不同的 Prompt,并模拟 LLM 调用
    prompt = f"User({user_tier}) asks: {user_query}"
    llm_response = llm.invoke(prompt)
    
    return {
        "response": llm_response,
        "log": [f"Processed by {llm.__class__.__name__}"]
    }


    return None
def run_demo():
    builder = StateGraph(CustomerSupportState)
    builder.add_node("customer_service", node_customer_service)
    builder.add_edge(START, "customer_service")
    builder.add_edge("customer_service", END)
    graph = builder.compile()
    
    # 初始化外部依赖
    my_llm = MockLLM()
    my_db = MockDatabase()
    
    print("==================================================")
    print("场景: 依赖注入演示")
    print("演示目标: 将 LLM 和 DB 对象通过 runtime.context 注入到节点中")
    print("==================================================")
    
    # 准备状态
    initial_state = {"query": "如何升级会员?"}
    
    # user_id为一个配置信息,将其放入到config当中
    config = {
        "configurable": {
            "user_id": "vip_user_999",
        }
    }
    
    print("[System] 开始运行图,并注入依赖对象...")
    
    context = {
        "llm": my_llm,
        "db": my_db
    }
    result = graph.invoke(initial_state,config=config,context=context)
    print(result)
    

if __name__ == "__main__":
    run_demo()

4.1.2 节点输出

节点的输出为当前节点对状态的增量更新,而不能将接收到的整个状态实例返回出去。

LangGraph底层会将所有节点输出的状态,都作为增量状态,并尝试和当前的全局状态做一次合并操作。如果将整个状态都输出,对于没有配置任何reducer的状态键,langgraph底层无法完成合并,会抛出异常;对于配置了reducer的状态键,如果节点输出了不属于该节点更新的状态,也会导致数据产生问题。

以下演示错误返回了整个状态的问题:

from typing import TypedDict
from langgraph.constants import START,END
from langgraph.graph import StateGraph

class MyState(TypedDict):
    query:str
    file_result:str
    web_result:str
    final_answer:str

def query_web(state:MyState)->dict:
    """
    做网络搜索,返回搜索结果
    :return:
    """
    # 1、错误演示:直接return整个state,而非当前节点增量修改的状态
    query = state['query']
    state['web_result'] = f'{query}的网络搜索结果'
    return state

def query_file(state:MyState)->dict:
    """
    做文件搜索,返回搜索结果
    :return:
    """
    query = state['query']
    state['file_result'] = f'{query}的文件搜索结果'
    return state

def answer(state:MyState)->dict:
    """
    返回最终的答案
    :return:
    """
    web_result = state['web_result']
    file_result = state['file_result']
    final_answer = f'LLM基于{web_result},{file_result} 的最终结果'
    state['final_answer'] = final_answer
    return state

graph = StateGraph(state_schema=MyState)
graph.add_node(answer)
graph.add_node(query_web)
graph.add_node(query_file)
graph.add_edge(START,'query_web')
graph.add_edge(START,'query_file')
graph.add_edge('query_web','answer')
graph.add_edge('query_file','answer')
compiled_graph = graph.compile()
res = compiled_graph.invoke({"query":"什么是Langgraph"})
print(res['final_answer'])

4.2 特殊节点

START 和 END 节点都是 LangGraph 当中的特殊节点。START 它代表着将用户输入发送到图中的节点。引用此节点的主要目的是确定应首先调用哪些节点。END 节点是一个特殊节点,代表终止节点。当想表示哪些边在完成后没有动作时,会引用这个节点。

START 和 END 节点本质上是一个字符串,如下所示:

END = sys.intern("__end__")
"""The last (maybe virtual) node in graph-style Pregel."""
START = sys.intern("__start__")
"""The first (maybe virtual) node in graph-style Pregel."""

START作为一个特殊节点,在图执行完START节点之后,也会创建一个checkpoint。

4.3 节点缓存

LangGraph支持基于节点输入对节点进行缓存。对于配置了缓存的节点,且缓存结果没有过期,以相同的输入再次调用节点时,可直接从缓存当中读取结果,不需要再进行节点计算。

使用缓存的方法如下:

  1. 编译图时指定缓存存储后端:langgraph.cache包当中提供了内存级别的缓存后端,Redis缓存后端,和Sqlite缓存后端的具体实现,需要构造相关实例,并在图编译时传入。

  2. 为节点指定缓存策略。每个缓存策略需配置:

  • key_func:用于根据节点的输入生成缓存键,默认情况下是使用pickle对输入进行hash运算的结果。

  • ttl:缓存的生存时间(以秒为单位)。如果未指定,缓存将永不过期。

import time
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from langgraph.cache.memory import InMemoryCache

from langgraph.types import CachePolicy
from langgraph.constants import START
from langgraph.checkpoint.memory import InMemorySaver

class State(TypedDict):
    x: int
    result: int

builder = StateGraph(State)
checkpointer = InMemorySaver()

# 1、定义节点:模拟一些有耗时计算的节点
def expensive_node(state: State) -> dict[str, int]:
    # expensive computation
    print(f"expensive_node 被调用")
    time.sleep(5)
    print(f"expensive_node 计算完成")
    return {"result": state["x"] * 2}

# 2、添加节点时,为节点配置缓存策略,这里设置为10秒缓存,缓存键的配置同样使用默认的方式,可以在CachePolicy当中传入key_func配置不同的缓存键生成策略
builder.add_node("expensive_node", expensive_node, cache_policy=CachePolicy(ttl=10))
builder.add_edge(START, "expensive_node")

# 3、图编译时,传入缓存器,此处使用InMemoryCache,也可以使用RedisCache等
graph = builder.compile(cache=InMemoryCache(),checkpointer=checkpointer)

# 4、第一次调用,传入的状态为5
print(graph.invoke({"x": 5},config={"configurable":{"thread_id":"1"}}))

# 5、第二次调用,传入相同的状态,由于缓存策略,会直接从缓存中返回结果,而不会重新调用节点
print(graph.invoke({"x": 5},config={"configurable":{"thread_id":"2"}}))

4.4 节点重试

在很多使用场景中,有些节点由于客观原因限制,导致其执行过程是不稳定的。因此,我们可能希望节点拥有自定义的重试策略,例如在调用API、查询数据库或调用大语言模型(LLM)等情况下。

为节点添加重试策略,需要在add_node中设置retry_policy参数。retry_policy参数接受一个RetryPolicy命名元组对象。RetryPolicy对象有两个属性:max_attempts和retry_on参数,前者定义了总共重试次数,后者定义了对于哪些异常类型进行重试。

示例代码如下:

import random
from typing import Dict, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import RetryPolicy

# 定义状态
class State(TypedDict):
    result: str

# 模拟不稳定的API调用,使用全局变量跟踪尝试次数
attempt_counter = 0

def unstable_api_call(state: State) -> Dict[str, Any]:
    """
    模拟一个不稳定的API调用,有一定概率失败
    """
    global attempt_counter
    attempt_counter += 1
    print(f"尝试调用API,这是第 {attempt_counter} 次尝试")
    
    # 模拟前几次尝试失败,最后一次成功
    if attempt_counter < 3:
        raise Exception(f"模拟API调用失败 (尝试 {attempt_counter})")
    else:
        # 第三次尝试成功
        return {
            "result": f"API调用成功,经过 {attempt_counter} 次尝试"
        }
# 模拟抛出 ValueError 的节点
def value_error_call(state: State) -> Dict[str, Any]:
    """
    模拟抛出 ValueError 的节点(不会被默认重试策略重试)
    """
    print("调用会抛出 ValueError 的节点")
    raise ValueError("模拟 ValueError 异常")

def run_demo():
    print("=== LangGraph 节点重试策略演示 ===\n")
    
    # 重置全局计数器
    global attempt_counter
    attempt_counter = 0
    
    # 演示1: 使用默认重试策略
    print("1. 使用默认重试策略:")
    print("   默认策略会对除特定异常外的所有异常进行重试")
    print("   不会重试的异常包括: ValueError, TypeError, ArithmeticError, ImportError,")
    print("                     LookupError, NameError, SyntaxError, RuntimeError,")
    print("                     ReferenceError, StopIteration, StopAsyncIteration, OSError\n")
    
    builder1 = StateGraph(State)
    
# 添加节点,使用默认重试策略,
#
    builder1.add_node(
        "unstable_call", 
        unstable_api_call, 
        retry_policy=RetryPolicy(max_attempts=5)  # 允许最多5次尝试
    )
    
    builder1.add_edge(START, "unstable_call")
    builder1.add_edge("unstable_call", END)
    
    graph1 = builder1.compile()
    
    print("测试默认重试策略:")
    try:
        result = graph1.invoke({"result": ""})
        print(f"最终结果: {result}\n")
    except Exception as e:
        print(f"最终失败: {type(e).__name__}: {e}\n")
    
    # 演示2: 使用自定义重试策略
    print("2. 使用自定义重试策略:")
    print("   自定义策略只对特定错误进行重试\n")
    
    # 重置全局计数器
    attempt_counter = 0
    # 演示3: 不会重试的异常类型
    print("3. 测试不会重试的异常类型:")
    
    builder3 = StateGraph(State)
    
    # 添加节点,使用默认重试策略
    builder3.add_node(
        "value_error_call", 
        value_error_call, 
        retry_policy=RetryPolicy(max_attempts=3)
    )
    builder3.add_edge(START, "value_error_call")
    builder3.add_edge("value_error_call", END)
    
    graph3 = builder3.compile()
    
    print("测试 ValueError(默认策略不会重试):")
    try:
        result = graph3.invoke({"result": ""})
        print(f"最终结果: {result}\n")
    except Exception as e:
        print(f"最终失败: {type(e).__name__}: {e}\n")

if __name__ == "__main__":
    run_demo()

4.5 图内外数据传递

默认情况下,通过graph.invoke调用图时,仅在整个图的执行过程都结束之后,我们才能够拿到最终的状态,那么如果想要在图的执行过程当中,想要获取到图内部所产生的数据,应该如何实现?

考虑Agent的一个场景:首先,我们希望在LLM输出时,就能够拿到LLM所产生的token,在前端进行展示;其次,如果Agent的流程较长,我们希望能够拿到,当前正在执行的节点或者流程是什么。

由于LangGraph实现了langchain_core当中的Runnable接口,其为我们提供了stream和astream方法,也即流式输出方法,通过流式输出,就能解决前面所说到的问题。

Stream方法提供了多种不同的模式,如下表所示:

模式

描述

values

每一个执行后,流式输出完整的状态

updates

图执行过程中,每一步执行后流式输出增量更新。如果在同一个步当中产生了多个增量更新,这些增量更新会分别流式输出。

custom

流式输出节点内部的自定义数据。

messages

在任何调用了LLM的节点当中,流式输出两元组数据:(LLM Token,metadata)

debug

流式输出所有能输出的信息

混合模式

流模式传入列表,在列表当中添加多种不同的模式,可以得到多种流式输出

具体代码如下所示:

import time
from typing import TypedDict, Annotated, List
import operator
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.types import StreamWriter
from langgraph.runtime import Runtime
llm = ChatOpenAI(model="gpt-4o-mini")

# --- 1. 定义状态 ---
class State(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    current_step: str

# --- 2. 定义节点 ---

def node_input(state: State):
    """模拟接收用户输入"""
    return {
        "messages": [HumanMessage(content="请帮我写一段Python代码")],
        "current_step": "input_received"
    }

def node_processing(state: State, runtime: Runtime):
    """
    模拟中间处理过程,并使用 writer 输出自定义流式数据
    这对应 stream_mode="custom"
    """
    steps = ["正在分析意图...", "正在检索知识库...", "正在构建Prompt..."]
    writer = runtime.stream_writer
    for i, step in enumerate(steps):
        time.sleep(0.5) # 模拟耗时操作
        
        # 使用 writer 发送自定义数据 (不影响图的状态)
        # 这些数据只能通过 stream_mode="custom" 接收到
        writer({
            "step_index": i + 1,
            "description": step,
            "timestamp": time.time()
        })
        
    return {"current_step": "processing_complete"}

def node_generation(state: State):
    """
    模拟 LLM 生成过程
    LangGraph 的 stream_mode="messages" 会自动捕获 LLM 的流式输出
    """   
    
    # 调用 LLM
    response = llm.invoke(state["messages"])
    
    return {
        "messages": [response],
        "current_step": "generation_complete"
    }

# --- 3. 构建图 ---
builder = StateGraph(State)
builder.add_node("input", node_input)
builder.add_node("process", node_processing)
builder.add_node("generate", node_generation)

builder.add_edge(START, "input")
builder.add_edge("input", "process")
builder.add_edge("process", "generate")
builder.add_edge("generate", END)

graph = builder.compile()

# --- 4. 演示不同的流式输出模式 ---
def run_demo():
    initial_state = {"messages": [], "current_step": "start"}
    
    print(f"\n{'='*20} 1. Mode: values (输出完整状态) {'='*20}")
    print("描述: 每执行完一个节点,输出当前的完整 State")
    for event in graph.stream(initial_state, stream_mode="values"):
        # event 就是当前的 State 字典
        print(f"State: keys={list(event.keys())}, step={event.get('current_step')}")

    print(f"\n{'='*20} 2. Mode: updates (输出增量更新) {'='*20}")
    print("描述: 每执行完一个节点,输出该节点返回的增量数据")
    for event in graph.stream(initial_state, stream_mode="updates"):
        # event 是一个字典,key是节点名,value是该节点的输出
        handle_event(event)
        print(f"Update: {event}")

    print(f"\n{'='*20} 3. Mode: custom (输出自定义数据) {'='*20}")
    print("描述: 仅输出节点内部通过 writer() 发送的数据")
    for event in graph.stream(initial_state, stream_mode="custom"):
        # event 就是 writer() 中传入的对象
        print(f"Custom Data: {event}")

    print(f"\n{'='*20} 4. Mode: messages (输出 LLM Token) {'='*20}")
    print("描述: 输出 LLM 生成的消息片段 (Token)")
    # 注意:FakeListChatModel 默认是一次性返回,但在 messages 模式下会被包装成 chunk
    for chunk, metadata in graph.stream(initial_state, stream_mode="messages"):
        # chunk 是 BaseMessageChunk 对象
        # metadata 包含 node 信息
        node_name = metadata.get('langgraph_node', 'unknown')
        print(f"[{node_name}] Token: {chunk.content!r}")
        # print(chunk.content,end="")
        time.sleep(0.2)
        

    print(f"\n{'='*20} 5. Mode: debug (调试模式) {'='*20}")
    print("描述: 输出所有详细的执行信息,包括任务调度、输入输出等")
    # debug 模式输出量很大,这里只打印前几条示意
    count = 0
    for event in graph.stream(initial_state, stream_mode="debug"):
        if count < 3:
            print(f"Debug Event: {event['type']} - {event.get('payload', {}).get('name')}")
        count += 1
    print("... (省略后续 debug 信息)")

    print(f"\n{'='*20} 6. Mixed Mode (混合模式) {'='*20}")
    print("描述: 同时获取 updates 和 custom 数据")
    # 返回的是 (mode, data) 元组
    for mode, data in graph.stream(initial_state, stream_mode=["updates", "custom"]):
        if mode == "updates":
            print(f"[Updates] 来自节点 {list(data.keys())[0]}")
        elif mode == "custom":
            print(f"[Custom] {data['description']}")

if __name__ == "__main__":
    run_demo()

4.6 人工审核节点

Agent在完成用户设定的任务时,有时候我们希望用户参与当中部分重要决策过程。LangGraph为此提供了一个非常方便的原语:interrupt。其原理如下图所示:

image-20260616105431933

可以在节点内部任意位置引入 interrupt ,当节点执行的到所在位置时,就会停止执行,并可以得到图内传递出来的,需要用户审核的数据,在用户进行相关审核完成后,有可通过 graph.invoke ,让图继续往下执行

具体示例如下:

"""
LangGraph interrupt 演示:让用户参与关键决策

场景:准备执行一笔“转账”,在真正执行前让用户审核/修改转账信息。
"""

from __future__ import annotations

from typing import Any
from typing_extensions import TypedDict

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import END, START, StateGraph
from langgraph.types import Command, interrupt

class TransferState(TypedDict):
    recipient: str
    amount: int
    memo: str
    approved: bool
    final_status: str

def review_transfer(state: TransferState) -> dict[str, Any]:
    print("\n[Node] review_transfer:生成待执行的转账请求")
    pending_transfer = {
        "recipient": state["recipient"],
        "amount": state["amount"],
        "memo": state["memo"],
    }

    user_review = interrupt(
        {
            "title": "转账审核",
            "pending_transfer": pending_transfer,
            "instruction": "请返回 bool(是否批准) 或 dict(可改 recipient/amount/memo,并带 approved 字段)。",
        }
    )

    approved = False
    updated_transfer = dict(pending_transfer)
    if isinstance(user_review, bool):
        approved = user_review
    elif isinstance(user_review, dict):
        approved = bool(user_review.get("approved", True))
        for k in ("recipient", "amount", "memo"):
            if k in user_review:
                updated_transfer[k] = user_review[k]

    print(f"[Node] review_transfer:用户审核结果 approved={approved},transfer={updated_transfer}")
    return {
        "approved": approved,
        "recipient": updated_transfer["recipient"],
        "amount": updated_transfer["amount"],
        "memo": updated_transfer["memo"],
    }

def execute_transfer(state: TransferState) -> dict[str, str]:
    if not state["approved"]:
        print("\n[Node] execute_transfer:用户未批准,取消转账")
        return {"final_status": "已取消:用户未批准转账"}

    print("\n[Node] execute_transfer:模拟执行转账")
    recipient = state["recipient"]
    amount = state["amount"]
    memo = state["memo"]
    return {"final_status": f"已转账:收款人={recipient},金额={amount},备注={memo}"}

def build_graph():
    builder = StateGraph(TransferState)
    builder.add_node("review_transfer", review_transfer)
    builder.add_node("execute_transfer", execute_transfer)

    builder.add_edge(START, "review_transfer")
    builder.add_edge("review_transfer", "execute_transfer")
    builder.add_edge("execute_transfer", END)

    return builder.compile(checkpointer=InMemorySaver())

def run_demo():
    graph = build_graph()
    config = {"configurable": {"thread_id": "interrupt-demo-1"}}

    print("==================================================")
    print("场景:转账前人工审核(interrupt)")
    print("第一次 invoke 会停在 interrupt,并把审核数据带出来")
    print("第二次 invoke 使用 Command(resume=...) 把用户决策传回去继续执行")
    print("==================================================")

    initial_state: TransferState = {
        "recipient": "Alice",
        "amount": 100,
        "memo": "午饭AA",
        "approved": False,
        "final_status": "",
    }

    first = graph.invoke(initial_state, config=config)
    interrupt_payload = first["__interrupt__"][0].value
    print("\n[System] 图已暂停,等待用户审核。可给用户展示的数据如下:")
    print(interrupt_payload)

    user_resume_value = {"approved": True, "amount": 80, "memo": "改为80元(实际应付)"}
    final = graph.invoke(Command(resume=user_resume_value), config=config)
    print("\n[System] 已恢复执行,最终结果:")
    print(final["final_status"])

if __name__ == "__main__":
    run_demo()

需要注意的是: 使用 interrupt 时,在第二次调用 graph.invoke(command) 继续执行时,有 interrupt 的函数,会从函数起点往下执行,如果在函数中,有更新数据、调用 API 接口等相关操作时,会造成多次重复执行的情况。因此一定要保证这种类型操作的幂等性,或者是将其封装到一个单独的节点中,避免在恢复时再次执行。

五、边

5.1 边的本质

定义边,本质上是定义了,Pregel当中节点订阅的状态通道和节点执行之后,需要更新的状态通道。

边有几种关键类型:

  • Normal Edges: 普通边。直接从一个节点连接到下一个节点。

  • Conditional Edges: 条件边。调用函数以确定接下来要前往哪个(哪些)节点。

5.2 条件边

条件边的本质是一个路由函数,其根据当前状态动态决定下一个要执行的节点。

from typing import Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

# 定义状态
class GraphState(TypedDict):
    value: int
    step: str

# 定义节点函数
def node_a(state: GraphState) -> dict:
    """节点A"""
    print("执行节点A")
    return {"value": state["value"], "step": "A执行完毕"}

def node_b(state: GraphState) -> dict:
    """节点B"""
    print("执行节点B")
    return {"value": state["value"] * 2, "step": "B执行完毕"}

def node_c(state: GraphState) -> dict:
    """节点C"""
    print("执行节点C")
    return {"value": state["value"] - 1, "step": "C执行完毕"}

# 条件边的路由函数
def route_condition(state: GraphState) -> Literal["node_b", "node_c"]:
    """根据value值决定路由到哪个节点"""
    if state["value"] % 2 == 0:
        return "node_b_alias"  # 偶数路由到节点B
    else:
        return "node_c_alias"  # 奇数路由到节点C

def main():
    """演示条件边"""
    print("=== 条件边演示 ===")
    
    # 创建图
    builder = StateGraph(GraphState)
    
    # 添加节点
    builder.add_node("node_a", node_a)
    builder.add_node("node_b", node_b)
    builder.add_node("node_c", node_c)
    
    # 添加边
    builder.add_edge(START, "node_a")  # 入口点
    
    # 添加条件边
    builder.add_conditional_edges(
        "node_a",  # 源节点
        route_condition,  # 路由函数
        {  # 路由映射
            "node_b_alias": "node_b",
            "node_c_alias": "node_c"
        }
    )
    
    # 从B和C到结束
    builder.add_edge("node_b", END)
    builder.add_edge("node_c", END)
    
    # 编译图
    graph = builder.compile()
    
    # 执行图 - 偶数情况
    print("输入值为偶数:")
    result = graph.invoke({"value": 2})
    print(f"执行结果: {result}")
    
    # 执行图 - 奇数情况
    print("\n输入值为奇数:")
    result = graph.invoke({"value": 1})
    print(f"执行结果: {result}\n")

if __name__ == "__main__":
    main()

5.3 可控循环

通过条件边,我们可以构建带有循环结构的图,例如在典型的REACT模式下,工具调用和大模型总结生成结果形成了一个循环,如下图所示:

image-20260616110322017

需要注意的是,这种带循环的图结构,有一个隐藏的问题:图执行过程当中,可能因为某些原因,导致一直在循环内循环往复执行,因此LangGraph也提供了一个强制使图的执行终止的递归限制参数。

递归限制设定了图在抛出错误之前允许执行的超级步骤数量,默认值25,在graph.invoke的config参数中指定。在经过指定数量的超级步骤后,图还没有自然停止执行时,LangGraph会抛出异常GraphRecursionError。

示例代码如下所示:

from typing import Annotated, Dict, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.errors import GraphRecursionError

class LoopState(TypedDict):
    count: int
    result: str
    max_count: int

def node_a(state: LoopState) -> dict:
    """节点a:处理逻辑并更新计数"""
    print(f"执行节点a,当前计数: {state['count']}")
    return {
        'count': state['count'] + 1,
        'result': f"已处理{state['count']}次"
    }

def node_b(state: LoopState) -> dict:
    """节点b:辅助处理"""
    print(f"执行节点b,当前计数: {state['count']}")
    return {
        'result': f"已处理{state['count']}次 - 辅助处理"
    }

def route(state: LoopState) -> Literal["b", END]:
    """条件路由函数:决定是继续循环还是终止"""
    # 终止条件:当计数达到最大值时终止
    if state['count'] >= state['max_count']:
        print(f"满足终止条件,计数 {state['count']} >= {state['max_count']},返回END")
        return END
    else:
        print(f"未满足终止条件,计数 {state['count']} < {state['max_count']},返回b")
        return "b"

# 创建图
builder = StateGraph(LoopState)

# 添加节点
builder.add_node("a", node_a)
builder.add_node("b", node_b)

# 添加边
builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")

# 编译图
graph = builder.compile()

# 执行图
print("=== 开始执行工作流 ===")
try:
    result = graph.invoke(input={
        'count': 0,
        'result': '',
        'max_count': 300
    }, config={
        'recursion_limit': 6  # 设置递归限制
    })
    print("=== 执行结果 ===")
    print(result)
except GraphRecursionError as e:
print(f"递归错误: {e}")