医生为你 發表於 2025-9-3 00:55:00

[LangGraph]Human-in-the-loop示例之人工干预shell命令执行

<h2 id="前言">前言</h2>
<p>看langgraph官方文档感觉human in the loop貌似还挺简单的,但实际上手时,那文档看得我云里雾里的。更详细的Guides和Reference,恕我能力有限,悲摧的也没看懂。作为试验,我想做一个功能:本地执行shell命令,每次执行前都要用户确认。左看官方文档, 右去西天请ChatGPT老祖。ChatGPT说得头头是道,Copilot也反复调试,但就是不能用。就这。。。看来碰到新东西AI就十分拉胯。最终,认真看了半天文档,没借助GPT,总算捣鼓出来一个最简版。</p>
<blockquote>
<p>自从AI能力越来越强,大多时候自己更习惯直接让AI帮忙解决问题,越来越懒得看文档。自己找饭吃的能力还是得留着,不能光靠AI喂饭。</p>
</blockquote>
<h2 id="运行效果">运行效果</h2>
<p>试验嘛,交互就是命令行了。效果大概这样</p>
<pre><code>$ python custom_workflow.py
AI助手已启动,输入 'quit'、'exit' 或 'q' 退出程序
User: 今天的日期是什么
Assistant: 今天的日期是 2025-09-03。
User: 合肥的天气怎么样
Assistant: 合肥的天气总是阳光明媚!
User: 查看下本地内存占用
Assistant: Do you approve executing this command: free -h? Please answer 'yes' or 'no'.
User: yes
Assistant: 当前本地内存占用情况如下

total      used      free      sharedbuff/cache   available
内存:          62Gi      10Gi      46Gi       157Mi       6.5Gi      52Gi
交换:         3.8Gi          0B       3.8Gi

User: disk呢?
Assistant: Do you approve executing this command: df -h? Please answer 'yes' or 'no'.
User: yes
Assistant: 当前磁盘使用情况如下:

文件系统               大小已用可用 已用% 挂载点
udev                      32G   0   32G    0% /dev
tmpfs                  6.3G1.8M6.3G    1% /run
/dev/mapper/debian-root234G   29G194G   13% /
tmpfs                     32G   37M   32G    1% /dev/shm
efivarfs               128K   40K   84K   32% /sys/firmware/efi/efivars
tmpfs                  5.0M   12K5.0M    1% /run/lock
tmpfs                  1.0M   01.0M    0% /run/credentials/systemd-journald.service
tmpfs                     32G   49M   32G    1% /tmp
/dev/nvme1n1p1         989M256M666M   28% /boot
/dev/mapper/debian-home676G196G446G   31% /home
/dev/nvme0n1p1         300M   39M262M   13% /boot/efi
tmpfs                  6.3G4.1M6.3G    1% /run/user/1000

User: 非常好
Assistant: 谢谢!如果您有其他问题或需要进一步的帮助,请随时告诉我。😊
User: quit
Goodbye!
</code></pre>
<h2 id="code">Code</h2>
<p>注释写得够详细的了,具体可以直接看注释。LLM用的是阿里千问,注意替换成自己的。</p>
<p>checkpointer用的是内存,在生产环境,可以把checkpointer换成sqlite、postgres、redis等。</p>
<p>log就是个写日志文件的模块,不输出到控制台,之前调试的时候用来发给LLM做诊断,比较简单就不贴了。</p>
<p>在python命令行交互程序中,最好引用下<code>readline</code>模块,不然输入中文会碰到退格键没法正常用的问题,而且方向键也没法用。</p>
<pre><code class="language-python">"""
Human in the loop 示例, 每当AI需要执行shell命令时, 都需要经过用户确认
"""

import os
import readline# 引入readlint模块以增强命令行输入体验。Linux环境的Python标准库内置
from datetime import datetime
import subprocess
import traceback

from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import END, START, StateGraph, MessagesState
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.types import Command, interrupt
from langgraph.prebuilt import create_react_agent

# 自定义一个简单的文件型日志记录器
from log import logger


