当前位置: 首页 > news >正文

怎么去投诉做网站的公司最新任免名单最新

怎么去投诉做网站的公司,最新任免名单最新,网络股权设计培训课程,手机数据线东莞网站建设Airflow 入门及使用什么是 Airflow#xff1f;Airflow 是一个使用 Python 语言编写的 Data Pipeline 调度和监控工作流的平台。Airflow 是通过 DAG(Directed acyclic graph 有向无环图)来管理任务流程的任务调度工具#xff0c;不需要知道业务数据的具体内容#xff0c;设置…Airflow 入门及使用什么是 AirflowAirflow 是一个使用 Python 语言编写的 Data Pipeline 调度和监控工作流的平台。Airflow 是通过 DAG(Directed acyclic graph 有向无环图)来管理任务流程的任务调度工具不需要知道业务数据的具体内容设置任务的依赖关系即可实现任务调度。这个平台拥有和 Hive、Presto、MySQL、HDFS、Postgres 等数据源之间交互的能力并且提供了钩子(hook)使其拥有很好地扩展性。除了使用命令行该工具还提供了一个 WebUI 可以可视化的查看依赖关系、监控进度、触发任务等。Airflow 的架构在一个可扩展的生产环境中Airflow 含有以下组件元数据库这个数据库存储有关任务状态的信息。调度器Scheduler 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。调度器通常作为服务运行。执行器Executor 是一个消息队列进程它被绑定到调度器中用于确定实际执行每个任务计划的工作进程。有不同类型的执行器每个执行器都使用一个指定工作进程的类来执行任务。例如LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务。Workers这些是实际执行任务逻辑的进程由正在使用的执行器确定。Airflow 解决哪些问题通常在一个运维系统数据分析系统或测试系统等大型系统中我们会有各种各样的依赖需求。包括但不限于时间依赖任务需要等待某一个时间点触发。外部系统依赖任务依赖外部系统需要调用接口去访问。任务间依赖任务 A 需要在任务 B 完成后启动两个任务互相间会产生影响。资源环境依赖任务消耗资源非常多 或者只能在特定的机器上执行。crontab 可以很好地处理定时执行任务的需求但仅能管理时间上的依赖。Airflow 是一种 WMS即它将任务以及它们的依赖看作代码按照那些计划规范任务执行并在实际工作进程之间分发需执行的任务。Airflow 提供了一个用于显示当前活动任务和过去任务状态的优秀 UI并允许用户手动管理任务的执行和状态。Airflow 中的工作流是具有方向性依赖的任务集合。具体说就是 Airflow 的核心概念 DAG(有向无环图)—— 来表现工作流。DAG 中的每个节点都是一个任务DAG 中的边表示的是任务之间的依赖(强制为有向无环因此不会出现循环依赖从而导致无限执行循环)。Airflow 在 ETL 上的实践ETL是英文 ExtractTransformLoad 的缩写用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。ETL 一词较常用在数据仓库Airflow 在解决 ETL 任务各种依赖问题上的能力恰恰是我们所需要的。在现阶段的实践中我们使用 Airflow 来同步各个数据源数据到数仓同时定时执行一些批处理任务及带有数据依赖、资源依赖关系的计算脚本。本文立意于科普介绍故在后面的用例中只介绍了 BashOperatorPythonOperator这俩个最为易用且在我们日常使用中最为常见的 Operator。Airflow 同时也具有不错的集群扩展能力可使用 CeleryExecuter 以及多个 Pool 来提高任务并发度。Airflow在 CeleryExecuter 下可以使用不同的用户启动 Worker不同的 Worker 监听不同的 Queue这样可以解决用户权限依赖问题。Worker 也可以启动在多个不同的机器上解决机器依赖的问题。Airflow 可以为任意一个 Task 指定一个抽象的 Pool每个 Pool 可以指定一个 Slot 数。每当一个 Task 启动时就占用一个 Slot当 Slot 数占满时其余的任务就处于等待状态。这样就解决了资源依赖问题。Airflow 安装及初始化假设你已经安装好了 Python 及配置好了其包管理工具 pip。pip install apache-airflow# 初始化数据库airflow initdb# 上面的命令默认在家目录下创建 Airflow 文件夹和相关配置文件# 也可以使用以下命令来指定目录export AIRFLOW_HOME{yourpath}/airflow# 配置数据库# vim airflow/airflow.cfg# 修改 sql_alchemy_conn# 守护进程运行 webserver默认端口为8080也可以通过-p来指定端口airflow webserver -D# 守护进程运行调度器airflow scheduler -D定义第一个 DAG在 AIRFLOW_HOME 目录下新建 DAGs 文件夹后面的所有 DAG 文件都要存储在这个目录。新建 demo.py语句含义见注释。from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.utils.dates import days_agofrom airflow.operators.bash_operator import BashOperatorfrom airflow.operators.python_operator import PythonOperatorfrom airflow.operators.dummy_operator import DummyOperatordef default_options():default_args  {owner: airflow,  # 拥有者名称start_date: days_ago(1),  # 第一次开始执行的时间为 UTC 时间(注意不要设置为当前时间)retries: 1# 失败重试次数retry_delay: timedelta(seconds5)  # 失败重试间隔}return default_args# 定义 DAGdef test1(dag):t  echo hallo world# operator 支持多种类型 这里使用 BashOperatortask  BashOperator(task_idtest1,  # task_idbash_commandt,  # 指定要执行的命令dagdag  # 指定归属的 DAG)return taskdef hello_world_1():current_time  str(datetime.today())print(hello world at {}.format(current_time))def test2(dag):# PythonOperatortask  PythonOperator(task_idtest2,python_callablehello_world_1,  # 指定要执行的函数dagdag)return taskdef test3(dag):# DummyOperatortask  DummyOperator(task_idtest3,dagdag)return taskwith DAG(test_task,  # dag_iddefault_argsdefault_options(),  # 指定默认参数schedule_intervalonce  # 执行周期) as d:task1  test1(d)task2  test2(d)task3  test3(d)task1  task2  task3  # 指定执行顺序写完后执行 python $AIRFLOW_HOME/dags/demo.py 检查是否有错误如果命令行没有报错就表示没问题。Web UI打开 localhost:8080。主视图Airflow 的 WebUI 是其任务调度可视化的体现可以在这个 WebUI 上监控几乎所有任务调度运行的实时及历史数据。一些命令如 Trigger、Clear 均可以在 WebUI 上完成一些全局参数也可以在主页面导航栏 Admin 下配置。点击 dag_name进入任务预览任务图视图任务树视图其他常用命令# 测试任务格式airflow test dag_id task_id execution_timeairflow test test_task test1 2019-09-10# 查看生效的 DAGsairflow list_dags -sd $AIRFLOW_HOME/dags# 开始运行任务(同 web 界面点 trigger 按钮)airflow trigger_dag test_task# 暂停任务airflow pause dag_id# 取消暂停等同于在 web 管理界面打开 off 按钮airflow unpause dag_id# 查看 task 列表airflow list_tasks dag_id  查看task列表# 清空任务状态airflow clear dag_id# 运行taskairflow run dag_id task_id execution_dateAirflow 核心原理分析概念及发展JOB最上层的工作。分为 SchedulerJob、BackfillJob 和 LocalTaskJob。SchedulerJob 由 Scheduler 创建BackfillJob 由 Backfill 创建LocalTaskJob 由前面两种 Job 创建。DAG有向无环图用来表示工作流。DAG Run工作流实例表示某个工作流的一次运行(状态)。Task任务工作流的基本组成部分。TaskInstance任务实例表示某个任务的一次运行(状态)。在早期版本 Airflow 中DAG 执行主要有两种完全独立的执行途径SchedulerJob 和 BackfillJob。在一次较大的重构中增加了 DagRun 方式以跟踪 DAG 的执行状态。结构关系图DagRun 执行流程描述DagRuns 表示某个时间点 DAG 的状态(也称为 DagInstances)。要运行 DAG 或管理 DAG 的执行必须首先创建一个 DagRun 实例。但是仅创建 DagRun 不足以实际运行 DAG(就像创建 TaskInstance 与实际运行任务并不一样)。因此需要一种机制来实现上述流程。结构相当简单维护一组要执行的 DagRuns 集合并循环遍历该集合直到所有 DagRuns 成功或失败为止。基本的 DagRuns 循环如下所示刷新 DAGs收集新的 DagRuns执行 DagRuns(包括更新 DagRuns 的状态为成功或失败)唤醒 executor/心跳检查Scheduler 的调度逻辑调度器实际上就是一个 airflow.jobs.SchedulerJob 实例 Job 持续运行 run 方法。job.run() 在开始时将自身的信息加入到 Job 表中并维护状态和心跳预期能够正常结束将结束时间也更新到表中。但是实际上往往因为异常中断导致结束时间为空。不管是如何进行的退出SchedulerJob 退出时会关闭所有子进程。这里主要介绍下 Scheduler 的调度逻辑遍历 DAGs 路径下的所有 DAG 文件启动一定数量的进程(进程池)并且给每个进程指派一个 DAG 文件。每个 DagFileProcessor 解析分配给它的 DAG 文件并根据解析结果在DB中创建 DagRuns 和 TaskInstance。在 scheduler_loop 中检查与活动 DagRun 关联的 TaskInstance 的状态解析 TaskInstance 之间的任何依赖标识需要被执行的 TaskInstance然后将它们添加至 executor 队列将新排列的 TaskInstance 状态更新为QUEUED状态。每个可用的 executor 从队列中取一个 TaskInstance然后开始执行它将此 TaskInstance 的数据库记录更新为SCHEDULED。当一个 TaskInstance 完成运行关联的 executor 就会报告到队列并更新数据库中的 TaskInstance 的状态(例如“完成”、“失败”等)。一旦所有的 DAG 处理完毕后就会进行下一轮循环处理。这里还有一个细节就是上一轮的某个 DAG 的处理时间可能很长导致到下一轮处理的时候这个 DAG 还没有处理完成。Airflow 的处理逻辑是在这一轮不为这个 DAG 创建进程这样就不会阻塞进程去处理其余 DAG。文档原文Enumerate the all the files in the DAG directory.Start a configurable number of processes and for each one, assign a DAG file to process.In each child process, parse the DAG file, create the necessary DagRuns given the state of the DAGs task instances, and for all the task instances that should run, create a TaskInstance (with the SCHEDULED state) in the ORM.Back in the main scheduler process, query the ORM for task instances in the SCHEDULED state. If any are found, send them to the executor and set the task instance state to QUEUED.If any of the child processes have finished, create another process to work on the next file in the series, provided that the number of running processes is less than the configured limit.Once a process has been launched for all of the files in the DAG directory, the cycle is repeated. If the process to parse a particular DAG file is still running when the files turn comes up in the next cycle, a new process is not launched and a process for the next file in the series is launched instead. This way, a DAG file that takes a long time to parse does not necessarily block the processing of other DAGs.Scheduler 模块代码结构DagFileProcessor 在子进程中解析 DAG 定义文件。对于发现的 DAG检查 DagRun 和 TaskInstance 的状态。如果有 TaskInstance 可以运行将状态标记为 SCHEDULED。为每个 DAG 文件分配一个进程同时在 DagFileProcessorManager 中保存有 DAG 和 processor 的映射表。在 DAG 没有被任何 processor 处理的时候才会给它创建新的处理进程。DagFileProcessorManager 控制 DagFileProcessors 如何启动。它追踪哪些文件应该被处理并且确保一旦有一个 DagFileProcessor 完成解析下一个 DAG 文件应该得到处理。并且控制 DagFileProcessors 的数量。SchedulerJob 通过 Agent 获取 manager 的 DAG 定义文件解析结果并且将 SCHEDULED 状态的 TaskInstance 发送给 executor 执行。DagFileProcessorAgent 作为一个采集代理scheduler 可以借助 Agent 获取 manager 获取到的 DAG 解析结果并且可以控制manager的行为。核心类分析Dagmethodfollowing_schedule() 计算当前 DAG 的下一次调度时间previous_schedule() 计算当前 DAG 的上一次调度时间get_dagrun() 返回给定执行日期的 dagrun(如果存在)create_dagrun() 创建一个包括与此 DAG 相关任务的 dagrunckear() 清除指定日期范围内与当前 DAG 相关的一组任务实例run() 实例化为 BackfillJob 同时调用 job.run()DagRunmodelID_PREFIX  scheduled__ID_FORMAT_PREFIX  ID_PREFIX  {0}id  Column(Integer, primary_keyTrue)dag_id  Column(String(ID_LEN))execution_date  Column(UtcDateTime, defaulttimezone.utcnow)start_date  Column(UtcDateTime, defaulttimezone.utcnow)end_date  Column(UtcDateTime)_state  Column(state, String(50), defaultState.RUNNING)run_id  Column(String(ID_LEN))external_trigger  Column(Boolean, defaultTrue)conf  Column(PickleType)methodget_dag() 返回与当前 DagRun 相关的 Dagget_task_instances() 返回与当前 DagRun 的所有 TaskInstancesupdate_state() 根据 TaskInstances 的状态确定 DagRun 的总体状态get_latest_runs() 返回每个 Dag 的最新一次 DagRunTaskInstancemodel__tablename__  task_instancetask_id  Column(String(ID_LEN), primary_keyTrue)dag_id  Column(String(ID_LEN), primary_keyTrue)execution_date  Column(UtcDateTime, primary_keyTrue)start_date  Column(UtcDateTime)end_date  Column(UtcDateTime)duration  Column(Float)state  Column(String(20))_try_number  Column(try_number, Integer, default0)max_tries  Column(Integer)hostname  Column(String(1000))unixname  Column(String(1000))job_id  Column(Integer)pool  Column(String(50), nullableFalse)queue  Column(String(256))priority_weight  Column(Integer)operator  Column(String(1000))queued_dttm  Column(UtcDateTime)pid  Column(Integer)executor_config  Column(PickleType(picklerdill))methodget_dagrun() 返回当前 TaskInstance 的 DagRunrun() TaskInstance runget_template_context() 通过 Jinja2 模板获取上下文xcom_push() 创建一个 XCom 可用于 task 发送参数xcom_pull() 创建一个 XCom 可用于 task 接收参数SchedulerJobdef _execute(self):The actual scheduler loop. The main steps in the loop are:#. Harvest DAG parsing results through DagFileProcessorAgent#. Find and queue executable tasks#. Change task instance state in DB#. Queue tasks in executor#. Heartbeat executor#. Execute queued tasks in executor ake_aware(execution_date,self.task.dag.timezone)self.processor_agent  DagFileProcessorAgent()  # 通过检查当前 processor 数量来控制进程个数self.executor.start()# Start after resetting orphaned tasks to avoid stressing out DB.self.processor_agent.start()  # 在解析 DAG 文件时只会对最近修改过的文件进行解析execute_start_time  timezone.utcnow()# For the execute duration, parse and schedule DAGswhile (timezone.utcnow() - execute_start_time).total_seconds() self.run_duration or self.run_duration # Starting Loop...self.processor_agent.heartbeat()  # 控制 DagFileProcessor 解析 DAG 文件的速度# Harvesting DAG parsing resultssimple_dags  self.processor_agent.harvest_simple_dags()if len(simple_dags)  0:self._execute_task_instances()...# Call heartbeatsself.executor.heartbeat()# heartbeat() 中根据 parallelism 得出当前可用的 slots 数量# 决定 execute_async 多少个 task# Process events from the executorself._process_executor_events(simple_dag_bag)# Ran scheduling loop for all tasks done...# Stop any processorsself.processor_agent.terminate()# Verify that all files were processed, and if so, deactivate DAGs that# havent been touched by the scheduler as they likely have been# deleted....self.executor.end()methodcreate_dag_run() 根据调度周期检查是否需要为 DAG 创建新的 DagRun。如果已调度则返回 DagRun否则返回 Noneprocess_file() 解析 DAG 定义文件_execute_task_instances() 尝试执行调度器调度过的 TaskInstancesThere are three steps:Pick TaskInstances by priority with the constraint that they are in the expected states and that we do exceed max_active_runs or pool limits.Change the state for the TaskInstances above atomically.Enqueue the TaskInstances in the executor.reduce_in_chunks() 用来进行小的分批处理总结本文在第一部分着重介绍了 Airflow 的理念、使用场景及其一般架构。提供了相对简单易懂的安装及操作命令并附带了一个使用案例用来介绍代码如何编排以及 WebUI 的使用。在第二部分开篇介绍了 Airflow 任务创建、调度和管理的一些基础概念以及 Airflow 版本迭代的一些重要变化。Airflow 目前还是处于快速开发中当前版本有很多遗留问题版本升级也不是向后兼容的变动很大。Scheduler 毫无疑问是整个 Airflow 的核心模块逻辑结构复杂。本文从 Scheduler 模块的主要逻辑入手分析了控制循环和代码结构重点分析了从 dag.py 代码文件到可调度执行的 TaskInstances 所经历的阶段以及介绍了并发控制的实现和性能优化。最后结合源码介绍了 Airflow 核心类的模型定义和主要方法以了解各个类所扮演的角色及其实现的功能。参考https://zhuanlan.zhihu.com/p/90282578
http://www.pierceye.com/news/328375/

