网站建设的论文范文,深圳做营销网站制作,晋江网络推广公司,小程序小游戏开发文章目录 前言发现问题分析问题阻塞#xff1f;阻塞#xff01;附#xff1a;用什么阻塞streamlit进程 前言
从来没想过在接入通义千问的时候还会遇到NotImplementedError。实在难以理解#xff0c;处理过后才明白问题。现在总结后给出结果。 发现问题
我们来看个例子。就… 文章目录 前言发现问题分析问题阻塞阻塞附用什么阻塞streamlit进程 前言
从来没想过在接入通义千问的时候还会遇到NotImplementedError。实在难以理解处理过后才明白问题。现在总结后给出结果。 发现问题
我们来看个例子。就比如这段代码 摘自ranying666/langchain_tongyi_practical中的5-langchain-search-document_loaders.py loader AsyncChromiumLoader([https://dataea.cn/okr-keyresult-checklist/])
html loader.load()
html2text Html2TextTransformer()
docs_transformed html2text.transform_documents(html)如果把这段直接接入streamlit那就直接报错
NotImplementedError
比较离谱的是只有这么一个错误没有其他信息。
分析问题
很难理解为什么是这个错。
随着排查的进行发现问题好像出在AsyncChromiumLoader中。
AsyncChromiumLoader继承自BaseLoader而BaseLoader的load方法中调用的却是lazy_load。先不用看这个方法的具体内容就看这个方法的名字你大概也能猜出来什么问题了
懒加载带来的未能实例化的问题。
简单地说就是streamlit已在前面飞loader.load()还在后面追。终于追不上了就爆炸了。而刚好的是lazy_load中抛出的异常就是这个NotImplementedError。
众所周知streamlit构建网页的过程是单线程的。
所以当我们需要请求内容的时候使用异步请求的AsyncChromiumLoader就会出现这种问题。
那么该怎么办呢
阻塞
怎么办呢阻塞对吧很容易想到。
于是会想当然的这么用
loader AsyncChromiumLoader([https://dataea.cn/okr-keyresult-checklist/])
html loader.load()
while html is None:time.sleep(1)
html2text Html2TextTransformer()
docs_transformed html2text.transform_documents(html)看着很直观检测html是否有返回值。
如果真这么简单的话我也不会把它写在这里♂️。
结果就是还是报错。这又是为什么呢逻辑没问题呀
逻辑是没问题问题出在底层。作为一个异步函数他怎么可能没有返回值呢
我们来回顾一下load方法
# Sub-classes should not implement this method directly. Instead, they should implement the lazy load method.
def load(self) - List[Document]:Load data into Document objects.return list(self.lazy_load())那么lazy_load是怎么回事呢
def lazy_load(self) - Iterator[Document]:A lazy loader for Documents.if type(self).load ! BaseLoader.load:return iter(self.load())raise NotImplementedError(f{self.__class__.__name__} does not implement lazy_load())这里比较有意思的是对实例化的AsyncChromiumLoader对象就是这个self判断AsyncChromiumLoader.load与BaseLoader.load是否一致。
其实这里比较的是地址信息因为子类如果重写了这个load方法那么地址就会被改变。如果不一致的话就会返回一个迭代器这个迭代器就是为了后续过程中无论返回的是否是list都能够迭代。
听起来没问题。
但是懒加载呢
async def aload(self) - List[Document]:Load data into Document objects.return [document async for document in self.alazy_load()]async def alazy_load(self) - AsyncIterator[Document]:A lazy loader for Documents.iterator await run_in_executor(None, self.lazy_load)done object()while True:doc await run_in_executor(None, next, iterator, done) # type: ignore[call-arg, arg-type]if doc is done:breakyield doc # type: ignore[misc]比较神奇的就出现在这里了。alazy_load方法最终给出来的就是一个Document类的迭代器然后最终通过yield给到调用方直到doc在迭代过程中达到了done。
但是呢doc变量的结果是await run_in_executor(None, next, iterator, done)即使run_in_executor返回的是一个迭代器对象最终由await进行处理所以是有返回值的但是返回的是未来需要返回的是asyncio.futures.Future类。这一点完全可以类比Java中的Future对象。
所以最终而言AsyncChromiumLoader.load并不是直到结束才返回值而是在执行的过程中不断地通过yield给出返回值只是在await最终处理为AsyncIterator[Document]类型。
阻塞
为了让streamlit等待异步请求就需要主线程停下来直到请求结束了才能继续执行。
那这回该怎么办呢直接用asyncio与playwright给阻塞掉。
首先我们需要利用asyncio创建一个阻塞事件并设置所有的事件都需要在阻塞事件结束后执行。
其次在执行这个阻塞之间的时候我们依然使用异步请求只不过是所有的事件都在等我们。
于是可以给出代码如下
import asyncio
import platform
from playwright.async_api import async_playwright
# 阻塞事件
async def fetch_page_content(url):async with async_playwright() as p:browser await p.chromium.launch()page await browser.new_page()await page.goto(url)content await page.content()await browser.close()return content
# 阻塞主进程
st.cache_data
def load_documents(url):loop Noneif platform.system() Windows:loop asyncio.ProactorEventLoop()elif platform.system() Linux:loop asyncio.new_event_loop()elif platform.system() Darwin:loop asyncio.SelectorEventLoop()else:return Nonehtml_content loop.run_until_complete(fetch_page_content(url))html2text Html2TextTransformer()document Document(page_contenthtml_content)docs_transformed list(html2text.transform_documents([document]))return docs_transformed其实这里面最核心的就是asyncio下的run_until_complete函数
def run_until_complete(self, future):Run until the Future is done.If the argument is a coroutine, it is wrapped in a Task.WARNING: It would be disastrous to call run_until_complete()with the same coroutine twice -- it would wrap it in twodifferent Tasks and that cant be good.Return the Futures result, or raise its exception.self._check_closed()self._check_running()new_task not futures.isfuture(future)future tasks.ensure_future(future, loopself)if new_task:# An exception is raised if the future didnt complete, so there# is no need to log the destroy pending task messagefuture._log_destroy_pending Falsefuture.add_done_callback(_run_until_complete_cb)try:self.run_forever()except:if new_task and future.done() and not future.cancelled():# The coroutine raised a BaseException. Consume the exception# to not log a warning, the caller doesnt have access to the# local task.future.exception()raisefinally:future.remove_done_callback(_run_until_complete_cb)if not future.done():raise RuntimeError(Event loop stopped before Future completed.)return future.result()这个函数最大的特点就是会创建一个任务并执行。直到任务执行完成或者报错中断之前其他所有任务都得等着这个任务的回调函数。
于是这个函数就阻塞了streamlit的进程直到异步任务完成。
附用什么阻塞streamlit进程
其实这段文字本来应该接在上面这段的。但是这个坑实在太神奇了单独拉出来说明。
这里面还有一个很神奇的坑用什么东西阻塞。
就像上面这段代码针对Windows、Linux、Darwin分别采用了ProactorEventLoop、new_event_loop、SelectorEventLoop阻塞streamlit进程。
如果在Linux平台中使用ProactorEventLoop那么streamlit进程依然不会阻塞因为他们都只能在各自的操作系统中起作用。