顔秌 發表於 2026-4-15 01:09:00

企业微信机器人与 DeepAgents 集成实践

<h2 id="前言">前言</h2>
<p>企业微信机器人以前通常采用 Webhook 回调方式接收消息,但这种方式存在延迟较高、需要公网服务器等局限性。随着OpenClaw爆火,企业微信机器人也支持 WebSocket 长连接方式。本文介绍一种基于 WebSocket 长连接的企业微信机器人实现方案,并集成 DeepAgents 框架实现智能对话能力。</p>
<h2 id="技术栈">技术栈</h2>
<ul>
<li><strong>企业微信 WebSocket SDK</strong>: <code>wecom-aibot-python-sdk</code> - 官方提供的 WebSocket 连接库</li>
<li><strong>FastAPI</strong>: 现代异步 Web 框架,用于托管服务和 MCP 服务器</li>
<li><strong>DeepAgents</strong>: 智能体框架,用于构建具备工具调用能力的 AI 助手</li>
<li><strong>LangChain</strong>: 提供 LLM 集成和工具加载能力</li>
<li><strong>MCP (Model Context Protocol)</strong>: 标准化的工具调用协议</li>
</ul>
<h2 id="项目结构">项目结构</h2>
<pre><code>qywx-bot/
├── main.py               # FastAPI 主入口
├── pyproject.toml         # 项目依赖配置
├── conf/
│   └── config.toml      # 应用配置
├── pkg/
│   ├── config/            # 配置管理模块
│   ├── log/               # 日志模块
│   └── qywx/            # 企业微信客户端
└── ai_agent/
    ├── ai_agent.py      # DeepAgents 集成
    └── mcp_servers/       # MCP 工具服务器
</code></pre>
<h2 id="安装依赖">安装依赖</h2>
<pre><code class="language-shell">uv add fastapi deepagents langchain-openai langchain-mcp-adapters wecom-aibot-python-sdk uvicorn
</code></pre>
<h2 id="核心实现">核心实现</h2>
<h3 id="1-配置管理">1. 配置管理</h3>
<p>使用 TOML 格式管理配置,支持多环境切换:</p>
<pre><code class="language-toml">
host = "127.0.0.1"
port = 8000
env = "dev"


bot_id = "your-bot-id"
secret = "your-bot-secret"
bot_name = "智能助手"
</code></pre>
<h3 id="2-企业微信-websocket-客户端">2. 企业微信 WebSocket 客户端</h3>
<p>通过 WebSocket 长连接接收企业微信消息,实现低延迟实时交互:</p>
<pre><code class="language-python">class QywxClient:
    async def start(self):
      self.ws_client = WSClient(
            WSClientOptions(
                bot_id=cfg.qywx_bot_id,
                secret=cfg.qywx_secret,
                logger=self.logger,
            )
      )
      
      # 注册事件处理器
      self.ws_client.on("authenticated", self._on_authenticated)
      self.ws_client.on("event.enter_chat", self._on_event_enter_chat)
      self.ws_client.on("message.text", self._on_message_text)
      
      await self.ws_client.connect()
</code></pre>
<h3 id="3-deepagents-集成">3. DeepAgents 集成</h3>
<p>构建具备工具调用能力的智能体,通过 MCP 协议加载工具:</p>
<pre><code class="language-python">class AIAgent:
    async def _create_root_agent(self, session: ClientSession):
      tools = await load_mcp_tools(session)
      return create_deep_agent(
            model=self.model,
            tools=tools,
            system_prompt=f"你是一个企业微信机器人,名字叫{cfg.qywx_bot_name}",
      )
</code></pre>
<h3 id="4-流式输出处理">4. 流式输出处理</h3>
<p>实现企业微信流式消息回复,提升用户体验:</p>
<pre><code class="language-python">async def _on_message_text(self, frame: WsFrameHeaders):
    stream_id = generate_req_id('stream')
    await self.ws_client.reply_stream(frame, stream_id, "思考中...", False)
   
    async for chunk in aiops.invoke(content):
      await self.ws_client.reply_stream(frame, stream_id, str(chunk), False)
   
    await self.ws_client.reply_stream(frame, stream_id, "", True)
