李联广 發表於 2026-4-16 09:00:00

【从0到1构建一个ClaudeAgent】并发-后台任务

<p>有些操作很慢,Agent 不能干等着。例如<strong>长时间编译/构建</strong>:<code>make</code>, <code>mvn compile</code>, <code>gradle build</code> 或 <strong>大数据处理</strong>:<code>hadoop</code>, <code>spark-submit</code> 等的一些工作</p>
<h2 id="java实现代码">Java实现代码</h2>
<pre><code class="language-java">public class BackgroundTasksSystem {
    // --- 配置 ---
    private static final Path WORKDIR = Paths.get(System.getProperty("user.dir"));
    private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();
   
    // --- 后台任务管理器 ---
    static class BackgroundManager {
      // 任务存储
      private final Map&lt;String, TaskInfo&gt; tasks = new ConcurrentHashMap&lt;&gt;();
      // 通知队列
      private final Queue&lt;TaskNotification&gt; notificationQueue = new ConcurrentLinkedQueue&lt;&gt;();
      // 任务 ID 生成器
      private final AtomicInteger taskIdCounter = new AtomicInteger(1);
      // 锁
      private final Object lock = new Object();
      
      static class TaskInfo {
            String taskId;
            String status;// running, completed, timeout, error
            String result;
            String command;
            long startTime;
            Thread thread;// 关联的执行线程
      }
      
      static class TaskNotification {
            String taskId;
            String status;
            String command;
            String result;
      }
      
      /**
         * 启动后台任务
         * 立即返回任务 ID,不等待命令完成
         */
      public String run(String command) {
            String taskId = "task_" + taskIdCounter.getAndIncrement();
            
            TaskInfo task = new TaskInfo(taskId, command);
            tasks.put(taskId, task);
            
            // 创建并启动后台线程
            Thread thread = new Thread(() -&gt; executeTask(task), "BackgroundTask-" + taskId);
            thread.setDaemon(true);
            task.thread = thread;
            thread.start();// 立即返回,不阻塞
            
            return String.format("Background task %s started: %s",
                taskId, command.substring(0, Math.min(command.length(), 80)));
      }
      
      /**
         * 线程目标:执行子进程,捕获输出,推送结果到队列
         */
      private void executeTask(TaskInfo task) {
            String output;
            String status;
            
            try {
                ProcessBuilder pb = new ProcessBuilder("bash", "-c", task.command);
                pb.directory(WORKDIR.toFile());
                pb.redirectErrorStream(true);
               
                Process process = pb.start();
                boolean finished = process.waitFor(300, TimeUnit.SECONDS);// 5分钟超时
               
                if (!finished) {
                  process.destroy();
                  output = "Error: Timeout (300s)";
                  status = "timeout";
                } else {
                  output = new String(process.getInputStream().readAllBytes()).trim();
                  status = "completed";
                }
            } catch (Exception e) {
                output = "Error: " + e.getMessage();
                status = "error";
            }
            
            // 更新任务状态
            task.status = status;
            task.result = output.isEmpty() ? "(no output)" :
                        output.substring(0, Math.min(output.length(), 50000));
            
            // 添加通知到队列
            synchronized (lock) {
                notificationQueue.offer(new TaskNotification(
                  task.taskId,
                  status,
                  task.command.substring(0, Math.min(task.command.length(), 80)),
                  task.result.substring(0, Math.min(task.result.length(), 500))
                ));
            }
      }
      
      /**
         * 检查任务状态
         * 如果指定 taskId,检查单个任务;否则列出所有任务
         */
      public String check(String taskId) {
            if (taskId != null &amp;&amp; !taskId.isEmpty()) {
                TaskInfo task = tasks.get(taskId);
                if (task == null) {
                  return "Error: Unknown task " + taskId;
                }
                return String.format("[%s] %s\n%s",
                  task.status,
                  task.command.substring(0, Math.min(task.command.length(), 60)),
                  task.result != null ? task.result : "(running)");
            } else {
                StringBuilder sb = new StringBuilder();
                for (Map.Entry&lt;String, TaskInfo&gt; entry : tasks.entrySet()) {
                  TaskInfo task = entry.getValue();
                  sb.append(String.format("%s: [%s] %s\n",
                        task.taskId,
                        task.status,
                        task.command.substring(0, Math.min(task.command.length(), 60))));
                }
                return sb.length() &gt; 0 ? sb.toString().trim() : "No background tasks.";
            }
      }
      
      /**
         * 清空通知队列并返回所有待处理的通知
         */
      public List&lt;TaskNotification&gt; drainNotifications() {
            synchronized (lock) {
                List&lt;TaskNotification&gt; notifications = new ArrayList&lt;&gt;();
                while (!notificationQueue.isEmpty()) {
                  notifications.add(notificationQueue.poll());
                }
                return notifications;
            }
      }
      
      /**
         * 获取所有任务
         */
      public Map&lt;String, TaskInfo&gt; getAllTasks() {
            return new HashMap&lt;&gt;(tasks);
      }
    }
   
    // 初始化后台管理器
    private static final BackgroundManager BG_MANAGER = new BackgroundManager();
   
    // --- 工具枚举 ---
    public enum ToolType {
      BASH("bash", "Run a shell command (blocking)."),
      READ_FILE("read_file", "Read file contents."),
      WRITE_FILE("write_file", "Write content to file."),
      EDIT_FILE("edit_file", "Replace exact text in file."),
      BACKGROUND_RUN("background_run", "Run command in background thread. Returns task_id immediately."),// 新增
      CHECK_BACKGROUND("check_background", "Check background task status. Omit task_id to list all.");// 新增
      public final String name;
      public final String description;
      ToolType(String name, String description) { this.name = name; this.description = description; }
    }

    // --- 工具处理器映射 ---
    private static final Map&lt;String, ToolExecutor&gt; TOOL_HANDLERS = new HashMap&lt;&gt;();
   
    static {
      // ... 省略基础工具注册
      
      // 后台任务工具
      TOOL_HANDLERS.put(ToolType.BACKGROUND_RUN.name, args -&gt; {
            String command = (String) args.get("command");
            return BG_MANAGER.run(command);
      });
      
      TOOL_HANDLERS.put(ToolType.CHECK_BACKGROUND.name, args -&gt; {
            String taskId = (String) args.get("task_id");
            return BG_MANAGER.check(taskId);
      });
    }
   
    // ... 省略相同的工具实现
   
    // --- Agent 主循环(集成后台任务通知)---
    public static void agentLoop(List&lt;Map&lt;String, Object&gt;&gt; messages) {
      while (true) {
            try {
                // 在 LLM 调用前检查后台通知
                List&lt;BackgroundManager.TaskNotification&gt; notifications = BG_MANAGER.drainNotifications();
               
                if (!notifications.isEmpty() &amp;&amp; !messages.isEmpty()) {
                  StringBuilder notifText = new StringBuilder();
                  notifText.append("&lt;background-results&gt;\n");
                  
                  for (BackgroundManager.TaskNotification notif : notifications) {
                        notifText.append(String.format(" %s: %s\n",
                            notif.taskId, notif.status, notif.result));
                  }
                  
                  notifText.append("&lt;/background-results&gt;");
                  
                  messages.add(Map.of(
                        "role", "user",
                        "content", notifText.toString()
                  ));
                  
                  messages.add(Map.of(
                        "role", "assistant",
                        "content", "Noted background results."
                  ));
                  // 异步结果注入:将后台任务结果插入到对话中
                  // 结构化格式:用XML标签包裹,便于LLM解析
                }
               
                // 显示当前活动任务
                Map&lt;String, BackgroundManager.TaskInfo&gt; activeTasks = BG_MANAGER.getAllTasks();
                int runningTasks = (int) activeTasks.values().stream()
                  .filter(t -&gt; "running".equals(t.status))
                  .count();
               
                if (runningTasks &gt; 0) {
                  System.out.printf("\n", runningTasks);
                }
               
                // ... 省略相同的 LLM 调用和工具执行逻辑
               
            } catch (Exception e) {
                System.err.println("Error in agent loop: " + e.getMessage());
                e.printStackTrace();
                return;
            }
      }
    }
}
</code></pre>
<p>这段代码引入了<strong>后台任务系统</strong>,解决了 Agent 在执行长时间任务时的<strong>阻塞问题</strong></p>
<p><strong>关键洞察</strong>:Agent 可以在命令执行时继续工作,而不是被阻塞。</p>
<h2 id="异步任务处理架构">异步任务处理架构</h2>
<p><strong>核心思想</strong>:从同步阻塞的任务执行升级为<strong>异步非阻塞的并发处理</strong>,让Agent能够<strong>同时处理多个耗时任务</strong>,实现"并行计算"能力,大幅提升效率和响应性。</p>
<pre><code class="language-java">// 后台任务管理器 - 异步执行引擎
static class BackgroundManager {
    // 任务存储
    private final Map&lt;String, TaskInfo&gt; tasks = new ConcurrentHashMap&lt;&gt;();
    // 通知队列
    private final Queue&lt;TaskNotification&gt; notificationQueue = new ConcurrentLinkedQueue&lt;&gt;();
    // 任务 ID 生成器
    private final AtomicInteger taskIdCounter = new AtomicInteger(1);
    // 并发安全:使用线程安全集合
    // 异步通信:通过队列传递任务结果
    // 唯一标识:自动生成任务ID
}
</code></pre>
<ul>
<li><strong>解耦执行</strong>:任务提交和执行分离,立即返回控制权</li>
<li><strong>并发管理</strong>:多个后台任务可以同时运行</li>
<li><strong>结果异步收集</strong>:通过队列机制收集完成的任务结果</li>
<li><strong>线程安全</strong>:使用并发集合确保多线程安全</li>
</ul>
<h2 id="任务信息结构设计">任务信息结构设计</h2>
<pre><code class="language-java">// 任务信息实体
static class TaskInfo {
    String taskId;      // 唯一标识
    String status;      // 状态:running, completed, timeout, error
    String result;      // 执行结果
    String command;       // 执行的命令
    long startTime;       // 开始时间
    Thread thread;      // 关联的执行线程
    // 完整状态跟踪:从启动到完成的全生命周期
    // 线程关联:可以控制或监控执行线程
    // 时间戳:支持超时和性能分析
}

