栗吆儿 發表於 2025-8-16 22:24:00

一文速通Python并行计算:14 Python异步编程-协程的管理和调度

<h1 id="一文速通-python-并行计算14-python-异步编程-协程的管理和调度">一文速通 Python 并行计算:14 Python 异步编程-协程的管理和调度</h1>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202504/2591203-20250416003327388-1352068275.png"></p>
<h1 id="摘要"><strong>摘要:</strong></h1>
<p>Python 异步编程基于 async/await 构建协程,运行在事件循环中。协程生成 Task,遇到 await 时挂起,I/O 完成触发回调恢复运行,通过事件循环非阻塞调度并发任务,实现单线程高并发。</p>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202503/2591203-20250321010016257-450656136.png"></p>
<blockquote>
<p><strong>关于我们更多介绍可以查看云文档:</strong>Freak 嵌入式工作室云文档<strong>,或者访问我们的 wiki:****https://github.com/leezisheng/Doc/wik</strong></p>
</blockquote>
<h1 id="原文链接"><strong>原文链接:</strong></h1>
<p><strong>FreakStudio的博客</strong></p>
<h1 id="往期推荐"><strong>往期推荐:</strong></h1>
<p><strong>学嵌入式的你,还不会面向对象??!</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:00 面向对象设计方法导论</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:01 面向对象编程的基本概念</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:02 类和对象的 Python 实现-使用 Python 创建类</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:03 类和对象的 Python 实现-为自定义类添加属性</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:04 类和对象的Python实现-为自定义类添加方法</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:05 类和对象的Python实现-PyCharm代码标签</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:06 类和对象的Python实现-自定义类的数据封装</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:07 类和对象的Python实现-类型注解</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:08 类和对象的Python实现-@property装饰器</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:09 类和对象的Python实现-类之间的关系</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:10 类和对象的Python实现-类的继承和里氏替换原则</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:11 类和对象的Python实现-子类调用父类方法</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:12 类和对象的Python实现-Python使用logging模块输出程序运行日志</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:13 类和对象的Python实现-可视化阅读代码神器Sourcetrail的安装使用</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:全网最适合入门的面向对象编程教程:14 类和对象的Python实现-类的静态方法和类方法</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:15 类和对象的 Python 实现-__slots__魔法方法</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:16 类和对象的Python实现-多态、方法重写与开闭原则</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:17 类和对象的Python实现-鸭子类型与“file-like object“</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:18 类和对象的Python实现-多重继承与PyQtGraph串口数据绘制曲线图</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:19 类和对象的 Python 实现-使用 PyCharm 自动生成文件注释和函数注释</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:20 类和对象的Python实现-组合关系的实现与CSV文件保存</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:21 类和对象的Python实现-多文件的组织:模块module和包package</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:22 类和对象的Python实现-异常和语法错误</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:23 类和对象的Python实现-抛出异常</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:24 类和对象的Python实现-异常的捕获与处理</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:25 类和对象的Python实现-Python判断输入数据类型</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:26 类和对象的Python实现-上下文管理器和with语句</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:27 类和对象的Python实现-Python中异常层级与自定义异常类的实现</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:28 类和对象的Python实现-Python编程原则、哲学和规范大汇总</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:29 类和对象的Python实现-断言与防御性编程和help函数的使用</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:30 Python的内置数据类型-object根类</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:31 Python的内置数据类型-对象Object和类型Type</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:32 Python的内置数据类型-类Class和实例Instance</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:33 Python的内置数据类型-对象Object和类型Type的关系</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:34 Python的内置数据类型-Python常用复合数据类型:元组和命名元组</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:35 Python的内置数据类型-文档字符串和__doc__属性</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:36 Python的内置数据类型-字典</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:37 Python常用复合数据类型-列表和列表推导式</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:38 Python常用复合数据类型-使用列表实现堆栈、队列和双端队列</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:39 Python常用复合数据类型-集合</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:40 Python常用复合数据类型-枚举和enum模块的使用</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:41 Python常用复合数据类型-队列(FIFO、LIFO、优先级队列、双端队列和环形队列)</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:42 Python常用复合数据类型-collections容器数据类型</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:43 Python常用复合数据类型-扩展内置数据类型</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:44 Python内置函数与魔法方法-重写内置类型的魔法方法</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:45 Python实现常见数据结构-链表、树、哈希表、图和堆</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:46 Python函数方法与接口-函数与事件驱动框架</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:47 Python函数方法与接口-回调函数Callback</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:48 Python函数方法与接口-位置参数、默认参数、可变参数和关键字参数</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:49 Python函数方法与接口-函数与方法的区别和lamda匿名函数</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:50 Python函数方法与接口-接口和抽象基类</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:51 Python函数方法与接口-使用Zope实现接口</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:52 Python函数方法与接口-Protocol协议与接口</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:53 Python字符串与序列化-字符串与字符编码</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:54 Python字符串与序列化-字符串格式化与format方法</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:55 Python字符串与序列化-字节序列类型和可变字节字符串</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:56 Python字符串与序列化-正则表达式和re模块应用</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:57 Python字符串与序列化-序列化与反序列化</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:58 Python字符串与序列化-序列化Web对象的定义与实现</strong></p>
<p><strong>全网最适合入门的面向对象编程教程:59 Python并行与并发-并行与并发和线程与进程</strong></p>
<p><strong>一文速通Python并行计算:00 并行计算的基本概念</strong></p>
<p><strong>一文速通Python并行计算:01 Python多线程编程-基本概念、切换流程、GIL锁机制和生产者与消费者模型</strong></p>
<p><strong>一文速通Python并行计算:02 Python多线程编程-threading模块、线程的创建和查询与守护线程</strong></p>
<p><strong>一文速通Python并行计算:03 Python多线程编程-多线程同步(上)—基于互斥锁、递归锁和信号量</strong></p>
<p><strong>一文速通Python并行计算:04 Python多线程编程-多线程同步(下)—基于条件变量、事件和屏障</strong></p>
<p><strong>一文速通Python并行计算:05 Python多线程编程-线程的定时运行</strong></p>
<p><strong>一文速通Python并行计算:06 Python多线程编程-基于队列进行通信</strong></p>
<p><strong>一文速通Python并行计算:07 Python多线程编程-线程池的使用和多线程的性能评估</strong></p>
<p><strong>一文速通Python并行计算:08 Python多进程编程-进程的创建命名、获取ID、守护进程的创建和终止进程</strong></p>
<p><strong>一文速通Python并行计算:09 Python多进程编程-进程之间的数据同步-基于互斥锁、递归锁、信号量、条件变量、事件和屏障</strong></p>
<p><strong>一文速通Python并行计算:10 Python多进程编程-进程之间的数据共享-基于共享内存和数据管理器</strong></p>
<p><strong>一文速通Python并行计算:11 Python多进程编程-进程之间的数据安全传输-基于队列和管道</strong></p>
<p><strong>一文速通Python并行计算:12 Python多进程编程-进程池Pool</strong></p>
<p><strong>一文速通Python并行计算:13 Python异步编程-基本概念与事件循环和回调机制</strong></p>
<h1 id="更多精彩内容可看"><strong>更多精彩内容可看:</strong></h1>
<p><strong>C语言一点五编程实战:纯 C 的模块化×继承×多态框架</strong></p>
<p><strong>给你的 Python 加加速:一文速通 Python 并行计算</strong></p>
<p><strong>一文搞懂 CM3 单片机调试原理</strong></p>
<p><strong>肝了半个月,嵌入式技术栈大汇总出炉</strong></p>
<p><strong>电子计算机类比赛的“武林秘籍”</strong></p>
<p><strong>一个MicroPython的开源项目集锦:awesome-micropython,包含各个方面的Micropython工具库</strong></p>
<p><strong>Avnet ZUBoard 1CG开发板—深度学习新选择</strong></p>
<p><strong>工程师不要迷信开源代码,还要注重基本功</strong></p>
<p><strong>什么?配色个性化的电机驱动模块?!!</strong></p>
<p><strong>什么?XIAO主控新出三款扩展板!</strong></p>
<p><strong>手把手教你实现Arduino发布第三方库</strong></p>
<p><strong>万字长文手把手教你实现MicroPython/Python发布第三方库</strong></p>
<p><strong>一文速通电子设计大赛,电子人必看的获奖秘籍</strong></p>
<p><strong>一文速通光电设计大赛,电子人必看!</strong></p>
<p><strong>工科比赛“无脑”操作指南:知识学习硬件选购→代码调试→报告撰写的保姆级路线图</strong></p>
<p><strong>单场会议拍摄收费6000+?拍摄技巧和步骤都在这里</strong></p>
<p><strong>0基础如何冲击大唐杯国奖?学姐的的备赛心得都在这里</strong></p>
<p><strong>爆肝整理长文】大学生竞赛速通指南:选题 × 组队 × 路演 48 小时备赛搞定</strong></p>
<p><strong>当代大学生竞赛乱象:从“内卷”到“祖传项目”的生存指南</strong></p>
<p><strong>女大学生摆摊亏损5000元踩点实录:成都哪里人最多、最容易赚到钱?我告诉你!</strong></p>
<p><strong>用拍立得在成都网红打卡点赚钱:一份超实用地摊级旅游副业教程</strong></p>
<p><strong>成都印象:一个电子女孩亲手做了点浪漫</strong></p>
<p><strong>普通继电器 vs 磁保持继电器 vs MOS管:工作原理与电路设计全解析</strong></p>
<p><strong>告别TP4056!国产SY3501D单芯片搞定充放电+升压,仅需7个元件!附开源PCB文件</strong></p>
<p><strong>POB面向老板编程—现实驱动的新型编程范式</strong></p>
<h1 id="文档获取"><strong>文档获取:</strong></h1>
<p>可访问如下链接进行对文档下载:</p>
<p><strong>https://github.com/leezisheng/Doc</strong></p>
<p>该文档是一份关于 <strong>并行计算</strong> 和 <strong>Python 并发编程</strong> 的学习指南,内容涵盖了并行计算的基本概念、Python 多线程编程、多进程编程以及协程编程的核心知识点:</p>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202503/2591203-20250321010038855-257589202.png"></p>
<h1 id="正文"><strong>正文</strong></h1>
<p>在上文提到的例子中,我们看到当一个程序变得很大而且复杂时,将其划分为子程序,每一部分实现特定的任务是个不错的方案。子程序不能单独执行,只能在主程序的请求下执行,主程序负责协调使用各个子程序。<strong>协程就是子程序的泛化。和子程序一样的事,协程只负责计算任务的一步;和子程序不一样的是,协程没有主程序来进行调度。这是因为协程通过管道连接在一起,没有监视函数负责顺序调用它们。</strong></p>
<p>对于子程序来说,调用是通过栈实现的,子程序调用总是一个入口,一次返回,调用顺序是明确的。比如 A 调用 B,B 在执行过程中又调用了 C,C 执行完毕返回,B 执行完毕返回,最后是 A 执行完毕。协程看上去也是子程序,但<strong>执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。从心理学的角度看,协程类似于人类的“任务切换”能力,我们可以暂时将注意力从一个任务转移到另一个,然后再回来继续未完成的任务。</strong></p>
<p>或者说,<strong>在协程中,执行点可以被挂起,可以被从之前挂起的点恢复执行。</strong>通过协程池就可以插入到计算中:运行第一个任务,直到它返回(yield)执行权,然后运行下一个,这样顺着执行下去。<strong>这种插入的控制组件就是前文介绍的事件循环。它持续追踪所有的协程并执行它们。</strong></p>
<p>协程的另外一些重要特性如下:</p>
<ul>
<li>协程可以有多个入口点,并可以 yield 多次;</li>
<li>协程可以将执行权交给其他协程</li>
</ul>
<p>yield 表示协程在此暂停,并且将执行权交给其他协程。因为协程<strong>可以将值与控制权一起传递给另一个协程</strong>,所以“yield 一个值”就表示将值传给下一个执行的协程。</p>
<p>协程与传统的线程或进程相比,有几个关键区别:</p>
<p><strong>(1)轻量级:</strong>协程通常是用户态的,子程序切换(函数)不是线程切换,由程序自身控制,切换开销比系统线程小得多。</p>
<p><strong>(2)非抢占式:</strong>协程的切换是协作式的,即协程需要显式地 yield 来让出控制权。</p>
<p><strong>(3)更好的控制:</strong>协程提供了更细粒度的控制,如何以及何时切换是由程序员或协程库决定的。</p>
<p>协程可以处理 IO 密集型程序的效率问题,但是处理 CPU 密集型不是它的长处,如要充分发挥 CPU 利用率可以结合多进程 + 协程。</p>
<h1 id="1使用-asyncio-管理协程">1.使用 Asyncio 管理协程</h1>
<p>Python3.x 提供了如下方式实现协程:</p>
<ul>
<li><code>asyncio + yield</code> from (python3.4+) :</li>
</ul>
<p><code>asyncio</code> 是 Python3.4 版本引入的标准库,直接内置了对异步 IO 的支持。<code>asyncio</code> 的异步操作,需要在 <code>coroutine</code> 中通过 <code>yield from</code> 完成。</p>
<pre><code class="language-python">import asyncio

