深度学习

智能体6-LangGraph入门

2025-11-18  本文已影响0人  R7_Perfect

LangGraph:复杂认知工作流的编排引擎

1. 架构哲学:从链式到图式

传统LangChain的链式(Chain)结构在处理以下场景时显露局限:

LangGraph引入 有向无环图(DAG) 模型,通过三大创新突破限制:

当客服Agent需要处理以下场景时,传统链式架构遇到瓶颈:

# 传统线性处理
response = agent.run("我要退货,上周买的鞋子尺寸不对")  
# 隐含需求:1.验证订单 2.检查退货政策 3.生成退货码 4.通知物流

痛点呈现:

2. 核心抽象模型

1d5e5f7f-9b43-4e27-9d62-da635c4dcb04.png

3. 企业级应用案例

客户服务自动化系统

准备

pip install -U langgraph  
pip install langgraph-checkpoint-sqlite
import os
import sqlite3
from typing import TypedDict, Literal, Optional
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tracers import LangChainTracer
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, END

# LangSmith配置
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "default"
tracing_handler = LangChainTracer()


class AgentState(TypedDict):
    """代理状态类型定义
    Attributes:
        user_input: str 用户输入文本
        intent: Optional[Literal["普通问题", "技术问题", "投诉问题"]] 问题分类结果
        response: Optional[str] 生成的响应内容
    """
    user_input: str
    intent: Optional[Literal["普通问题", "技术问题", "投诉问题"]]
    response: Optional[str]


# 初始化大语言模型
llm = ChatOpenAI(
    model='deepseek-chat',
    openai_api_key=os.getenv('DEEPSEEK_API_KEY'),  # 从环境变量获取API密钥
    openai_api_base='https://api.deepseek.com/v1',  # DeepSeek API端点
    temperature=0,  # 控制生成结果的随机性
    callbacks=[tracing_handler]  # 集成 LangSmith
)
output_parser = StrOutputParser()  # 用于解析LLM输出的纯文本结果


def safe_set_intent(value: str) -> Optional[Literal["普通问题", "技术问题", "投诉问题"]]:
    """安全设置问题分类
    Args:
        value: 待验证的分类字符串
    Returns:
        当输入为有效分类时返回原值,否则返回None
    """
    return value if value in ("普通问题", "技术问题", "投诉问题") else None


def classify_intent(state: AgentState) -> AgentState:
    """问题分类节点
    Args:
        state: 当前对话状态
    Returns:
        更新后的状态(包含分类结果)
    """
    prompt = ChatPromptTemplate.from_template(
        "请判断用户问题类型,只返回普通问题/技术问题/投诉问题,不要包含任何格式符号或额外文字。问题内容:{input}"
    )
    chain = prompt | llm | output_parser  # 构建处理链
    intent = chain.invoke({"input": state["user_input"]}).strip()
    return {
        "user_input": state["user_input"],
        "intent": safe_set_intent(intent),  # 安全设置分类
        "response": None
    }


def handle_general(state: AgentState) -> AgentState:
    """普通问题处理节点"""
    prompt = ChatPromptTemplate.from_template(
        "你是一个客服助手,请用纯文本回答以下普通问题,不要包含任何格式符号或标记。问题:{input}"
    )
    chain = prompt | llm | output_parser
    response = chain.invoke({"input": state["user_input"]})
    return {
        "user_input": state["user_input"],
        "intent": state.get("intent"),
        "response": response
    }


def handle_tech(state: AgentState) -> AgentState:
    """技术问题处理节点"""
    prompt = ChatPromptTemplate.from_template(
        "你是一名技术支持工程师,请用纯文本回答以下技术问题,不要包含任何格式符号或标记。技术问题:{input}"
    )
    chain = prompt | llm | output_parser
    response = chain.invoke({"input": state["user_input"]})
    return {
        "user_input": state["user_input"],
        "intent": state.get("intent"),
        "response": response
    }


def handle_complaint(state: AgentState) -> AgentState:
    """投诉问题处理节点"""
    prompt = ChatPromptTemplate.from_template(
        "你是一名投诉处理专员,请用纯文本回答以下投诉,不要包含任何格式符号或标记。投诉内容:{input}"
    )
    chain = prompt | llm | output_parser
    response = chain.invoke({"input": state["user_input"]})
    return {
        "user_input": state["user_input"],
        "intent": state.get("intent"),
        "response": response
    }


def decide_next_step(state: AgentState) -> str:
    """路由决策函数
    Args:
        state: 当前对话状态
    Returns:
        下一节点的名称
    """
    return state.get("intent", "普通问题")  # 默认路由到普通问题


# 构建工作流
workflow = StateGraph(AgentState)

# 注册节点
workflow.add_node("classify", classify_intent)  # 分类节点
workflow.add_node("普通问题", handle_general)  # 注意:节点名称改为中文
workflow.add_node("技术问题", handle_tech)
workflow.add_node("投诉问题", handle_complaint)

# 设置条件路由(修正后的映射关系)
workflow.add_conditional_edges(
    "classify",
    decide_next_step,
    {
        "普通问题": "普通问题",  # 键值统一使用中文
        "技术问题": "技术问题",
        "投诉问题": "投诉问题"
    }
)