// 任务通知实体
static class TaskNotification {
    String taskId;
    String status;
    String command;
    String result;
    // 轻量传输:只包含必要信息
    // 结构化:易于解析和处理
    // 结果截断:避免过大的通知消息
}
</code></pre>
<ul>
<li><strong>状态驱动</strong>:明确的任务状态生命周期</li>
<li><strong>结果持久</strong>:任务结果可以多次查询</li>
<li><strong>线程管理</strong>:可以跟踪和控制执行线程</li>
<li><strong>事件驱动</strong>:通过通知机制传递完成事件</li>
</ul>
<h2 id="异步任务启动机制">异步任务启动机制</h2>
<pre><code class="language-java">/**
* 启动后台任务
* 立即返回任务 ID,不等待命令完成
*/
public String run(String command) {
    String taskId = "task_" + taskIdCounter.getAndIncrement();
   
    TaskInfo task = new TaskInfo(taskId, command);
    tasks.put(taskId, task);
   
    // 创建并启动后台线程
    Thread thread = new Thread(() -&gt; executeTask(task), "BackgroundTask-" + taskId);
    thread.setDaemon(true);// 守护线程,不会阻止JVM退出
    task.thread = thread;
    thread.start();// 立即返回,不阻塞调用者
   
    return String.format("Background task %s started: %s",
      taskId, command.substring(0, Math.min(command.length(), 80)));
    // 异步启动:立即返回任务ID,不等待命令完成
    // 守护线程:不会阻止程序正常退出
    // 线程命名:便于调试和监控
}
</code></pre>
<ul>
<li><strong>立即返回</strong>:不阻塞主线程,立即返回控制权</li>
<li><strong>守护线程</strong>:后台任务不会阻止JVM退出</li>
<li><strong>资源管理</strong>:线程自动清理,避免内存泄漏</li>
<li><strong>友好反馈</strong>:返回任务ID和简化的命令描述</li>
</ul>
<h2 id="任务执行与结果收集">任务执行与结果收集</h2>
<pre><code class="language-java">/**
* 线程目标:执行子进程,捕获输出,推送结果到队列
*/
private void executeTask(TaskInfo task) {
    String output;
    String status;
   
    try {
      ProcessBuilder pb = new ProcessBuilder("bash", "-c", task.command);
      pb.directory(WORKDIR.toFile());
      pb.redirectErrorStream(true);
      
      Process process = pb.start();
      boolean finished = process.waitFor(300, TimeUnit.SECONDS);// 5分钟超时
      
      if (!finished) {
            process.destroy();
            output = "Error: Timeout (300s)";
            status = "timeout";
      } else {
            output = new String(process.getInputStream().readAllBytes()).trim();
            status = "completed";
      }
    } catch (Exception e) {
      output = "Error: " + e.getMessage();
      status = "error";
    }
   
    // 更新任务状态
    task.status = status;
    task.result = output.isEmpty() ? "(no output)" :
                  output.substring(0, Math.min(output.length(), 50000));
   
    // 添加通知到队列
    synchronized (lock) {
      notificationQueue.offer(new TaskNotification(
            task.taskId,
            status,
            task.command.substring(0, Math.min(task.command.length(), 80)),
            task.result.substring(0, Math.min(task.result.length(), 500))
      ));
    }
}
</code></pre>
<ul>
<li><strong>超时保护</strong>:防止长时间运行的任务阻塞</li>
<li><strong>异常安全</strong>:全面捕获执行异常</li>
<li><strong>内存管理</strong>:截断大结果,避免内存溢出</li>
<li><strong>事件驱动</strong>:完成后立即通知主线程</li>
</ul>
<h2 id="智能通知注入机制">智能通知注入机制</h2>
<pre><code class="language-java">// 在 LLM 调用前检查后台通知
List&lt;BackgroundManager.TaskNotification&gt; notifications = BG_MANAGER.drainNotifications();