@asyncio.coroutine
def test(i):
    print('test_1', i)
    r = yield from asyncio.sleep(1)
    print('test_2', i)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tasks =
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
</code></pre>
<p><code>asyncio.coroutine</code> 使用装饰器,定义了一个协程。所谓装饰器是给现有的模块增添新的小功能的函数,它可以对原函数进行功能扩展,而且还不需要修改原函数的内容,也不需要修改原函数的调用。</p>
<p>使用 <code>@asyncio.coroutine</code> 定义协程的通用方法如下:</p>
<pre><code class="language-bash">import asyncio

@asyncio.coroutine
def coroutine_function(function_arguments):
    _# DO_SOMETHING_
</code></pre>
<p>以上代码将 <code>test(i)</code> 定义为一个协程,然后就把这个协程放到事件循环中执行。<code>test(i)</code> 首先执行打印操作,这里用 <code>asyncio.sleep(1)</code> 模拟一个耗时 1 秒的 IO 操作,<code>asyncio.sleep()</code> 本身也是一个协程,这里使用 <code>yield from</code> 语法调用 <code>asyncio.sleep()</code>,但注意线程不会等待 <code>asyncio.sleep()</code>,而是直接中断并执行下一个消息循环。当 <code>asyncio.sleep()</code> 返回时,线程就可以从 <code>yield from</code> 拿到返回值(此处是 <code>None</code>),然后接着执行下一行语句。由此实现异步执行。</p>
<ul>
<li><code>asyncio + async/await </code>(python3.5+):</li>
</ul>
<p>为了简化并更好地标识异步 IO,从 Python3.5 开始引入了新的语法 async 和 await,可以让 coroutine 的代码更简洁易读。请注意,async 和 await 是 coroutine 的新语法,使用新语法只需要做两步简单的替换:</p>
<ul>
<li>把 @asyncio.coroutine 替换为 async;</li>
<li>把 yield from 替换为 await,即让出当前的执行权,等待的对象有结果返回时,再重新获得可以被继续执行的权利,只有可等待对象才能被 await。</li>
</ul>
<p><strong>注意,包含 @asyncio.coroutine 装饰器的将从 Python3.11 中删除,因此 asyncio 模块没有 @asyncio.coroutine 装饰符。</strong></p>
<pre><code class="language-python">import asyncio
async def test(i):
    print('test_1', i)
    await asyncio.sleep(1)
    print('test_2', i)

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    _# 在python3.8后直接把协程对象传给asyncio.wait()是不行的,_
    _# 必须封装成tasks对象传入_
    tasks =
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
</code></pre>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222143826-799507894.png"></p>
<p>除了 <code>await</code> 方法,<code>asyncio</code> 提供了<strong>asyncio.run()</strong>来执行协程:</p>
<p><code>run()</code> 函数接收一个协程对象,在执行时,总会<strong>创建一个新的事件循环,并在结束后关闭循环。<strong><strong>理想情况下,</strong></strong>run() ****函数应当被作为程序的总入口,并且只会被调用一次。</strong>如果同一线程中还有其它事件循环在运行,则此方法不能被调用。</p>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222153831-1097253441.png"></p>
<h1 id="2使用-asyncio-控制任务">2.使用 Asyncio 控制任务</h1>
<p><code>Asyncio</code> 是用来处理事件循环中的异步进程和并发任务执行的。<strong>它还提供了 <strong><strong>asyncio.Task()</strong></strong> 类,可以在任务中使用协程。</strong>它的作用是,在同一事件循环中,<strong>运行某一个任务的同时可以并发地运行多个任务。当协程被包在任务中,它会自动将任务和事件循环连接起来,当事件循环启动的时候,任务自动运行。</strong>这样就提供了一个可以自动驱动协程的机制。</p>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222200862-426518909.png"></p>
<h2 id="21-asynciocreate_task">2.1 asyncio.create_task()</h2>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222205006-1026761286.png"></p>
<p>下面的例子中我们使用 <code>asyncio.create_task()</code> 创建 Task,<code>create_task()</code> 会把一个协程打包成一个任务(<code>Task</code>),并立即排入日程准备执行,函数返回值是打包完成的 <code>Task </code> 对象。</p>
<pre><code class="language-python">import asyncio
import time
async def foo(n):
    await asyncio.sleep(n)

