三分库真凉快 發表於 2026-3-26 09:03:00

用 Microsoft Agent Framework 构建 SubAgent(Multi-Agent)

<p>本文演示如何用 Microsoft Agent Framework 用 Executor + Workflow(DAG)模式实现 SubAgent(子代理)架构。通过示例代码(来自项目的 txt)展示并发 Fan‑Out/Fan‑In 的实现、消息路由与聚合策略,最后讨论最佳实践与权衡。</p>
<p><strong>假定读者等级:</strong> 熟悉 C#、异步编程与基本的 LLM/Agent 概念(如果不熟,先看基础 LLM/Agent 入门)。</p>
<p><strong>为什么要用 SubAgents?</strong></p>
<ul>
<li>将复杂任务拆成职责单一的模块(职责分离)</li>
<li>更易观测、测试与演进(可插拔节点)</li>
<li>支持并行/流水线处理,便于扩展与优化</li>
</ul>
<p><strong>核心思路(简要)</strong></p>
<ul>
<li>Planner/Orchestrator 负责任务拆解与路由</li>
<li>每个 SubAgent 为专责 Executor(或 Tool)</li>
<li>用 Workflow/DAG 串联节点:支持 Fan‑Out(并发)与 Fan‑In(聚合)</li>
<li>明确上下文与记忆隔离,防止“串味”</li>
</ul>
<p><strong>示例架构</strong></p>
<ul>
<li>节点:<code>ConcurrentStartExecutor</code>(广播)、<code>ChatClientAgent</code>(专业子代理)、<code>ConcurrentAggregationExecutor</code>(聚合)</li>
<li>流程:Start → Fan‑Out 到多个 Agent → Agent 各自处理 → Fan‑In 到 Aggregation → 输出结果</li>
</ul>
<p><strong>示例代码</strong></p>
<p><em><strong>初始化 OpenAI client &amp; 两个 ChatClientAgent</strong></em></p>
<pre><code class="language-csharp">
OpenAIClient client = new OpenAIClient(Environment.GetEnvironmentVariable("OPENAI_API_KEY")!);
var chatClient = client.GetChatClient("gpt-4o-mini").AsIChatClient();

ChatClientAgent physicist = new(
    chatClient,
    name: "Physicist",
    instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
    chatClient,
    name: "Chemist",
    instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
</code></pre>
<p><em><strong>构建 Workflow</strong></em></p>
<pre><code class="language-csharp">
var startExecutor = new ConcurrentStartExecutor();
var aggregationExecutor = new ConcurrentAggregationExecutor();

var workflow = new WorkflowBuilder(startExecutor)
    .AddFanOutEdge(startExecutor, new[] { physicist, chemist })
    .AddFanInBarrierEdge(new[] { physicist, chemist }, aggregationExecutor)
    .WithOutputFrom(aggregationExecutor)
    .Build();
</code></pre>
<p><em><strong>流式执行并监听输出</strong></em></p>
<pre><code class="language-csharp">
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, input: "What is temperature?");

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is WorkflowOutputEvent output)
    {
      Console.WriteLine($"Workflow completed with results:\n{output.Data}");
    }
}
</code></pre>
<p><em><strong>ConcurrentStartExecutor(广播用户消息,再发 TurnToken 启动子代理)</strong></em></p>
<pre><code class="language-csharp">

public async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
    await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken: cancellationToken);
    await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken);
}

protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder)
{
    return protocolBuilder.ConfigureRoutes(r =&gt; r.AddHandler&lt;string&gt;(this.HandleAsync))
                        .SendsMessage&lt;ChatMessage&gt;()
                        .SendsMessage&lt;TurnToken&gt;();
}
</code></pre>
<p><em><strong>ConcurrentAggregationExecutor(接收各 Agent 消息并在满足条件时产出聚合结果)</strong></em></p>
<pre><code class="language-csharp">
public override async ValueTask HandleAsync(List&lt;ChatMessage&gt; message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
    this._messages.AddRange(message);

    if (this._messages.Count == 3) // 示例:等待 3 条回复再聚合
    {
      var formattedMessages = string.Join(Environment.NewLine, this._messages.Select(m =&gt; $"{m.AuthorName}: {m.Text}"));
      await context.YieldOutputAsync(formattedMessages, cancellationToken);
    }
}
</code></pre>
<p><strong>逐步说明(快速教程风格)</strong></p>
<ol>
<li>准备
<ul>
<li>环境:.NET 版本与 Microsoft Agent Framework SDK(按项目要求 pin 版本)</li>
<li>配置:设置 <code>OPENAI_API_KEY</code> 环境变量</li>
</ul>
</li>
<li>定义 Agents / Executors
<ul>
<li>每个子代理用一类 Executor 或 ChatClientAgent 实例表示</li>
<li>明确每个代理的 <code>instructions</code>(prompt)与权限(可调用哪些工具)</li>
</ul>
</li>
<li>构建 Workflow(DAG)
<ul>
<li>先构建起点 Executor(start),再 AddNode / AddEdge 或用 Fan‑Out/Fan‑In helper</li>
<li>聚合节点负责合并与输出(并可实现冲突解决策略)</li>
</ul>
</li>
<li>运行与监控
<ul>
<li>支持流式执行(StreamingRun)便于实时观察中间结果</li>
<li>在聚合节点加入超时、重试或部分可用逻辑</li>
</ul>
</li>
<li>测试
<ul>
<li>单元测试 Executors 的消息处理逻辑</li>
<li>集成测试覆盖完整 Workflow 路径</li>
</ul>
</li>
</ol>
<p><strong>最佳实践与权衡</strong></p>
<ul>
<li>推荐:Workflow/DAG 模式(可观测、适合生产)</li>
<li>必须:上下文隔离(避免 prompt 串味)、清晰的路由与错误边界</li>
<li>权衡:
<ul>
<li>更细颗粒的 SubAgents → 更高可组合性,但更复杂的调度/监控</li>
<li>Tool‑based 子代理更简单但失去自治能力</li>
</ul>
</li>
<li>安全:控制外部工具调用权限,防止子代理滥用</li>
</ul>
<p><strong>常见坑</strong></p>
<ul>
<li>把所有逻辑塞进单个 Agent(难以维护)</li>
<li>忽视失败场景(网络、LLM 超时、部分结果不可用)</li>
<li>不做上下文边界与隐私隔离导致数据泄露或“串味”</li>
</ul>
<p><strong>结论 &amp; 下一步</strong></p>
<ul>
<li>SubAgent 模式适合将复杂任务分解为可管理、可观测的模块。对于生产级系统推荐用 Workflow/DAG + Executor 模式。</li>
</ul>
<p><strong>参考与进一步阅读</strong></p>
<ul>
<li>Microsoft Agent Framework 文档(请参考 SDK 官方文档)</li>
</ul><br><br>
来源:https://www.cnblogs.com/chenyishi/p/19774090
頁: [1]
查看完整版本: 用 Microsoft Agent Framework 构建 SubAgent(Multi-Agent)