1. 项目概述:当AI开始“自我修正”,我们到底在构建什么?
LangGraph 和 Agent Workflows 这两个词最近在工程一线高频出现,但很多人一听到“AI能改变自己的想法”,第一反应是科幻——这玩意儿真能落地?我去年在给一家智能客服中台做架构升级时,就踩着这个坑往前冲了一整年。不是在调 prompt,而是在设计一套能让 AI 主动质疑、回溯、重规划的决策骨架。LangGraph 不是另一个 LLM 框架,它是把“状态机”这个被后端工程师用了二十年的老朋友,重新请回 AI 工程现场;Agent Workflows 也不是 fancy 的新名词,它本质是把“人处理复杂任务时的思考节奏”——比如查资料、比对、推翻初稿、再验证——用可调试、可中断、可审计的方式编码进系统。我试过用纯 LCEL(LangChain Expression Language)硬撑多跳推理,结果在第三轮工具调用后状态彻底丢失,日志里只剩一堆None和超时错误;也试过用自定义 asyncio 事件循环手写 agent 调度,结果一个并发请求就把整个 state tree 搞成意大利面条。LangGraph 的核心价值,恰恰在于它强制你面对一个现实:LLM 不是万能胶水,而是需要被约束在明确节点、有明确定向边、有可序列化 checkpoint 的图结构里运行的“认知单元”。它解决的不是“能不能回答”,而是“答错之后,系统有没有能力自己发现、定位、并启动修正流程”。适合谁?不是只写 demo 的新手,而是正在把 AI 接入真实业务流的后端/全栈工程师、AI Infra 团队成员、以及那些被“一次 prompt 不灵就得重写整个 chain”折磨过的 AI 产品经理。它不承诺更聪明的答案,但能保证每次失败都留下可读的日志、可复现的断点、可人工干预的入口——这才是生产环境里真正的“可控性”。
2. 核心设计逻辑:为什么非得用图,而不是链、树或纯函数?
2.1 从“线性思维”到“认知回路”的范式迁移
传统 LangChain Chain 是一条笔直的高速公路:Input → Prompt A → LLM A → Parse → Prompt B → LLM B → Output。它高效、清晰,但致命缺陷是“不可逆”。一旦 LLM B 的输出不符合预期(比如工具调用返回了空结果,或 JSON 解析失败),整个链就卡死,你只能重启——就像汽车在高速上爆胎,没有应急车道,只能等拖车。而 LangGraph 强制你画一张带环的交通图:节点是具体动作(Call API、Run SQL、Validate Response),边是条件判断(“是否数据为空?”、“置信度是否低于0.7?”),而图本身就是一个有状态的实体。我给某银行做的反欺诈分析 agent 就依赖这个特性:当模型初步判定“高风险交易”后,图会自动触发一个“二次验证”子图——调取该用户近30天行为基线、比对同设备其他账户活动、甚至模拟一笔小额测试交易。这个子图的输出不是直接覆盖原结论,而是生成一个“修正权重”,动态调整原始风险分。整个过程不是“推翻重来”,而是“增量校准”。这种设计背后,是对 LLM 本质的清醒认知:它不是确定性计算器,而是概率性采样器。我们不指望它第一次就完美,而是要建一套机制,让它在出错成本最低的环节(比如刚拿到工具返回值时)就触发检查点。
2.2 State:不是变量,而是“认知快照”
LangGraph 的State类型常被误解为普通 Python 字典。错。它是整个工作流的“记忆中枢”,必须满足三个硬性约束:可序列化、可合并、可版本化。我见过太多团队把state["intermediate_results"]直接塞一个巨大的 Pandas DataFrame,结果在分布式部署时序列化失败,或者 checkpoint 存储爆炸。正确的做法是:State 只存“指针”和“元信息”。比如,实际数据存在 Redis 或对象存储里,state 里只存{"data_ref": "redis:task_abc123", "schema_version": "v2.1", "last_updated_by": "validator_node"}。这样,当 workflow 因故障中断后,恢复时只需根据 ref 去拉取最新数据,而不是把几 GB 的中间结果反复序列化。更关键的是State的合并逻辑(reduce函数)。在并行分支(如同时调用天气 API 和航班 API)后,两个分支会各自更新 state,LangGraph 需要一个明确规则决定如何合并冲突字段。我们默认采用“最后写入获胜”(LWW),但对关键字段如final_decision,我们强制要求合并函数抛出异常,逼迫开发者显式处理冲突——这正是“AI 改变自己想法”的起点:不是静默覆盖,而是主动暴露矛盾,触发人工审核或更高阶的仲裁节点。
2.3 Node 与 Edge:让每个决策都有迹可循
LangGraph 的 node 必须是纯函数(或可调用对象),且输入输出严格对应 state 的子集。这不是教条,而是为了可测试性。我坚持要求团队所有 node 都配单元测试,用固定 seed 的 mock LLM 输出,验证其对各种边界输入(空字符串、超长文本、非法 JSON)的响应。例如一个parse_json_node,它的测试用例必须覆盖:
- 输入
{"score": 0.95, "reason": "..."}→ 正确解析 - 输入
{"score": "0.95", "reason": "..."}→ 类型转换失败,应返回{"error": "score must be float"} - 输入
{"score": 0.95}→ 缺少 reason 字段,应触发 fallback 流程
而 edge 的条件函数(should_rerun_validation)才是真正的“心智开关”。它不能只是if len(state['results']) == 0:这种简单判断。我们要求它输出结构化诊断:{"triggered": True, "reason": "insufficient_data_points", "severity": "high", "suggested_action": "fetch_historical_context"}。这个结构会被自动记录进 audit log,成为后续分析“AI 为何在此刻改变主意”的原始证据。去年我们靠这个日志定位到一个隐藏 bug:某个节点在处理中文地址时,因正则表达式未开启 Unicode 模式,导致address_parsed字段为空,进而触发了不必要的重试,白白消耗了 40% 的 token 预算。没有这个结构化 edge 输出,这个问题会一直以“偶发超时”的面目存在。
3. 实操拆解:从零搭建一个“可自省”的客服工单分类 agent
3.1 环境准备与最小可行图(MVP Graph)
先明确目标:接收用户提交的工单文本(如“APP 登录后一直转圈,iOS 17.5,iPhone 14 Pro”),自动分类为BUG、FEATURE_REQUEST、USER_ERROR或UNKNOWN,并在分类置信度低于阈值时,主动发起追问。这不是一个静态分类器,而是一个闭环决策体。
pip install langgraph langchain-openai python-dotenv # 注意:必须使用 langgraph >= 0.1.18,早期版本的 interrupt_after 不稳定核心 state 定义(state.py):
from typing import Annotated, Dict, List, Optional, TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver from langchain_core.messages import AnyMessage class WorkItem(TypedDict): text: str user_id: str timestamp: str class GraphState(TypedDict): work_item: WorkItem # 分类主干结果 primary_category: Optional[str] confidence_score: Optional[float] # 追问相关 followup_questions: List[str] pending_followup_response: Optional[str] # 元信息 current_step: str # 'classify', 'validate', 'ask_followup', 'resolve' error: Optional[str] # 用于中断恢复的唯一标识 task_id: str # 初始化图 workflow = StateGraph(GraphState)提示:
task_id是生产环境的生命线。我们用uuid.uuid4().hex[:8]生成,并在所有日志、监控指标、数据库记录中透传。当客户投诉“我的工单卡住了”,运维只需查这个 ID,就能秒级定位到卡在哪个 node、state 是什么、上次 checkpoint 时间。
3.2 构建核心节点:分类、验证与追问
Node 1:LLM 分类器(classify_node)
from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser llm = ChatOpenAI(model="gpt-4-turbo", temperature=0.1) prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个资深客服工单分类专家。请严格按JSON格式输出,包含'category'(BUG/FEATURE_REQUEST/USER_ERROR/UNKNOWN)和'confidence'(0.0-1.0浮点数)"), ("human", "工单内容:{text}\n用户ID:{user_id}") ]) parser = JsonOutputParser(pydantic_object=ClassificationOutput) # 自定义 Pydantic 模型 def classify_node(state: GraphState) -> dict: try: result = prompt | llm | parser output = result.invoke({ "text": state["work_item"]["text"], "user_id": state["work_item"]["user_id"] }) return { "primary_category": output["category"], "confidence_score": output["confidence"], "current_step": "classify" } except Exception as e: return {"error": f"classify_node failed: {str(e)}", "current_step": "classify"}Node 2:置信度验证器(validate_confidence_node)
def validate_confidence_node(state: GraphState) -> dict: if state.get("error"): return {"current_step": "error_handling"} # 关键阈值:0.85 是我们 A/B 测试得出的平衡点 # 低于此值,自动触发追问;高于此值,直接进入 resolve if state["confidence_score"] < 0.85: # 生成追问问题:不是通用问题,而是基于工单内容定制 questions = generate_targeted_questions(state["work_item"]["text"]) return { "followup_questions": questions, "current_step": "ask_followup" } else: return {"current_step": "resolve"} def generate_targeted_questions(text: str) -> List[str]: # 实际项目中,这里会调用一个轻量级微调模型(如 Phi-3-mini) # 但 MVP 阶段,我们用规则+LLM 混合 if "登录" in text and "转圈" in text: return ["您尝试过清除APP缓存吗?", "同一网络下其他设备是否正常?"] elif "崩溃" in text: return ["崩溃前您正在操作哪个页面?", "是否有错误代码截图?"] else: return ["请描述问题发生的具体步骤?", "问题是否每次都会出现?"]Node 3:追问执行器(ask_followup_node)
def ask_followup_node(state: GraphState) -> dict: # 这里对接企业微信/钉钉机器人API # 发送消息给用户,并记录待回复状态 send_message_to_user( user_id=state["work_item"]["user_id"], content=f"为了更准确处理您的问题,请回答以下问题:\n1. {state['followup_questions'][0]}\n2. {state['followup_questions'][1]}" ) return {"current_step": "awaiting_response"} def send_message_to_user(user_id: str, content: str): # 伪代码:调用企微 webhook requests.post( "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx", json={"msgtype": "text", "text": {"content": content}} )3.3 设计“自省”边:让 AI 主动选择路径
LangGraph 的边(edge)是决策逻辑的载体。我们定义三条核心边:
# 边1:从 classify_node 到 validate_confidence_node(无条件) workflow.add_edge("classify_node", "validate_confidence_node") # 边2:从 validate_confidence_node 到 ask_followup_node(条件触发) def should_ask_followup(state: GraphState) -> str: if state.get("error"): return "error_handling" # 关键逻辑:当 confidence 低时,走追问;否则走 resolve return "ask_followup_node" if state["confidence_score"] < 0.85 else "resolve_node" workflow.add_conditional_edges( "validate_confidence_node", should_ask_followup, { "ask_followup_node": "ask_followup_node", "resolve_node": "resolve_node", "error_handling": "error_handling" } ) # 边3:从 ask_followup_node 到 await_response_node(等待用户输入) # 这里用 interrupt_after 实现“暂停” workflow.add_node("await_response_node", await_response_node) workflow.add_edge("ask_followup_node", "await_response_node") workflow.add_edge("await_response_node", "handle_user_response_node")await_response_node的核心是interrupt_after:
def await_response_node(state: GraphState) -> dict: # 此节点不执行任何操作,仅作为中断点 # 当 workflow 运行到此处,会自动暂停,并返回当前 state 给调用方 # 调用方(如 FastAPI endpoint)可将 state 序列化并返回给前端 return {} # 在编译图时启用中断 app = workflow.compile( checkpointer=MemorySaver(), # 生产环境换为 PostgresSaver interrupt_after=["await_response_node"] # 关键!指定中断点 )注意:
interrupt_after不是“等待”,而是“暂停并交出控制权”。真正的用户响应由外部系统(如 Webhook)捕获后,再调用app.invoke(state, {"configurable": {"thread_id": state['task_id']}})恢复。这个设计分离了“AI 决策”和“用户交互”,让系统可水平扩展。
3.4 启动与恢复:生产级 workflow 生命周期管理
一个完整的工单处理周期涉及多次中断与恢复。我们封装了一个WorkflowManager:
class WorkflowManager: def __init__(self, app: CompiledGraph): self.app = app def start_new_workflow(self, work_item: WorkItem) -> str: """启动新工单,返回 task_id""" task_id = uuid.uuid4().hex[:8] initial_state = GraphState( work_item=work_item, primary_category=None, confidence_score=None, followup_questions=[], pending_followup_response=None, current_step="start", error=None, task_id=task_id ) # 第一次 invoke,会运行到 await_response_node 并中断 result = self.app.invoke( initial_state, config={"configurable": {"thread_id": task_id}} ) return task_id def resume_workflow(self, task_id: str, user_response: str) -> dict: """用户回复后,恢复 workflow""" # 从 checkpointer 加载中断时的 state checkpoint = self.app.get_state(config={"configurable": {"thread_id": task_id}}) state = checkpoint.values # 注入用户回复 state["pending_followup_response"] = user_response state["current_step"] = "handle_user_response_node" # 恢复执行 result = self.app.invoke( state, config={"configurable": {"thread_id": task_id}} ) return result # FastAPI 示例 @app.post("/start_ticket") async def start_ticket(item: WorkItem): task_id = manager.start_new_workflow(item) return {"task_id": task_id, "status": "awaiting_user_response"} @app.post("/resume_ticket/{task_id}") async def resume_ticket(task_id: str, response: UserResponse): result = manager.resume_workflow(task_id, response.text) return {"final_category": result.get("primary_category"), "status": "resolved"}这个流程确保了:用户第一次提交工单,系统立刻返回task_id;用户在企微收到追问,回复后,Webhook 触发resume_ticket,workflow 从断点继续,最终给出确定分类。整个过程,AI 在validate_confidence_node主动判断“我不够确定”,在ask_followup_node主动发起追问,在handle_user_response_node主动整合新信息并更新结论——这就是“改变自己想法”的完整链条。
4. 深度实操:让 AI 不仅能改,还能解释“为什么改”
4.1 构建可追溯的决策日志(Audit Trail)
仅仅让 AI 改变想法不够,必须让人类能理解它为何改。我们在每个关键 node 后插入一个log_decision_node:
def log_decision_node(state: GraphState) -> dict: # 结构化日志,发送到 ELK 或 Datadog log_entry = { "task_id": state["task_id"], "step": state["current_step"], "timestamp": datetime.now().isoformat(), "state_snapshot": { "primary_category": state.get("primary_category"), "confidence_score": state.get("confidence_score"), "followup_questions": state.get("followup_questions", []), "error": state.get("error") }, "decision_provenance": get_provenance(state) # 关键:溯源 } send_to_log_collector(log_entry) return {} # 不修改 state def get_provenance(state: GraphState) -> str: """生成决策依据摘要""" if state["current_step"] == "classify_node": return f"LLM classification based on input text length={len(state['work_item']['text'])}" elif state["current_step"] == "validate_confidence_node": return f"Confidence {state['confidence_score']} < threshold 0.85" elif state["current_step"] == "ask_followup_node": return f"Generated questions for text containing keywords: {extract_keywords(state['work_item']['text'])}" return "unknown"这个日志不是流水账,而是“决策证明”。当 QA 团队抽查工单时,输入task_id,就能看到:
t=00:00:00:classify_node输出{"category": "BUG", "confidence": 0.72}t=00:00:01:validate_confidence_node触发should_ask_followup,因为0.72 < 0.85t=00:00:02:ask_followup_node生成问题["您尝试过清除APP缓存吗?", ...]t=00:01:30:resume_ticket被调用,pending_followup_response="清除了,还是转圈"t=00:01:31:resolve_node最终输出{"category": "BUG", "confidence": 0.96}
实操心得:我们曾把日志级别设为
INFO,结果每天产生 2TB 日志。后来改为:只有current_step在["validate_confidence_node", "resolve_node"]时才打INFO级别日志,其余为DEBUG,并通过log_entry["decision_provenance"]字段建立 Elasticsearch 的keyword类型索引,实现毫秒级task_id查询。这是成本与可观测性的关键平衡点。
4.2 实现“后悔机制”:人工干预与覆盖
生产环境必须有人兜底。LangGraph 的interrupt_before和interrupt_after是为此而生。我们在resolve_node前设置中断:
workflow.add_node("human_review_node", human_review_node) workflow.add_edge("validate_confidence_node", "human_review_node") workflow.add_conditional_edges( "human_review_node", lambda s: "proceed" if s.get("human_override") else "awaiting_review", { "proceed": "resolve_node", "awaiting_review": END # 暂停,等待人工操作 } ) # 在编译时启用 app = workflow.compile( checkpointer=PostgresSaver(connection=conn), interrupt_before=["human_review_node"], # 关键!在人工审核前中断 interrupt_after=["await_response_node"] )当confidence_score低于 0.6(比追问阈值更低),human_review_node会自动将工单推送到内部审核队列。审核员在后台看到:
- 原始工单文本
- AI 初步分类及置信度
- AI 生成的追问及用户回复
- “Override Category” 下拉菜单(BUG/FEATURE...)和 “Add Reason” 文本框
审核员提交后,系统调用:
# 人工覆盖 state app.update_state( config={"configurable": {"thread_id": task_id}}, values={ "primary_category": "FEATURE_REQUEST", "confidence_score": 0.99, # 人工覆盖后置信度设为最高 "human_override": True, "human_reason": "用户明确提到‘希望增加夜间模式’,属功能需求" } ) # 然后恢复 workflow app.invoke(None, config={"configurable": {"thread_id": task_id}})这个update_state是 LangGraph 的王牌功能。它允许你在任意时刻,用任意合法值覆盖 state 的任意字段,然后无缝恢复。这不仅是“后悔”,更是“教学”——每一次人工覆盖,都是在给 AI 的决策边界打标签,为后续的 fine-tuning 提供黄金数据。
4.3 性能与成本优化:避免“思考过载”
让 AI 频繁改变想法,代价是 token 和延迟。我们通过三层控制:
第一层:前置过滤(Pre-filtering)在classify_node前加一个轻量级规则引擎:
def pre_filter_node(state: GraphState) -> dict: text = state["work_item"]["text"].lower() if "refund" in text or "money back" in text: return {"primary_category": "REFUND", "confidence_score": 0.99, "current_step": "resolve"} if "how to" in text or "tutorial" in text: return {"primary_category": "USER_GUIDE", "confidence_score": 0.99, "current_step": "resolve"} return {"current_step": "classify_node"} # 继续走 LLM 流程这个节点用不到 10 行正则,拦截了 35% 的工单,节省了大量 GPT-4 调用。
第二层:动态模型降级(Model Fallback)当validate_confidence_node发现置信度低时,不总是调用 GPT-4。我们根据task_id的哈希值做 AB 测试:
def adaptive_model_selection(state: GraphState) -> str: # 10% 的低置信度工单,降级到 GPT-3.5 Turbo,节省 70% 成本 hash_val = int(hashlib.md5(state["task_id"].encode()).hexdigest()[:4], 16) if state["confidence_score"] < 0.85 and hash_val % 100 < 10: return "gpt35_classify_node" # 专用轻量节点 else: return "gpt4_classify_node"第三层:结果缓存(Result Caching)对完全相同的工单文本(经标准化后),我们用 Redis 缓存最终分类结果,TTL 设为 1 小时。命中率高达 22%,因为很多用户会重复提交相似问题。
| 优化策略 | 实施方式 | 成本降低 | 延迟降低 | 备注 |
|---|---|---|---|---|
| 前置规则过滤 | 正则匹配关键词 | 35% | 90% | 适用于高频、明确意图场景 |
| 动态模型降级 | 低置信度工单中 10% 降级 | 7% | 40% | 需监控降级后准确率下降幅度 |
| Redis 结果缓存 | 标准化文本 + TTL 1h | 22% | 95% | 标准化需去除时间戳、设备号等 |
这些不是理论,而是我们压测 5000 QPS 时,从 2.1s P95 延迟降到 0.8s 的真实数据。没有这些,所谓的“自省”只会变成昂贵的自我怀疑。
5. 常见问题与实战排障:那些文档里不会写的坑
5.1 “State 丢失”:最痛的幻觉
现象:workflow 运行到一半,state["work_item"]突然变成None,日志里只有KeyError: 'work_item'。
根因:LangGraph 的StateGraph默认使用__getitem__和__setitem__,但如果你在 node 中返回了{"work_item": None},它会静默覆盖。更隐蔽的是,当 node 抛出异常,LangGraph 的默认错误处理会返回一个空 dict,导致后续 state 丢失。
解决方案:
- 强制 state schema:用 Pydantic v2 的
BaseModel定义 state,并在每个 node 返回前做验证:from pydantic import BaseModel, Field class GraphState(BaseModel): work_item: WorkItem = Field(...) # ... 表示必填 primary_category: Optional[str] = None # ... 其他字段 def safe_node_wrapper(func): def wrapper(state: dict) -> dict: try: result = func(state) # 强制用 Pydantic 验证 validated = GraphState(**{**state, **result}) return validated.model_dump(exclude_unset=True) except Exception as e: logger.error(f"Node {func.__name__} failed: {e}") raise return wrapper - 全局错误处理器:在
compile()时注入:def global_error_handler(state: GraphState, error: Exception) -> dict: return { "error": f"{type(error).__name__}: {str(error)}", "current_step": "error_handling" } app = workflow.compile( checkpointer=checkpointer, interrupt_after=interrupt_nodes, # 关键:捕获所有未处理异常 debug=False # 生产环境关闭 debug,避免敏感信息泄露 )
5.2 “中断失效”:为什么 workflow 不暂停?
现象:设置了interrupt_after=["node_a"],但 workflow 一路跑到END,根本没有中断。
排查清单:
- ✅ 检查
node_a是否真的存在于 graph 中(大小写、拼写) - ✅ 检查
node_a是否被add_edge正确连接(workflow.add_node("node_a", node_func)) - ✅ 检查
invoke()时是否传入了config参数:app.invoke(state, config={"configurable": {"thread_id": "123"}}) - ✅ 检查
checkpointer是否初始化成功(MemorySaver()在多进程下不共享,生产必须用PostgresSaver) - ✅最关键的坑:
interrupt_after只对CompiledGraph.invoke()有效,对stream()无效!如果用for chunk in app.stream(...),中断会被忽略。必须用invoke()。
修复:在 FastAPI 中,永远用invoke()启动和恢复,stream()只用于前端实时日志推送(需另起一个 stream 调用)。
5.3 “并发冲突”:两个用户同时操作一个 task_id
现象:用户 A 和 B 都用同一个task_id调用resume_ticket,state 被覆盖,其中一人操作丢失。
原因:LangGraph 的update_state()是覆盖写,不是原子更新。
工业级解法:
- 乐观锁(Optimistic Locking):在 state 中加入
version字段,每次update_state时检查版本号:# 在 state 中 version: int = Field(default=0) # 更新时 current = app.get_state(config) if current.values["version"] != expected_version: raise ValueError("Concurrent update conflict") app.update_state(config, {..., "version": expected_version + 1}) - 队列化(Queue-based):所有
resume_ticket请求先入 Kafka,由单消费者顺序处理,确保task_id的操作串行化。这是我们在线上采用的方案,吞吐量达 1200 TPS,零冲突。
5.4 “图爆炸”:节点过多,调试像在迷宫里
现象:一个 20+ node 的 workflow,app.get_graph().draw_mermaid_png()生成的图密密麻麻,无法阅读。
经验技巧:
- 子图封装(Subgraph):把一组逻辑相关的 node(如“数据获取”)封装成一个
StateGraph,再作为一个 node 接入主图:# data_fetch_subgraph.py fetch_graph = StateGraph(FetchState) fetch_graph.add_node("call_api", call_api_node) fetch_graph.add_node("parse_xml", parse_xml_node) fetch_graph.set_entry_point("call_api") fetch_graph.set_finish_point("parse_xml") # 主图中 workflow.add_node("data_fetch", fetch_graph.compile()) - 可视化过滤:用
app.get_graph(xray=True).draw_mermaid_png()只显示当前活跃路径,配合app.get_state()查看实时 state。 - 命名规范:node 名强制用
<domain>_<action>_<scope>,如ticket_classify_primary,ticket_validate_confidence,user_ask_followup_ios。grep 一下就能定位。
5.5 “成本失控”:LLM 调用次数远超预期
现象:一个工单平均触发 5 次 GPT-4 调用,账单飙升。
根因分析表:
| 触发场景 | 原因 | 解决方案 | 效果 |
|---|---|---|---|
validate_confidence_node反复执行 | 边条件函数返回了错误的分支名 | 用print()打印should_xxx返回值,确认字面量匹配 | 100% 修复 |
ask_followup_node后未中断 | 忘记interrupt_after或配置错误 | 在compile()后加print(app.get_graph().to_json())检查中断点 | 立竿见影 |
resolve_node里嵌套了 LLM 调用 | 本该用规则判断,却写了llm.invoke() | 提取公共逻辑到pre_filter_node,用正则/关键词匹配 | 降低 40% 调用 |
| 用户多次回复,每次触发完整 workflow | resume_ticket未做幂等处理 | 在resume_ticket开头加if state.get('final_category'): return state | 防止雪崩 |
我们曾因第一条,在should_ask_followup里写了return "ask_followup",但 node 名是ask_followup_node,导致 LangGraph 默认走到END,然后又从START重跑,形成无限循环。这个 bug 让单个工单产生了 137 次 LLM 调用。教训是:所有边的目标 node 名,必须和add_node()的第一个参数完全一致,包括下划线和大小写。
6. 进阶思考:超越“改变想法”,走向“持续进化”
LangGraph 和 Agent Workflows 的终点,不是做一个会犹豫的 AI,而是构建一个能从自身决策中学习的系统。我们正在落地的两个方向:
方向一:自动化的“决策回溯”(Decision Retrospective)每天凌晨,系统扫描所有task_id,提取log_decision_node中的decision_provenance,用 GPT-4 生成日报:
- “今日 32% 的低置信度分类集中在‘支付失败’类工单,主要因用户未提供错误代码”
- “追问问题‘您是否重启过设备?’的回复率仅 18%,建议替换为‘请长按电源键10秒后开机’”
这份日报直接推送给产品团队,驱动 UI 改版(在提交页强制添加“错误代码”字段)和追问话术优化。AI 不再是执行者,而是业务洞察的生成器。
方向二:基于反馈的“图结构热更新”(Hot Graph Update)当human_override累计达到 100 次,且集中在某个work_item文本模式(如“APP 闪退 + iOS 17.5”),系统自动创建一个新的 node:
# 自动生成的 node def ios175_crash_fix_node(state: GraphState) -> dict: return { "primary_category": "BUG", "confidence_score": 0.99, "resolution_suggestion": "已知 iOS 17.5 系统 Bug,临时方案:关闭后台刷新" }然后用workflow.add_node("ios175_crash_fix_node", ios175_crash_fix_node)动态注入图中,并更新pre_filter_node的规则。整个过程无需重启服务,真正实现 workflow 的“在线进化”。
这已经不是“AI 改变自己的想法”,而是“AI 改变自己的思考方式”。它不再依赖人类编写新的 if-else,而是从海量决策日志中,自主提炼出新的认知节点,并将其编织进自己的思维图谱。我亲眼看着这个系统,在三个月内,把“支付类工单”的平均处理时间从 4.2 分钟压缩到 1.1 分钟,而