async def main():
    task1 = asyncio.create_task(foo(1))
    task2 = asyncio.create_task(foo(2))
    t1 = time.time()
    print('hello')
    await task1
    await task2
    print('coroutine')
    t2 = time.time()
    print('cost:', t2 - t1)

asyncio.run(main())
</code></pre>
<p>如下为运行结果:</p>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222211532-613172051.png"></p>
<p>当使用 <code>create_task() </code> 时,创建的任务立即被加入到事件循环中,并不会阻塞当前的程序,所以上述程序在打印出 <code>hello</code> 后只需等待 2 秒就打印出 <code>coroutine</code>。</p>
<h2 id="22-asynciogather">2.2 asyncio.gather()</h2>
<p><code>asyncio.gather()</code> 函数允许调用者将多个可等待对象组合在一起。分组后,可等待对象可以并发执行、等待和取消。它是一个有用的实用函数,可用于分组和执行多个协程或多个任务。</p>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222216259-1606004006.png"></p>
<p>从功能上看,<code>asyncio.wait</code> 和 <code>asyncio.gather</code> 实现的效果是相同的,都是把所有 <code>Task </code> 任务结果收集起来。但不同的是,<code>asyncio.wait</code> 会返回两个值:<code>done</code> 和 <code>pending</code>,<code>done</code> 为已完成的协程 <code>Task</code>,<code>pending </code> 为超时未完成的协程 <code>Task</code>,需通过 <code>future.result</code> 调用 <code>Task</code> 的 <code>result</code>;而 <strong>asyncio.gather<strong><strong>返回的是所有已完成</strong></strong>Task<strong><strong>的</strong></strong>result****,不需要再进行调用或其他操作,就可以得到全部结果。</strong></p>
<pre><code class="language-python">import asyncio

