收费网站怎么免费,望野李梦阳,软件定制开发费用多少云鲸互创团队,山东东营网络科技有限公司fastapi操作异步和同步请求 声明#xff1a;异步“请求” 和 异步“方法调用” 的区别【关键点】 1、同步、异步方法 同步阻塞1.1 仅同步请求的并发测试 1.2 仅异步请求的并发测试 1.3 同步请求 和 异步请求 的并发 2、异步方法阻塞的解决方案2.1 使用线程池执行同步阻塞2.2 … fastapi操作异步和同步请求 声明异步“请求” 和 异步“方法调用” 的区别【关键点】 1、同步、异步方法 同步阻塞1.1 仅同步请求的并发测试 1.2 仅异步请求的并发测试 1.3 同步请求 和 异步请求 的并发 2、异步方法阻塞的解决方案2.1 使用线程池执行同步阻塞2.2 使用await 异步等待2.2.1 异步方法内部使用同步阻塞2.2.2 异步方法内部使用异步阻塞 声明异步“请求” 和 异步“方法调用” 的区别
问题 如果使用uvicorn单进程、单线程启动fastapi服务其中代码里
app.get(/bb)
async def read_item():await long_running_task(4)result await long_running_task(4)return {result: result}这时/bb的单次请求时执行await long_running_task(4)任务先等待4s结束后再次执行long_running_task(4)又是4s等待总共就是8s也就是在方法体内的await是串行的。
那当多次请求/aa时
app.get(/aa)
async def read_item():result await long_running_task()return {result: result}这时fastapi 会把2个异步请求分别转化为task而不是coroutine了并把task提交到事件循环中即每个请求都是1个独立的任务 事件循环会并发地处理这些任务因此每个请求的 await long_running_task(4) 调用会并发执行。 因此每个请求的响应时长是 4 秒而不是顺序累加成 8 秒。
【关键点】 1、 方法体内多个await 在同一个方法体内不管有多少个“await ” 它们都共享同一个任务。 await是并发 or 串行取决于在await执行前是否存在多个已启动的task。【具体详见 异步编程下await的理解】 2、多个请求并发处理当多个请求并发时每个请求会被包装成一个独立的任务并提交到事件循环中。事件循环会并发地处理这些任务因此每个请求的await 调用会并发执行。 1、同步、异步方法 同步阻塞
from fastapi import FastAPI, Query, Request, Response
import uvicorn
import time
import sys
import os
import asynciosys.path.append(os.getcwd())
app FastAPI(titlebbb)
#***************************************1、同步阻塞操作... ***************************************
### def不用async修饰同步方法同步阻塞 ###
app.get(/aa)
def aa():print(aa)time.sleep(3) # 同步阻塞3sprint(aaa)return aaaaaa### 异步方法同步阻塞 ###
app.get(/bb)
async def bb(): 虽然bb用async关键字修饰成了一个异步函数协程但该方法内部使用了同步阻塞所以导致它的异步能力失效变成同步方法print(bb)time.sleep(8) # 是一个同步阻塞操作。它会阻塞当前线程直到指定的秒数过去。print(bbb)return bbbbbb#***************************************2、非阻塞操作... ***************************************
### 异步方法 无阻塞 ###
app.get(/hello)
async def hello(con: str Query(..., description输入字符串)):print(hello)con con **** 好样的return {message: con}### 同步方法 无阻塞 ###
app.get(/hello2)
def hello2(con: str Query(..., description输入字符串)):con con **** 好样的222222return {message: con}if __name__ __main__:uvicorn.run(app,host0.0.0.0,port9351) 1.1 仅同步请求的并发
fastapi 在处理同步请求时会把他们放入底层维护的工作线程池中每个请求由AnyIO worker thread工作线程执行。
测试
/aa 和 /hello2 并发 /aa耗时3.02s /hello2耗时5ms。 结论/aa 的阻塞不会影响 /hello2 的及时返回。 1.2 仅异步请求的并发
fastapi 在处理异步请求时会把他们放入Main Thread的协程event_loop中。如果一个异步方法里存在同步阻塞那他会导致后请求的异步方法也被阻塞 因为所有的异步都在1个event_loop中~~
测试
/bb 和 /hello 并发 /hello要等/bb执行完才会结束总体耗时8s。 结论/bb 的阻塞会影响 /hello 的及时返回。 1.3 同步请求 和 异步请求 的并发
1/aa稍早于/bb 通过调用堆栈可以看到 /aa即使阻塞也不会阻塞/bb的请求两者可同步执行方法体 但是不知道为什么有时候/aa要等到/bb响应结束也才return 也就是/aa的总响应时长不总是3s附近会出现return bbbbbb结束后才收到return aaaaaa导致/aa的响应时长大于8s 执行结果 aa bb aaa bbb
2/bb稍早于/aa 通过调用堆栈可以看到 /bb阻塞会导致/aa不能立即执行而是要等待/bb完全执行完才会进入/aa的方法体 执行结果 bb bbb aa aaa
2、异步方法阻塞的解决方案
2.1 使用线程池执行同步阻塞
from fastapi import FastAPI, Query, Request, Response
import uvicorn
from concurrent.futures import ThreadPoolExecutor
import time
import sys
import os
import asynciosys.path.append(os.getcwd())
app FastAPI(titlebbb)threadpoolThreadPoolExecutor(max_workers3)############## 【ok】放线程池中不会阻塞主线程~ ##################
app.get(/ver2)
async def ver2(request:Request):msgrequest.query_params.get(msg)loopasyncio.get_event_loop() # 拿到主线程的事件循环事件循环可以看做“方法间”的异步task{msg:msg}def handle_task():print(task recieved:, task[msg])resulttask[msg].lower()time.sleep(8)return result# 在主线程的事件循环里等待异步结果因为事件循环是针对“所有方法间”的所以是在主线程里resultawait loop.run_in_executor(threadpool, handle_task) print(task ends:, result, asyncio.get_event_loop)return Response(result)if __name__ __main__:uvicorn.run(app,host0.0.0.0,port9351) 2.2 使用await 异步等待
2.2.1 异步方法内部使用同步阻塞
【方法间】A、B、C、D四个请求在异步执行遵循单线程下的协程异步 【方法内】存在同步阻塞导致遇到阻塞时不会主动让出线程必须等待当前方法整体执行完包括先阻塞结束再执行该方法体内阻塞后的代码才会让出线程。
2.2.2 异步方法内部使用异步阻塞
【方法间】不变 【方法内】异步阻塞当某个请求调用执行到异步调用代码(await)时会主动让出线程失去抢占线程的资格由其他异步方法抢占。 1抢占到线程的协程异步请求下的异步方法继续这种操作即遇到异步阻塞就让出线程 2一旦某个方法异步阻塞结束它就会恢复重新抢占线程的资格 3当某个方法体全部执行完后就独立返回不受其他未完成的方法影响。
from fastapi import FastAPI
import uvicorn
# from concurrent.futures import ThreadPoolExecutor
import time
import sys
import os
import asynciosys.path.append(os.getcwd())
app FastAPI(titlettt)app.get(/a)
async def A(a: int):# 任务1a a10print(fA任务1{str(a)})# 任务2a a1print(fA任务2{str(a)})# 任务3# time.sleep(8) # 同步阻塞8sawait asyncio.sleep(8)print(fA任务38s睡好了)# 任务4a a1print(fA任务4{str(a)})app.get(/b)
async def B(a: int):# 任务1a a20print(fB任务1{str(a)})# 任务2a a1print(fB任务2{str(a)})# 任务3# time.sleep(4) # 同步阻塞4sawait asyncio.sleep(4)print(fB任务34s睡好了)# 任务4a a1print(fB任务4{str(a)})app.get(/c)
async def C(a: int):# 任务1a a30print(fC任务1{str(a)})# 任务2a a1print(fC任务2{str(a)})# 任务3a a1print(fC任务3{str(a)})# 任务4a a1print(fC任务4{str(a)})app.get(/d)
async def D(a: int):# 任务1a a40print(fD任务1{str(a)})# 任务2a a1print(fD任务2{str(a)})# 任务3a a1print(fD任务3{str(a)})# 任务4a a1print(fD任务4{str(a)})if __name__ __main__:uvicorn.run(app,host0.0.0.0,port9351)