# 设置API密钥
os.environ["OPENAI_API_KEY"] = ""

# 初始化语言模型
llm = ChatOpenAI(
    model="qwen-plus",
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)


# 定义工具函数

@tool
def get_date() -&gt; str:
    """获取今天的日期。
   
    Returns:
      str: 当前日期,格式为 YYYY-MM-DD
    """
    logger.info("Getting date")
    return datetime.now().strftime("%Y-%m-%d")

@tool
def get_weather(city: str) -&gt; str:
    """获取指定城市的天气信息。
   
    Args:
      city (str): 城市名称
      
    Returns:
      str: 天气信息描述
    """
    logger.info("Getting weather")
    return f"It's always sunny in {city}!"

@tool
def execute_command(command: str) -&gt; str:
    """本地执行shell命令, 每次执行前需要用户确认
   
    Args:
      command (str): 要执行的命令

    Returns:
      str: 命令执行结果或拒绝信息
    """
    # 使用interrupt函数暂停执行并请求用户确认
    # interrupt会将控制权交还给用户,等待用户输入
    decision = interrupt({"query": f"Do you approve executing this command: {command}? Please answer 'yes' or 'no'."})
    logger.info(f"Decision: {decision}")
   
    # 根据用户决策决定是否执行命令
    if decision == "yes":
      logger.info(f"Executing command, {command}")
      try:
            # 执行命令并获取结果
            result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=10)
            result = result.stdout or result.stderr
            return result
      except subprocess.TimeoutExpired:
            return "Command timed out"
      except Exception as e:
            return f"Error executing command: {str(e)}"
    else:
      logger.info("Command execution denied by user")
      return "Command execution denied by user"

# 定义可用工具列表
tools =

# 创建ReAct代理,它可以根据需要自动调用工具
agent = create_react_agent(
    model=llm,
    tools=tools,
    prompt="You are a helpful assistant."
)

# 创建工具节点,用于执行工具调用
tool_node = ToolNode(tools=tools)

# 创建内存检查点保存器,用于保存对话状态
memory = InMemorySaver()

# 配置运行时参数,使用固定的线程ID
config = RunnableConfig(configurable={"thread_id": "1"})

def create_graph() -&gt; CompiledStateGraph:
    """创建并返回工作流图。
   
    Returns:
      CompiledStateGraph: 编译后的工作流图
    """
    # 创建状态图,使用MessagesState作为状态类型
    graph_builder = StateGraph(MessagesState)
   
    # 添加节点
    graph_builder.add_node("agent", agent)# AI代理节点
    graph_builder.add_node("tools", tool_node)# 工具执行节点
   
    # 添加边
    graph_builder.add_edge(START, "agent")# 从开始节点连接到代理节点
    graph_builder.add_edge("tools", "agent")# 从工具节点连接回代理节点
   
    # 添加条件边,根据代理的决策决定下一步
    graph_builder.add_conditional_edges(
      "agent",
      tools_condition,# 条件函数,判断是否需要调用工具
      {"tools": "tools", END: END}# 映射:需要工具时转到工具节点,否则结束
    )

    # 编译图并返回,使用内存保存器来保存状态
    return graph_builder.compile(checkpointer=memory)

def handle_user_decision(user_input: str) -&gt; bool:
    """处理用户对中断的响应。
   
    Args:
      user_input (str): 用户的输入
      
    Returns:
      bool: 如果处理了中断返回True,否则返回False
    """
    # 创建图实例
    graph = create_graph()
   
    # 获取当前状态
    current_state = graph.get_state(config)
   
    # 检查是否有待处理的中断
    if not current_state.next:
      logger.warning("No pending interrupts to handle.")
      return False# 没有待处理的中断
   
    # 根据用户输入决定如何响应中断
    if user_input.lower() == "yes":
      # 用户确认,继续执行
      graph.invoke(Command(resume="yes"), config=config)
    else:
      # 用户拒绝,取消执行
      graph.invoke(Command(resume="no"), config=config)
      
    return True# 处理了中断

