企业微信机器人与 DeepAgents 集成实践
<h2 id="前言">前言</h2><p>企业微信机器人以前通常采用 Webhook 回调方式接收消息,但这种方式存在延迟较高、需要公网服务器等局限性。随着OpenClaw爆火,企业微信机器人也支持 WebSocket 长连接方式。本文介绍一种基于 WebSocket 长连接的企业微信机器人实现方案,并集成 DeepAgents 框架实现智能对话能力。</p>
<h2 id="技术栈">技术栈</h2>
<ul>
<li><strong>企业微信 WebSocket SDK</strong>: <code>wecom-aibot-python-sdk</code> - 官方提供的 WebSocket 连接库</li>
<li><strong>FastAPI</strong>: 现代异步 Web 框架,用于托管服务和 MCP 服务器</li>
<li><strong>DeepAgents</strong>: 智能体框架,用于构建具备工具调用能力的 AI 助手</li>
<li><strong>LangChain</strong>: 提供 LLM 集成和工具加载能力</li>
<li><strong>MCP (Model Context Protocol)</strong>: 标准化的工具调用协议</li>
</ul>
<h2 id="项目结构">项目结构</h2>
<pre><code>qywx-bot/
├── main.py # FastAPI 主入口
├── pyproject.toml # 项目依赖配置
├── conf/
│ └── config.toml # 应用配置
├── pkg/
│ ├── config/ # 配置管理模块
│ ├── log/ # 日志模块
│ └── qywx/ # 企业微信客户端
└── ai_agent/
├── ai_agent.py # DeepAgents 集成
└── mcp_servers/ # MCP 工具服务器
</code></pre>
<h2 id="安装依赖">安装依赖</h2>
<pre><code class="language-shell">uv add fastapi deepagents langchain-openai langchain-mcp-adapters wecom-aibot-python-sdk uvicorn
</code></pre>
<h2 id="核心实现">核心实现</h2>
<h3 id="1-配置管理">1. 配置管理</h3>
<p>使用 TOML 格式管理配置,支持多环境切换:</p>
<pre><code class="language-toml">
host = "127.0.0.1"
port = 8000
env = "dev"
bot_id = "your-bot-id"
secret = "your-bot-secret"
bot_name = "智能助手"
</code></pre>
<h3 id="2-企业微信-websocket-客户端">2. 企业微信 WebSocket 客户端</h3>
<p>通过 WebSocket 长连接接收企业微信消息,实现低延迟实时交互:</p>
<pre><code class="language-python">class QywxClient:
async def start(self):
self.ws_client = WSClient(
WSClientOptions(
bot_id=cfg.qywx_bot_id,
secret=cfg.qywx_secret,
logger=self.logger,
)
)
# 注册事件处理器
self.ws_client.on("authenticated", self._on_authenticated)
self.ws_client.on("event.enter_chat", self._on_event_enter_chat)
self.ws_client.on("message.text", self._on_message_text)
await self.ws_client.connect()
</code></pre>
<h3 id="3-deepagents-集成">3. DeepAgents 集成</h3>
<p>构建具备工具调用能力的智能体,通过 MCP 协议加载工具:</p>
<pre><code class="language-python">class AIAgent:
async def _create_root_agent(self, session: ClientSession):
tools = await load_mcp_tools(session)
return create_deep_agent(
model=self.model,
tools=tools,
system_prompt=f"你是一个企业微信机器人,名字叫{cfg.qywx_bot_name}",
)
</code></pre>
<h3 id="4-流式输出处理">4. 流式输出处理</h3>
<p>实现企业微信流式消息回复,提升用户体验:</p>
<pre><code class="language-python">async def _on_message_text(self, frame: WsFrameHeaders):
stream_id = generate_req_id('stream')
await self.ws_client.reply_stream(frame, stream_id, "思考中...", False)
async for chunk in aiops.invoke(content):
await self.ws_client.reply_stream(frame, stream_id, str(chunk), False)
await self.ws_client.reply_stream(frame, stream_id, "", True)
</code></pre>
<h3 id="5-fastapi-lifespan-管理">5. FastAPI Lifespan 管理</h3>
<p>正确管理应用生命周期,包括 WebSocket 连接、MCP 服务器和 AI 代理:</p>
<pre><code class="language-python">@asynccontextmanager
async def lifespan(app: FastAPI):
await aiops.start()
await qywx_client.start()
# 挂载 MCP 服务器
mcp_app = datetime_mcp.streamable_http_app()
async with datetime_mcp.session_manager.run():
app.mount("/mcp", mcp_app)
yield
await aiops.shutdown()
await qywx_client.shutdown()
</code></pre>
<h2 id="关键技术点">关键技术点</h2>
<h3 id="mcp-服务器挂载">MCP 服务器挂载</h3>
<p>将 FastMCP 服务器挂载到 FastAPI 时,需注意正确初始化 session manager:</p>
<pre><code class="language-python"># 错误方式:直接挂载会导致 task group 未初始化
app.mount("/mcp", datetime_mcp.streamable_http_app())
# 正确方式:在 lifespan 中启动 session manager
async with datetime_mcp.session_manager.run():
app.mount("/mcp", mcp_app)
yield
</code></pre>
<h3 id="流式消息解析">流式消息解析</h3>
<p>DeepAgents 的 <code>astream()</code> 返回的 chunk 是嵌套字典结构,需正确提取内容:</p>
<pre><code class="language-python">async for chunk in root_agent.astream(input={"messages": }):
if isinstance(chunk, dict):
messages = chunk.get("model", {}).get("messages", [])
for msg in messages:
if hasattr(msg, "content") and msg.content:
yield str(msg.content)
</code></pre>
<h2 id="示例代码">示例代码</h2>
<p>配置模块、日志模块等代码就略过了。MCP Server也略过,之前的文章写过很多遍了,这里就不赘述了。</p>
<h3 id="qywxclient">QywxClient</h3>
<p><code>pkg/qywx/qywx_client.py</code> 内封装了企业微信机器人交互的一些方法。</p>
<pre><code class="language-python">from pkg.config import cfg
from pkg.log import get_logger
from aibot import WSClient, WSClientOptions, generate_req_id, WsFrameHeaders
from ai_agent import aiops
import logging
class QywxClient:
"""企业微信客户端, 通过websockets连接企业微信服务器, 接收消息并处理"""
logger = get_logger("qywx_client", logging.INFO)
def __init__(self) -> None:
self.ws_client: WSClient = None# type: ignore
async def _on_authenticated(self):
"""处理认证成功事件"""
self.logger.info("Authenticated with Qywx server")
async def _on_event_enter_chat(self, frame: WsFrameHeaders):
"""处理用户进入聊天事件"""
self.logger.debug("Received event: enter_chat")
await self.ws_client.reply_welcome(frame, {
"msgtype": "text",
"text": {'content': f'您好!我是智能助手{cfg.qywx_bot_name},有什么可以帮您的吗?'},
})
async def _on_message_text(self, frame: WsFrameHeaders):
"""处理文本消息事件"""
self.logger.debug("Received text message")
msg_id = frame.get("body", {}).get("msgid", "")
user_id = frame.get("body", {}).get("from", {}).get("userid", "")
chattype = frame.get("body", {}).get("chattype", "")
response_url = frame.get("body", {}).get("response_url", "")
content = frame.get('body', {}).get('text', {}).get('content', '')
self.logger.debug(f"Message content: {content}, from user: {user_id}")
stream_id = generate_req_id('stream')
await self.ws_client.reply_stream(frame, stream_id, "小脑瓜努力思考中...", False)
# await asyncio.sleep(2)# 模拟处理时间
# resp = await aiops.ainvoke(input=content)
# await self.ws_client.reply_stream(frame, stream_id, resp, True)
final_text = ""
async for chunk in aiops.astream(input=content):
await self.ws_client.reply_stream(frame, stream_id, chunk, False)
# final_text += chunk
final_text = chunk
await self.ws_client.reply_stream(frame, stream_id, final_text, True)
async def start(self):
self.logger.info("Starting QywxClient...")
if not self.ws_client:
self.ws_client = WSClient(
WSClientOptions(
bot_id=cfg.qywx_bot_id,
secret=cfg.qywx_secret,
logger=self.logger,
)
)
self.ws_client.on("authenticated", self._on_authenticated)
self.ws_client.on("event.enter_chat", self._on_event_enter_chat)
self.ws_client.on("message.text", self._on_message_text)
await self.ws_client.connect()
async def shutdown(self):
self.logger.info("Shutting down QywxClient...")
if self.ws_client and self.ws_client.is_connected:
self.ws_client.disconnect()
</code></pre>
<h3 id="aiagent">AIAgent</h3>
<p>AIAgent 是一个 AI 代理类,用于处理企业微信消息。</p>
<pre><code class="language-python">from deepagents import create_deep_agent
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamable_http_client
from pkg.config import cfg
from pkg.log import get_logger
class AIAgent:
logger = get_logger("ai_agent")
def __init__(self):
self.model: ChatOpenAI = None# type: ignore
self._mcp_session: ClientSession = None# type: ignore
self._mcp_server_url = f"http://127.0.0.1:{cfg.service_port}/mcp/"
async def start(self):
if not self.model:
self.model = ChatOpenAI(
base_url=cfg.agent_base_url,
api_key=cfg.agent_api_key,# type: ignore
model=cfg.agent_model,
)
async def shutdown(self):
self.model = None# type: ignore
async def _create_root_agent(self, session: ClientSession):
tools = await load_mcp_tools(session)
root_agent = create_deep_agent(
model=self.model,
tools=tools,
system_prompt=f"你是一个智能助手,名字叫{cfg.qywx_bot_name}, 可以协助用户处理各种问题,并用温和积极的语气回答问题。回答的格式应该符合markdown规范。",
)
return root_agent
async def astream(self, input: str, thread_id: str = ""):
self.logger.debug(f"Connecting to mcp server: {self._mcp_server_url}")
async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id):
async with ClientSession(read, write) as session:
await session.initialize()
root_agent = await self._create_root_agent(session)
async for chunk in root_agent.astream(
input={"messages": }
):
# 从 chunk 字典中提取 AIMessage 的 content
if isinstance(chunk, dict):
messages = chunk.get("model", {}).get("messages", [])
if not messages:
continue
for msg in messages:
if hasattr(msg, "content") and msg.content:
yield str(msg.content)
elif hasattr(chunk, "content"):
yield str(chunk.content)# type: ignore
else:
yield str(chunk)
async def ainvoke(self, input: str, thread_id: str = ""):
self.logger.debug(f"Connecting to mcp server: {self._mcp_server_url}")
async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id):
async with ClientSession(read, write) as session:
await session.initialize()
root_agent = await self._create_root_agent(session)
resp = await root_agent.ainvoke(
input={"messages": }
)
return resp["messages"][-1].content
</code></pre>
<h3 id="main">main</h3>
<pre><code class="language-python"># main.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uvicorn
from contextlib import asynccontextmanager
from pkg.qywx import qywx_client
from pkg.config import cfg
from ai_agent.mcp_servers.datetime_server import mcp as datetime_mcp
from ai_agent import aiops
@asynccontextmanager
async def lifespan(app: FastAPI):
await aiops.start()
await qywx_client.start()
# 先获取 MCP app(这会创建 session_manager)
mcp_app = datetime_mcp.streamable_http_app()
# 在 FastAPI lifespan 中启动 MCP session manager
async with datetime_mcp.session_manager.run():
# 挂载 MCP app
app.mount("/mcp", mcp_app)
yield
await aiops.shutdown()
await qywx_client.shutdown()
app = FastAPI(
lifespan=lifespan
)
if __name__ == "__main__":
uvicorn.run("main:app", host=cfg.service_host, port=cfg.service_port)
</code></pre>
</div>
<div id="MySignature" role="contentinfo">
<p>本文来自博客园,作者:花酒锄作田,转载请注明原文链接:https://www.cnblogs.com/XY-Heruo/p/19868248</p><br><br>
来源:https://www.cnblogs.com/XY-Heruo/p/19868248
頁:
[1]