360免费做网站电话,免费社区建站系统,苏州网站建设推荐q479185700霸屏,化妆品网站设计#x1f3e1; 个人主页#xff1a;IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 #x1f6a9; 私聊博主#xff1a;加入大数据技术讨论群聊#xff0c;获取更多大数据资料。 #x1f514; 博主个人B栈地址#xff1a;豹哥教你大数据的个人空间-豹… 个人主页IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 私聊博主加入大数据技术讨论群聊获取更多大数据资料。 博主个人B栈地址豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 PythonOperator可以调用Python函数由于Python基本可以调用任何类型的任务如果实在找不到合适的Operator将任务转为Python函数使用PythonOperator即可。
关于PythonOperator常用参数如下更多参数可以查看官网airflow.operators.python — Airflow Documentation
python_callable(python callable)调用的python函数op_kwargs(dict)调用python函数对应的 **args 参数dict格式使用参照案例。op_args(list)调用python函数对应的 *args 参数多个封装到一个tuple中list格式使用参照案例。
PythonOperator调度案例
import random
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator# python中 * 关键字参数允许你传入0个或任意个参数这些可变参数在函数调用时自动组装为一个tuple。
# python中 ** 关键字参数允许你传入0个或任意个含参数名的参数,这些关键字参数在函数内部自动组装为一个dict。
def print__hello1(*a,**b):print(a)print(b)print(hello airflow1)# 返回的值只会打印到日志中return{sss1:xxx1}def print__hello2(random_base):print(random_base)print(hello airflow2)# 返回的值只会打印到日志中return{sss2:xxx2}default_args {owner:maliu,start_date:datetime(2021, 10, 1),retries: 1, # 失败重试次数retry_delay: timedelta(minutes5) # 失败重试间隔
}dag DAG(dag_id execute_pythoncode,default_argsdefault_args,schedule_intervaltimedelta(minutes1)
)firstPythonOperator(task_idfirst,#填写 print__hello1 方法时不要加上“()”python_callableprint__hello1,# op_args 对应 print_hello1 方法中的a参数op_args[1,2,3,hello,world],# op_kwargs 对应 print__hello1 方法中的b参数op_kwargs{id:1,name:zs,age:18},dag dag
)secondPythonOperator(task_idsecond,#填写 print__hello2 方法时不要加上“()”python_callableprint__hello2,# random_base 参数对应 print_hello2 方法中参数“random_base”op_kwargs{random_base:random.randint(0,9)},dagdag
)first second