FastAPI 高级特性:同异步路由函数、依赖注入与后台任务
==============================
引言
==
在现代 Web 开发中,高效的性能和简洁的代码结构是至关重要的。FastAPI 作为一个现代、快速(高性能)的 Web 框架,凭借其对异步编程的原生支持和直观的依赖注入系统,迅速成为开发者的热门选择。在本文中,将深入探讨 FastAPI 中的几个核心概念:同异步函数的区别、Depends 依赖以及后台任务。
同步异步路由函数
在fastapi中路由函数可以是同步的也可以是异步的,跟随如下例子一探究竟
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @File: def_async_def.py
# @Desc: { 同步异步路由函数 }
# @Date: 2024/05/21 11:06
import asyncio
import time
import uvicorn
from fastapi import FastAPI
import requests
import httpx
app = FastAPI(description="同步异步路由函数")
async def async_get(url):
async with httpx.AsyncClient() as client:
resp = await client.get(url)
return resp
@app.get("/ping")
async def ping():
return "pong"
@app.get("/async_route_func_demo")
async def async_route_func():
resp = await async_get("https://www.baidu.com")
# resp = requests.get("https://www.baidu.com")
print("resp", resp)
return "async_route_func_demo"
@app.get("/sync_route_func_demo")
def sync_route_func():
resp = requests.get("https://www.baidu.com")
print("resp", resp)
return "sync_route_func_demo"
def main():
uvicorn.run(app)
if __name__ == '__main__':
main()
- 同步路由函数中使用 requests 请求 ping 接口
- 异步路由函数则是使用 httpx 请求 ping 接口
具体有什么区别呢
- 同步函数将会使用线程进行处理
- 异步函数则是以协程方式在事件循环中处理
FastAPI 框架属于异步(asgi)框架,所以能用异步尽量用异步,没有对应的异步库可以考虑用线程池执行同步io操作,为什么不推荐直接使用同步路由函数借助fastapi内置策略使用线程处理,因为在实际处理时,异步兼容同步,而同步不兼容异步,如果数据库采用的是异步操作,则大部分业务逻辑都是异步函数,为了解决几个同步io,这样在最顶层使用同步路由函数则极其不方便,还是采用异步路由函数,然后在同步io操作中使用线程池进行封装,例如读写文件等。
这里还有一个非常重要的一点 不要在异步路由函数中做同步io处理,不然性能将大打折扣。这里通过同异步发送http请求来模拟异步路由使用同步io。
import uvicorn
from fastapi import FastAPI
import requests
import httpx
import aiohttp
app = FastAPI(description="同步异步路由函数")
aio_client = httpx.AsyncClient()
aio_session: aiohttp.ClientSession = None
req_session = requests.Session()
@app.on_event("startup")
async def startup_event():
# 应用启动事件
global aio_session
aio_session = aiohttp.ClientSession()
async def async_get(url):
# async with aio_session.get(url) as resp:
# return resp
resp = await aio_client.get(url)
return resp
@app.get("/ping")
async def ping():
return "pong"
@app.get("/async_route_func_demo")
async def async_route_func():
url = "https://juejin.cn/"
resp = await async_get(url)
# print("resp", resp)
return "async_route_func_demo"
@app.get("/async_route_use_sync_io_demo")
async def async_route_func():
url = "https://juejin.cn/"
resp = req_session.get(url)
# print("resp", resp)
return "async_route_func_demo"
@app.get("/sync_route_func_demo")
def sync_route_func():
url = "https://juejin.cn/"
resp = req_session.get(url)
# print("resp", resp)
return "sync_route_func_demo"
def main():
uvicorn.run(app, log_level="warning")
if __name__ == '__main__':
main()
最后使用wrk来压测下这种情况
# 20线程模拟200并发请求 持续10s
wrk -t20 -d10s -c200 http://127.0.0.1:8000/async_route_func_demo
通过wrk的压测数据可以看出,异步路由函数中使用同步io(requests),并发性能一落千丈,简直惨不忍睹。
- 异步路由(aiohttp):10s 总共处理了 7027 个请求,qps:697.17
- 异步路由(requests):10s 总共处理了 138 个请求,qps:13.66
- 同步路由(requests):10s 总共处理了 5135 个请求,qps:510.23 (隐式线程池处理)
这是一个非常严重的问题一个同步io会堵塞住整个系统,导致其他接口也访问不了。所以再次说明一遍 不要在异步路由函数中做同步io处理,尤其这些异步web框架 fastapi、tornado、sanic 等很容易会因为技术选型、程序员编码问题导致性能大打折扣。异步框架推荐用异步库,没有支持的异步库使用线程池处理同步IO这是最佳的实践。
推荐我另外一篇文章: 同步、异步无障碍:Python异步装饰器指南 可以处理没有异步库的情况。
2024-06-27 内容补充
我一开始不愿相信httpx同是异步框架为啥性能对比aiohttp效果差太多,怀疑是不是我的使用方式不对,于是在stackoverflow发帖寻求帮助,后面才发现是我的测试方法不对,我上面测试只是拿到响应对象就不处理,而 aiohttp 是懒加载方式获取response对象的时候只处理响应头,响应体需要在触发 await resp.text()
、await resp.json()
才会真正请求处理响应体。
大家可以看看相关帖子:stackoverflow.com/questions/7…
aiohttp相关说明:docs.aiohttp.org/en/stable/h…
Httpx issue:github.com/encode/http…
于是我换了下测试方式都拿到响应体数据返回,再看看具体性能情况
import uvicorn
from fastapi import FastAPI
import requests
import httpx
import aiohttp
app = FastAPI(description="同步异步路由函数")
aio_client = httpx.AsyncClient()
aio_session: aiohttp.ClientSession = None
req_session = requests.Session()
@app.on_event("startup")
async def startup_event():
global aio_session
aio_session = aiohttp.ClientSession()
async def async_httpx_get(url):
resp = await aio_client.get(url)
return resp
async def async_aiohttp_get(url):
async with aio_session.get(url) as resp:
return await resp.text()
@app.get("/ping")
async def ping():
return "pong"
@app.get("/async_httpx_get")
async def async_route_func():
url = "https://juejin.cn/"
resp = await async_httpx_get(url)
return resp.text
@app.get("/async_aiohttp_get")
async def async_route_func():
url = "https://juejin.cn/"
resp_text = await async_aiohttp_get(url)
return resp_text
@app.get("/async_route_use_sync_io_demo")
async def async_route_func():
url = "https://juejin.cn/"
resp = req_session.get(url)
return resp.text
@app.get("/sync_route_func_demo")
def sync_route_func():
url = "https://juejin.cn/"
resp = req_session.get(url)
return resp.text
def main():
uvicorn.run(app, log_level="warning")
if __name__ == '__main__':
main()
wrk 测试数据
还是差不多,只是 aiohttp 的 qps 降了一些,因此还是推荐使用 aiohttp 进行 http 请求。我之前用httpx封装的http客户端现在也改成了aiohttp。大家感兴趣可以看这篇:Python 同、异步HTTP客户端封装:性能与简洁性的较量
Depends依赖项
在FastAPI中,Depends
用于处理依赖注入(Dependency Injection)。它的主要作用是让你能够将一些共享的逻辑、组件或者资源(如数据库连接、配置、服务等)注入到路由函数中,而不需要显式地在每个函数中传递这些依赖项。
基本使用
公共分页参数处理
import uvicorn
from fastapi import FastAPI, HTTPException, Query
from typing import Union
from fastapi import Depends, FastAPI
from pydantic import BaseModel, Field
app = FastAPI(description="depends 使用")
class PageModel(BaseModel):
offset: int = Field(0, description="偏移量")
limit: int = Field(10, description="每页大小")
def page_parameters(
curr_page: int = 1, page_size: int = 10
):
offset = (curr_page - 1) * page_size
if offset < 0 or page_size > 1000:
raise HTTPException(status_code=400, detail="Limit must be less than 100")
return PageModel(offset=offset, limit=page_size)
@app.get("/v1/items/")
async def read_items(curr_page: int = Query(1), page_size: int = Query(10)):
page_model = page_parameters(curr_page, page_size)
items = ["item1", "item2", "item3"][page_model.offset:page_model.offset + page_model.limit]
return {"items": items}
@app.get("/v1/users/")
async def read_users(curr_page: int = Query(1), page_size: int = Query(10)):
page_model = page_parameters(curr_page, page_size)
users = ["user1", "user2", "user3"][page_model.offset:page_model.offset + page_model.limit]
return users
@app.get("/v2/items/")
async def read_items(page_model: PageModel = Depends(page_parameters)):
items = ["item1", "item2", "item3"][page_model.offset:page_model.offset + page_model.limit]
return {"items": items}
@app.get("/v2/users/")
async def read_users(page_model: PageModel = Depends(page_parameters)):
users = ["user1", "user2", "user3"][page_model.offset:page_model.offset + page_model.limit]
return users
def main():
uvicorn.run(app)
if __name__ == '__main__':
main()
在这个例子可以看出
- 分页参数处理和验证逻辑 被提取到
page_parameters
依赖函数中。 - 路由函数
/v2/items
和/v2/users
通过Depends
分别注入这些依赖函数的返回值。
不用Depends,在路由函数需要定义分页参数然后传递给 page_parameters
函数处理,而使用Depends在路由定义上简化了然后间接调用处理。整体复用代码逻辑,更简洁了。
还有一些用途示例,大家可以查阅官网:fastapi.tiangolo.com/zh/tutorial…
不知道为啥我对官网的例子没有兴趣,我一般都不想这样代码复用,共享客户端等操作。因为我不想把逻辑代码放到路由函数层处理,而是单独抽离一个逻辑层(service)处理,路由层只做如下操作
- 请求参数校验(pydantic的BaseModel)
- 参数透传调用逻辑层处理
- 响应反参
路径、查询字符串参数转成pydantic的BaseModel
通常我喜欢用Depends把路径参数或者查询字符串参数封装到pydantic的BaseModel 中
class UserQueryIn(BaseModel):
user_id: int = Path(description="用户ID")
name: Optional[str] = Query(default=None, description="姓名")
age: Optional[int] = Query(default=None, description="年龄")
@app.get("/users/{user_id}/path_query_params2", summary="路径参数+查询字符串参数BaseModel的demo")
def with_path_query_params(req_model: UserQueryIn = Depends(UserQueryIn)):
# 业务逻辑处理
# logic_func(req_model)
return req_model.model_dump()
如果直接用BaseModel 的话 fastapi 会认为是body参数,这明显不合理是错误,因此需要通过 fastapi的Depends函数来解决。
配合 contextvars 存储 Request、BackgroundTasks
这里通过全局依赖存储请求对象与后台任务处理器。到后面逻辑层需要用时可以直接通过上下文取出来。不用一层一层传递。
import time
from contextvars import ContextVar
import uvicorn
from fastapi import FastAPI, HTTPException, Query, Path, Request, BackgroundTasks
from typing import Union, Optional
from fastapi import Depends, FastAPI
from pydantic import BaseModel, Field
REQ_CTX: ContextVar[Union[Request, None]] = ContextVar("req_ctx", default=None)
BG_TASK_EXECUTOR: ContextVar[Union[BackgroundTasks, None]] = ContextVar("bg_task_executor", default=None)
async def set_req_and_bg_tasks(req: Request, bg_tasks: BackgroundTasks):
"""全局依赖把请求对象与后台任务处理器存储到上下文变量中"""
REQ_CTX.set(req)
BG_TASK_EXECUTOR.set(bg_tasks)
return req, bg_tasks
app = FastAPI(description="depends 使用", dependencies=[Depends(set_req_and_bg_tasks)])
async def req_bg_tasks_logic(name: str, sleep_seconds: int = 5):
req = REQ_CTX.get()
print(req.url)
ret = f"name {name} sleep {sleep_seconds}s"
def bg_task_demo():
time.sleep(sleep_seconds)
print(ret)
# 模拟添加后台任务
bg_task_executor = BG_TASK_EXECUTOR.get()
bg_task_executor.add_task(bg_task_demo)
return ret
@app.get("/req_bg_tasks_demo")
async def req_bg_tasks_demo(name: str, sleep_seconds: int = 5):
ret = await req_bg_tasks_logic(name, sleep_seconds)
return ret
测试结果如下
http://127.0.0.1:8000/req_bg_tasks_demo?name=hui&sleep_seconds=5
INFO: 127.0.0.1:58067 - "GET /req_bg_tasks_demo?name=hui&sleep_seconds=5 HTTP/1.1" 200 OK
bg_task_demo name hui sleep 5s
后台任务
FastAPI 内置了一个后台任务处理器,可以用于处理一些小型的后台任务。不过,不能直接实例化 BackgroundTasks
对象来使用,而是需要通过路由函数中的 BackgroundTasks
对象来添加后台任务。类似于 Request
对象,BackgroundTasks
对象是隐式地放在路由函数中的,只要在参数中声明对应的类型,它就会被自动注入。上面提到的全局依赖也是通过这种方式来进行存储的。
@app.get("/bg_tasks_demo")
async def bg_task_demo(bg_tasks: BackgroundTasks):
def sync_bg_task_test(name, sleep_seconds: int = 3):
print("sync_bg_task_test running")
time.sleep(sleep_seconds)
print(f"sync_bg_task_test {name} sleep {sleep_seconds}s end")
async def async_bg_task_test(name, sleep_seconds: int = 3):
print("async_bg_task_test running")
await asyncio.sleep(sleep_seconds)
print(f"async_bg_task_test {name} sleep {sleep_seconds}s end")
# 分别添加同步、异步io的后台任务
bg_tasks.add_task(sync_bg_task_test, name="hui-sync", sleep_seconds=1)
bg_tasks.add_task(async_bg_task_test, name="hui-async", sleep_seconds=2)
return "bg_task_demo"
测试结果如下
INFO: 127.0.0.1:58691 - "GET /bg_tasks_demo HTTP/1.1" 200 OK
sync_bg_task_test running
sync_bg_task_test hui-sync sleep 1s end
async_bg_task_test running
async_bg_task_test hui-async sleep 2s end
在接口请求响应完成,后台顺序调度后台任务,兼容同步、异步io,同步io是通过线程池处理,不会堵塞住系统。
这里先展示部分 BackgroundTasks 的源码,后面单独再分析研究下fastapi的一些经典源码。
class BackgroundTasks(BackgroundTask):
def __init__(self, tasks: typing.Optional[typing.Sequence[BackgroundTask]] = None):
self.tasks = list(tasks) if tasks else []
def add_task(
self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs
) -> None:
task = BackgroundTask(func, *args, **kwargs)
self.tasks.append(task)
async def __call__(self) -> None:
for task in self.tasks:
await task()
源代码
Github:FastAPI 实战手册
从FastAPI的安装、请求处理、响应返回等基础知识入手,到中间件、数据库操作、身份认证等核心组件的应用,再到实战项目的开发,以及源码分析,循序渐进地介绍FastAPI的使用,旨在让读者全面掌握这个框架。
原文链接: https://juejin.cn/post/7372135071971885071
文章收集整理于网络,请勿商用,仅供个人学习使用,如有侵权,请联系作者删除,如若转载,请注明出处:http://www.cxyroad.com/17278.html