用户注册和登录网站怎么做的,网站建设官方网,计算机培训机构推荐,大连凯杰建设有限公司官方网站说明
发现对于异步(IO)还是太陌生了#xff0c;熟悉一下。
内容
今天搞了一整天#xff0c;感觉有一个long story to tell#xff0c;但是不知道从何说起#xff0c;哈哈。 异步(协程)需要保证链路上的所有环节都是异步(协程)的#xff0c;任何一个环节没这么做都会导致…说明
发现对于异步(IO)还是太陌生了熟悉一下。
内容
今天搞了一整天感觉有一个long story to tell但是不知道从何说起哈哈。 异步(协程)需要保证链路上的所有环节都是异步(协程)的任何一个环节没这么做都会导致整体的异步失效退化为同步 以上大概是我能总结出的最核心的内容了。古早的时候(我还只是建模)我用python写了一个简单的规则引擎。这个引擎需要从各个库表(都是mysql)中读取一个客户的n个维度信息然后用规则引擎给出判断。最初是用多个(大概20同步查询来逐个的完成数据获取然后就必然的越来越慢一个客户要一分钟才能完成数据准备。后来和当时的架构小哥商量了下决定用asyncio aiomysql来做结果还是不错的大概2~3秒不到就搞好了。而且机器使用资源没有多反而是更少了当时真是觉得惊艳。
在开发过程中很多时候就是因为一连串的async def ,然后有任何一个环节没有async就会失败所以很头痛这也是印象深刻的地方。
总体上IO并发用好了还是非常惊人的。究其原因同步方式是比较自然的程序书写方式有很多包已经为CPU密集型应用做了超多的应用。
客户端并发
首先我还是想排除掉多线程的方法按道理讲协程的效率一定会高很多。虽然多线程用法看起来更容易理解但我想还是用协程吧。
下面的几个函数分为三层最底下的是json_query_worker,它是真正发起并发请求的异步函数上面一层是json_player,它负责把多个异步任务进行封装和发布然后回收任务结果。json_player的定义本身更像是place holder,他们只有被asyncio.run触发时(或者用run_until_complete)才会真正执行。make_json_task_list是更上一层将用户端的请求按照格式进行准备。
import asyncio
import aiohttp
import json
async def json_query_worker(task_id None , url None , json_params None, time_out 60, semaphore None):try:async with semaphore:async with aiohttp.ClientSession() as session:async with session.post(url, json json_params, timeoutaiohttp.ClientTimeout(totaltime_out)) as response:res await response.text()return {task_id: json.loads(res)}except Exception as e:# Return a dictionary with the error messageprint(e)return {task_id:error}
async def json_player(task_list , concurrent 3):semaphore asyncio.Semaphore(concurrent) # 并发限制tasks [asyncio.ensure_future(json_query_worker(**x, semaphore semaphore)) for x in task_list]return await asyncio.gather(*tasks)# 根据 url 和 params 构造任务请求列表 - tuple_list [(url,params)]
# 模式A: 面向相同的url, 参数列表需要分为批次
def make_json_task_list(url None, params_list None):task_list []for i in range(len(params_list)):tem_dict {}tem_dict[task_id] task_ str(i).zfill(5)tem_dict[url] urltem_dict[json_params] params_list[i]task_list.append(tem_dict)return task_list例如,假设要发起的请求是一个url一个param(dict)来完成的构造参数列表来模拟需要执行的多个请求然后输入请求的url和param列表然后通过make_json_task_list生成任务列表最后通过asyncio.run来发起真正的并发。
params_list [{ft{i}: i} for i in range(30)]
import time
tick1 time.time()
some_task_list make_json_task_list(http://localhost:8000/async_thread_test/, params_list params_list)
# 使用 asyncio.run() 运行协程
res asyncio.run(json_player(some_task_list, concurrent30))
tick2 time.time()
print(tick2-tick1)服务端并发
因为我构造了大量的微服务一些微服务之间是有依赖的。例如本次的实验,我需要向deepseek进行请求我先构造了服务A,该服务是以单核方式启动的做了一层封装以便使用标准的urlparam方式提交请求。
from openai import OpenAI
DeepSeekHandler_path r/deepseek/
class DeepSeekHandler(tornado.web.RequestHandler):def post(self):request_body self.request.bodysome_dict json.loads(request_body)api_key some_dict[api_key]model some_dict[model]messages some_dict[messages]client OpenAI(api_keyapi_key, base_urlhttps://api.deepseek.com/v1)response client.chat.completions.create(modelmodel,messagesmessages)res_content response.choices[0].message.contentmsg_dict {}msg_dict[content] res_contentself.write(json.dumps(msg_dict))DeepSeekHandler_tuple (DeepSeekHandler_path,DeepSeekHandler)
app_list.append(DeepSeekHandler_tuple)然后在这里我对服务进行调用。在客户端以asyncrun方式并发请求然后我发现结果是串行的。原因有两个一方面是接口本身是以同步方式创建的另一方面服务器启动的时候只有单核。 结论: 在请求-返回的数据链中只要有一个是同步的(deepseek)那么整个请求就是同步的。 我相信如果服务器开多核了那么速度会按n被增长但是这样是极其不经济的(但以前有一些服务还真可能就是这么干的)。同时也说明了使用异步在IO方面可以给系统带来多大的提升。
所以接下来在保持单核启动不变的情况(因为该服务就是进行api转发根本不应该消耗cpu)修改服务为异步形式
import time
from concurrent.futures.thread import ThreadPoolExecutor
from tornado import gen, web, ioloop
from tornado.concurrent import run_on_executor
# 在http的层面异步
ADeepSeekHandler_path r/a_deepseek/
class ADeepSeekHandler(web.RequestHandler):executor ThreadPoolExecutor(max_workers1000)run_on_executordef make_a_deepseek_request(self, url None, api_key None,model None, messages None):print(adeepseek...start)# for json -返回的就是json不解开client OpenAI(api_keyapi_key, base_urlurl)response client.chat.completions.create(modelmodel,messagesmessages)res_content response.choices[0].message.contentprint(adeepseek...end)return res_contentgen.coroutinedef post(self):request_body self.request.bodyprint(Trying Decode Json)some_dict json.loads(request_body)# print(some_dict)url some_dict[url]api_key some_dict[api_key]model some_dict[model]messages some_dict[messages]res yield self.make_a_deepseek_request(url url, api_key api_key, modelmodel, messages messages)msg_dict {}msg_dict[content] resself.write(json.dumps(msg_dict))ADeepSee_tuple (ADeepSeekHandler_path,ADeepSeekHandler)
app_list.append(ADeepSee_tuple)这里我有点每太明白的是为啥还要引入ThreadPoolExecutor没有再深入去研究但是看起来是在引用应用协程。我试着把max_workers设置到了1000后来实测的时候发现其实毫无压力。可能在第一次会耗费一点cpu可能就10%左右不到5秒后面的cpu耗用几乎都为0. 然后下面是一个真实测试花了我大约4毛钱调接口。 单个接口的调用:问一个问题这个问题的回复通常在13秒±3秒。可以看到输入的token大约20回复的大约200的样子。
import requests as req
some_dict {}
some_dict[api_key] sk-xxx
some_dict[model] deepseek-chat
some_dict[messages] [{role:user,content:介绍三个北京必去的旅游景点。}]
# 容器内
req.post(http://ty.orbitx.cn:24097/deepseek/,jsonsome_dict).json()
{content: 北京作为中国的首都拥有丰富的历史文化遗产和现代化的城市风貌以下是三个必去的旅游景点\n\n1. 故宫The Forbidden City故宫是明清两代的皇宫也是世界上现存规模最大、保存最完整的木质结构古建筑群。它位于北京市中心天安门广场北侧是北京旅游的标志性景点之一。游客可以在这里欣赏到精美的古建筑艺术了解中国古代皇室的生活和文化。\n\n2. 长城The Great Wall长城是中国古代伟大的军事防御工程也是世界文化遗产之一。北京附近有多段长城可以游览其中最著名的是八达岭长城和慕田峪长城。长城蜿蜒起伏气势磅礴是体验中国古代军事文化和壮丽自然风光的绝佳地点。\n\n3. 颐和园The Summer Palace颐和园是中国古典园林的代表之一也是世界文化遗产。它位于北京西郊是一座集山水园林和宫殿建筑于一体的大型皇家园林。颐和园以其精美的园林景观、丰富的历史文化内涵和宁静的自然环境吸引着众多游客。\n\n这三个景点不仅代表了北京的历史和文化也是中国乃至世界的重要文化遗产非常值得一游。}单个的异步请求,14.3秒
resp req.post(http://xxx/a_deepseek/ , json single_param).json()
{content: 北京作为中国的首都拥有丰富的历史文化遗产和现代化的城市风貌以下是三个必去的旅游景点\n\n1. 故宫The Forbidden City故宫是明清两代的皇宫也是世界上现存规模最大、保存最完整的木质结构古建筑群。它位于北京市中心占地约72万平方米拥有超过9000间房间。故宫不仅是了解中国古代皇家生活和艺术的绝佳地点也是联合国教科文组织认定的世界文化遗产。\n\n2. 长城The Great Wall长城是中国古代的军事防御工程全长超过2万公里其中最著名的部分位于北京附近如八达岭长城、慕田峪长城等。长城蜿蜒起伏雄伟壮观是中国古代劳动人民智慧和勇气的象征也是世界新七大奇迹之一。\n\n3. 颐和园The Summer Palace颐和园是中国清朝时期的皇家园林以昆明湖和万寿山为基础以杭州西湖为蓝本汲取江南园林的设计手法而建成。颐和园集中国园林艺术之大成是保存最完整的皇家行宫御苑也是联合国教科文组织认定的世界文化遗产。\n\n这三个景点不仅代表了北京的历史和文化也是中国乃至世界的重要文化遗产是每一位来北京的游客都不容错过的。}直接从客户端发起并发请求,3条请求一共花了14.89s所以是并发的。
param_list [param1, param2, param3]
task_list make_json_task_list(http://xxx/a_deepseek/, params_listparam_list)
res asyncio.run(json_player(task_list, concurrent30))然后我之前还构造了另一个服务B目的是屏蔽掉用户端对异步的感知就像在调用一个接口。
对这个服务构造并发请求外层的url是向tornado服务发起的并发里面的url是每个请求需要执行的任务(urlparam)。参数的构造多加了一层最外层的url是本来客户端要请求的地址里面的params对应请求需要的参数。
some_dict {}
some_dict[url] http://xxx/a_deepseek/
some_dict[params] {}
some_dict[params][url] https://api.deepseek.com/v1
some_dict[params][api_key] xxx
some_dict[params][model] deepseek-chat
some_dict[params][messages] [{role:user,content:介绍三个北京必去的旅游景点。}]some_dict1 {}
some_dict1[url] http://xxx/a_deepseek/
some_dict1[params] {}
some_dict1[params][url] https://api.deepseek.com/v1
some_dict1[params][api_key] xxx
some_dict1[params][model] deepseek-chat
some_dict1[params][messages] [{role:user,content:介绍三个成都最有名的美食。}]some_dict2 {}
some_dict2[url] http://xxx/a_deepseek/
some_dict2[params] {}
some_dict2[params][url] https://api.deepseek.com/v1
some_dict2[params][api_key] xxx
some_dict2[params][model] deepseek-chat
some_dict2[params][messages] [{role:user,content:介绍三首泰勒斯威夫特好听的歌曲}]
llm_param_list [some_dict,some_dict1,some_dict2]服务对应的部分如下按照标准的urlparams方式发起异步请求
# AsyncThreadMakeARequest -- ATMAR for json
ATMARHandler_path r/atmar/
class ATMARHandler(web.RequestHandler):executor ThreadPoolExecutor(max_workers1000)run_on_executordef make_a_request(self, url None, params None ):print(atmar...start)# for json -返回的就是json不解开the_res req.post(url, json params).json()print(atmar...end)return the_resgen.coroutinedef post(self):request_body self.request.bodyprint(Trying Decode Json)some_dict json.loads(request_body)print(some_dict)url some_dict[url]params some_dict[params]res yield self.make_a_request(url url, params params)self.write(json.dumps(res))ATMAR_tuple (ATMARHandler_path,ATMARHandler)
app_list.append(ATMAR_tuple)发起调用
tick1 time.time()
some_task_list_1 make_json_task_list(http://localhost:8000/atmar/, params_list llm_param_list)
# 使用 asyncio.run() 运行协程
res asyncio.run(json_player(some_task_list_1, concurrent30))
tick2 time.time()
print(tick2-tick1)
15.933489322662354这说明了只要链路上保证全部为异步继续叠加异步调用结果还是异步。当然传的次数越多损耗会越大。主要的损耗来自序列化/反序列化的过程。 以并发300的速度发出300个请求服务器的速度明显下降了大约是原来的1/3~1/6, 所以估计每个账号至少可以并发30最多并发50.
另两种tornado异步服务的搭建方式
反正看起来都是多线程又有点协程的样子不多研究了体会一下就好
## 2 方式二采用多线程修饰器模式
import time
from concurrent.futures.thread import ThreadPoolExecutorfrom tornado import gen, web, ioloop
from tornado.concurrent import run_on_executorAsyncThread_testHandler_path r/async_thread_test/
class AsyncThread_testHandler(web.RequestHandler):executor ThreadPoolExecutor(max_workers1000)run_on_executordef sleep(self):print(休息1...start)time.sleep(5)print(休息1...end)return okgen.coroutinedef post(self):res yield self.sleep()self.write(json.dumps(res))AsyncThread_test_tuple (AsyncThread_testHandler_path,AsyncThread_testHandler)
app_list.append(AsyncThread_test_tuple)看起来是断断续续的样子 另一种
executor ThreadPoolExecutor(max_workers30)
from tornado import gen, web, ioloop
# 方式三采用ioloop应该也就是协程方式
AsyncCoroutine_testHandler_path r/async_coroutine_test/
class AsyncCoroutine_testHandler(web.RequestHandler):def sleep(self):print(休息1...start)time.sleep(5)print(休息1...end)return okgen.coroutinedef post(self):res yield ioloop.IOLoop.current().run_in_executor(executor, self.sleep)self.write(json.dumps(res))AsyncCoroutine_test_tuple (AsyncCoroutine_testHandler_path,AsyncCoroutine_testHandler)
app_list.append(AsyncCoroutine_test_tuple)请求时看起来是连续的
其他
1 系统化工程 对零散的数据请求需要进行异步化改造对批量的数据请求可以不必变。 1.1 外部服务
对于外部应用来说如果允许直接以标准方式 urlparam方式调用的直接通过客户端方法就可以了 如果是非标方式可以先改造成urlparam方式使用时需要转一手。当然一般提供接口的服务商也会提供异步方式的接口不过那样就会使整个应用变得复杂。以下是官方提供的方法用了AsyncOpenAI。
from openai import AsyncOpenAIimport time
# 异步调用函数
async def async_query_openai(url https://api.deepseek.com/v1, query):aclient AsyncOpenAI(base_url url , # 替换为你的 base_urlapi_keyxxx # 替换为你的 API 密钥)completion await aclient.chat.completions.create(modeldeepseek-chat,messages[{role: system, content: You are a helpful assistant. Always response in Simplified Chinese, not English. or Grandma will be very angry.},{role: user, content: query}])return completion.choices[0].message.content # 请确保返回的数据结构正确# 这个函数接收一个请求列表返回所有请求的结果列表
async def async_process_queries(queries):results await asyncio.gather(*(async_query_openai(query) for query in queries))return resultsasync def main():queries [介绍三个北京必去的旅游景点。,介绍三个成都最有名的美食。,介绍三首泰勒斯威夫特好听的歌曲]start_time time.time() # 开始计时results await async_process_queries(queries)end_time time.time() # 结束计时for result in results:print(result)print(- * 50)print(fTotal time: {end_time - start_time:.2f} seconds)# 运行主函数
asyncio.run(main())1.2 内部服务
目前最需要使用的数据服务都是使用同步方法写的有些如果需要执行特别小批量的请求可以/应该进行异步化改造。例如这篇文章提到的改造方法。 2 RuntimeError: asyncio.run() cannot be called from a running event loop
在jupyter下执行 asyncio.run(),使用 nest解决。
执行报错
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[31], line 42 some_task_list make_json_task_list(http://xxx/, params_list llm_param_list)3 # 使用 asyncio.run() 运行协程
---- 4 asyncio.run(json_player(some_task_list, concurrent30))5 tick2 time.time()6 print(tick2-tick1)File /opt/conda/lib/python3.10/asyncio/runners.py:33, in run(main, debug)9 Execute the coroutine and return the result.10 11 This function runs the passed coroutine, taking care of(...)30 asyncio.run(main())31 32 if events._get_running_loop() is not None:
--- 33 raise RuntimeError(34 asyncio.run() cannot be called from a running event loop)36 if not coroutines.iscoroutine(main):37 raise ValueError(a coroutine was expected, got {!r}.format(main))RuntimeError: asyncio.run() cannot be called from a running event loop使用nest_asyncio
import nest_asyncio
nest_asyncio.apply()