FastAPI异步方法中调用同步方法
<h2 id="前言">前言</h2><p>在异步方法中调用同步方法,会直接阻塞整个事件循环,导致应用在执行同步方法期间无法处理其他任何并发请求,从而拖垮整个服务的性能。</p>
<p>为了解决这个问题,核心思路是将同步方法交给外部线程池去执行。</p>
<h2 id="方法1-使用-to_thread">方法1, 使用 to_thread</h2>
<p>Python 3.9 后可以使用 <code>asyncio.to_thread</code> 方法,将同步函数跑在独立的线程中,并返回一个协程供 <code>await</code></p>
<pre><code class="language-python">import asyncio
import time
from fastapi import FastAPI
app = FastAPI()
def sync_task(name: str):
time.sleep(2)
return f"Hello {name}, sync task done!"
@app.get("/async-call")
async def async_endpoint():
result = await asyncio.to_thread(sync_task, "World")
return {"message": result}
</code></pre>
<h2 id="方法2-直接定义同步路由">方法2, 直接定义同步路由</h2>
<p>FastAPI支持定义同步路由,FastAPI会自动在一个外部线程池中运行该函数。不过出于代码整体设计的考虑,个人不建议这么做。</p>
<h2 id="方法3-使用-run_in_threadpool">方法3, 使用 run_in_threadpool</h2>
<p>FastAPI 基于 Starlette, 而 Starlette 提供一个工具函数 <code>run_in_threadpool</code>,这种方式类似于 <code>asyncio.to_thread</code>,在某些老版本的 FastAPI 或特定的 contextvars 传递场景下更常用。</p>
<pre><code class="language-python">from fastapi.concurrency import run_in_threadpool
@app.get("/method3")
async def starlette_endpoint():
result = await run_in_threadpool(sync_task, "Starlette")
return {"message": result}
</code></pre>
<h2 id="方法4-使用进程池">方法4, 使用进程池</h2>
<p>对于CPU密集型任务,应该使用多进程<code>ProcessPoolExecutor</code>来操作</p>
<pre><code class="language-python">import concurrent.futures
import math
from fastapi import FastAPI
app = FastAPI()
# 创建一个全局进程池
executor = concurrent.futures.ProcessPoolExecutor()
def cpu_intensive_calculation(n: int):
# 模拟重度 CPU 计算
return sum(math.isqrt(i) for i in range(n))
@app.get("/cpu-bound-task")
async def cpu_task():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, cpu_intensive_calculation, 10**7)
return {"result": result}
</code></pre>
</div>
<div id="MySignature" role="contentinfo">
<p>本文来自博客园,作者:花酒锄作田,转载请注明原文链接:https://www.cnblogs.com/XY-Heruo/p/19448294</p><br><br>
来源:https://www.cnblogs.com/XY-Heruo/p/19448294
頁:
[1]