if (!notifications.isEmpty() &amp;&amp; !messages.isEmpty()) {
    StringBuilder notifText = new StringBuilder();
    notifText.append("&lt;background-results&gt;\n");
   
    for (BackgroundManager.TaskNotification notif : notifications) {
      notifText.append(String.format(" %s: %s\n",
            notif.taskId, notif.status, notif.result));
    }
   
    notifText.append("&lt;/background-results&gt;");
   
    messages.add(Map.of(
      "role", "user",
      "content", notifText.toString()
    ));
   
    messages.add(Map.of(
      "role", "assistant",
      "content", "Noted background results."
    ));
    // 自动注入:自动将后台结果插入到对话中
    // 结构化格式:XML标签明确标识内容类型
    // 对话完整:添加assistant确认,保持对话结构
    // 时机智能:在LLM调用前插入,确保LLM能看到最新结果
}
</code></pre>
<ul>
<li><strong>自动同步</strong>:后台结果自动同步到主对话</li>
<li><strong>结构化格式</strong>:便于LLM识别和解析</li>
<li><strong>对话集成</strong>:无缝集成到现有对话流</li>
<li><strong>时机优化</strong>:在决策前注入,确保信息及时性</li>
</ul>
<h2 id="工具集成架构">工具集成架构</h2>
<pre><code class="language-java">// 后台任务工具集
public enum ToolType {
    BACKGROUND_RUN("background_run", "Run command in background thread. Returns task_id immediately."),
    CHECK_BACKGROUND("check_background", "Check background task status. Omit task_id to list all.");
    // 异步执行:立即返回,不阻塞
    // 状态查询:支持单个和批量查询
    // 语义清晰:工具名明确表示异步特性
}

