网站做的比较好的公司,在哪个网站做一照一码,外包推广公司,高端手机网站建设SmartETL框架主要采用了面向对象的设计思想#xff0c;将ETL过程中的处理逻辑抽象为Loader和Processor#xff08;对应loader模块和iterator模块#xff09;#xff0c;所有流程组件需要继承或实现DataProvider#xff08;iter方法#xff09;或JsonIterator#xff08;…SmartETL框架主要采用了面向对象的设计思想将ETL过程中的处理逻辑抽象为Loader和Processor对应loader模块和iterator模块所有流程组件需要继承或实现DataProvideriter方法或JsonIteratoron_data或__process__方法。
例如以下代码实现将论文结构中的摘要和正文拼接为一个字符串字段方便后续对论文建立全文索引。
class ConcatPaperContent(JsonIterator):arxiv html页面数据处理类def on_data(self, data: Any, *args):paper data[paper]content if paper:abstract paper.get(abstract)content f{abstract}\nsections paper.get(sections)for section in sections:content f{section[content]}\ndata[content] contentreturn data然而业务中很多处理逻辑比较简单以往开发时用少数几行代码就可以搞定而在SmartETL框架中则必须实现一个类正如上面的例子所示。虽然SmartETL支持加载外部包的组件只要在sys.path中但如果是需要定制开发则相对繁琐。
此前在过滤组件Filter中考虑到这种情况解决办法是在流程中定义Lambda表达式。例如以下流程定义中filter节点通过Lambda表达式abnormal_time实现过滤publish_time字段值小于当前时间的记录的功能即对于经过filter节点的记录仅当其publish_time字段值大于等于当前时间current时才会输出给后续节点。
nodes:current: util.dates.current_ts(True)abnormal_time: lambda t, currentcurrent: t current filter: Filter(abnormal_time, keypublish_time)为了简化业务代码编写SmartETL新增实现函数式组件即以函数形式提供核心处理逻辑而不需要封装成类。Lambda表达式就是一种特殊的函数。
跟C/C、Java不同Python语言中函数是一等公民即开发者可以直接访问和操作函数支持将函数作为一个对象进行加载、传递和管理这对于开发一些高级功能提高程序扩展性非常方便。
SmartETL函数式组件是指将任意编写的数据处理函数作为ETL流程组件加入到流程处理中。唯一的限制是除了作为Loader组件的函数外框架无法提供输入函数应该以流程数据作为输入参数并将需要向后续流程传递的数据作为输出参数。以下表格说明了函数的参数与节点类型作用的对应关系
节点类型是否支持输入是否要求有输出Loader节点否可通过配置提供是Processor节点是流程数据作为第一个参数均可
为了使用函数对象框架设计了函数式Loader组件Function如下
class Function(DataProvider):函数调用包装器 提供调用函数的结果def __init__(self, function, *args, **kwargs)::param function 函数对象或函数对象的完整限定名如wikidata_filter.util.files.get_linesassert function is not None, function is None!if isinstance(function, str):from wikidata_filter.util.mod_util import load_clsfunction load_cls(function)[0]self.function functionself.args argsself.kwargs kwargsdef iter(self):DataProvider的主要API对提供函数进行调用# 注意使用了组件构造参数res self.function(*self.args, **self.kwargs)if isinstance(res, GeneratorType):for item in res:yield itemelse:yield res类似的框架实现了Function(JsonIterator)。常用的Map组件也支持提供函数对象或函数对象完整限定名。
基于函数式组件对本文开头的示例进行改写代码如下
def concat_paper_content(paper: dict):paper paper or {}abstract paper.get(abstract)content f{abstract}\nsections paper.get(sections)for section in sections:content f{section[content]}\nreturn content在yaml流程中进行引用如下所示
nodes:concat: Map(gestata.arxiv.concat_paper_content)或者
nodes:concat: Function(wikidata_filter.gestata.arxiv.concat_paper_content)流程说明通过yaml流程文件将concat_content函数与Map进行绑定假设该函数定义在wikidata_filter.gestata.arxiv模块中实现对基于paper的处理并将函数调用返回值作为content字段值。
注意为了支持Function使用自定义组件可能在任意sys.path可访问模块需要提供完整的函数对象限定名本示例中包括顶层模块wikidata_filter。
那么Map与Function有什么区别呢主要区别是Map主要是为了支持wikidata_filter.gestata和wikidata_filter.util模块中定义的函数且支持指定要处理的字段通过key参数和目标字段通过target_key参数。
从示例中可以看出使用函数式组件至少有几点好处
代码更简洁只需要实现一个提供核心处理逻辑的函数即可。配置更加灵活通过流程指定输入字段和输出字段可以灵活适配不同业务数据。复用性更好可以通过代码或yaml配置进行复用。
在此前arXiv论文数据处理应用流程中大量采用了函数式组件。具体可查看https://github.com/ictchenbo/SmartETL/blob/main/wikidata_filter/gestata/arxiv.py了解详情。