网站开发语言怎么选,wordpress可视化插件,网站开发讲座心得体会,wordpress 经验插件大数据#xff1a;任务调度https://blog.csdn.net/qq_43713049/article/details/116985497 文章目录 任务调度一、任务流调度的需求二、任务流调度的工具三、Oozie的简介四、Oozie的2种使用方式五、WorkFlow 与 Fork 和 Join六、SubFlow#xff1a;子工作流七、定时调度的实现…大数据任务调度https://blog.csdn.net/qq_43713049/article/details/116985497 文章目录 任务调度一、任务流调度的需求二、任务流调度的工具三、Oozie的简介四、Oozie的2种使用方式五、WorkFlow 与 Fork 和 Join六、SubFlow子工作流七、定时调度的实现八、自动化调度的实现1.自动化调度需求2.自动化调度脚本3.自动化调度实现 任务调度 一、任务流调度的需求 整体需求 相同的业务线有不同的需求会有多个程序来实现这多个程序共同完成的需求组合在一起就是工作流或者叫做任务流基于工作流来实现任务流的自动化运行 基于时间的任务运行 Job1和Job2是在每天固定的时间去采集昨天的数据每天00:00 基于运行依赖关系的任务运行 Job3必须等待Job1运行成功才能运行Job5必须等待Job3和Job4都运行成功才能运行 调度类型 定时调度基于某种时间的规律进行调度运行依赖调度基于某种依赖关系进行调度运行 二、任务流调度的工具 Linux Crontab Linux中自带的一个工具 优点 简单不用做额外的部署能实现大多数的定时需求crontab -e 缺点 只能做定时任务的执行 语法 * * * * * command
分钟 小时 日 月 周几
12 Oozie Cloudera公司研发的Hadoop生态圈的调度工具 官网oozie.apache.org优点 功能很强大能满足几乎所有常规的任务流调度的需求支持DAG流程调度 缺点 本身不是分布式的工具依赖于MapReduce来实现分布式原生的交互开发接口不友好整体的监控不完善学习成本比较高 Zeus 阿里巴巴最早基于Hadoop1研发的一个调度系统目前市场上的Zeus一般都是携程版本的Zeus 优点 交互非常友好使用非常简单分布式的功能相对也比较全面 缺点 Bug非常多阿里没有继续研发Zeus不支持Hadoop2 Azkaban LinkedIn公司研发的分布式调度工具 优点 重点着重于自身的调度功能的研发其他的辅助性功能都通过插件来完成自身也是分布式调度系统界面交互性比较友好开发交互性properties或者JSON 缺点 3.x版本开始才支持完全分布式 三、Oozie的简介 功能 Oozie是一个专门为管理Hadoop生态的程序调度而设计的工作流调度系统基于DAG实现依赖调度WorkFlow基于定时器实现定时调度Coordinator 特点 优点功能全面缺点部署相对复杂、原生开发方式过于复杂 应用 基于Hadoop平台的分布式离线任务流调度 原理 底层依赖于MapReduce将工作流变成MapReduce程序提交个YARN由YARN来将不同的工作流分配到不同的机器上运行用于构建分布式调度系统 四、Oozie的2种使用方式 原生方式 这种方式是通过自己写代码的方式来实现工作流的开发效率低容易出问题不用 实现一个效果4个程序 第一个程序shell脚本定时运行的第二个程序Spark程序必须等第一个程序运行完才能运行第三个程序MapReduce程序必须等第二个程序运行才能运行’第四个程序Hive,必须等第三个程序运行完才能运行 先要开发一个XML文件 控制节点start、end、kill 控制程序运行的流程 start开始节点 end终止节点kill强制退出节点fork分支节点join合并节点 程序节点action start tofirst
action namefirstshellpathxxx.sh/pathargs/args/shellok tosecond /okerror tokill/error
/action
action name secondsparkjar/jarclass/class……/sparkok tothird /ok
error tokill/error
actionaction name forthhivescprit/scpritpath/path……/hiveok toend /okerror tokill/error
action
kill namekill kill
/killend nameend end
/end
123456789101112131415161718192021222324252627282930313233 集成Hue 由于Oozie原生的方式交互性非常差导致用户上手非常困难 Cloudera基于可视化需求在Hue中集成Oozie开发和监控 五、WorkFlow 与 Fork 和 Join 创建测试脚本 启动Oozie在第一台机器 启动start-oozie.sh 关闭stop-oozie.sh 测试 创建四个脚本 mkdir /export/data/flow
1 /export/data/flow/test01.sh #!/bin/bash
echo this is test01
12 /export/data/flow/test02.sh #!/bin/bash
echo this is test02
12 /export/data/flow/test03.sh #!/bin/bash
echo this is test03
12 /export/data/flow/test04.sh #!/bin/bash
echo this is test04
12 上传到HDFS hdfs dfs -put /export/data/flow /user/oozie/
1 单job工作流 需求1构建一个工作流执行test01 多job工作流 需求2构建一个工作流先执行test01再执行test02最后执行test03 分支工作流 需求3 test01先执行 test01执行完成test02和test03并行执行 test02和test03都执行完成执行test04 六、SubFlow子工作流 需求在调度运行一个工作流的实现需要嵌套调用另外一个工作流 七、定时调度的实现 八、自动化调度的实现 1.自动化调度需求 目标自动化实现增量任务流调度实施 第一个job增量采集第二个job统计昨天的订单总个数第三个job统计昨天的订单总金额第四个job合并二和三的结果得到每天的订单总个数和总金额导出到MySQL 2.自动化调度脚本 目标实现自动化脚本调度的开发 路径 step1增量采集脚本job1step2增量统计个数脚本job2step3增量统计金额脚本job3step4增量合并导出脚本job4 实施 增量采集脚本job1 创建脚本 vim /export/data/shell/01.collect.sh
1 开发Shell脚本 #!/bin/bash#step1先获取要采集的数据时间规则如果没有给参数就默认处理昨天的日期如果给了参数就参数对应的日期 if [ $# -ne 0 ] then #参数个数不为0 if [ $# -ne 1 ] then echo “参数至多只能有一个为处理的日期请重新运行” exit 100 else #参数个数只有1个就用第一个参数作为处理的日期 yesterdayKaTeX parse error: Expected EOF, got # at position 13: 1 fi else #̲参数个数为0默认处理昨天的日…{yesterday}
echo “step2开始运行采集的程序” #step2运行增量采集 SQOOP_HOME/export/server/sqoop-1.4.6-cdh5.14.0 SQOOPHOME/bin/sqoopimport−−connectjdbc:mysql://node3:3306/dborder−−usernameroot−−password−filehdfs://node1:8020/user/oozie/shell/sqoop.passwd−−queryselect∗fromtborderwheresubstring(createtime,1,10)′SQOOP_HOME/bin/sqoop import \ --connect jdbc:mysql://node3:3306/db_order \ --username root \ --password-file hdfs://node1:8020/user/oozie/shell/sqoop.passwd \ --query select * from tb_order where substring(create_time,1,10) SQOOPHOME/bin/sqoopimport −−connectjdbc:mysql://node3:3306/dborder −−usernameroot −−password−filehdfs://node1:8020/user/oozie/shell/sqoop.passwd −−queryselect∗fromtborderwheresubstring(createtime,1,10)′{yesterday}’ and $CONDITIONS –delete-target-dir –target-dir /nginx/logs/tb_order/daystr${yesterday} –fields-terminated-by ‘\t’ -m 1
echo “step2采集的程序运行结束”
echo “step3开始运行ETL” #模拟ETL的过程将采集的新增的数据移动到表的目录下 HADOOP_HOME/export/server/hadoop-2.6.0-cdh5.14.0 #先判断结果是否存在如果已经存在先删除再移动 HADOOPHOME/bin/hdfsdfs−test−e/user/hive/warehouse/tborder/daystrHADOOP_HOME/bin/hdfs dfs -test -e /user/hive/warehouse/tb_order/daystrHADOOPHOME/bin/hdfsdfs−test−e/user/hive/warehouse/tborder/daystr{yesterday} if [ $? -eq 0 ] then #存在 HADOOPHOME/bin/hdfsdfs−rm−r/user/hive/warehouse/tborder/daystrHADOOP_HOME/bin/hdfs dfs -rm -r /user/hive/warehouse/tb_order/daystrHADOOPHOME/bin/hdfsdfs−rm−r/user/hive/warehouse/tborder/daystr{yesterday} HADOOPHOME/bin/hdfsdfs−cp/nginx/logs/tborder/daystrHADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystrHADOOPHOME/bin/hdfsdfs−cp/nginx/logs/tborder/daystr{yesterday} /user/hive/warehouse/tb_order/ else #不存在 HADOOPHOME/bin/hdfsdfs−cp/nginx/logs/tborder/daystrHADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystrHADOOPHOME/bin/hdfsdfs−cp/nginx/logs/tborder/daystr{yesterday} /user/hive/warehouse/tb_order/ fi echo “step3ETL结束” 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
增量统计个数脚本job2
创建脚本
vim /export/data/shell/02.analysis.sh
vim /export/data/shell/02.analysis.sql
12
开发Shell脚本 #!/bin/bash
#step1先获取要采集的数据时间规则如果没有给参数就默认处理昨天的日期如果给了参数就参数对应的日期
if [ $# -ne 0 ]
then#参数个数不为0if [ $# -ne 1 ]thenecho 参数至多只能有一个为处理的日期请重新运行exit 100else#参数个数只有1个就用第一个参数作为处理的日期yesterday$1fi
else#参数个数为0默认处理昨天的日期yesterdaydate -d -1 day %Y-%m-%d
fi
echo step1要处理的日期是${yesterday}echo step2开始运行分析
#step2运行分析程序
HIVE_HOME/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest${yesterday} -f hdfs://node1:8020/user/oozie/shell/02.analysis.sql
echo step2分析的程序运行结束开发SQL文件 create table if not exists default.tb_order(
id string ,
pid string,
userid string,
price double ,
create_time string
)
partitioned by (daystr string)
row format delimited fields terminated by \t
location /user/hive/warehouse/tb_order;
alter table default.tb_order add if not exists partition (daystr${hiveconf:yest});create table if not exists default.tb_order_num_rs(
daystr string,
order_number int
)
row format delimited fields terminated by \t;insert into table default.tb_order_num_rs
select
daystr,
count(id) as order_number
from default.tb_order
where daystr${hiveconf:yest}
group by daystr123456789101112131415161718192021222324252627
增量统计金额脚本job3
创建脚本
vim /export/data/shell/03.analysis.sh
vim /export/data/shell/03.analysis.sql
12
开发Shell脚本 #!/bin/bash#step1先获取要采集的数据时间规则如果没有给参数就默认处理昨天的日期如果给了参数就参数对应的日期
if [ $# -ne 0 ]
then#参数个数不为0if [ $# -ne 1 ]thenecho 参数至多只能有一个为处理的日期请重新运行exit 100else#参数个数只有1个就用第一个参数作为处理的日期yesterday$1fi
else#参数个数为0默认处理昨天的日期yesterdaydate -d -1 day %Y-%m-%d
fi
echo step1要处理的日期是${yesterday}echo step2开始运行分析
#step2运行分析程序
HIVE_HOME/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest${yesterday} -f hdfs://node1:8020/user/oozie/shell/03.analysis.sqlecho step2分析的程序运行结束1234567891011121314151617181920212223242526
开发SQL文件 create table if not exists default.tb_order(
id string ,
pid string,
userid string,
price double ,
create_time string
)
partitioned by (daystr string)
row format delimited fields terminated by \t
location /user/hive/warehouse/tb_order;alter table default.tb_order add if not exists partition (daystr${hiveconf:yest});create table if not exists default.tb_order_price_rs(
daystr string,
order_price double
)
row format delimited fields terminated by \t;insert into table default.tb_order_price_rs
select
daystr,
sum(price) as order_price
from default.tb_order
where daystr${hiveconf:yest}
group by daystr;123456789101112131415161718192021222324252627
增量合并导出脚本job4
创建脚本
vim /export/data/shell/04.export.sh
vim /export/data/shell/04.export.sql
12
开发Shell脚本 #!/bin/bash#step1先获取要采集的数据时间规则如果没有给参数就默认处理昨天的日期如果给了参数就参数对应的日期
if [ $# -ne 0 ]
then#参数个数不为0if [ $# -ne 1 ]thenecho 参数至多只能有一个为处理的日期请重新运行exit 100else#参数个数只有1个就用第一个参数作为处理的日期yesterday$1fi
else#参数个数为0默认处理昨天的日期yesterdaydate -d -1 day %Y-%m-%d
fi
echo step1要处理的日期是${yesterday}echo step2开始运行分析
#step2运行分析程序
HIVE_HOME/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest${yesterday} -f hdfs://node1:8020/user/oozie/shell/04.export.sqlecho step2分析的程序运行结束echo step3开始运行导出的程序
#step2运行增量采集
SQOOP_HOME/export/server/sqoop-1.4.6-cdh5.14.0
$SQOOP_HOME/bin/sqoop export \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password-file hdfs://node1:8020/user/oozie/shell/sqoop.passwd \
--table tb_order_rs \
--hcatalog-database default \
--hcatalog-table tb_order_rs \
--input-fields-terminated-by \t \
--update-key daystr \
--update-mode allowinsert \
-m 1echo step3导出的程序运行结束 12345678910111213141516171819202122232425262728293031323334353637383940414243
开发SQL文件 create table if not exists default.tb_order_rs(
daystr string,
order_number int,
order_price double)
row format delimited fields terminated by \t;insert into table default.tb_order_rs
select
a.daystr,
a.order_number,
b.order_price
from default.tb_order_num_rs a join default.tb_order_price_rs b on a.daystr b.daystr
where a.daystr${hiveconf:yest};1234567891011121314
3.自动化调度实现
上传
cp /export/data/sqoop.passwd /export/data/shell/
hdfs dfs -put /export/data/shell /user/oozie/
12
在MySQL中导入最新数据
use db_order;
insert into tb_order values(o00013,p00009,u00001,121,2021-05-17 00:01:01);
insert into tb_order values(o00014,p00010,u00002,122,2021-05-17 10:01:02);
insert into tb_order values(o00015,p00011,u00003,123,2021-05-17 11:01:03);
insert into tb_order values(o00016,p00012,u00004,124,2021-05-17 23:01:04);
12345