async def foo():
    return 'foo'
async def bar():
    raise RuntimeError('fake runtime error')
async def main():
    task1 = asyncio.create_task(foo())
    task2 = asyncio.create_task(bar())

    _# return_exceptions=True_
    _# 如果return_exceptions为True,_
    _# 异常会和成功的结果一样处理,并聚合至结果列表_
    results = await asyncio.gather(task1, task2, return_exceptions=True)
    print(results)
    _# 返回结果的顺序和传参顺序一致_
    _# isinstance() 函数来判断一个对象是否是一个已知的类型_
    _# isinstance(object, classinfo)_
    assert isinstance(results, RuntimeError)
    try:
      _# 如果return_exceptions为False(默认)_
      _# 所引发的首个异常会立即传播给等待gather()的任务_
      results = await asyncio.gather(task1, task2, return_exceptions=False)
      _# 此处打印并不会被执行, results 也未被赋值_
      print(results)

    except RuntimeError as runtime_err:
      _# 捕获异常并打印: fake runtime error_
      print(runtime_err)

asyncio.run(main())
</code></pre>
<p>执行结果如下:</p>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222223006-1753753680.png"></p>
<h2 id="23-asynciowait">2.3 asyncio.wait()</h2>
<p><code>asyncio.wait() </code> 函数可用于<strong>等待一组异步任务完成</strong>,回想一下,<code>asyncio </code> 任务是包装协程的 <code>asyncio.Task</code> 类的一个实例。它允许独立调度和执行协程,<code>Task</code> 实例提供任务句柄以查询状态和获取结果。<strong>wait()****函数允许我们等待一组任务完成。等待调用可以配置为等待不同的条件,例如所有任务完成、第一个任务完成以及第一个任务因错误而失败。</strong></p>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222226567-337233584.png"></p>
<p><code>asyncio.wait </code> 最常见的写法是:<code>await asyncio.wait(task_list)</code>,表示运行直到所有给定的协程都完成。常见写法为:</p>
<pre><code class="language-bash">...
_# create many tasks_
tasks =
</code></pre>
<p>示例代码如下:</p>
<pre><code class="language-python">import asyncio

