安丘市建设局网站,网站标题修改,如何做品牌营销策划,ppt模板背景图文章目录 一、DolphinScheduler概述和部署1、DolphinScheduler简介1.1 概述1.2 核心架构 2、DolphinScheduler部署模式2.1 概述2.2 单机模式2.3 伪集群模式2.4 集群模式 3、DolphinScheduler集群模式部署3.1 集群规划与准备3.2 下载与配置部署脚本3.3 初始化数据库3.4 一键部署… 文章目录 一、DolphinScheduler概述和部署1、DolphinScheduler简介1.1 概述1.2 核心架构 2、DolphinScheduler部署模式2.1 概述2.2 单机模式2.3 伪集群模式2.4 集群模式 3、DolphinScheduler集群模式部署3.1 集群规划与准备3.2 下载与配置部署脚本3.3 初始化数据库3.4 一键部署DolphinScheduler3.5 DolphinScheduler启停命令 二、DolphinScheduler操作1、工作流传参1.1 内置参数1.2 参数传递 2、引用依赖资源3、数据源配置4、告警实例配置4.1 邮箱告警实例配置4.2 其他告警 5、其他注意事项 三、Airflow1、Airflow基本概念1.1 概述1.2 名词解释 2、Airflow安装2.1 python环境安装2.2 安装Airflow 3、修改数据库与调度器3.1 修改数据库为mysql3.2 修改执行器 4、部署使用4.1 环境部署启动 4.2 Dag任务操作4.3 配置邮件服务器 四、Azkaban1、Azkaban入门1.1 上传jar包和配置sql1.2 配置Executor Server1.3 配置Web Server 2、Work Flow案例实操2.1 HelloWorld案例2.2 作业依赖案例2.3 自动失败重试案例2.4 手动失败重试案例 3、JavaProcess作业类型案例3.1 概述3.2 案例 4、条件工作流案例4.1 概述4.2 运行时参数案例4.3 预定义宏案例 5、邮箱告警6、Azkaban多Executor模式注意事项 一、DolphinScheduler概述和部署 官网https://dolphinscheduler.apache.org/ 1、DolphinScheduler简介
1.1 概述
Apache DolphinScheduler是一个分布式、易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系使调度系统在数据处理流程中开箱即用
1.2 核心架构
DolphinScheduler的主要角色如下
MasterServer采用分布式无中心设计理念MasterServer主要负责 DAG 任务切分、任务提交、任务监控并同时监听其它MasterServer和WorkerServer的健康状态WorkerServer也采用分布式无中心设计理念WorkerServer主要负责任务的执行和提供日志服务ZooKeeper服务系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错Alert服务提供告警相关服务API接口层主要负责处理前端UI层的请求UI系统的前端页面提供系统的各种可视化操作界面 2、DolphinScheduler部署模式 https://dolphinscheduler.apache.org/zh-cn/docs/2.0.3/部署指南_menu 2.1 概述
DolphinScheduler支持多种部署模式包括单机模式Standalone、伪集群模式Pseudo-Cluster、集群模式Cluster等
2.2 单机模式
单机模式standalone模式下所有服务均集中于一个StandaloneServer进程中并且其中内置了注册中心Zookeeper和数据库H2。只需配置JDK环境就可一键启动DolphinScheduler快速体验其功能
由于DolphinScheduler的单机模式使用的是内置的ZK和数据库故在集群模式下所做的相关配置在单机模式下并不可见所以需要重新配置必要的配置为创建租户和创建用户
bin/dolphinscheduler-daemon.sh start standalone-server2.3 伪集群模式
伪集群模式Pseudo-Cluster是在单台机器部署 DolphinScheduler 各项服务该模式下master、worker、api server、logger server等服务都只在同一台机器上。Zookeeper和数据库需单独安装并进行相应配置
2.4 集群模式
集群模式Cluster与伪集群模式的区别就是在多台机器部署 DolphinScheduler各项服务并且可以配置多个Master及多个Worker
3、DolphinScheduler集群模式部署
3.1 集群规划与准备
三台节点均需部署JDK1.8并配置相关环境变量需部署数据库支持MySQL5.7或者PostgreSQL8.2.15。如 MySQL 则需要 JDBC Driver 8.0.16需部署Zookeeper3.4.6如果启用 HDFS 文件系统则需要 Hadoop2.6环境三台节点均需安装进程管理工具包psmisc
sudo yum install -y psmisc3.2 下载与配置部署脚本
wget https://archive.apache.org/dist/dolphinscheduler/2.0.3/apache-dolphinscheduler-2.0.3-bin.tar.gz
tar -zxvf apache-dolphinscheduler-2.0.3-bin.tar,gz
修改解压目录下的conf/config目录下的install_config.conf文件不需要修改的可以直接略过
# ---------------------------------------------------------
# INSTALL MACHINE
# ---------------------------------------------------------
# A comma separated list of machine hostname or IP would be installed DolphinScheduler,
# including master, worker, api, alert. If you want to deploy in pseudo-distributed
# mode, just write a pseudo-distributed hostname
# Example for hostnames: ipsds1,ds2,ds3,ds4,ds5, Example for IPs: ips192.168.8.1,192.168.8.2,192.168.8.3,192.168.8.4,192.168.8.5
ipshadoop102,hadoop103,hadoop104
# 将要部署任一 DolphinScheduler 服务的服务器主机名或 ip 列表# Port of SSH protocol, default value is 22. For now we only support same port in all ips machine
# modify it if you use different ssh port
sshPort22# A comma separated list of machine hostname or IP would be installed Master server, it
# must be a subset of configuration ips.
# Example for hostnames: mastersds1,ds2, Example for IPs: masters192.168.8.1,192.168.8.2
mastershadoop102
# master 所在主机名列表必须是 ips 的子集# A comma separated list of machine hostname:workerGroup or IP:workerGroup.All hostname or IP must be a
# subset of configuration ips, And workerGroup have default value as default, but we recommend you declare behind the hosts
# Example for hostnames: workersds1:default,ds2:default,ds3:default, Example for IPs: workers192.168.8.1:default,192.168.8.2:default,192.168.8.3:default
workershadoop102:default,hadoop103:default,hadoop104:default
# worker主机名及队列此处的 ip 必须在 ips 列表中# A comma separated list of machine hostname or IP would be installed Alert server, it
# must be a subset of configuration ips.
# Example for hostname: alertServerds3, Example for IP: alertServer192.168.8.3
alertServerhadoop102
# 告警服务所在服务器主机名
# A comma separated list of machine hostname or IP would be installed API server, it
# must be a subset of configuration ips.
# Example for hostname: apiServersds1, Example for IP: apiServers192.168.8.1
apiServershadoop102
# api服务所在服务器主机名# A comma separated list of machine hostname or IP would be installed Python gateway server, it
# must be a subset of configuration ips.
# Example for hostname: pythonGatewayServersds1, Example for IP: pythonGatewayServers192.168.8.1
# pythonGatewayServersds1
# 不需要的配置项可以保留默认值也可以用 # 注释# The directory to install DolphinScheduler for all machine we config above. It will automatically be created by install.sh script if not exists.
# Do not set this configuration same as the current path (pwd)
installPath/opt/module/dolphinscheduler
# DS 安装路径如果不存在会创建# The user to deploy DolphinScheduler for all machine we config above. For now user must create by yourself before running install.sh
# script. The user needs to have sudo privileges and permissions to operate hdfs. If hdfs is enabled than the root directory needs
# to be created by this user
deployUseratguigu
# 部署用户任务执行服务是以 sudo -u {linux-user} 切换不同 Linux 用户的方式来实现多租户运行作业因此该用户必须有免密的 sudo 权限。# The directory to store local data for all machine we config above. Make sure user deployUser have permissions to read and write this directory.
dataBasedirPath/tmp/dolphinscheduler
# 前文配置的所有节点的本地数据存储路径需要确保部署用户拥有该目录的读写权限# ---------------------------------------------------------
# DolphinScheduler ENV
# ---------------------------------------------------------
# JAVA_HOME, we recommend use same JAVA_HOME in all machine you going to install DolphinScheduler
# and this configuration only support one parameter so far.
javaHome/opt/module/jdk1.8.0_212
# JAVA_HOME 路径# DolphinScheduler API service port, also this is your DolphinScheduler UI components URL port, default value is 12345
apiServerPort12345# ---------------------------------------------------------
# Database
# NOTICE: If database value has special characters, such as .*[]^${}\?|()#, Please add prefix \ for escaping.
# ---------------------------------------------------------
# The type for the metadata database
# Supported values: postgresql, mysql, h2.
# 注意数据库相关配置的 value 必须加引号否则配置无法生效DATABASE_TYPEmysql
# 数据库类型# Spring datasource url, following HOST:PORT/database?parameter format, If you using mysql, you could use jdbc
# string jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicodetruecharacterEncodingUTF-8 as example
# SPRING_DATASOURCE_URL${SPRING_DATASOURCE_URL:-jdbc:h2:mem:dolphinscheduler;MODEMySQL;DB_CLOSE_DELAY-1;DATABASE_TO_LOWERtrue}
SPRING_DATASOURCE_URLjdbc:mysql://hadoop102:3306/dolphinscheduler?useUnicodetruecharacterEncodingUTF-8
# 数据库 URL# Spring datasource username
# SPRING_DATASOURCE_USERNAME${SPRING_DATASOURCE_USERNAME:-sa}
SPRING_DATASOURCE_USERNAMEdolphinscheduler
# 数据库用户名# Spring datasource password
# SPRING_DATASOURCE_PASSWORD${SPRING_DATASOURCE_PASSWORD:-}
SPRING_DATASOURCE_PASSWORDdolphinscheduler
# 数据库密码# ---------------------------------------------------------
# Registry Server
# ---------------------------------------------------------
# Registry Server plugin name, should be a substring of registryPluginDir, DolphinScheduler use this for verifying configuration consistency
registryPluginNamezookeeper
# 注册中心插件名称DS 通过注册中心来确保集群配置的一致性# Registry Server address.
registryServershadoop102:2181,hadoop103:2181,hadoop104:2181
# 注册中心地址即 Zookeeper 集群的地址# Registry Namespace
registryNamespacedolphinscheduler
# DS 在 Zookeeper 的结点名称# ---------------------------------------------------------
# Worker Task Server
# ---------------------------------------------------------
# Worker Task Server plugin dir. DolphinScheduler will find and load the worker task plugin jar package from this dir.
taskPluginDirlib/plugin/task# resource storage type: HDFS, S3, NONE
resourceStorageTypeHDFS
# 资源存储类型# resource store on HDFS/S3 path, resource file will store to this hdfs path, self configuration, please make sure the directory exists on hdfs and has read write permissions. /dolphinscheduler is recommended
resourceUploadPath/dolphinscheduler
# 资源上传路径# if resourceStorageType is HDFSdefaultFS write namenode addressHA, you need to put core-site.xml and hdfs-site.xml in the conf directory.
# if S3write S3 addressHAfor example s3a://dolphinscheduler
# NoteS3 be sure to create the root directory /dolphinscheduler
defaultFShdfs://hadoop102:8020
# 默认文件系统# if resourceStorageType is S3, the following three configuration is required, otherwise please ignore
s3Endpointhttp://192.168.xx.xx:9010
s3AccessKeyxxxxxxxxxx
s3SecretKeyxxxxxxxxxx# resourcemanager port, the default value is 8088 if not specified
resourceManagerHttpAddressPort8088
# yarn RM http 访问端口# if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single node, keep this value empty
yarnHaIps
# Yarn RM 高可用 ip若未启用 RM 高可用则将该值置空# if resourcemanager HA is enabled or not use resourcemanager, please keep the default value; If resourcemanager is single node, you only need to replace yarnIp1 to actual resourcemanager hostname
singleYarnIphadoop103
# Yarn RM 主机名若启用了 HA 或未启用 RM保留默认值# who has permission to create directory under HDFS/S3 root path
# Note: if kerberos is enabled, please config hdfsRootUser
hdfsRootUseratguigu
# 拥有 HDFS 根目录操作权限的用户# 下面是如果hdfs开启了验证在操作的
# kerberos config
# whether kerberos starts, if kerberos starts, following four items need to config, otherwise please ignore
kerberosStartUpfalse
# kdc krb5 config file path
krb5ConfPath$installPath/conf/krb5.conf
# keytab username,watch out the sign should followd by \\
keytabUserNamehdfs-mycluster\\ESZ.COM
# username keytab path
keytabPath$installPath/conf/hdfs.headless.keytab
# kerberos expire time, the unit is hour
kerberosExpireTime2# use sudo or not
sudoEnabletrue# worker tenant auto create
workerTenantAutoCreatefalse3.3 初始化数据库
DolphinScheduler 元数据存储在关系型数据库中故需创建相应的数据库和用户
# 创建数据库
CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
# 创建用户
CREATE USER dolphinscheduler% IDENTIFIED BY dolphinscheduler;# 提高密码复杂度或者执行以下命令降低MySQL密码强度级别
set global validate_password_length4;
set global validate_password_policy0;
# 赋予用户相应权限
GRANT ALL PRIVILEGES ON dolphinscheduler.* TO dolphinscheduler%;
flush privileges;# 拷贝MySQL驱动到DolphinScheduler的解压目录下的lib中
cp /opt/software/mysql-connector-java-8.0.16.jar lib/# 执行数据库初始化脚本
# 数据库初始化脚本位于DolphinScheduler解压目录下的script目录中即/opt/software/ds/apache-dolphinscheduler-2.0.3-bin/script/
script/create-dolphinscheduler.sh
3.4 一键部署DolphinScheduler
# 启动zk
zk.sh start
# 一键部署并启动DolphinScheduler
./install.sh
# 查看DolphinScheduler进程
# ApiApplicationServer
# WorkerServer
# MasterServer
# AlertServer
# LoggerServer# ----------
# 访问DolphinScheduler UI
# DolphinScheduler UI地址为http://hadoop102:12345/dolphinscheduler
# 初始用户的用户名为admin密码为dolphinscheduler1233.5 DolphinScheduler启停命令
安装完后得去/opt/module/dolphinscheduler修改或启停
# 一键启停所有服务
./bin/start-all.sh
./bin/stop-all.sh
# 注意同Hadoop的启停脚本进行区分
# 启停 Master
./bin/dolphinscheduler-daemon.sh start master-server
./bin/dolphinscheduler-daemon.sh stop master-server
# 启停 Worker
./bin/dolphinscheduler-daemon.sh start worker-server
./bin/dolphinscheduler-daemon.sh stop worker-server
# 启停 Api
./bin/dolphinscheduler-daemon.sh start api-server
./bin/dolphinscheduler-daemon.sh stop api-server
# 启停 Logger
./bin/dolphinscheduler-daemon.sh start logger-server
./bin/dolphinscheduler-daemon.sh stop logger-server
# 启停 Alert
./bin/dolphinscheduler-daemon.sh start alert-server
./bin/dolphinscheduler-daemon.sh stop alert-server二、DolphinScheduler操作 入门文档可以参考https://dolphinscheduler.apache.org/zh-cn/docs/2.0.3/guide/quick-start 1、工作流传参 https://dolphinscheduler.apache.org/zh-cn/docs/2.0.3/功能介绍_menu/参数_menu DolphinScheduler支持对任务节点进行灵活的传参任务节点可通过${参数名}引用参数值
1.1 内置参数
基础内置参数
变量名参数说明system.biz.date${system.biz.date}定时时间前一天格式为 yyyyMMddsystem.biz.curdate${system.biz.curdate}定时时间格式为 yyyyMMddsystem.datetime${system.datetime}定时时间格式为 yyyyMMddHHmmss
衍生内置参数
可通过衍生内置参数设置任意格式、任意时间的日期。
自定义日期格式可以对 $[yyyyMMddHHmmss] 任意分解组合如 $[yyyyMMdd], $[HHmmss], $[yyyy-MM-dd]。使用 add_months() 函数该函数用于加减月份 第一个入口参数为[yyyyMMdd]表示返回时间的格式 第二个入口参数为月份偏移量表示加减多少个月
参数说明$[add_months(yyyyMMdd,12*N)]后 N 年$[add_months(yyyyMMdd,-12*N)]前 N 年$[add_months(yyyyMMdd,N)]后 N 月$[add_months(yyyyMMdd,-N)]前 N 月$[yyyyMMdd7*N]后 N 周$[yyyyMMdd-7*N]前 N 周$[yyyyMMddN]后 N 天$[yyyyMMdd-N]前 N 天$[HHmmssN/24]后 N 小时$[HHmmss-N/24]前 N 小时$[HHmmssN/24/60]后 N 分钟$[HHmmss-N/24/60]前 N 分钟 相关说明
dt参数名ININ 表示局部参数仅能在当前节点使用OUT 表示局部参数可以向下游传递(目前支持这个特性的任务类型有Shell、SQL、Procedure同时若节点之间没有依赖关系则局部参数无法传递)DATE数据类型日期$[yyyy-MM-dd]自定义格式的衍生内置参数
全局参数在工作流定义本地参数在节点定义本地参数 全局参数 上游任务传递的参数
1.2 参数传递
本地参数 全局参数 上游任务传递的参数多个上游节点均传递同名参数时下游节点会优先使用值为非空的参数如果存在多个值为非空的参数则按照上游任务的完成时间排序选择完成时间最早的上游任务对应的参数。
2、引用依赖资源
有些任务需要引用一些额外的资源例如MR、Spark等任务须引用jar包Shell任务需要引用其他脚本等。DolphinScheduler提供了资源中心来对这些资源进行统一管理。
如果需要用到资源上传功能针对单机可以选择本地文件目录作为上传文件夹(此操作不需要部署 Hadoop)。当然也可以选择上传到 Hadoop or MinIO 集群上此时则需要有Hadoop (2.6) 或者 MinIO 等相关环境。本文在部署 DS 集群时指定了文件系统为 HDFS https://dolphinscheduler.apache.org/zh-cn/docs/2.0.3/guide/resource 3、数据源配置 https://dolphinscheduler.apache.org/zh-cn/docs/2.0.3/功能介绍_menu/数据源中心_menu 数据源中心支持MySQL、POSTGRESQL、HIVE/IMPALA、SPARK、CLICKHOUSE、ORACLE、SQLSERVER等数据源。此处仅对 HIVE 数据源进行介绍
数据源选择HIVE数据源名称输入数据源的名称描述输入数据源的描述可置空IP/主机名输入连接HIVE的IP端口输入连接HIVE的端口默认 10000用户名设置连接HIVE的用户名如果没有配置 HIVE 权限管理则用户名可以任意但 HIVE 表数据存储在 HDFS为了保证对所有表的数据均有操作权限此处选择 HDFS 超级用户 atguigu注HDFS 超级用户名与执行 HDFS 启动命令的 Linux 节点用户名相同密码设置连接HIVE的密码如果没有配置 HIVE 权限管理则密码置空即可数据库名输入连接HIVE的数据库名称Jdbc连接参数用于HIVE连接的参数设置以JSON形式填写没有参数可置空
然后在工作流中可以选择SQL 节点名称自定义节点名称环境名称HIVE 执行所需环境数据源类型选择 HIVE数据源选择上文配置的 HIVE 数据源SQL 类型根据SQL 语句选择此处选用默认的“查询”即可SQL 语句要执行的 SQL 语句末尾不能有分号否则报错语法错误
4、告警实例配置
4.1 邮箱告警实例配置 POP3IMAPSMTP描述 需要登陆管理员账户
告警实例名称在告警组配置时可以选择的告警插件实例名称用户自定义选择插件选择 Email 则为邮箱告警实例收件人接收方邮箱地址收件人不需要开启 SMTP 服务抄送人抄送是指用户给收件人发出邮件的同时把该邮件发送给另外的人收件人之外的收件方都是抄送人“收件人”可以获知该邮件的所有抄送人抄送人可以为空。mail.smtp.host邮箱的 SMTP 服务器域名对于 QQ 邮箱为 smtp.qq.com。各邮箱的 SMTP 服务器见此链接https://blog.csdn.net/wustzjf/article/details/52481309mail.smtp.port邮箱的 SMTP 服务端口号主流邮箱均为 25 端口使用默认值即可mail.sender发件方邮箱地址需要开启 SMTP 服务mail.user与 mail.sender 保持一致即可mail.password获取的邮箱授权码。未列出的选项保留默认值或默认选项即可 4.2 其他告警 其他告警可以参考https://dolphinscheduler.apache.org/zh-cn/docs/3.0.0 同时还可以电话告警这里有个运维平台是一站式集成的睿象云官网https://www.aiops.com/
5、其他注意事项
DolphinScheduler的环境变量是不和主机共享的默认需要进入/opt/module/dolphinscheduler/conf/env/dolphinscheduler_env.sh进行修改也可以直接在admin用户下在可视化界面进行创建创建节点的时候选择即可
三、Airflow
1、Airflow基本概念 官方网站https://airflow.apache.org 1.1 概述
Airflow是一个以编程方式编写安排和监视工作流的平台。使用Airflow将工作流编写任务的有向无环图DAG。Airflow计划程序在遵循指定的依赖项同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度变的轻而易举。丰富的用户界面使查看生产中正在运行的管道监视进度以及需要时对问题进行故障排除变的容易
1.2 名词解释
DynamicAirflow配置需要实用Python允许动态生产管道。这允许编写可动态。这允许编写可动态实例化管道的代码Extensible轻松定义自己的运算符执行程序并扩展库使其适合于您的环境ElegantAirlfow是精简的使用功能强大的Jinja模板引擎将脚本参数化内置于Airflow的核心中ScalableAirflow具有模板块架构并使用消息队列来安排任意数量的工作任务
2、Airflow安装
2.1 python环境安装
# Superset是由Python语言编写的Web应用要求Python3.8的环境
# 这里使用MiniConda作为包管理器
# 下载地址https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
# 加载环境变量配置文件使之生效
source ~/.bashrc
# Miniconda安装完成后每次打开终端都会激活其默认的base环境我们可通过以下命令禁止激活默认base环境
conda config --set auto_activate_base false# 配置conda国内镜像
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
conda config --set show_channel_urls yes
# 创建Python3.8环境
conda create --name airflow python3.8
# 创建环境conda create -n env_name
# 查看所有环境conda info --envs
# 删除一个环境conda remove -n env_name --all
# 激活airflow环境
conda activate airflow
# 执行python -V命令查看python版本
python -V
2.2 安装Airflow
conda activate airflow
pip install numpy -i https://pypi.tuna.tsinghua.edu.cn/simple
sudo mkdir ~/.pip
sudo vim ~/.pip/pip.conf
#添加以下内容
[global]
index-url https://pypi.tuna.tsinghua.edu.cn/simple
[install]
trusted-host https://pypi.tuna.tsinghua.edu.cn
# 安装airflow
pip install apache-airflow2.4.3
# 初始化airflow
airflow db init
# 查看版本
airflow version
# airflow安装好存放路径
pwd
# 启动airflow web服务,启动后浏览器访问http://hadoop102:8081
airflow webserver -p 8081 -D
# 启动airflow调度
airflow scheduler -D
# 创建账号
airflow users create \
--username admin \
--firstname atguigu \
--lastname atguigu \
--role Admin \
--email shawnatguigu.com# 启动停止脚本
vim af.sh
#!/bin/bashcase $1 in
start){echo --------启动 airflow-------ssh hadoop102 conda activate airflow;airflow webserver -p 8081 -D;airflow scheduler -D; conda deactivate
};;
stop){echo --------关闭 airflow-------ps -ef|egrep scheduler|airflow-webserver|grep -v grep|awk {print $2}|xargs kill -15
};;
esac# 添加权限即可使用
chmod x af.sh
3、修改数据库与调度器
3.1 修改数据库为mysql
# https://airflow.apache.org/docs/apache-airflow/2.4.3/howto/set-up-database.html#setting-up-a-mysql-database
# 在MySQL中建库
CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
# 如果报错Linux error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol可以关闭MySQL的SSL证书
SHOW VARIABLES LIKE %ssl%;
# 修改配置文件my.cnf加入以下内容
# disable_ssl
skip_ssl# 添加python连接的依赖
pip install mysql-connector-python
# 修改airflow的配置文件
vim ~/airflow/airflow.cfg
[database]
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines.
# More information here:
# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri
#sql_alchemy_conn sqlite:home/atguigu/airflow/airflow.db
sql_alchemy_conn mysqlmysqlconnector://root:123456hadoop102:3306/airflow_db# 关闭airflow初始化后重启
af.sh stop
airflow db init
# 初始化报错1067 - Invalid default value for ‘update_at’
# 原因字段 update_at 为 timestamp类型取值范围是1970-01-01 00:00:00 到 2037-12-31 23:59:59UTC 8 北京时间从1970-01-01 08:00:00 开始而这里默认给了空值所以导致失败
set GLOBAL sql_modeSTRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION;
# 重启MySQL会造成参数失效推荐将参数写入到配置文件my.cnf中
sql_mode STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
# 重启
af.sh start# 重新创建账号登录
airflow users create \
--username admin \
--firstname atguigu \
--lastname atguigu \
--role Admin \
--email shawnatguigu.com
3.2 修改执行器
官网不推荐在开发中使用顺序执行器会造成任务调度阻塞
# 修改airflow的配置文件
[core]
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor,
# KubernetesExecutor, CeleryKubernetesExecutor or the
# full import path to the class when using a custom executor.
executor LocalExecutor# dags_folder是保存文件位置4、部署使用 文档https://airflow.apache.org/docs/apache-airflow/2.4.3/howto/index.html 4.1 环境部署启动
# 需要启动hadoop和spark的历史服务器
# 编写.py脚本创建work-py目录用于存放python调度脚本
mkdir ~/airflow/dags
cd dags/
vim test.py
编写脚本
#!/usr/bin/python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedeltadefault_args {# 用户owner: test_owner,# 是否开启任务依赖depends_on_past: True, # 邮箱email: [403627000qq.com],# 启动时间start_date:datetime(2022,11,28),# 出错是否发邮件报警email_on_failure: False,# 重试是否发邮件报警email_on_retry: False,# 重试次数retries: 1,# 重试时间间隔retry_delay: timedelta(minutes5),
}
# 声明任务图
dag DAG(test, default_argsdefault_args, schedule_intervaltimedelta(days1))# 创建单个任务
t1 BashOperator(# 任务idtask_iddwd,# 任务命令bash_commandssh hadoop102 /opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 ,# 重试次数retries3,# 把任务添加进图中dagdag)t2 BashOperator(task_iddws,bash_commandssh hadoop102 /opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 ,retries3,dagdag)t3 BashOperator(task_idads,bash_commandssh hadoop102 /opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 ,retries3,dagdag)# 设置任务依赖
t2.set_upstream(t1)
t3.set_upstream(t2)注意一些注意事项 必须导包 from airflow import DAG from airflow.operators.bash_operator import BashOperator default_args 设置默认参数 depends_on_past 是否开启任务依赖 schedule_interval 调度频率 retries 重试次数 start_date 开始时间 BashOperator 具体执行任务如果为true前置任务必须成功完成才会走下一个依赖任务如果为false则忽略是否成功完成 task_id 任务唯一标识必填 bash_command 具体任务执行命令 set_upstream 设置依赖
4.2 Dag任务操作
# 过段时间会加载
airflow dags list
# 查看所有任务
airflow list_dags
# 查看单个任务
airflow tasks list test --tree
# 如果删除的话需要UI和底层都删除才行4.3 配置邮件服务器
修改airflow配置文件用stmps服务对应587端口
vim ~/airflow/airflow.cfg
smtp_host smtp.qq.com
smtp_starttls True
smtp_ssl False
smtp_user xxqq.com
# smtp_user
smtp_password qluxdbuhgrhgbigi
# smtp_password
smtp_port 587
smtp_mail_from xxqq.com# 然后重启
af.sh stop
af.sh star
# 编辑test.py脚本并且替换#!/usr/bin/python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime, timedeltadefault_args {# 用户owner: test_owner,# 是否开启任务依赖depends_on_past: True, # 邮箱email: [xxqq.com],# 启动时间start_date:datetime(2022,11,28),# 出错是否发邮件报警email_on_failure: False,# 重试是否发邮件报警email_on_retry: False,# 重试次数retries: 1,# 重试时间间隔retry_delay: timedelta(minutes5),
}
# 声明任务图
dag DAG(test, default_argsdefault_args, schedule_intervaltimedelta(days1))# 创建单个任务
t1 BashOperator(# 任务idtask_iddwd,# 任务命令bash_commandssh hadoop102 /opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 ,# 重试次数retries3,# 把任务添加进图中dagdag)t2 BashOperator(task_iddws,bash_commandssh hadoop102 /opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 ,retries3,dagdag)t3 BashOperator(task_idads,bash_commandssh hadoop102 /opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 ,retries3,dagdag)emailEmailOperator(task_idemail,toyaohm163163.com ,subjecttest-subject,html_contenth1test-content/h1,ccxxqq.com ,dagdag)t2.set_upstream(t1)
t3.set_upstream(t2)
email.set_upstream(t3)四、Azkaban azkaban官网https://azkaban.github.io/downloads.html 1、Azkaban入门
1.1 上传jar包和配置sql
首先获取azkaban的三个包可以自行编译github地址
# https://pan.baidu.com/s/10zD2Y_h0oB_rC-BAjLal1g%C2%A0 密码zsxa
# 将azkaban-db-3.84.4.tar.gzazkaban-exec-server-3.84.4.tar.gzazkaban-web-server-3.84.4.tar.gz上传到hadoop102的/opt/software路径
# 新建/opt/module/azkaban目录并将所有tar包解压到这个目录下
mkdir /opt/module/azkaban
# 解压
tar -zxvf azkaban-db-3.84.4.tar.gz -C /opt/module/azkaban/
tar -zxvf azkaban-exec-server-3.84.4.tar.gz -C /opt/module/azkaban/
tar -zxvf azkaban-web-server-3.84.4.tar.gz -C /opt/module/azkaban/
# 进入到/opt/module/azkaban目录依次修改名称
mv azkaban-exec-server-3.84.4/ azkaban-exec
mv azkaban-web-server-3.84.4/ azkaban-web# 然后配置mysql
mysql -uroot -p123456
# 登陆MySQL创建Azkaban数据库
create database azkaban;
# 创建azkaban用户并赋予权限
set global validate_password_length4;
set global validate_password_policy0;
CREATE USER azkaban% IDENTIFIED BY 000000;
# 赋予Azkaban用户增删改查权限
GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to azkaban% WITH GRANT OPTION;
# 创建Azkaban表完成后退出MySQL
use azkaban;
source /opt/module/azkaban/azkaban-db-3.84.4/create-all-sql-3.84.4.sql
quit;# 更改MySQL包大小防止Azkaban连接MySQL阻塞
sudo vim /etc/my.cnf
# 在[mysqld]下面加一行max_allowed_packet1024M
[mysqld]
max_allowed_packet1024M
# 重启MySQL
sudo systemctl restart mysqld
1.2 配置Executor Server
Azkaban Executor Server处理工作流和作业的实际执行
# 编辑azkaban.properties
vim /opt/module/azkaban/azkaban-exec/conf/azkaban.properties
# 修改如下属性
#...
default.timezone.idAsia/Shanghai
#...
azkaban.webserver.urlhttp://hadoop102:8081executor.port12321
#...
database.typemysql
mysql.port3306
mysql.hosthadoop102
mysql.databaseazkaban
mysql.userazkaban
mysql.password000000
mysql.numconnections100# 同步azkaban-exec到所有节点
xsync /opt/module/azkaban/azkaban-exec
# 必须进入到/opt/module/azkaban/azkaban-exec路径分别在三台机器上启动executor server
bin/start-exec.sh
bin/start-exec.sh
bin/start-exec.sh
# 注意如果在/opt/module/azkaban/azkaban-exec目录下出现executor.port文件说明启动成功
# 下面激活executor需要分别在三台机器依次执行
curl -G hadoop102:12321/executor?actionactivate echo
curl -G hadoop103:12321/executor?actionactivate echo
curl -G hadoop104:12321/executor?actionactivate echo
# 如果三台机器都出现如下提示则表示激活成功
{status:success}
1.3 配置Web Server
Azkaban Web Server处理项目管理身份验证计划和执行触发
# 编辑azkaban.properties
vim /opt/module/azkaban/azkaban-web/conf/azkaban.properties
# 修改如下属性
...
default.timezone.idAsia/Shanghai
...
database.typemysql
mysql.port3306
mysql.hosthadoop102
mysql.databaseazkaban
mysql.userazkaban
mysql.password000000
mysql.numconnections100
...
azkaban.executorselector.filtersStaticRemainingFlowSize,CpuStatus# 说明
# StaticRemainingFlowSize正在排队的任务数
# CpuStatusCPU占用情况
# MinimumFreeMemory内存占用情况。测试环境必须将MinimumFreeMemory删除掉否则它会认为集群资源不够不执行。# 修改azkaban-users.xml文件添加atguigu用户
vim /opt/module/azkaban/azkaban-web/conf/azkaban-users.xml
azkaban-usersuser groupsazkaban passwordazkaban rolesadmin usernameazkaban/user passwordmetrics rolesmetrics usernamemetrics/user passwordatguigu rolesadmin usernameatguigu/role nameadmin permissionsADMIN/role namemetrics permissionsMETRICS/
/azkaban-users# 必须进入到hadoop102的/opt/module/azkaban/azkaban-web路径启动web server
bin/start-web.sh
# 访问http://hadoop102:8081,并用atguigu用户登陆
2、Work Flow案例实操
2.1 HelloWorld案例
# 在windows环境新建azkaban.project文件编辑内容如下
# 注意该文件作用是采用新的Flow-API方式解析flow文件
azkaban-flow-version: 2.0
# 新建basic.flow文件内容如下
nodes:- name: jobAtype: commandconfig:command: echo Hello World# Namejob名称
# Typejob类型。command表示你要执行作业的方式为命令
# Configjob配置# 将azkaban.project、basic.flow文件压缩到一个zip文件文件名称必须是英文
# 在WebServer新建项目http://hadoop102:8081/index
# 然后上传压缩文件执行查看日志
2.2 作业依赖案例
需求JobA和JobB执行完了才能执行JobC
# 修改basic.flow为如下内容
nodes:- name: jobCtype: command# jobC 依赖 JobA和JobBdependsOn:- jobA- jobBconfig:command: echo I’m JobC- name: jobAtype: commandconfig:command: echo I’m JobA- name: jobBtype: commandconfig:command: echo I’m JobB
2.3 自动失败重试案例
需求如果执行任务失败需要重试3次重试的时间间隔10000ms
nodes:- name: JobAtype: commandconfig:command: sh /not_exists.shretries: 3retry.backoff: 10000也可以在Flow全局配置中添加任务失败重试配置此时重试配置会应用到所有Job
config:retries: 3retry.backoff: 10000
nodes:- name: JobAtype: commandconfig:command: sh /not_exists.sh2.4 手动失败重试案例
需求JobA⇒JobB依赖于A⇒JobC⇒JobD⇒JobE⇒JobF。生产环境任何Job都有可能挂掉可以根据需求执行想要执行的Job。
nodes:- name: JobAtype: commandconfig:command: echo This is JobA.- name: JobBtype: commanddependsOn:- JobAconfig:command: echo This is JobB.- name: JobCtype: commanddependsOn:- JobBconfig:command: echo This is JobC.- name: JobDtype: commanddependsOn:- JobCconfig:command: echo This is JobD.- name: JobEtype: commanddependsOn:- JobDconfig:command: echo This is JobE.- name: JobFtype: commanddependsOn:- JobEconfig:command: echo This is JobF.在可视化界面Enable和Disable下面都分别有如下参数
Parents该作业的上一个任务Ancestors该作业前的所有任务Children该作业后的一个任务Descendents该作业后的所有任务Enable All所有的任务
3、JavaProcess作业类型案例
3.1 概述
JavaProcess类型可以运行一个自定义主类方法type类型为javaprocess可用的配置为
Xms最小堆Xmx最大堆classpath类路径java.class要运行的Java对象其中必须包含Main方法main.argsmain方法的参数
3.2 案例
新建一个azkaban的maven工程然后创建包名com.atguigu创建AzTest类
package com.atguigu;public class AzTest {public static void main(String[] args) {System.out.println(This is for testing!);}
}打包成jar包azkaban-1.0-SNAPSHOT.jar新建testJava.flow内容如下
nodes:- name: test_javatype: javaprocessconfig:Xms: 96MXmx: 200Mjava.class: com.atguigu.AzTest**将Jar包、flow文件和project文件打包成javatest.zip **然后上传执行
4、条件工作流案例
4.1 概述
条件工作流功能允许用户自定义执行条件来决定是否运行某些Job。条件可以由当前Job的父Job输出的运行时参数构成也可以使用预定义宏。在这些条件下用户可以在确定Job执行逻辑时获得更大的灵活性例如只要父Job之一成功就可以运行当前Job
4.2 运行时参数案例
基本原理父Job将参数写入JOB_OUTPUT_PROP_FILE环境变量所指向的文件子Job使用 ${jobName:param}来获取父Job输出的参数并定义执行条件
支持的条件运算符
1 等于
2! 不等于
3 大于
4 大于等于
5 小于
6 小于等于
7 与
8|| 或
9! 非
需求分析
# JobA执行一个shell脚本。
# JobB执行一个shell脚本但JobB不需要每天都执行而只需要每个周一执行# 新建JobA.sh
#!/bin/bash
echo do JobA
wkdate %w
echo {\wk\:$wk} $JOB_OUTPUT_PROP_FILE# 新建JobB.sh
#!/bin/bash
echo do JobB# 新建condition.flow
nodes:- name: JobAtype: commandconfig:command: sh JobA.sh- name: JobBtype: commanddependsOn:- JobAconfig:command: sh JobB.shcondition: ${JobA:wk} 1# 最后将JobA.sh、JobB.sh、condition.flow和azkaban.project打包成condition.zip4.3 预定义宏案例
Azkaban中预置了几个特殊的判断条件称为预定义宏。预定义宏会根据所有父Job的完成情况进行判断再决定是否执行。可用的预定义宏如下
1all_success: 表示父Job全部成功才执行(默认)
2all_done表示父Job全部完成才执行
3all_failed表示父Job全部失败才执行
4one_success表示父Job至少一个成功才执行
5one_failed表示父Job至少一个失败才执行
# 需求
# JobA执行一个shell脚本
# JobB执行一个shell脚本
# JobC执行一个shell脚本要求JobA、JobB中有一个成功即可执行# 新建JobA.sh
#!/bin/bash
echo do JobA# 新建JobC.sh
#!/bin/bash
echo do JobC# 新建macro.flow
nodes:- name: JobAtype: commandconfig:command: sh JobA.sh- name: JobBtype: commandconfig:command: sh JobB.sh- name: JobCtype: commanddependsOn:- JobA- JobBconfig:command: sh JobC.shcondition: one_success
5、邮箱告警
首先申请好邮箱然后配置
# 在azkaban-web节点hadoop102上编辑/opt/module/azkaban/azkaban-web/conf/azkaban.properties修改如下内容
vim /opt/module/azkaban/azkaban-web/conf/azkaban.properties
# 添加如下内容
#这里设置邮件发送服务器需要 申请邮箱切开通stmp服务以下只是例子
mail.senderatguigu126.com
mail.hostsmtp.126.com
mail.useratguigu126.com
mail.password用邮箱的授权码# 保存并重启web-server
bin/shutdown-web.sh
bin/start-web.sh# 编辑basic.flow
nodes:- name: jobAtype: commandconfig:command: echo This is an email test.# 将azkaban.project和basic.flow压缩成email.zip
# 然后上传在可视化页面里选择邮箱告警
# 针对电话告警可以使用睿象云https://www.aiops.com/6、Azkaban多Executor模式注意事项
Azkaban多Executor模式是指在集群中多个节点部署Executor。在这种模式下 Azkaban web Server会根据策略选取其中一个Executor去执行任务。为确保所选的Executor能够准确的执行任务我们须在以下两种方案任选其一推荐使用方案二。
方案一指定特定的Executorhadoop102去执行任务
# 在MySQL中azkaban数据库executors表中查询hadoop102上的Executor的idmysql use azkaban;
mysql select * from executors;# 在执行工作流程时选择Flow Parameters加入useExecutor属性方案二在Executor所在所有节点部署任务所需脚本和应用
官网文档https://azkaban.readthedocs.io/en/latest/configuration.html