def graph_invoke(user_input: str):
    """处理用户输入并执行相应操作。
   
    Args:
      user_input (str): 用户输入的文本
    """
    # 首先尝试处理用户对中断的响应
    interrupt_handled = handle_user_decision(user_input)

    # 如果已经处理了中断,则不再继续处理用户输入,而是显示结果
    if interrupt_handled:
      # 获取处理后的状态并显示结果
      graph = create_graph()
      current_state = graph.get_state(config)
      if current_state.values and 'messages' in current_state.values:
            # 显示最新的消息内容
            messages = current_state.values['messages']
            if messages:
                last_message = messages[-1]
                if hasattr(last_message, 'content'):
                  print("Assistant:", last_message.content)
      return

    # 如果没有待处理的中断,则正常处理用户输入
    graph = create_graph()
    resp = graph.invoke({"messages": }, config=config)
   
    logger.debug(f"response: {resp}")
   
    # 检查是否有中断需要处理
    if "__interrupt__" in resp:
      interrupt_data = resp["__interrupt__"]
      interrupt = interrupt_data if interrupt_data else None

      if not interrupt or not hasattr(interrupt, "value"):
            logger.error("Invalid interrupt data")
            return
            
      interrupt_value = interrupt.value
      # 显示中断请求给用户
      print(f"Assistant: {interrupt_value['query']}")
    else:
      # 直接显示AI的响应
      print("Assistant:", resp["messages"][-1].content)
      
    logger.debug(f"Snapshot state: {graph.get_state(config)}")
    logger.debug(f"Snapshot next: {graph.get_state(config).next}")

# 程序入口点
if __name__ == "__main__":
    """主程序循环,处理用户输入并生成响应。"""
   
    print("AI助手已启动,输入 'quit'、'exit' 或 'q' 退出程序")
   
    while True:
      try:
            # 获取用户输入
            user_input = input("User: ").strip()
            logger.info(f"User input: {user_input}")
            
            # 检查退出命令
            if user_input.lower() in ["quit", "exit", "q"]:
                print("Goodbye!")
                break
               
            # 处理用户输入
            graph_invoke(user_input)
            
      except KeyboardInterrupt:
            # 处理Ctrl+C中断
            print("\nGoodbye!")
            break
      except Exception as e:
            # 记录并显示错误信息
            logger.error(f"Error occurred: {traceback.format_exc()}")
            print(f"Error: {traceback.format_exc()}")
            break
</code></pre>
<h2 id="用fastapi实现human-in-the-loop">用fastapi实现human-in-the-loop</h2>
<p>效果</p>
<pre><code># rainux @ debian in ~
$ curl -X POST http://127.0.0.1:8000/chat -H "content-type: application/json" -d '{"thread_id": "11", "message": "yes"}'
{"Ai":"当前内存占用情况如下:\n\n- **总内存**: 62Gi\n- **已使用内存**: 11Gi\n- **空闲内存**: 43Gi\n- **共享内存**: 233Mi\n- **缓存/缓冲区**: 8.7Gi\n- **可用内存**: 50Gi\n\n**交换分区**(Swap):\n- **总交换空间**: 3.8Gi\n- **已使用交换空间**: 0B\n- **空闲交换空间**: 3.8Gi\n\n目前系统内存使用情况较为健康,有充足的可用内存。"}

# rainux @ debian in ~
$ curl -X POST http://127.0.0.1:8000/chat -H "content-type: application/json" -d '{"thread_id": "11", "message": "还有硬
盘"}'
{"Ai":"Do you approve executing this command: df -h? Please answer 'yes' or 'no'."}