async def foo():
    await asyncio.sleep(3)
    return 'foo'
async def bar():
    await asyncio.sleep(1)
    return 'bar'
async def main():
    task1 = asyncio.create_task(foo())
    task2 = asyncio.create_task(bar())
    _# 有一个任务执行完成即返回, 总共耗时 1 秒_
    done, pending = await asyncio.gather(task1, task2, return_exceptions=True)
    _# done 集合里包含打包成 Task 的 bar()_
    print(f'done: {done}')
    _# pendding 集合里包含打包成 Task 的 foo()_
    print(f'pending: {pending}')
    _# 所有任务执行完成后返回, 总共耗时 3 秒_
    done, pending = await asyncio.gather(task1, task2, return_exceptions=True)
    _# done 集合里包含被带打包成 Task 的 foo() 和 bar()_
    print(f'done: {done}')
    _# pending 集合为空_
    print(f'pending: {pending}')
    _# 所有任务执行完成, 但运行时间不能超 2 秒后返回, 总共耗时 2 秒_
    done, pending = await asyncio.gather(task1, task2, return_exceptions=True)
    _# done 集合里包含打包成 Task 的 bar()_
    print(f'done: {done}')
    _# pendding 集合里包含打包成 Task 的 foo()_
    print(f'pending: {pending}')
