查看: 45|回复: 0

LangGraph实战:构建有状态AI工作流引擎

[复制链接]

0

主题

0

回帖

0

积分

积极分子

金币
0
阅读权限
220
精华
0
威望
0
贡献
0
在线时间
0 小时
注册时间
2009-1-30
发表于 2026-4-14 14:02:00 | 显示全部楼层 |阅读模式

一、LangGraph vs 其他AI框架

LangGraph是LangChain团队推出的新一代AI应用开发框架,专门用于构建有状态、多步骤的AI工作流。与LangChain的DAG模型不同,LangGraph采用Cyclic Graph(循环图)结构,支持无限循环和条件分支,可以实现真正的工作流编排。

二、核心概念

StateGraph:状态图,工作流的核心
Node:节点,代表一个步骤或任务
Edge:边,定义节点间的流转关系
State:状态,整个工作流的共享状态
Checkpoint:检查点,支持断点续跑和回溯
ConditionalEdge:条件边,支持动态路由

三、环境搭建

pip install langgraph langchain langchain-openai

import os
os.environ["OPENAI_API_KEY"] = "your-api-key"

四、最简单的工作流

from langgraph.graph import StateGraph, END
from typing import TypedDict

# 定义状态
class GraphState(TypedDict):
    messages: list
    result: str

# 定义节点
def node1(state):
    return {"result": "Step 1 完成 -> " + state.get("result", "")}

def node2(state):
    return {"result": state.get("result", "") + "Step 2 完成"}

# 创建图
workflow = StateGraph(GraphState)
workflow.add_node("step1", node1)
workflow.add_node("step2", node2)

# 定义边
workflow.set_entry_point("step1")
workflow.add_edge("step1", "step2")
workflow.add_edge("step2", END)

# 编译
app = workflow.compile()

# 执行
result = app.invoke({"messages": [], "result": ""})
print(result)

五、有状态工作流:客服机器人

from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")

class客服State(TypedDict):
    query: str
    intent: str
    response: str
    needs_human: bool

# 意图识别节点
def识别意图(state):
    query = state["query"]
    prompt = f"判断用户意图:{query},只返回:咨询/投诉/查询/办理"
    intent = llm.invoke(prompt).content
    return {"intent": intent}

# 分类处理
def处理咨询(state):
    response = llm.invoke(f"专业回答用户问题:{state['query']}").content
    return {"response": response}

def处理投诉(state):
    response = "非常抱歉给您带来不便,我们会尽快处理您的问题。"
    return {"response": response, "needs_human": True}

# 创建工作流
workflow = StateGraph(客服State)
workflow.add_node("intent_classifier", 识别意图)
workflow.add_node("handle_consult", 处理咨询)
workflow.add_node("handle_complaint", 处理投诉)

workflow.set_entry_point("intent_classifier")

# 条件边:根据意图路由
def路由(state):
    intent = state.get("intent", "")
    if "投诉" in intent:
        return "handle_complaint"
    return "handle_consult"

workflow.add_conditional_edges(
    "intent_classifier",
    路由,
    {"handle_consult": "handle_consult", "handle_complaint": "handle_complaint"}
)

workflow.add_edge("handle_consult", END)
workflow.add_edge("handle_complaint", END)

app = workflow.compile()
result = app.invoke({"query": "我要投诉快递太慢", "intent": "", "response": "", "needs_human": False})
print(result)

六、循环工作流:智能迭代

# 循环工作流:代码审查 + 自动修复
class CodeState(TypedDict):
    code: str
    issues: list
    iterations: int

def代码审查(state):
    issues = []
    if "TODO" in state["code"]:
        issues.append("发现TODO待办")
    if len(state["code"]) > 500:
        issues.append("代码过长")
    return {"issues": issues}

def代码修复(state):
    code = state["code"]
    for issue in state.get("issues", []):
        if "TODO" in issue:
            code = code.replace("TODO", "DONE")
    return {"code": code, "iterations": state.get("iterations", 0) + 1}