</code></pre>
<h3 id="5-fastapi-lifespan-管理">5. FastAPI Lifespan 管理</h3>
<p>正确管理应用生命周期,包括 WebSocket 连接、MCP 服务器和 AI 代理:</p>
<pre><code class="language-python">@asynccontextmanager
async def lifespan(app: FastAPI):
    await aiops.start()
    await qywx_client.start()
   
    # 挂载 MCP 服务器
    mcp_app = datetime_mcp.streamable_http_app()
    async with datetime_mcp.session_manager.run():
      app.mount("/mcp", mcp_app)
      yield
   
    await aiops.shutdown()
    await qywx_client.shutdown()
</code></pre>
<h2 id="关键技术点">关键技术点</h2>
<h3 id="mcp-服务器挂载">MCP 服务器挂载</h3>
<p>将 FastMCP 服务器挂载到 FastAPI 时,需注意正确初始化 session manager:</p>
<pre><code class="language-python"># 错误方式:直接挂载会导致 task group 未初始化
app.mount("/mcp", datetime_mcp.streamable_http_app())

# 正确方式:在 lifespan 中启动 session manager
async with datetime_mcp.session_manager.run():
    app.mount("/mcp", mcp_app)
    yield
</code></pre>
<h3 id="流式消息解析">流式消息解析</h3>
<p>DeepAgents 的 <code>astream()</code> 返回的 chunk 是嵌套字典结构,需正确提取内容:</p>
<pre><code class="language-python">async for chunk in root_agent.astream(input={"messages": }):
    if isinstance(chunk, dict):
      messages = chunk.get("model", {}).get("messages", [])
      for msg in messages:
            if hasattr(msg, "content") and msg.content:
                yield str(msg.content)
</code></pre>
<h2 id="示例代码">示例代码</h2>
<p>配置模块、日志模块等代码就略过了。MCP Server也略过,之前的文章写过很多遍了,这里就不赘述了。</p>
<h3 id="qywxclient">QywxClient</h3>
<p><code>pkg/qywx/qywx_client.py</code> 内封装了企业微信机器人交互的一些方法。</p>
<pre><code class="language-python">from pkg.config import cfg
from pkg.log import get_logger
from aibot import WSClient, WSClientOptions, generate_req_id, WsFrameHeaders
from ai_agent import aiops
import logging

class QywxClient:
    """企业微信客户端, 通过websockets连接企业微信服务器, 接收消息并处理"""
    logger = get_logger("qywx_client", logging.INFO)

    def __init__(self) -&gt; None:
      self.ws_client: WSClient = None# type: ignore

    async def _on_authenticated(self):
      """处理认证成功事件"""
      self.logger.info("Authenticated with Qywx server")

    async def _on_event_enter_chat(self, frame: WsFrameHeaders):
      """处理用户进入聊天事件"""
      self.logger.debug("Received event: enter_chat")
      await self.ws_client.reply_welcome(frame, {
            "msgtype": "text",
            "text": {'content': f'您好!我是智能助手{cfg.qywx_bot_name},有什么可以帮您的吗?'},
      })

    async def _on_message_text(self, frame: WsFrameHeaders):
      """处理文本消息事件"""
      self.logger.debug("Received text message")
      msg_id = frame.get("body", {}).get("msgid", "")
      user_id = frame.get("body", {}).get("from", {}).get("userid", "")
      chattype = frame.get("body", {}).get("chattype", "")
      response_url = frame.get("body", {}).get("response_url", "")
      content = frame.get('body', {}).get('text', {}).get('content', '')
      self.logger.debug(f"Message content: {content}, from user: {user_id}")

      stream_id = generate_req_id('stream')

      await self.ws_client.reply_stream(frame, stream_id, "小脑瓜努力思考中...", False)
      # await asyncio.sleep(2)# 模拟处理时间
      
      # resp = await aiops.ainvoke(input=content)
      # await self.ws_client.reply_stream(frame, stream_id, resp, True)
      final_text = ""
      async for chunk in aiops.astream(input=content):
            await self.ws_client.reply_stream(frame, stream_id, chunk, False)
            # final_text += chunk
            final_text = chunk

      await self.ws_client.reply_stream(frame, stream_id, final_text, True)

    async def start(self):
      self.logger.info("Starting QywxClient...")
      if not self.ws_client:
            self.ws_client = WSClient(
                WSClientOptions(
                  bot_id=cfg.qywx_bot_id,
                  secret=cfg.qywx_secret,
                  logger=self.logger,
                )
            )

      self.ws_client.on("authenticated", self._on_authenticated)
      self.ws_client.on("event.enter_chat", self._on_event_enter_chat)
      self.ws_client.on("message.text", self._on_message_text)

      await self.ws_client.connect()

    async def shutdown(self):
      self.logger.info("Shutting down QywxClient...")
      if self.ws_client and self.ws_client.is_connected:
            self.ws_client.disconnect()