asyncio.run(main())
</code></pre>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222232614-1209280312.png"></p>
<h2 id="24-asyncioas_completed">2.4 asyncio.as_completed()</h2>
<p>有时,我们必须在完成一个后台任务后立即开始下面的动作。比如我们爬取一些数据,马上调用机器学习模型进行计算,<code>gather</code> 方法不能满足我们的需求,但是我们可以使用 <code>as_completed</code> 方法。</p>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222236988-454507686.png"></p>
<p>as_completed 不是并发方法,接受 aws 集合,返回一个带有 yield 语句的迭代器。所以我们可以直接遍历每个完成的后台任务,我们可以对每个任务单独处理异常,而不影响其他任务的执行:</p>
<p>示例代码如下:</p>
<pre><code class="language-python">import asyncio

async def foo():
    await asyncio.sleep(2)
    return 'foo'

async def bar():
    await asyncio.sleep(1)
    return 'bar'

async def main():
    for fut in asyncio.as_completed({foo(), bar()}):
      earliest_result = await fut
      _# 会依次打印 bar 和 foo, 因为 bar() 会更早执行完毕_
      print(earliest_result)

asyncio.run(main())
</code></pre>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222242210-1237702578.png"></p>
<p>以上介绍多任务并发时引入了超时的概念,超时也可以被应用在单独的一个任务中,使用 <code>asyncio.wait_for(aw, timeout) </code> 函数,该函数接受一个任务 <code>aw </code> 和超时时间 <code>timeout</code>,如果在限制时间内完成,则会正常返回,否则会被取消并抛出 <code>asyncio.TimeoutError</code> 异常。</p>
<p>为了防止任务被取消,可以使用 <code>asyncio.shield(aw) </code> 进行保护。<code>shield()</code> 会屏蔽外部取消操作,如果外部任务被取消,其内部正在执行的任务不会被取消,在内部看来取消操作并没有发生,由于内部仍正常执行,执行完毕后会触发异常,如果确保程序能忽略异常继续执行,需要在外部使用 <code>try-except</code> 捕获异常。如果在任务内部取消,则会被成功取消。</p>
<p><code>Asyncio</code> 模块为我们提供了 <code>asyncio.Task(coroutine)</code> 方法来处理计算任务,它可以调度协程的执行。任务对协程对象在事件循环的执行负责。如果被包裹的协程要从 <code>future</code>(就是协程的实例化对象)调度,那么任务会被挂起,等待 <code>future</code> 的计算结果。</p>
<p>当 <code>future</code> 计算完成,被包裹的协程将会拿到 future 返回的结果或异常继续执行。另外,需要注意的是,事件循环一次只能运行一个任务,除非还有其它事件循环在不同的线程并行运行,此任务才有可能和其他任务并行。当一个任务在等待 <code>future</code> 执行的期间,事件循环会运行一个新的任务。</p>
<p>在下面的代码中,我们展示了三个可以被 <code>Asyncio.Task()</code> 并发执行的数学函数,在这个例子中,我们定义了三个协程, <code>factorial</code>, <code>fibonacci</code> 和 <code>binomialCoeff </code>,为了能并行执行这三个任务,我们将其放到一个 <code>task</code> 的 <code>list</code> 中得到事件循环然后运行任务,最后,关闭事件循环。</p>
<pre><code class="language-python">import asyncio

