超实用 Demo:使用 FastAPI、Celery、RabbitMQ 和 Mo···

发布时间:2025-05-18 20:31:20 作者:益华网络 来源:undefined 浏览量(1) 点赞(1)
摘要:异步任务,是 Web 开发中经常遇到的问题,比如说用户提交了一个请求,虽然这个请求对应的任务非常耗时,但是不能让用户等在这里,通常需要立即返回结果,告诉用户任务已提交。任务可以在后续慢慢完成,完成后再给用户发一个完成的通知。 今天分享一份代码,使用 Celery、RabbitMQ 和 MongoDB 实现一个

异步任务,是 Web 开发中经常遇到的问题,比如说用户提交了一个请求,虽然这个请求对应的任务非常耗时,但是不能让用户等在这里,通常需要立即返回结果,告诉用户任务已提交。任务可以在后续慢慢完成,完成后再给用户发一个完成的通知。

今天分享一份代码,使用 Celery、RabbitMQ 和 MongoDB 实现一个异步任务工作流,你可以修改 task.py 来实现你自己的异步任务。

架构图如下:

其中 Celery 来执行异步任务,RabbitMQ 作为消息队列,MongoDB 存储任务执行结果,FastAPI 提供 Web 接口。

以上所有模块均可使用 Docker 一键部署。

下面为 Demo 使用方法:

1、确保本机已安装 Docker、Git

2、下载源代码:

git clone https://github.com/aarunjith/async-demo.git

3、部署并启动:

cd async-demo

docker compose up --build

4、启动一个异步任务:

$ curl -X POST http://localhost:8080/process

任务会发送到消息队列,同时会立即返回一个任务 id:

❯ curl -X POST http://localhost:8080/process

{"status":"PENDING","id":"a129c666-7b5b-45f7-ba54-9d7b96a1fe58","error":""}%

5、查询任务状态:

curl -X POST http://localhost:8080/check_progress/<task_id>

任务完成后的返回结果如下:

❯ curl -X POST http://localhost:8080/check_progress/a129c666-7b5b-45f7-ba54-9d7b96a1fe58

{"status":"SUCEESS","data":"\"hello\""}%

代码目录结构如下:

其中 app.py 如下:

from fastapi import FastAPI

from celery.result import AsyncResult

from tasks import start_processing

from loguru import logger

from pymongo import MongoClient

import uvicorn

# Lets create a connection to our backend where celery stores the results

client = MongoClient("mongodb://mongodb:27017")

# Default database and collection names that Celery create

db = client[task_results]

coll = db["celery_taskmeta"]

app = FastAPI()

@app.post(/process)

async def process_text_file():

Process endpoint to trigger the start of a process

try:

result = start_processing.delay()

logger.info(fStarted processing the task with id {result.id})

return {

"status": result.state,

id: result.id,

error:

}

except Exception as e:

logger.info(fTask Execution failed: {e})

return {

"status": "FAILURE",

id: None,

error: e

}

@app.post(/check_progress/{task_id})

async def check_async_progress(task_id: str):

Endpoint to check the task progress and fetch the results if the task is

complete.

try:

result = AsyncResult(task_id)

if result.ready():

data = coll.find({_id: task_id})[0]

return {status: SUCEESS, data: data[result]}

else:

return {"status": result.state, "error": }

except Exception as e:

data = coll.find({_id: task_id})[0]

if data:

return {status: SUCEESS, data: data[result]}

return {status: Task ID invalid, error: e}

if __name__ == "__main__":

uvicorn.run("app:app", host=0.0.0.0, port=8080)

如果要实现自己的任务队列,就修改 task.py 来添加自己的异步任务,可以整合到自己的项目中。

二维码

扫一扫,关注我们

声明:本文由【益华网络】编辑上传发布,转载此文章须经作者同意,并请附上出处【益华网络】及本页链接。如内容、图片有任何版权问题,请联系我们进行处理。

感兴趣吗?

欢迎联系我们,我们愿意为您解答任何有关网站疑难问题!

您身边的【网站建设专家】

搜索千万次不如咨询1次

主营项目:网站建设,手机网站,响应式网站,SEO优化,小程序开发,公众号系统,软件开发等

立即咨询 15368564009
在线客服
嘿,我来帮您!