# rainux @ debian in ~
$ curl -X POST http://127.0.0.1:8000/chat -H "content-type: application/json" -d '{"thread_id": "11", "message": "no"}'
{"Ai":"我无法执行命令来获取硬盘信息,因为该操作需要用户的明确确认。如果你希望查看硬盘使用情况,请确认是否允许执行相关命令
。"}
</code></pre>
<p>code</p>
<pre><code class="language-python">import asyncio
import subprocess
from datetime import datetime
from typing import List

from fastapi import FastAPI
from fastapi.responses import JSONResponse
from langchain_core.messages import HumanMessage
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import BaseTool, tool
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt import ToolNode, create_react_agent, tools_condition
from langgraph.types import Command, interrupt
from pydantic import BaseModel

from pkg.config import cfg
from pkg.log import logger


class ReqChat(BaseModel):
    thread_id: str
    message: str

def get_llm() -&gt; ChatOpenAI:
    return ChatOpenAI(
      model=cfg.llm_model,
      base_url=cfg.llm_base_url,
      temperature=0.3,
      api_key=cfg.llm_api_key,
    )


# 定义工具函数
def get_tools() -&gt; List:
    @tool
    def get_weather(city: str) -&gt; str:
      """获取指定城市的天气信息。
      
      Args:
            city (str): 城市名称
            
      Returns:
            str: 天气信息描述
      """
      logger.info("Getting weather")
      return f"It's always sunny in {city}!"
   
    @tool
    def get_date() -&gt; str:
      """获取今天的日期。
      
      Returns:
            str: 当前日期,格式为 YYYY-MM-DD
      """
      logger.info("Getting date")
      return datetime.now().strftime("%Y-%m-%d")


    @tool
    async def execute_command(command: str) -&gt; str:
      """本地执行shell命令, 每次执行前需要用户确认
      
      Args:
            command (str): 要执行的命令

      Returns:
            str: 命令执行结果或拒绝信息
      """
      # 使用interrupt函数暂停执行并请求用户确认
      # interrupt会将控制权交还给用户,等待用户输入
      decision = interrupt({"query": f"Do you approve executing this command: {command}? Please answer 'yes' or 'no'."})
      logger.info(f"Decision: {decision}")
      
      # 根据用户决策决定是否执行命令
      if decision == "yes":
            logger.info(f"Executing command, {command}")
            try:
                # 使用asyncio.create_subprocess_shell执行异步命令
                process = await asyncio.create_subprocess_shell(
                  command,
                  stdout=subprocess.PIPE,
                  stderr=subprocess.PIPE
                )
               
                # 设置超时时间为10秒
                stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=10.0)
               
                # 解码输出
                stdout_str = stdout.decode().strip()
                stderr_str = stderr.decode().strip()
               
                # 返回结果
                if stdout_str:
                  return stdout_str
                elif stderr_str:
                  return f"Error: {stderr_str}"
                else:
                  return "Command executed successfully with no output"
                  
            except asyncio.TimeoutError:
                # 如果进程仍在运行,终止它
                if process and not process.returncode:
                  try:
                        process.terminate()
                        await process.wait()
                  except:
                        pass
                return "Command timed out after 10 seconds"
            except Exception as e:
                return f"Error executing command: {str(e)}"
      else:
            logger.info("Command execution denied by user")
            return "Command execution denied by user"
      
    return


memory = InMemorySaver()
def create_graph() -&gt; CompiledStateGraph:
    tools = get_tools()
    agent = create_react_agent(
      model=get_llm(),
      tools=tools,
      prompt="You are a helpful assistant."
    )


    # 创建工具节点,用于执行工具调用
    tool_node = ToolNode(tools=tools)


    # 配置运行时参数,使用固定的线程ID
    # config = RunnableConfig(configurable={"thread_id": thread_id})

    # 创建状态图,使用MessagesState作为状态类型
    graph_builder = StateGraph(MessagesState)
   
    # 添加节点
    graph_builder.add_node("agent", agent)# AI代理节点
    graph_builder.add_node("tools", tool_node)# 工具执行节点
   
    # 添加边
    graph_builder.add_edge(START, "agent")# 从开始节点连接到代理节点
    graph_builder.add_edge("tools", "agent")# 从工具节点连接回代理节点
   
    # 添加条件边,根据代理的决策决定下一步
    graph_builder.add_conditional_edges(
      "agent",
      tools_condition,# 条件函数,判断是否需要调用工具
      {"tools": "tools", END: END}# 映射:需要工具时转到工具节点,否则结束
    )


    # 编译图并返回,使用内存保存器来保存状态
    return graph_builder.compile(checkpointer=memory)