async def factorial(number):
    f = 1
    for i in range(2, number + 1):
      print("Asyncio.Task: Compute factorial(%s)" % (i))
      await asyncio.sleep(1)
      f *= i
    print("Asyncio.Task - factorial(%s) = %s" % (number, f))

async def fibonacci(number):
    a, b = 0, 1
    for i in range(number):
      print("Asyncio.Task: Compute fibonacci (%s)" % (i))
      await asyncio.sleep(1)
      a, b = b, a + b
    print("Asyncio.Task - fibonacci(%s) = %s" % (number, a))

async def binomialCoeff(n, k):
    result = 1
    for i in range(1, k+1):
      result = result * (n-i+1) / i
      print("Asyncio.Task: Compute binomialCoeff (%s)" % (i))
      await asyncio.sleep(1)
    print("Asyncio.Task - binomialCoeff(%s , %s) = %s" % (n, k, result))


if __name__ == "__main__":
    tasks = [asyncio.Task(factorial(10)),
             asyncio.Task(fibonacci(10)),
             asyncio.Task(binomialCoeff(20, 10))]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
</code></pre>
<p>运行结果如下:</p>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222249247-1580775032.png"></p>
<h1 id="3使用-asyncio-和-futures">3.使用 Asyncio 和 Futures</h1>
<p><code>future </code> 是一个 <code>Python </code> 对象,它包含一个你希望在未来某个时间点获得、但目前还不存在的值。通常,当创建 <code>future </code> 时,它没有任何值,因为它还不存在。在这种状态下,它被认为是不完整的、未解决的或根本没有完成的。然后一旦你得到一个结果,就可以设置 <code>future </code> 的值,这将完成 <code>future</code>。那时,我们可以认为它已经完成,并可从 <code>future </code> 中提取结果。</p>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222254628-906152657.png"></p>
<p>要操作 <code>Asyncio</code> 中的 <code>Future </code>,必须进行以下声明:</p>
<pre><code class="language-bash">import asyncio
future = asyncio.Future()
</code></pre>
<p>基本的方法有:</p>
<table>
<thead>
<tr>
<th><strong>方法</strong></th>
<th><strong>作用</strong></th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>cancel()</strong></td>
<td>取消 future 的执行,调度回调函数</td>
</tr>
<tr>
<td><strong>result()</strong></td>
<td>返回 future 代表的结果</td>
</tr>
<tr>
<td><strong>exception()</strong></td>
<td>返回 future 中的 Exception</td>
</tr>
<tr>
<td><strong>add_done_callback(fn)</strong></td>
<td>添加一个回调函数,当 future 执行的时候会调用这个回调函数</td>
</tr>
<tr>
<td><strong>remove_done_callback(fn)</strong></td>
<td>从“call whten done”列表中移除所有 callback 的实例</td>
</tr>
<tr>
<td><strong>set_result(result)</strong></td>
<td>将 future 标为执行完成,并且设置 result 的值</td>
</tr>
<tr>
<td><strong>set_exception(exception)</strong></td>
<td>将 future 标为执行完成,并设置 Exception</td>
</tr>
</tbody>
</table>
<pre><code class="language-python">import asyncio