相关文章:

  • 网站后台生成静态页面天津百度推广电话号码
  • 网站单个页面301跳转湖南省建设局网站
  • 潮州网站建设十堰seo招聘
  • 企业网站建设公司公司系统优化的方法
  • 网站开发与sparkwordpress default
  • 品牌网站建设帮你大蝌蚪北京做网站建设的公司排名
  • 中国建设第一平台网站网络网站建设10大指标
  • 书画院网站源码网站主题模板下载不了
  • 邢台制作网站网上申报流程
  • 做网站的困难做的网站有营销效果吗
  • 高端集团网站建设公司做网站开发的有外快嘛
  • 网站服务器防火墙设置惠州网络推广公司哪家好
  • 做网站根据内容生成pdfwordpress自媒体二号
  • 临沂网站开发不会写代码怎么做网站
  • 怎么做加密货币网站wordpress 多域名登陆
  • 做网站的过程做网站公司广州
  • 女人动漫做受网站wordpress如何作页面
  • 做网站导航栏素材图建筑设计网站制作
  • 淘宝的网站建设方案国家为何要求所有网站均须备案
  • 企业网站模板下载哪家公司强温州建设公司网站
  • 网站编辑能在家做wordpress 做的商城
  • 空间信息网站开发公司工程项目质量安全管理体系
  • 网站流量被黑包装回收网站建设
  • 网站拒绝被百度收录成品网站1688特色
  • 深圳住房和建设局网站官网打不开WordPress 斗鱼
  • 纯文本网站连接西宁圆井模板我自己做的网站
  • 职业院校专题建设网站wordpress文章版权投诉
  • 网站改版好吗如何解决旅游网站建设问题
  • 爱站网使用的是什么网站模仿网站页面违法吗
  • 做民宿的网站wordpress 短信平台