</code></pre>
<h3 id="aiagent">AIAgent</h3>
<p>AIAgent 是一个 AI 代理类,用于处理企业微信消息。</p>
<pre><code class="language-python">from deepagents import create_deep_agent
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamable_http_client
from pkg.config import cfg
from pkg.log import get_logger

class AIAgent:
    logger = get_logger("ai_agent")
    def __init__(self):
      self.model: ChatOpenAI = None# type: ignore
      self._mcp_session: ClientSession = None# type: ignore
      self._mcp_server_url = f"http://127.0.0.1:{cfg.service_port}/mcp/"

    async def start(self):
      if not self.model:
            self.model = ChatOpenAI(
                base_url=cfg.agent_base_url,
                api_key=cfg.agent_api_key,# type: ignore
                model=cfg.agent_model,
            )
   

    async def shutdown(self):
      self.model = None# type: ignore

    async def _create_root_agent(self, session: ClientSession):
      tools = await load_mcp_tools(session)
      root_agent = create_deep_agent(
            model=self.model,
            tools=tools,
            system_prompt=f"你是一个智能助手,名字叫{cfg.qywx_bot_name}, 可以协助用户处理各种问题,并用温和积极的语气回答问题。回答的格式应该符合markdown规范。",
      )
      return root_agent

    async def astream(self, input: str, thread_id: str = ""):
      self.logger.debug(f"Connecting to mcp server: {self._mcp_server_url}")
      async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id):
            async with ClientSession(read, write) as session:
                await session.initialize()

                root_agent = await self._create_root_agent(session)

                async for chunk in root_agent.astream(
                  input={"messages": }
                ):
                  # 从 chunk 字典中提取 AIMessage 的 content
                  if isinstance(chunk, dict):
                        messages = chunk.get("model", {}).get("messages", [])
                        if not messages:
                            continue
                        for msg in messages:
                            if hasattr(msg, "content") and msg.content:
                              yield str(msg.content)
                  elif hasattr(chunk, "content"):
                        yield str(chunk.content)# type: ignore
                  else:
                        yield str(chunk)

    async def ainvoke(self, input: str, thread_id: str = ""):
      self.logger.debug(f"Connecting to mcp server: {self._mcp_server_url}")
      async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id):
            async with ClientSession(read, write) as session:
                await session.initialize()

                root_agent = await self._create_root_agent(session)
                resp = await root_agent.ainvoke(
                  input={"messages": }
                )
                return resp["messages"][-1].content
               
</code></pre>
<h3 id="main">main</h3>
<pre><code class="language-python"># main.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uvicorn
from contextlib import asynccontextmanager
from pkg.qywx import qywx_client
from pkg.config import cfg
from ai_agent.mcp_servers.datetime_server import mcp as datetime_mcp
from ai_agent import aiops

@asynccontextmanager
async def lifespan(app: FastAPI):
    await aiops.start()
    await qywx_client.start()
   
    # 先获取 MCP app(这会创建 session_manager)
    mcp_app = datetime_mcp.streamable_http_app()
   
    # 在 FastAPI lifespan 中启动 MCP session manager
    async with datetime_mcp.session_manager.run():
      # 挂载 MCP app
      app.mount("/mcp", mcp_app)
      yield
   
    await aiops.shutdown()
    await qywx_client.shutdown()


app = FastAPI(
    lifespan=lifespan
)


if __name__ == "__main__":
    uvicorn.run("main:app", host=cfg.service_host, port=cfg.service_port)
</code></pre>


</div>
<div id="MySignature" role="contentinfo">
    <p>本文来自博客园,作者:花酒锄作田,转载请注明原文链接:https://www.cnblogs.com/XY-Heruo/p/19868248</p><br><br>
来源:https://www.cnblogs.com/XY-Heruo/p/19868248
頁: [1]
查看完整版本: 企业微信机器人与 DeepAgents 集成实践