// 工具处理器映射
TOOL_HANDLERS.put(ToolType.BACKGROUND_RUN.name, args -&gt; {
    String command = (String) args.get("command");
    return BG_MANAGER.run(command);
    // 委托执行:将命令转交给后台管理器
    // 立即返回:不等待任务完成
});

TOOL_HANDLERS.put(ToolType.CHECK_BACKGROUND.name, args -&gt; {
    String taskId = (String) args.get("task_id");
    return BG_MANAGER.check(taskId);
    // 灵活查询:支持单任务详查和列表概览
});
</code></pre>
<ul>
<li><strong>接口统一</strong>:与同步工具相同的调用方式</li>
<li><strong>异步语义</strong>:工具名明确区分同步/异步</li>
<li><strong>灵活查询</strong>:支持多种查询方式</li>
<li><strong>无缝集成</strong>:与现有工具系统完全兼容</li>
</ul>
<h2 id="架构演进与价值">架构演进与价值</h2>
<p><strong>从 ContextCompactSystem 到 BackgroundTasksSystem 的升级</strong>:</p>
<table>
<thead>
<tr>
<th>维度</th>
<th>ContextCompactSystem</th>
<th>BackgroundTasksSystem</th>
</tr>
</thead>
<tbody>
<tr>
<td>执行模式</td>
<td>同步串行</td>
<td>异步并行</td>
</tr>
<tr>
<td>吞吐量</td>
<td>一次一个任务</td>
<td>并发多个任务</td>
</tr>
<tr>
<td>响应性</td>
<td>阻塞等待</td>
<td>立即响应</td>
</tr>
<tr>
<td>资源利用</td>
<td>单线程</td>
<td>多线程并发</td>
</tr>
<tr>
<td>任务类型</td>
<td>短任务为主</td>
<td>长短任务混合</td>
</tr>
</tbody>
</table>


</div>
<div id="MySignature" role="contentinfo">
    <p>本文来自在线网站:seven的菜鸟成长之路,作者:seven,转载请注明原文链接:www.seven97.top</p><br><br>
来源:https://www.cnblogs.com/sevencoding/p/19821570
頁: [1]
查看完整版本: 【从0到1构建一个ClaudeAgent】并发-后台任务