使用 pkgutil 实现动态插件系统
<h2 id="pkgutil-简介">pkgutil 简介</h2><p><code>pkgutil</code> 是 Python 标准库中的一个模块,提供了用于处理 Python 包的工具函数。它的核心功能之一是 <code>iter_modules()</code> 函数,能够动态遍历和发现指定包路径下的所有子模块和子包。这一特性使其成为实现动态插件系统的选择之一。(之前也介绍过借助<code>__init_subclass__()</code>在子类继承时动态注册插件)</p>
<p>与手动遍历文件系统或使用第三方库相比,<code>pkgutil</code> 具有以下优势:</p>
<ul>
<li><strong>标准库原生支持</strong>:无需引入额外依赖</li>
<li><strong>跨平台兼容</strong>:统一处理不同操作系统的路径差异</li>
<li><strong>支持命名空间包</strong>:能够正确处理 PEP 420 定义的命名空间包</li>
<li><strong>与导入系统紧密集成</strong>:返回的模块名可直接用于 <code>importlib.import_module()</code></li>
</ul>
<h2 id="核心函数iter_modules">核心函数:iter_modules</h2>
<p><code>iter_modules()</code> 函数签名如下:</p>
<pre><code class="language-python">pkgutil.iter_modules(path=None, prefix='')
</code></pre>
<ul>
<li><strong>path</strong>:要搜索的路径列表,通常使用包的 <code>__path__</code> 属性</li>
<li><strong>prefix</strong>:返回的模块名前缀,常用于构建完整的模块导入路径</li>
</ul>
<p>该函数返回一个迭代器,每个元素是一个三元组 <code>(module_info_finder, name, ispkg)</code>:</p>
<ul>
<li><code>module_info_finder</code>:查找器对象(Python 3.6+ 为 <code>ModuleInfo</code> 实例)</li>
<li><code>name</code>:模块或包的名称(不含前缀)</li>
<li><code>ispkg</code>:布尔值,表示是否为包(含有 <code>__init__.py</code> 的目录)</li>
</ul>
<h2 id="实现动态插件系统">实现动态插件系统</h2>
<h3 id="设计思路">设计思路</h3>
<p>一个典型的插件系统包含以下组件:</p>
<ol>
<li><strong>协议定义</strong>:使用 <code>typing.Protocol</code> 定义插件必须实现的接口</li>
<li><strong>插件发现</strong>:使用 <code>pkgutil.iter_modules()</code> 自动发现所有插件包</li>
<li><strong>插件加载</strong>:使用 <code>importlib.import_module()</code> 动态导入插件模块</li>
<li><strong>插件验证</strong>:运行时检查插件是否满足协议要求</li>
<li><strong>插件执行</strong>:调用插件方法执行具体任务</li>
</ol>
<h3 id="代码实现">代码实现</h3>
<p>首先定义插件协议:</p>
<pre><code class="language-python"># jobs/base.py
from typing import Protocol, runtime_checkable
@runtime_checkable
class JobProtocol(Protocol):
"""插件协议定义"""
def enabled(self) -> bool:
"""判断插件是否启用"""
...
def run(self) -> bool:
"""执行插件任务"""
...
</code></pre>
<p><code>@runtime_checkable</code> 装饰器使得协议可以在运行时通过 <code>isinstance()</code> 进行检查。</p>
<blockquote>
<p>写稿的时候想起来可以加个<code>_runable()</code>方法, 执行<code>run()</code>方法之前先检查是否满足可执行条件。</p>
</blockquote>
<p>插件加载器实现:</p>
<pre><code class="language-python"># main.py
import logging
import pkgutil
import importlib
from types import ModuleType
from jobs.base import JobProtocol
logger = logging.getLogger(__name__)
def load_jobs(package: str = "jobs") -> list:
"""动态加载指定包下的所有任务插件"""
loaded_jobs: list = []
# 导入目标包
pkg = importlib.import_module(package)
# 遍历子包
for finder, name, ispkg in pkgutil.iter_modules(pkg.__path__, pkg.__name__ + "."):
# 只处理子包
if not ispkg:
continue
# 动态导入模块
module = importlib.import_module(name)
# 获取工厂函数并创建实例
if hasattr(module, "job_factory"):
job = module.job_factory()
# 协议验证
if isinstance(job, JobProtocol) and job.enabled():
loaded_jobs.append(job)
return loaded_jobs
</code></pre>
<p>插件实现示例:</p>
<pre><code class="language-python"># jobs/jobs1/__init__.py
from jobs.base import JobProtocol
from .job import MyJob1
def job_factory() -> JobProtocol:
return MyJob1()
# jobs/jobs1/job.py
class MyJob1:
def enabled(self) -> bool:
return True
def run(self) -> bool:
print(f"{self.__class__.__name__} is running")
return True
</code></pre>
<blockquote>
<p>之所以每个插件放单独的package中,是想着如果插件功能复杂,单个文件的篇幅可能会极长,可以拆分到不同的文件中。每个插件也可以维护单独的配置加载方式。而且可以利用上 pkgutil 返回的 <code>ispkg</code> 。如果插件的功能简单,也可以写成单独的文件。</p>
</blockquote>
<h3 id="项目结构">项目结构</h3>
<pre><code>project/
├── main.py # 主程序入口
├── jobs/
│ ├── __init__.py # 包初始化文件
│ ├── base.py # 协议定义
│ ├── jobs1/ # 插件1
│ │ ├── __init__.py # 导出 job_factory
│ │ └── job.py # 具体实现
│ ├── jobs2/ # 插件2
│ │ ├── __init__.py
│ │ └── job.py
│ └── jobs3/ # 插件3(可禁用)
│ ├── __init__.py
│ └── job.py
</code></pre>
<h3 id="运行结果示例">运行结果示例</h3>
<pre><code>2026-03-01 21:13:26 - INFO - 开始加载任务...
2026-03-01 21:13:26 - INFO - 成功加载任务: jobs.jobs1
2026-03-01 21:13:26 - INFO - 成功加载任务: jobs.jobs2
2026-03-01 21:13:26 - INFO - 任务 jobs.jobs3 已禁用,跳过
2026-03-01 21:13:26 - WARNING - 任务 jobs.jobs4 未实现 JobProtocol 协议(缺少 enabled 或 run 方法)
2026-03-01 21:13:26 - INFO - 共加载 2 个任务
2026-03-01 21:13:26 - INFO - 开始执行任务...
MyJob1 is running
2026-03-01 21:13:26 - INFO - 任务 MyJob1 执行完成,结果: True
MyJob2 is running
2026-03-01 21:13:26 - INFO - 任务 MyJob2 执行完成,结果: True
2026-03-01 21:13:26 - INFO - 所有任务执行完毕
</code></pre>
<h2 id="实际应用注意事项">实际应用注意事项</h2>
<h3 id="包结构规范">包结构规范</h3>
<p>确保插件目录是规范的 Python 包:</p>
<ul>
<li>每个插件包必须包含 <code>__init__.py</code> 文件</li>
<li>父包(<code>jobs/</code>)也应包含 <code>__init__.py</code>,确保 <code>__path__</code> 属性正确设置</li>
<li>虽然 Python 3.3+ 支持命名空间包(无 <code>__init__.py</code>),但显式定义包结构更加健壮</li>
</ul>
<h3 id="错误处理策略">错误处理策略</h3>
<p>动态加载过程中存在多种潜在的失败点,需要逐一处理:</p>
<pre><code class="language-python">try:
pkg = importlib.import_module(package)
except ImportError as e:
logger.error(f"导入包失败: {e}")
return []
for finder, name, ispkg in pkgutil.iter_modules(pkg.__path__, prefix):
try:
module = importlib.import_module(name)
except Exception as e:
logger.error(f"加载模块 {name} 失败: {e}")
continue
if not hasattr(module, "job_factory"):
continue
try:
job = module.job_factory()
except Exception as e:
logger.error(f"实例化插件 {name} 失败: {e}")
continue
</code></pre>
<h3 id="使用-logging-替代-print">使用 logging 替代 print</h3>
<p>生产环境中应使用 <code>logging</code> 模块:</p>
<pre><code class="language-python">import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
</code></pre>
<p>这提供了日志级别控制、时间戳、输出重定向等关键能力。</p>
<h3 id="protocol-运行时检查">Protocol 运行时检查</h3>
<p><code>typing.Protocol</code> 配合 <code>@runtime_checkable</code> 装饰器支持运行时类型检查:</p>
<pre><code class="language-python">from typing import Protocol, runtime_checkable
@runtime_checkable
class JobProtocol(Protocol):
def enabled(self) -> bool: ...
def run(self) -> bool: ...
# 检查实例是否满足协议
if isinstance(job, JobProtocol):
job.run()
</code></pre>
<p>注意:运行时检查仅验证方法是否存在,不验证方法签名。如果参数类型不匹配,运行时仍会报错。</p>
<h3 id="插件隔离与依赖管理">插件隔离与依赖管理</h3>
<ul>
<li><strong>避免循环导入</strong>:插件模块不应导入主程序模块</li>
<li><strong>延迟导入</strong>:插件内部的重量级依赖应在 <code>run()</code> 方法中导入,而非模块顶层</li>
<li><strong>异常隔离</strong>:每个插件的执行应该相互独立,一个插件失败不应影响其他插件</li>
</ul>
<pre><code class="language-python">def run_jobs(jobs: list) -> None:
for job in jobs:
try:
job.run()
except Exception as e:
logger.error(f"任务执行失败: {e}")
# 继续执行其他任务
</code></pre>
<h3 id="插件顺序控制">插件顺序控制</h3>
<p>如果插件执行顺序很重要,可以考虑以下策略:</p>
<ul>
<li>使用插件名称前缀排序(如 <code>jobs/01_init/</code>、<code>jobs/02_process/</code>)</li>
<li>在协议中添加 <code>priority()</code> 方法</li>
<li>在插件元数据中定义依赖关系</li>
</ul>
<h3 id="性能考量">性能考量</h3>
<ul>
<li><code>iter_modules()</code> 遍历文件系统,频繁调用可能影响性能</li>
<li>考虑在程序启动时一次性加载所有插件,后续使用缓存的插件列表</li>
<li>对于大量插件,可以考虑延迟加载(lazy loading)模式</li>
</ul>
<h3 id="安全性考虑">安全性考虑</h3>
<p>动态加载代码存在潜在安全风险:</p>
<ul>
<li>仅从可信路径加载插件</li>
<li>在沙箱环境中运行不受信任的插件</li>
<li>限制插件的文件系统和网络访问权限</li>
</ul>
<h2 id="补充">补充</h2>
<h3 id="代码示例">代码示例</h3>
<ul>
<li><code>main.py</code></li>
</ul>
<pre><code class="language-python">"""动态任务加载器
使用 pkgutil 模块动态发现和加载 jobs 包下的所有任务插件。
"""
import logging
import pkgutil
import importlib
from types import ModuleType
from jobs.base import JobProtocol
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
def load_jobs(package: str = "jobs") -> list:
"""动态加载指定包下的所有任务插件。
遍历 package 下的所有子包,尝试导入每个子包并调用其 job_factory 函数
创建任务实例。只有实现了 JobProtocol 协议且 enabled() 返回 True 的
任务才会被执行。
Args:
package: 要扫描的包名,默认为 "jobs"。
Returns:
成功加载的任务实例列表。
"""
loaded_jobs: list = []
try:
pkg: ModuleType = importlib.import_module(package)
except ImportError as e:
logger.error(f"导入包 {package} 失败: {e}")
return loaded_jobs
# pkg.__path__ 可能是 None(当 package 是命名空间包但没有子包时)
if not hasattr(pkg, "__path__") or pkg.__path__ is None:
logger.warning(f"包 {package} 没有 __path__ 属性,无法遍历子模块")
return loaded_jobs
for finder, name, ispkg in pkgutil.iter_modules(pkg.__path__, pkg.__name__ + "."):
# 只处理子包,跳过模块文件
if not ispkg:
logger.debug(f"跳过模块 {name}(只加载子包)")
continue
try:
module: ModuleType = importlib.import_module(name)
except Exception as e:
logger.error(f"加载任务模块 {name} 失败: {e}")
continue
if not hasattr(module, "job_factory"):
logger.warning(f"模块 {name} 没有 job_factory 函数,跳过")
continue
try:
job: JobProtocol = module.job_factory()
# 使用 Protocol 的运行时检查功能验证协议实现
if not isinstance(job, JobProtocol):
logger.warning(
f"任务 {name} 未实现 JobProtocol 协议(缺少 enabled 或 run 方法)"
)
continue
if not job.enabled():
logger.info(f"任务 {name} 已禁用,跳过")
continue
loaded_jobs.append(job)
logger.info(f"成功加载任务: {name}")
except Exception as e:
logger.error(f"创建任务实例 {name} 失败: {e}")
continue
return loaded_jobs
def run_jobs(jobs: list) -> None:
"""执行所有任务。
Args:
jobs: 要执行的任务实例列表。
"""
for job in jobs:
try:
result = job.run()
logger.info(f"任务 {job.__class__.__name__} 执行完成,结果: {result}")
except Exception as e:
logger.error(f"任务 {job.__class__.__name__} 执行失败: {e}")
def main() -> None:
"""程序入口函数。"""
logger.info("开始加载任务...")
jobs = load_jobs()
logger.info(f"共加载 {len(jobs)} 个任务")
logger.info("开始执行任务...")
run_jobs(jobs)
logger.info("所有任务执行完毕")
if __name__ == "__main__":
main()
</code></pre>
</div>
<div id="MySignature" role="contentinfo">
<p>本文来自博客园,作者:花酒锄作田,转载请注明原文链接:https://www.cnblogs.com/XY-Heruo/p/19656788</p><br><br>
来源:https://www.cnblogs.com/XY-Heruo/p/19656788
頁:
[1]