_# asyncio 里面有一个类 Future,实例化之后即可得到 future 对象_
_# 然后 asyncio 里面还有一个类 Task,实例化之后即可得到 task 对象(也就是任务)_
_# 这个 Task 是 Future 的子类,所以我们用的基本都是 task 对象,而不是 future 对象_
_# 但 Future 这个类和 asyncio 的实现有着密不可分的关系,所以我们必须单独拿出来说_

future = asyncio.Future()
print(future)_# &lt;Future pending&gt;_
print(future.__class__)_# &lt;class '_asyncio.Future'&gt;_
print(f"future 是否完成: {future.done()}")_# future 是否完成: False_

_# 设置一个值,通过 set_result_
future.set_result("古明地觉")
print(f"future 是否完成: {future.done()}")_# future 是否完成: True_
print(future)_# &lt;Future finished result='古明地觉'&gt;_
print(f"future 的返回值: {future.result()}")_# future 的返回值: 古明地觉_
</code></pre>
<p>可通过调用其类型对象 <code>Future</code> 来创建 <code>future</code>,此时 <code>future</code> 上将没有结果集,因此调用其 <code>done</code> 方法将返回 <code>False</code>。此后用 <code>set_result</code> 方法设置 <code>future</code> 的值,这将把 <code>future</code> 标记为已完成。或者,如果想在 <code>future</code> 中设置一个异常,可调用 <code>set_exception</code>。<strong>(必须在调用<strong><strong>set_result</strong></strong>(设置结果)之后才能调用<strong><strong>result</strong></strong>(获取结果),并且<strong><strong>set_result</strong></strong>只能调用一次,但<strong><strong>result</strong></strong>可以调用多次)</strong></p>
<p>在下面的示例代码中,我们定义了一个函数 <code>make_request</code>,该函数里面创建了一个 <code>future </code> 和一个任务,该任务将在 1 秒后异步设置 <code>future </code> 的结果。然后在主函数中调用 <code>make_request</code>,当调用它时,将立即得到一个没有结果的 <code>future</code>。然后 <code>await future</code> 会让主协程陷入等待,并将执行权交出去。一旦当 <code>future </code> 有值了,那么再恢复 <code>main() </code> 协程,拿到返回值进行处理。</p>
<pre><code class="language-python">import asyncio

async def set_future_value(future):
    await asyncio.sleep(1)
    future.set_result("Hello World")
def make_request():
    future = asyncio.Future()
    _# 创建一个任务来异步设置 future 的值_
    asyncio.create_task(set_future_value(future))
    return future
async def main():
    _# 注意这里的 make_request,它是一个普通的函数,如果在外部直接调用肯定是会报错的_
    _# 因为没有事件循环,在执行 set_future_value 时会报错_
    _# 但如果在协程里面调用是没问题的,因为协程运行时,事件循环已经启动了_
    _# 此时在 make_request 里面,会启动一个任务_
    future = make_request()
    print(f"future 是否完成: {future.done()}")
    _# 阻塞等待,直到 future 有值,什么时候有值呢?_
    _# 显然是当协程 set_future_value 里面执行完 future.set_result 的时候_
    value = await future_# 暂停 main(),直到 future 的值被设置完成_
    print(f"future 是否完成: {future.done()}")
    print(value)
asyncio.run(main())
</code></pre>
<p><img src="https://img2024.cnblogs.com/blog/2591203/202508/2591203-20250816222303207-566171698.png"></p><br><br>
来源:https://www.cnblogs.com/FreakEmbedded/p/19042738
頁: [1]
查看完整版本: 一文速通Python并行计算:14 Python异步编程-协程的管理和调度