def检查是否继续(state):
    if state.get("issues") and state.get("iterations", 0) < 3:
        return "fix"
    return "end"

workflow = StateGraph(CodeState)
workflow.add_node("review", 代码审查)
workflow.add_node("fix", 代码修复)

workflow.set_entry_point("review")
workflow.add_conditional_edges(
    "review",
    检查是否继续,
    {"fix": "fix", "end": END}
)
workflow.add_edge("fix", "review")

app = workflow.compile()
result = app.invoke({"code": "def test(): TODO...", "issues": [], "iterations": 0})
print(result)

七、检查点与断点续跑

# 检查点配置
from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()

workflow = StateGraph(GraphState)
workflow.add_node("step1", node1)
workflow.add_node("step2", node2)
workflow.set_entry_point("step1")
workflow.add_edge("step1", "step2")
workflow.add_edge("step2", END)

# 编译时启用检查点
app = workflow.compile(checkpointer=checkpointer)

# 首次执行(会保存检查点)
config = {"configurable": {"thread_id": "user-123"}}
result = app.invoke({"messages": [], "result": "Step 1 -> "}, config)
print("首次执行:", result)

# 断点续跑(从检查点恢复)
result2 = app.invoke({"messages": [], "result": ""}, config)
print("续跑结果:", result2)

八、Human-in-the-Loop

# 人类审核节点
from langgraph.prebuilt import ToolNode

def审核节点(state):
    # 返回特殊信号,等待人类审核
    return {"__interrupt__": True}

def人类决策(state, decision):
    # 接收人类决策后继续执行
    return {"human_decision": decision}

workflow = StateGraph(GraphState)
workflow.add_node("ai_process", lambda s: {"result": "AI处理完成"})
workflow.add_node("human_review", 审核节点)
workflow.add_node("human_decide", lambda s: {"result": "人类批准: " + s.get("human_decision", "")})

workflow.set_entry_point("ai_process")
workflow.add_edge("ai_process", "human_review")
workflow.add_edge("human_review", "human_decide")
workflow.add_edge("human_decide", END)

app = workflow.compile(interrupt_before=["human_decide"])

# 执行到人类审核节点后暂停
result = app.invoke({"messages": [], "result": ""})
print("暂停等待审核:", result)

# 人类审核后继续
app.invoke({"human_decision": "批准"}, config)

九、与Spring Boot集成

@Service
public class LangGraphService {
    @Value("${langgraph.service.url}")
    private String serviceUrl;

    public String runWorkflow(String workflowId, Map<String, Object> input) {
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        
        Map<String, Object> body = Map.of(
            "workflow_id", workflowId,
            "input", input,
            "thread_id", UUID.randomUUID().toString()
        );
        
        HttpEntity<Map<String, Object>> entity = new HttpEntity<>(body, headers);
        
        try {
            ResponseEntity response = restTemplate.exchange(
                serviceUrl + "/execute",
                HttpMethod.POST,
                entity,
                Map.class
            );
            return (String) response.getBody().get("result");
        } catch (Exception e) {
            return "Error: " + e.getMessage();
        }
    }
}

十、最佳实践

  1. 状态设计:状态要精简,只存必要数据,避免状态膨胀
  2. 节点粒度:节点保持单一职责,便于调试和维护
  3. 条件边:复杂路由用条件边,清晰表达业务逻辑
  4. 检查点:长流程务必配置检查点,支持断点续跑
  5. Human-in-the-Loop:关键决策点插入人类审核节点

十一、总结

LangGraph是有状态AI工作流的最佳选择。Cyclic Graph支持真正的循环和条件分支,比传统DAG更加强大灵活。配合Spring Boot可快速构建企业级AI应用,如智能客服、代码审查、自动化测试等场景。

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

相关侵权、举报、投诉及建议等,请发 E-mail:qiongdian@foxmail.com

Powered by Discuz! X5.0 © 2001-2026 Discuz! Team.

在本版发帖返回顶部