菊香满屋 發表於 2019-8-6 09:23:00

asyncio之异步上下文管理器

<h1 id="异步上下文管理器">异步上下文管理器</h1>
<p>前面文章我们提到了上下文管理器,但是这个上下文管理器只适用于同步代码,不能用于异步代码(async def形式),不过不用担心今天我们就来讨论在异步中如何使用上下文管理器。<br>
特别提醒本教程所使用的Python版本为Python3.7。</p>
<h3 id="async-with">async with</h3>
<p>异步上下文管理器。类似于同步上下文管理器,我们知道使用with可以实现一个上下文管理的器,而对于异步上下文管理器其根本表现形式为async with,下面的一段代码告诉你async with是如何运作的。</p>
<pre><code>import asyncio
class AContext:
    def __init__(self):
      print("in init")

    async def __aenter__(self):
      print("in aenter")
    async def __aexit__(self, exc_type, exc_val, exc_tb):
      print("in aexit")

async def main():
    async with AContext() as ac:
      print("in with", ac)

if __name__ == '__main__':
    print("start")
    asyncio.run(main())
</code></pre>
<p>输出内容</p>
<pre><code>start
in init
in aenter
in with None
in aexit
</code></pre>
<p>下面说下async with和with的不同地方<br>
语法上,with实现了enter和exit两个方法,async with实现了类似方法<br>
aenter和aexit在同步的基础上加个a,实际上就是代表asynchronous。<br>
实现上,使用普通的函数就可以实现with,但是async with需要通过异步函数的形式去实现,就像上面的例子一样。</p>
<h3 id="asynccontextmanager">asynccontextmanager</h3>
<p>从Python 3.7开始,有两种方法可以编写异步上下文管理器。一种就是前面提到的魔法函数的实现,另外一种就是contextlib的另外一个模块asynccontextmanager。通过装饰器的方式实现一个异步上下文管理器</p>
<pre><code>import asyncio
from contextlib import asynccontextmanager
from concurrent.futures.thread import ThreadPoolExecutor


class AsyncFile(object):
    def __init__(self, file, loop=None, executor=None):
      if not loop:
            loop = asyncio.get_running_loop()# 获取当前运行事件循环
      if not executor:
            executor = ThreadPoolExecutor(10)# 线程池数量10
      self.file = file
      self.loop = loop
      self.executor = executor
      self.pending = []
      self.result = []

    def write(self, string):
      """
      实现异步写操作
      :param string: 要写的内容
      :return:
      """
      self.pending.append(
            self.loop.run_in_executor(
                self.executor, self.file.write, string,
            )
      )

    def read(self, i):
      """
      实现异步读操作
      :param i:
      :return:
      """
      self.pending.append(
            self.loop.run_in_executor(self.executor, self.file.read, i,)
      )

    def readlines(self):
      self.pending.append(
            self.loop.run_in_executor(self.executor, self.file.readlines, )
      )

@asynccontextmanager
async def async_open(path, mode="w"):
    with open(path, mode=mode) as f:
      loop = asyncio.get_running_loop()
      file = AsyncFile(f, loop=loop)
      try:
            yield file
      finally:
            file.result = await asyncio.gather(*file.pending, loop=loop)
</code></pre>
<p>上面的代码通过使用asyncio中run_in_executor运行一个线程,来使得阻塞操作变为非阻塞操作,达到异步非阻塞的目的。<br>
AsyncFile类提供了一些方法,这些方法将用于将write、read和readlines的调用添加到pending列表中。这些任务通过finally块中的事件循环在ThreadPoolExecutor进行调度。<br>
yield 前半段用来表示_<em>aenter</em>_()<br>
yield 后半段用来表示_<em>aexit</em>_()<br>
使用finally以后可以保证链接资源等使用完之后能够关闭。</p>
<h3 id="运行异步上下文管理器">运行异步上下文管理器</h3>
<p>如果调用前面示例中的异步上下文管理器,则需要使用关键字async with来进行调用。另外带有async with的语句只能在异步函数中使用。</p>
<pre><code>from asynciodemo.asyncwith import async_open
import asyncio
import tempfile
import os


async def main():
    tempdir = tempfile.gettempdir()
    path = os.path.join(tempdir, "run.txt")
    print(f"临时文件目录:{path}")

    async with async_open(path, mode='w') as f:
      f.write("公众号: ")
      f.write("Python")
      f.write("学习")
      f.write("开发")


if __name__ == '__main__':
    asyncio.run(main())
</code></pre>
<p>使用方法和with类似可以通过使用as,然后使用其句柄,唯一需要注意的就是要在异步函数中使用。</p>
<h3 id="同步任务">同步任务</h3>
<p>在之前的一些异步教程里和大家说了关于协程中的几个同步方法,asyncio.wait和asyncio.gather,这里我们可以配合这些方法通过异步上下文管理器来实现同步任务,请看如下代码</p>
<pre><code>import asyncio


# 同步挂起协程

class Sync():
    def __init__(self):
      self.pending = []
      self.finished = None

    def schedule_coro(self, coro, shield=True):
       #如果去掉asyncio.shield,在取消fut函数的时候,就会导致coro协程也出错。
      fut = asyncio.shield(coro) if shield else asyncio.ensure_future(coro)
      self.pending.append(fut)
      return fut

    async def __aenter__(self):
      return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
      # 退出async with的时候,任务列表中的任务进行并发操作。
      self.finished = await asyncio.gather(*self.pending, return_exceptions=True)


async def workload1():
    await asyncio.sleep(2)
    print("These coroutines will be executed return 41")
    return 41


async def workload2():
    await asyncio.sleep(2)
    print("These coroutines will be executed return 42")
    return 42


async def workload3():
    await asyncio.sleep(2)
    print("These coroutines will be executed return 43")
    return 43


async def main():
    async with Sync() as sync:
      # 使用异步上下文可以创建同步协程程序
      sync.schedule_coro(workload1())
      sync.schedule_coro(workload2())
      sync.schedule_coro(workload3())
    print("All scheduled corotines have retuned or throw:", sync.finished)


if __name__ == '__main__':
    asyncio.run(main())

</code></pre>
<p>输出</p>
<pre><code>These coroutines will be executed return 41
These coroutines will be executed return 42
These coroutines will be executed return 43
All scheduled corotines have retuned or throw:
</code></pre>
<p>1.程序是同步的形式并发输出的。<br>
2. schedule_coro的作用是将协程workload1,workload2,workload3添加到任务列表pending,退出async with的时候,任务列表中的任务进行并发操作。</p><br><br>
来源:https://www.cnblogs.com/c-x-a/p/11198901.html
頁: [1]
查看完整版本: asyncio之异步上下文管理器