async def handle_user_decision(req_chat: ReqChat) -&gt; bool:
    """处理用户对中断的响应。
   
    Args:
      user_input (str): 用户的输入
      
    Returns:
      bool: 如果处理了中断返回True,否则返回False
    """
    # 创建图实例
    graph = create_graph()
    config = RunnableConfig(configurable={"thread_id": req_chat.thread_id})
   
    # 获取当前状态
    current_state = await graph.aget_state(config)
   
    # 检查是否有待处理的中断
    if not current_state.next:
      logger.debug("No pending interrupts to handle.")
      return False# 没有待处理的中断
   
    # 根据用户输入决定如何响应中断
    if req_chat.message.lower().strip() == "yes":
      # 用户确认,继续执行
      await graph.ainvoke(Command(resume="yes"), config=config)
    else:
      # 用户拒绝,取消执行
      await graph.ainvoke(Command(resume="no"), config=config)
      
    return True# 处理了中断

async def graph_invoke(req_chat: ReqChat) -&gt; str:
    """处理用户输入并执行相应操作。
   
    Args:
      user_input (str): 用户输入的文本
    """
    # 首先尝试处理用户对中断的响应
    interrupt_handled = await handle_user_decision(req_chat)

    config = RunnableConfig(configurable={"thread_id": req_chat.thread_id})

    # 如果已经处理了中断,则不再继续处理用户输入,而是显示结果
    if interrupt_handled:
      # 获取处理后的状态并显示结果
      graph = create_graph()
      current_state = await graph.aget_state(config)
      if current_state.values and 'messages' in current_state.values:
            # 显示最新的消息内容
            messages = current_state.values['messages']
            if messages:
                last_message = messages[-1]
                if hasattr(last_message, 'content'):
                  # 检查是否是工具消息,如果是,则再返回一条消息(工具执行结果)
                  if hasattr(last_message, 'tool_call_id') and hasattr(last_message, 'name'):
                        # 这是一个工具响应消息,获取下一条AI消息作为结果
                        if len(messages) &gt;= 2:
                            ai_message = messages[-2]
                            if hasattr(ai_message, 'content'):
                              return ai_message.content
                  # print("Assistant:", last_message.content)
                  return last_message.content
      return "check current state failed"

    # 如果没有待处理的中断,则正常处理用户输入
    graph = create_graph()
    resp = await graph.ainvoke({"messages": }, config=config)
   
    logger.debug(f"response: {resp}")
    logger.debug(f"Snapshot state: {graph.get_state(config)}")
    logger.debug(f"Snapshot next: {graph.get_state(config).next}")
   
    # 检查是否有中断需要处理
    if "__interrupt__" in resp:
      interrupt_data = resp["__interrupt__"]
      interrupt = interrupt_data if interrupt_data else None

      if not interrupt or not hasattr(interrupt, "value"):
            logger.error("Invalid interrupt data")
            return "Invalid interrupt data"
            
      interrupt_value = interrupt.value
      # 显示中断请求给用户
      # print(f"Assistant: {interrupt_value['query']}")
      return interrupt_value['query']
    else:
      # 直接显示AI的响应
      # print("Assistant:", resp["messages"][-1].content)
      return resp["messages"][-1].content
      

app = FastAPI()



@app.post("/chat")
async def post_chat(req: ReqChat):
    resp = await graph_invoke(req)
    logger.info(f"{req.thread_id} {req.message} -&gt; {resp}")
    return JSONResponse(content={"Ai": resp})

# 程序入口点
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host=cfg.server_host, port=cfg.server_port)
</code></pre>


</div>
<div id="MySignature" role="contentinfo">
    <p>本文来自博客园,作者:花酒锄作田,转载请注明原文链接:https://www.cnblogs.com/XY-Heruo/p/19071001/human-in-the-loop-of-langgraph</p><br><br>
来源:https://www.cnblogs.com/XY-Heruo/p/19071001/human-in-the-loop-of-langgraph
頁: [1]
查看完整版本: [LangGraph]Human-in-the-loop示例之人工干预shell命令执行