# 设置终止边
workflow.add_edge("普通问题", END)
workflow.add_edge("技术问题", END)
workflow.add_edge("投诉问题", END)

# 设置入口点
workflow.set_entry_point("classify")

# 编译时启用检查点
conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
memory = SqliteSaver(conn)
app = workflow.compile(checkpointer=memory)

if __name__ == "__main__":
    # 测试用例
    test_cases = [
        ("user_001", "你们的工作时间是几点?"),
        ("user_002", "我的账号无法登录"),
        ("user_003", "我要投诉你们的服务态度")
    ]

    for user_id, query in test_cases:
        print(f"\n[会话 {user_id}] 用户提问: {query}")

        # 执行工作流(关键修正)
        result = app.invoke(
            {"user_input": query},
            config={"configurable": {"thread_id": user_id}}
        )

        print(f"问题类型: {result.get('intent')}")
        print(f"系统回复: {result.get('response')}")

        # 验证检查点
        checkpoint = memory.get_tuple({"configurable": {"thread_id": user_id}})
        print(f"检查点状态: {checkpoint}")
b78649d0-bdde-4bd0-81ec-66732da9fec6.png
be521377-b505-4c29-a90a-3f9b269cc722.png
938709d8-0c12-44bd-9813-48f7e3fd580b.png
ab8bc43f-9c46-4773-82b9-7d1c053d1fa8.png

该工作流实现:

4. 核心概念解析

1. 状态流(State Flow)

核心概念:

LangGraph 的核心是基于状态流转的图计算模型,通过 StateGraph 定义数据流。状态是一个类型化的字典(TypedDict),在节点间传递和修改。

代码体现:
class AgentState(TypedDict):  # 定义状态结构
    user_input: str
    intent: Optional[Literal["普通问题", "技术问题", "投诉问题"]]#用户意图,类型为可选的 Literal,可以是 "普通问题"、"技术问题" 或 "投诉问题"。
    response: Optional[str]#系统的响应,类型为可选的 str。

workflow = StateGraph(AgentState)  # 声明状态流

2. 节点(Nodes)

核心概念:

节点是纯函数,接收当前状态,返回更新后的状态。LangGraph 不限制节点内部逻辑(可调用LLM、工具、API等)。

代码体现:
def classify_intent(state: AgentState) -> AgentState:
    # 调用LLM进行分类
    return {"intent": "技术问题"}  # 只更新部分状态

workflow.add_node("classify", classify_intent)  # 注册节点

3. 边(Edges)

核心概念:

边定义节点间的流转路径,分为两种:

固定边:add_edge("node1", "node2")
条件边:基于状态动态路由

代码体现:
# 条件边(根据intent值路由)
workflow.add_conditional_edges(
    "classify",
    decide_next_step,  # 路由决策函数
    {"技术问题": "tech_node", ...}
)

# 固定边(直接跳转)
workflow.add_edge("tech_node", END)

4. 检查点(Checkpoints)

核心概念:

检查点(Checkpoint)是在任务或流程执行过程中自动保存中间状态的一种机制,旨在实现容错恢复、状态追踪和重复计算避免。在 LangGraph 等流程编排框架中,检查点用于持久化每个节点执行后的状态,使得在流程中断或异常退出后,可以从最近的检查点恢复继续执行,而无需重头开始。检查点既可保存在内存中(如 MemorySaver),也可持久化到本地数据库(如 SqliteSaver)或远程存储(如 RedisSaver),以适应不同的开发与部署场景。

代码体现:
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver

conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
memory = SqliteSaver(conn)
app = workflow.compile(checkpointer=memory)
8a0ef651-c8ce-40a4-90d3-658b8a725e59.png

5. 执行模型

核心概念:

LangGraph 采用异步有向图执行模型:

  1. 从 entry_point 开始
  2. 按边条件流转
  3. 直到到达 END

代码体现:

workflow.set_entry_point("classify")  # 设置起点
app.invoke({"user_input": "问题内容"})  # 执行流式
与上面代码的映射
概念 代码示例 设计目的
状态 AgentState 定义数据流动的规范格式
节点 handle_tech / classify_intent 模块化业务逻辑
条件路由 decide_next_step函数 实现动态流程控制
持久化 SqliteSaver 支持长周期对话状态管理
架构优势对比
能力项 传统Agent LangGraph Agent
流程动态性 固定顺序 基于上下文的条件分支
状态管理 短期记忆 持久化检查点(Checkpoint)
执行模式 串行 支持并行与异步
错误恢复 整体失败 局部重试与补偿机制

4、LangChain生态的协同进化

1. 技术定位关系

7dc25b3f-1778-4fb4-943e-2244898b078e.png

2. 协同开发模式

开发阶段:
测试阶段:
运维阶段:

3. 性能基准对比

任务类型 纯LangChain LangChain+LangGraph 提升幅度
多步骤推理 12.4s 8.7s 30%
带循环的对话 失败率32% 失败率4% 87%
资源消耗 38GB内存 22GB内存 42%
上一篇 下一篇

猜你喜欢

热点阅读