黄文娟 發表於 2025-6-21 16:19:00

taskiq异步分布式任务管理器 适用fastapi

<h1 id="taskiq-异步分布式任务管理器">taskiq 异步分布式任务管理器</h1>
<p>https://taskiq-python.github.io/<br>
将 taskiq 视为 asyncio celery 实现。它使用几乎相同的模式,但它更加现代和灵活。<br>
它不是任何其他任务管理器的直接替代品。它具有不同的库生态系统和一组不同的功能。此外,它不适用于同步项目。将无法同步发送任务。</p>
<h2 id="1-安装taskiq">1 安装taskiq</h2>
<pre><code class="language-python">pip install taskiq
</code></pre>
<h2 id="2-使用">2 使用</h2>
<p>我这里使用的是fastapi+rabbitmq,所以需要多装一个<code>taskiq-aio-pika</code>包来使用</p>
<pre><code class="language-python">pip install taskiq-aio-pika
</code></pre>
<p>项目路径如下:<br>
<img src="https://raw.githubusercontent.com/ssrheart425/obsidian_image_bed/main/image/202506211603467.png" alt="image.png" loading="lazy"><br>
broker.py:</p>
<pre><code class="language-python">from taskiq_aio_pika import AioPikaBroker

from app.core.config import settings

broker = AioPikaBroker(url="amqp://guest:guest@localhost:5672//") # 此处替换broker_url
</code></pre>
<p><code>这里必须要定义一个worker.py,显式的导入你的tasks和broker。不然会报如下错误:</code></p>
<pre><code class="language-python">task "xxxx" is not found. Maybe you forgot to import it?
</code></pre>
<p>worker.py:</p>
<pre><code class="language-python">from app.tasks.broker import broker

import app.tasks.notify_tasks
</code></pre>
<p>xxx_tasks.py:</p>
<pre><code class="language-python">from app.tasks.broker import broker


@broker.task
async def test_tasks():
        # 现在就可以支持async await使用 例如:
        async with httpx.AsyncClient() as client:
                await client.post("jd.com", json=body)
        return
</code></pre>
<h2 id="3-cli启动命令">3 cli启动命令:</h2>
<pre><code class="language-python">taskiq worker app.tasks.worker:broker
</code></pre><br><br>
来源:https://www.cnblogs.com/ALPACINO6/p/18940274
頁: [1]
查看完整版本: taskiq异步分布式任务管理器 适用fastapi