智能体6-LangGraph入门
2025-11-18 本文已影响0人
R7_Perfect
LangGraph:复杂认知工作流的编排引擎
1. 架构哲学:从链式到图式
传统LangChain的链式(Chain)结构在处理以下场景时显露局限:
- 带循环的对话状态管理
- 多Agent协同决策
- 动态路径选择
LangGraph引入 有向无环图(DAG) 模型,通过三大创新突破限制:
- 状态持久化:支持长时间运行的智能体会话
- 条件分支:实现基于上下文的动态路由
- 并行执行:提升复杂任务的完成效率
当客服Agent需要处理以下场景时,传统链式架构遇到瓶颈:
# 传统线性处理
response = agent.run("我要退货,上周买的鞋子尺寸不对")
# 隐含需求:1.验证订单 2.检查退货政策 3.生成退货码 4.通知物流
痛点呈现:
- 僵化流程:无法动态调整处理步骤
- 状态丢失:多轮交互后无法回溯上下文
- 资源争用:无法并行执行独立任务
2. 核心抽象模型
1d5e5f7f-9b43-4e27-9d62-da635c4dcb04.png
3. 企业级应用案例
客户服务自动化系统
准备
pip install -U langgraph
pip install langgraph-checkpoint-sqlite
import os
import sqlite3
from typing import TypedDict, Literal, Optional
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tracers import LangChainTracer
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, END
# LangSmith配置
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "default"
tracing_handler = LangChainTracer()
class AgentState(TypedDict):
"""代理状态类型定义
Attributes:
user_input: str 用户输入文本
intent: Optional[Literal["普通问题", "技术问题", "投诉问题"]] 问题分类结果
response: Optional[str] 生成的响应内容
"""
user_input: str
intent: Optional[Literal["普通问题", "技术问题", "投诉问题"]]
response: Optional[str]
# 初始化大语言模型
llm = ChatOpenAI(
model='deepseek-chat',
openai_api_key=os.getenv('DEEPSEEK_API_KEY'), # 从环境变量获取API密钥
openai_api_base='https://api.deepseek.com/v1', # DeepSeek API端点
temperature=0, # 控制生成结果的随机性
callbacks=[tracing_handler] # 集成 LangSmith
)
output_parser = StrOutputParser() # 用于解析LLM输出的纯文本结果
def safe_set_intent(value: str) -> Optional[Literal["普通问题", "技术问题", "投诉问题"]]:
"""安全设置问题分类
Args:
value: 待验证的分类字符串
Returns:
当输入为有效分类时返回原值,否则返回None
"""
return value if value in ("普通问题", "技术问题", "投诉问题") else None
def classify_intent(state: AgentState) -> AgentState:
"""问题分类节点
Args:
state: 当前对话状态
Returns:
更新后的状态(包含分类结果)
"""
prompt = ChatPromptTemplate.from_template(
"请判断用户问题类型,只返回普通问题/技术问题/投诉问题,不要包含任何格式符号或额外文字。问题内容:{input}"
)
chain = prompt | llm | output_parser # 构建处理链
intent = chain.invoke({"input": state["user_input"]}).strip()
return {
"user_input": state["user_input"],
"intent": safe_set_intent(intent), # 安全设置分类
"response": None
}
def handle_general(state: AgentState) -> AgentState:
"""普通问题处理节点"""
prompt = ChatPromptTemplate.from_template(
"你是一个客服助手,请用纯文本回答以下普通问题,不要包含任何格式符号或标记。问题:{input}"
)
chain = prompt | llm | output_parser
response = chain.invoke({"input": state["user_input"]})
return {
"user_input": state["user_input"],
"intent": state.get("intent"),
"response": response
}
def handle_tech(state: AgentState) -> AgentState:
"""技术问题处理节点"""
prompt = ChatPromptTemplate.from_template(
"你是一名技术支持工程师,请用纯文本回答以下技术问题,不要包含任何格式符号或标记。技术问题:{input}"
)
chain = prompt | llm | output_parser
response = chain.invoke({"input": state["user_input"]})
return {
"user_input": state["user_input"],
"intent": state.get("intent"),
"response": response
}
def handle_complaint(state: AgentState) -> AgentState:
"""投诉问题处理节点"""
prompt = ChatPromptTemplate.from_template(
"你是一名投诉处理专员,请用纯文本回答以下投诉,不要包含任何格式符号或标记。投诉内容:{input}"
)
chain = prompt | llm | output_parser
response = chain.invoke({"input": state["user_input"]})
return {
"user_input": state["user_input"],
"intent": state.get("intent"),
"response": response
}
def decide_next_step(state: AgentState) -> str:
"""路由决策函数
Args:
state: 当前对话状态
Returns:
下一节点的名称
"""
return state.get("intent", "普通问题") # 默认路由到普通问题
# 构建工作流
workflow = StateGraph(AgentState)
# 注册节点
workflow.add_node("classify", classify_intent) # 分类节点
workflow.add_node("普通问题", handle_general) # 注意:节点名称改为中文
workflow.add_node("技术问题", handle_tech)
workflow.add_node("投诉问题", handle_complaint)
# 设置条件路由(修正后的映射关系)
workflow.add_conditional_edges(
"classify",
decide_next_step,
{
"普通问题": "普通问题", # 键值统一使用中文
"技术问题": "技术问题",
"投诉问题": "投诉问题"
}
)
# 设置终止边
workflow.add_edge("普通问题", END)
workflow.add_edge("技术问题", END)
workflow.add_edge("投诉问题", END)
# 设置入口点
workflow.set_entry_point("classify")
# 编译时启用检查点
conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
memory = SqliteSaver(conn)
app = workflow.compile(checkpointer=memory)
if __name__ == "__main__":
# 测试用例
test_cases = [
("user_001", "你们的工作时间是几点?"),
("user_002", "我的账号无法登录"),
("user_003", "我要投诉你们的服务态度")
]
for user_id, query in test_cases:
print(f"\n[会话 {user_id}] 用户提问: {query}")
# 执行工作流(关键修正)
result = app.invoke(
{"user_input": query},
config={"configurable": {"thread_id": user_id}}
)
print(f"问题类型: {result.get('intent')}")
print(f"系统回复: {result.get('response')}")
# 验证检查点
checkpoint = memory.get_tuple({"configurable": {"thread_id": user_id}})
print(f"检查点状态: {checkpoint}")
b78649d0-bdde-4bd0-81ec-66732da9fec6.png
be521377-b505-4c29-a90a-3f9b269cc722.png
938709d8-0c12-44bd-9813-48f7e3fd580b.png
ab8bc43f-9c46-4773-82b9-7d1c053d1fa8.png
该工作流实现:
- 智能会话状态管理
- 基于意图的动态路由
- 断点续话能力
4. 核心概念解析
1. 状态流(State Flow)
核心概念:
LangGraph 的核心是基于状态流转的图计算模型,通过 StateGraph 定义数据流。状态是一个类型化的字典(TypedDict),在节点间传递和修改。
代码体现:
class AgentState(TypedDict): # 定义状态结构
user_input: str
intent: Optional[Literal["普通问题", "技术问题", "投诉问题"]]#用户意图,类型为可选的 Literal,可以是 "普通问题"、"技术问题" 或 "投诉问题"。
response: Optional[str]#系统的响应,类型为可选的 str。
workflow = StateGraph(AgentState) # 声明状态流
2. 节点(Nodes)
核心概念:
节点是纯函数,接收当前状态,返回更新后的状态。LangGraph 不限制节点内部逻辑(可调用LLM、工具、API等)。
代码体现:
def classify_intent(state: AgentState) -> AgentState:
# 调用LLM进行分类
return {"intent": "技术问题"} # 只更新部分状态
workflow.add_node("classify", classify_intent) # 注册节点
3. 边(Edges)
核心概念:
边定义节点间的流转路径,分为两种:
固定边:add_edge("node1", "node2")
条件边:基于状态动态路由
代码体现:
# 条件边(根据intent值路由)
workflow.add_conditional_edges(
"classify",
decide_next_step, # 路由决策函数
{"技术问题": "tech_node", ...}
)
# 固定边(直接跳转)
workflow.add_edge("tech_node", END)
4. 检查点(Checkpoints)
核心概念:
检查点(Checkpoint)是在任务或流程执行过程中自动保存中间状态的一种机制,旨在实现容错恢复、状态追踪和重复计算避免。在 LangGraph 等流程编排框架中,检查点用于持久化每个节点执行后的状态,使得在流程中断或异常退出后,可以从最近的检查点恢复继续执行,而无需重头开始。检查点既可保存在内存中(如 MemorySaver),也可持久化到本地数据库(如 SqliteSaver)或远程存储(如 RedisSaver),以适应不同的开发与部署场景。
代码体现:
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver
conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
memory = SqliteSaver(conn)
app = workflow.compile(checkpointer=memory)
8a0ef651-c8ce-40a4-90d3-658b8a725e59.png
5. 执行模型
核心概念:
LangGraph 采用异步有向图执行模型:
- 从 entry_point 开始
- 按边条件流转
- 直到到达 END
代码体现:
workflow.set_entry_point("classify") # 设置起点
app.invoke({"user_input": "问题内容"}) # 执行流式
与上面代码的映射
| 概念 | 代码示例 | 设计目的 |
|---|---|---|
| 状态 | AgentState |
定义数据流动的规范格式 |
| 节点 |
handle_tech / classify_intent
|
模块化业务逻辑 |
| 条件路由 |
decide_next_step函数 |
实现动态流程控制 |
| 持久化 | SqliteSaver |
支持长周期对话状态管理 |
架构优势对比
| 能力项 | 传统Agent | LangGraph Agent |
|---|---|---|
| 流程动态性 | 固定顺序 | 基于上下文的条件分支 |
| 状态管理 | 短期记忆 | 持久化检查点(Checkpoint) |
| 执行模式 | 串行 | 支持并行与异步 |
| 错误恢复 | 整体失败 | 局部重试与补偿机制 |
4、LangChain生态的协同进化
1. 技术定位关系
7dc25b3f-1778-4fb4-943e-2244898b078e.png
2. 协同开发模式
开发阶段:
- 使用LangGraph设计工作流拓扑
- 通过LangSmith调试单个节点
测试阶段:
- 在LangSmith中创建黄金数据集
- 执行自动化回归测试
运维阶段:
- 实时监控生产环境指标
- 通过检查点回放定位问题
3. 性能基准对比
| 任务类型 | 纯LangChain | LangChain+LangGraph | 提升幅度 |
|---|---|---|---|
| 多步骤推理 | 12.4s | 8.7s | 30% |
| 带循环的对话 | 失败率32% | 失败率4% | 87% |
| 资源消耗 | 38GB内存 | 22GB内存 | 42% |