推荐答案
在 FastAPI 中使用 RQ(Redis Queue)可以通过以下步骤实现:
安装依赖: 首先,确保安装了
rq
和redis
库:pip install rq redis
创建任务函数: 定义一个任务函数,这个函数将被放入队列中异步执行:
def background_task(n): import time time.sleep(n) return f"Task completed after {n} seconds"
配置 Redis 连接: 在 FastAPI 应用中配置 Redis 连接:
from redis import Redis from rq import Queue redis_conn = Redis(host='localhost', port=6379) queue = Queue(connection=redis_conn)
将任务加入队列: 在 FastAPI 的路由中,将任务加入队列:
from fastapi import FastAPI app = FastAPI() @app.post("/task/") async def create_task(n: int): job = queue.enqueue(background_task, n) return {"job_id": job.id}
启动 RQ Worker: 在终端中启动 RQ Worker 来处理队列中的任务:
rq worker
查询任务状态: 可以通过
job
对象查询任务的状态:from rq.job import Job @app.get("/task/{job_id}") async def get_task_status(job_id: str): job = Job.fetch(job_id, connection=redis_conn) return {"status": job.get_status(), "result": job.result}
本题详细解读
1. RQ 简介
RQ(Redis Queue)是一个简单的 Python 库,用于将任务放入 Redis 队列中,并由后台工作进程异步执行。它非常适合处理耗时任务,如发送电子邮件、处理图像或执行复杂的计算。
2. FastAPI 与 RQ 的集成
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API。通过将 RQ 与 FastAPI 结合使用,可以实现异步任务处理,从而提高应用的性能和响应速度。
3. 任务队列的工作原理
- 任务函数:定义了一个普通的 Python 函数,该函数将在后台执行。
- 队列:使用 Redis 作为消息代理,将任务放入队列中。
- Worker:RQ Worker 是一个独立的进程,负责从队列中取出任务并执行。
4. 任务状态查询
通过 Job
对象,可以查询任务的执行状态和结果。这对于需要跟踪任务进度的应用场景非常有用。
5. 扩展与优化
- 任务重试:RQ 支持任务重试机制,可以在任务失败时自动重试。
- 任务优先级:可以为任务设置不同的优先级,确保重要任务优先执行。
- 任务超时:可以设置任务的超时时间,防止任务长时间占用资源。
通过以上步骤,你可以在 FastAPI 中轻松集成 RQ,实现异步任务处理。