长清网站建设费用,重庆网站设计公司网站制作,浏阳网站定制,可以搜索附近手机的软件在上一篇Luigi的线性调度文章中#xff08;Luigi任务调度框架学习1#xff1a;线性调用流程#xff09;#xff0c;我们知道Task运行的时候#xff1a; 每个任务是否完成有两次判定#xff0c;即#xff1a;进行判定(未完成) 》运行def run(self)函数 》进行判定(完成) …在上一篇Luigi的线性调度文章中Luigi任务调度框架学习1线性调用流程我们知道Task运行的时候 每个任务是否完成有两次判定即进行判定(未完成) 》运行def run(self)函数 》进行判定(完成) 》运行后续的Task如果第一次判定就完成则不会执行当前Task的def run(self)函数 但是Luigi支持的判定条件只有文件与SQL在有些情况下尤其是定时任务我们希望它梳理并运行整个拓扑而不是根据判定去决定是否运行因此本文来解决这个问题让我们能够直接运行Task 文章目录 解决方法问题解析1. Luigi多Task多次运行效果2. 无限调用Task示例程序 解决方法
博主写了一个工具类
import luigi
import osclass NeverStopLuigi(luigi.Task):无限执行Task的工具类never_stop luigi.BoolParameter(True)def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self._out_put_file self.__class__.__name__ # 用当前类名作为无限重启确认的文件名if self.never_stop:outputs luigi.task.flatten(self.output())for out in outputs:if isinstance(out, luigi.LocalTarget) and out.exists():os.remove(self.output().path)def run(self):with open(self._out_put_file, w) as f:f.write(.)def output(self):return luigi.LocalTarget(self._out_put_file)之后程序只需要继承NeverStopLuigi类后覆写两个函数def requires(self)与def run(self)即可运行
class MyTask(NeverStopLuigi):def requires(self): # 1. 用于定义前置程序print(f运行MyTask的 requires程序)return []def run(self): # 2. 这个Task运行的主程序super(MyTask, self).run() # 调用NeverStopLuigi的run函数print(f运行MyTask的 主程序 )在这个工具类中主要内容有
依靠初始化Task时删除掉检测的文件来实现首次检测output不通过调用run函数先写文件实现二次检测output通过
至于用于检测的文件名用最不容易重名的类名self.__class__.__name__来作为中间的文件名这里是为了文件能够循环执行而不用uuid这种随意生成的文件最后无法删除其实是可以在最后一个Task通过获取前面的所有output删除的但没有直接继承的写法简洁明了
问题解析
1. Luigi多Task多次运行效果
import luigiclass PreClass(luigi.Task):out_file pre_class.textdef requires(self): # 所需的前置函数也可以不写默认为空print(f运行PreClass的 requires程序)return []def output(self): # 完成的依据print(f运行PreClass的 output程序)return luigi.LocalTarget(self.out_file) # 判断这一部分是否完成会去查看是否有out_file文件pre_class.textdef run(self): # 这一部分执行的主函数print(f运行PreClass的 主程序 )with open(self.out_file, a) as file:file.write(啊哈哈哈)class Run1Class(luigi.Task):out_file run_1_class.text# 这里不覆写 output是默认空def requires(self): # 所需的前置函数print(f运行Run1Class的 requires程序)return [PreClass()]def output(self): # 完成的依据print(f运行Run1Class的 output程序)return luigi.LocalTarget(self.out_file) # 判断这一部分是否完成会去查看是否有out_file文件pre_class.textdef run(self): # 这一部分执行的主函数print(f运行Run1Class 主程序 )with open(self.out_file, a) as file:file.write(啊哈哈哈)class Run2Class(luigi.Task):out_file run_2_class.text# 这里不覆写 output是默认空def requires(self): # 所需的前置函数print(f运行Run2Class的 requires程序)return [Run1Class()]def run(self): # 这一部分执行的主函数print(f运行Run2Class 主程序 )with open(self.out_file, a) as file:file.write(啊哈哈哈)if __name__ __main__:luigi.build([Run2Class()], local_schedulerTrue) # 只写入最后输入的数据即可
最后的输出效果如下 Luigi Execution Summary Scheduled 3 tasks of which:
* 3 ran successfully:- 1 PreClass()- 1 Run1Class()- 1 Run2Class()This progress looks :) because there were no failed tasks or missing dependencies Luigi Execution Summary 运行Run2Class的 requires程序
运行Run1Class的 output程序
运行Run1Class的 requires程序
运行PreClass的 output程序
运行PreClass的 requires程序
运行PreClass的 requires程序
运行PreClass的 主程序
运行Run1Class的 requires程序
运行PreClass的 output程序
运行Run1Class 主程序
运行Run2Class的 requires程序
运行Run1Class的 output程序
运行Run2Class 主程序 调度顺序如上所示
2. 无限调用Task示例程序
import luigi
import osclass NeverStopLuigi(luigi.Task):工具类never_stop luigi.BoolParameter(True)def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self._out_put_file self.__class__.__name__ # 用当前类名作为无限重启确认的文件名if self.never_stop:outputs luigi.task.flatten(self.output())for out in outputs:if isinstance(out, luigi.LocalTarget) and out.exists():os.remove(self.output().path)def run(self):with open(self._out_put_file, w) as f:f.write(.)def output(self):return luigi.LocalTarget(self._out_put_file)class PreClass(NeverStopLuigi):def requires(self): # 所需的前置函数也可以不写默认为空print(f运行PreClass的 requires程序)return []def run(self): # 这一部分执行的主函数super(PreClass, self).run()print(f运行PreClass的 主程序 )class Run1Class(NeverStopLuigi):def requires(self): # 所需的前置函数print(f运行Run1Class的 requires程序)return [PreClass()]def run(self): # 这一部分执行的主函数super(Run1Class, self).run()print(f运行Run1Class 主程序 )class Run2Class(NeverStopLuigi):def requires(self): # 所需的前置函数print(f运行Run2Class的 requires程序)return [Run1Class()]def run(self): # 这一部分执行的主函数super(Run2Class, self).run()print(f运行Run2Class 主程序 )if __name__ __main__:luigi.build([Run2Class()], local_schedulerTrue) # 只写入最后输入的数据即可
如上程序不论运行多少次都会得到
运行Run2Class的 requires程序
运行Run1Class的 requires程序
运行PreClass的 requires程序
运行PreClass的 requires程序
运行PreClass的 主程序
运行Run1Class的 requires程序
运行Run1Class 主程序
运行Run2Class的 requires程序
运行Run2Class 主程序