网站开发的基本知识,门户网站产品设计方案,自己电脑做网站服务器违法吗,网站怎样做才能有点击率第 1 章#xff1a;数据仓库概念
数据仓库#xff0c;是为企业指定决策#xff0c;提供数据支持的#xff0c;可以帮助企业#xff0c;改进业务流程、提高产品质量等。 数据仓库的输入数据通常包括#xff1a;业务数据、用户行为数据和爬虫数据等。 业务数据#xff1a…第 1 章数据仓库概念
数据仓库是为企业指定决策提供数据支持的可以帮助企业改进业务流程、提高产品质量等。 数据仓库的输入数据通常包括业务数据、用户行为数据和爬虫数据等。 业务数据就是各行业在处理事务过程中产生的数据。比如用户在电商网站中登录、下单、支付等过程中需要和网站后台数据库进行增删改查交互产生的数据就是业务数据。业务数据通常存储在mysql、oracle等数据库中。
用户行为数据用户在使用产品过程中通过埋点收集与客户端产品交互过程中产生的数据并发往日志服务器进行保存。比如页面浏览、点击、停留、评论、点赞、收藏等。用户行为数据通常存储在日志文件中。
爬虫数据通常是通过技术手段获取其它公司网站的数据 数据采集在最左边有两个框表示数据采集阶段。上面的框标有“日常增量数据”表示每日增加的数据使用的工具是Flume。Flume是一个分布式、可靠且可用的服务用于有效地收集、聚合和移动大量日志数据。下面的框标有“全量/历史数据”表示需要处理的全量数据或历史数据这里使用的工具是DataX。DataX是一个在异构数据源之间进行高效数据传输的工具。 DolphinScheduler 任务调度在顶部中间的虚线框代表DolphinScheduler这是一个分布式、去中心化、易扩展的可视化工作流任务调度系统。它被用来调度和协调上述数据采集工具和数据仓库的不同阶段。 数据仓库中间的大框表示数据仓库的四个主要阶段 ODSOperational Data Store操作数据存储这是数据进入数据仓库的第一站通常包含近乎实时的原始数据。 DWDData Warehouse Detail这一层通常用于存储更加详细的事务数据经过一定程度的处理和整合。 DWSData Warehouse Summary数据仓库汇总这里的数据经过进一步的聚合和汇总用于分析和报告。 ADSApplication Data Store应用数据存储经过加工处理的数据以便直接被业务应用系统使用。 数据输出在右侧有三个框分别标识了数据输出可能的三个方向 应用层面数据被直接应用于业务层面比如数据驱动的决策支持系统。 服务层面数据可以被封装成服务供其他系统调用。 报告层面数据被用于生成报告可能是定期的业务报告或者是数据分析报告。
第 2 章项目需求及框架设计
2.1 项目需求分析
一、项目需求 1、用户行为数据采集平台大家 2、业务数据采集平台搭建 3、数据仓库维度建模 4、分析、设备、会员、商品、地区、活动等电商核心主题统计的报表指标近100个 5、采用即席查询工具随时进行指标分析 6、对集群性能进行监控发生异常需要报警 7、元数据管理 8、质量监控 9、权限管理
2.2 项目框架
2.2.2 系统数据流程设计 这张图是一个数据处理和分析平台的系统架构图。从这张图中我们可以看到以下几个主要部分 Web 应用层这一层有两个主要组成部分一个是 Nginx 作为反向代理服务器另一个是 SpringBoot这表明这是基于 Java 构建的 Web 应用程序。 数据采集层这一层包括日志文件、Flume 和 Kafka。这些组件通常用于数据的收集、聚合和移动。 数据处理和存储层包含 Hadoop 和 MySQL这表明系统既有大数据处理能力也有传统的关系型数据库管理。 监控报警使用 Zabbix 和 Grafana 进行系统监控和数据可视化。 数据分析和BI工具包括 Superset 和 DatAX这些工具可以用于数据分析和业务智能报告。 权限管理和任务调度有 Apache Ranger 和 DolphinScheduler用于数据安全和工作流管理。 其他工具和组件例如 ZooKeeper用于分布式系统的协调、Atlas用于数据治理以及一些脚本工具如 Python Shell。 图形表示底部的图形代表了数据可视化的示例包括地图和柱状图。
第 3 章用户行为日志
3.1 目标数据
要收集和分析的数据主要包括页面数据、事件数据、曝光数据、启动数据和错误数据。
3.1.1 页面
页面数据主要记录一个页面的用户访问情况包括访问事件、停留事件、页面路径等信息。
3.1.2 事件
事件数据主要记录应用内一个具体操作行为包括操作类型、操作对象、操作对象描述等信息。
3.1.3 曝光
曝光数据主要记录页面所曝光的内容包括曝光对象曝光类型等信息。
3.1.4 启动
启动数据记录应用的启动信息。
3.1.5 错误
错误数据记录应用使用。
3.2 数据埋点
3.2.1 主流埋点方式
目前主流的埋点方式有代码埋点前后端、可视化埋点、全埋点三种。 代码埋点是通过调用埋点sdk函数在需要埋点的业务逻辑功能位置调用接口上报埋点数据。例如我们对页面中的某个按钮埋点后当这个按钮被点击时可以在这个按钮对用的onclick函数里面调用sdk提供的数据发送接口来发送数据。 可视化埋点只需要研发人员集成采集sdk不需要写埋点代码业务人员就可以通过访问分析平台的“圈选”功能来“圈”出需要对用户行为进行捕捉的控件并对该事件进行命名。圈选完毕后这些配置会同步到各个用户的终端上由采集sdk按照圈选的配置自动进行用户行为数据的采集和发送。 全埋点是通过在产品中嵌入sdk前端自动采集页面上的全部用户行为事件上报埋点数据相当于做了一个统一的埋点。然后再通过界面配置哪些数据需要再系统里面进行分析。
3.2.2 埋点数据上报时机
埋点数据上报时包括两种方式。 方式一在离开该页面时上传在这个页面产生的所有数据页面、事件、曝光、错误等。优点批处理减少了服务器接收数据压力。缺点不是特别及时。 方式二每个事件、动作、错误等产生后立即发送。优点响应及时。缺点对服务器接收数据压力比较大。 本次项目采用方式一埋点。
3.2.3 埋点数据日志结构
我们的日志结构大概可分为两类一是普通页面埋点日志二是启动日志。 普通页面日志结构如下每条日志包含了当前页面的页面信息所有事件动作、所有曝光信息以及错误信息。除此之外还包含了一系列公共信息包括设备信息地理位置应用信息等即下边的common字段。 1、普通页面埋点日志格式
{common: { -- 公共信息ar: 230000, -- 地区编码ba: iPhone, -- 手机品牌ch: Appstore, -- 渠道is_new: 1,--是否首日使用首次使用的当日该字段值为1过了24:00该字段置为0。md: iPhone 8, -- 手机型号mid: YXfhjAYH6As2z9Iq, -- 设备idos: iOS 13.2.9, -- 操作系统uid: 485, -- 会员idvc: v2.1.134 -- app版本号},
actions: [ --动作(事件) {action_id: favor_add, --动作iditem: 3, --目标iditem_type: sku_id, --目标类型ts: 1585744376605 --动作时间戳}],displays: [{displayType: query, -- 曝光类型item: 3, -- 曝光对象iditem_type: sku_id, -- 曝光对象类型order: 1, --出现顺序pos_id: 2 --曝光位置},{displayType: promotion,item: 6,item_type: sku_id,order: 2, pos_id: 1},{displayType: promotion,item: 9,item_type: sku_id,order: 3, pos_id: 3},{displayType: recommend,item: 6,item_type: sku_id,order: 4, pos_id: 2},{displayType: query ,item: 6,item_type: sku_id,order: 5, pos_id: 1}],page: { --页面信息during_time: 7648, -- 持续时间毫秒item: 3, -- 目标iditem_type: sku_id, -- 目标类型last_page_id: login, -- 上页类型page_id: good_detail, -- 页面IDsourceType: promotion -- 来源类型},
err:{ --错误
error_code: 1234, --错误码msg: *********** --错误信息
},ts: 1585744374423 --跳入时间戳
}
2、启动日志格式 启动日志结构相对简单主要包含公共信息启动信息和错误信息。
{common: {ar: 370000,ba: Honor,ch: wandoujia,is_new: 1,md: Honor 20s,mid: eQF5boERMJFOujcp,os: Android 11.0,uid: 76,vc: v2.1.134},start: { entry: icon, --icon手机图标 notice 通知 install 安装后启动loading_time: 18803, --启动加载时间open_ad_id: 7, --广告页IDopen_ad_ms: 3449, -- 广告总共播放时间open_ad_skip_ms: 1989 -- 用户跳过广告时点},
err:{ --错误
error_code: 1234, --错误码msg: *********** --错误信息
},ts: 1585744304000
}
3.3 服务器和jdk准备
3.3.1 服务器准备
按照之前分配按照hadoop102、103、104三台主机。
3.3.2 编写集群分发脚本xsync
1、xsync集群分发脚本 1需求循环复制文件到所有节点的相同目录下 2需求分析 1rsync命令原始拷贝
rsync -av /opt/module roothadoop103:/opt/2期望脚本 xsync要同步的文件名称 3说明 在/home/atguigu/bin这个目录下存放的脚本atguigu用户可以在系统任何地方直接执行。 3脚本实现 1在用的家目录/home/atguigu下创建bin文件夹
[atguiguhadoop102 ~]$ mkdir bin2在/home/atguigu/bin目录下创建xsync文件以便全局调用
[atguiguhadoop102 ~]$ cd /home/atguigu/bin
[atguiguhadoop102 ~]$ vim xsync
在该文件中编写如下代码
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
thenecho Not Enough Arguement!exit;
fi
#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
doecho $host #3. 遍历所有目录挨个发送for file in $do#4 判断文件是否存在if [ -e $file ]then#5. 获取父目录pdir$(cd -P $(dirname $file); pwd)#6. 获取当前文件的名称fname$(basename $file)ssh $host mkdir -p $pdirrsync -av $pdir/$fname $host:$pdirelseecho $file does not exists!fidone
done
3修改脚本xsync具有执行权限
[atguiguhadoop102 bin]$ chmod x xsync4测试脚本
atguiguhadoop102 bin]$ xsync xsync3.3.4 ssh无密登录配置
说明这里面只配置hadoop102、103到其它主机的无密登录因为102配置的是namenode103配置的是resourcemanager都要求对其它节点无密访问。 1、hadoop102上生成公钥和私钥
[atguiguhadoop102 .ssh]$ ssh-keygen -t rsa然后敲三个回车就会生成两个文件id_rsa、id_rsa.pub。 2、将102公钥拷贝到免密登录的目标机器上
[atguiguhadoop102 .ssh]$ ssh-copy-id hadoop102
[atguiguhadoop102 .ssh]$ ssh-copy-id hadoop103
[atguiguhadoop102 .ssh]$ ssh-copy-id hadoop104
3、103上生成公钥和私钥
[atguiguhadoop103 .ssh]$ ssh-keygen -t rsa然后敲三个回车就会生成两个文件id_rsa、id_rsa.pub。 4、将103公钥拷贝到要免密登录的目标机器上
[atguiguhadoop103 .ssh]$ ssh-copy-id hadoop102
[atguiguhadoop103 .ssh]$ ssh-copy-id hadoop103
[atguiguhadoop103 .ssh]$ ssh-copy-id hadoop104
3.3.4 jdk准备
1、卸载现有jdk3台节点
[atguiguhadoop102 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps[atguiguhadoop103 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps[atguiguhadoop104 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps
1rpm -qa查询所有已经安装的软件包 2grep -i过滤时不区分大小写 3xargs -nl表示一次获取上次执行结果的一个值 4rpm -e --nodeps卸载软件 2、用xshell工具将jdk导入到102的/opt/software文件夹下面
3、在linux系统下的opt目录中查看软件包是否导入成功
[atguiguhadoop102 software]# ls /opt/software/看到如下结果
jdk-8u212-linux-x64.tar.gz4、解压jdk到/opt/module目录下
[atguiguhadoop102 software]# tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/5、配置jdk环境遍历 1新建/etc/profile.d/my_env.sh文件
[atguiguhadoop102 module]# sudo vim /etc/profile.d/my_env.sh添加如下内容然后保存:wq退出
#JAVA_HOME
export JAVA_HOME/opt/module/jdk1.8.0_212
export PATH$PATH:$JAVA_HOME/bin
2让环境变量生效
[atguiguhadoop102 software]$ source /etc/profile.d/my_env.sh6、测试jdk是否安装成功
[atguiguhadoop102 module]# java -version如果能看到一下结果、则java正常安装
java version 1.8.0_2127、分发jdk
[atguiguhadoop102 module]$ xsync /opt/module/jdk1.8.0_212/8、分发环境变量配置文件
[atguiguhadoop102 module]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh9、分别在103、104上执行source
[atguiguhadoop103 module]$ source /etc/profile.d/my_env.sh
[atguiguhadoop104 module]$ source /etc/profile.d/my_env.sh
3.3.6 环境变量配置说明
linux的环境变量可在多个文件中配置如/etc/profile/etc/profile.d/*.sh/.bashrc/.bash_profile等下面说明上述几个文件之间的关系和区别。 bash的运行模式可分为login shell和non-login shell。 例如我们通过终端输入用户名、密码登录系统之后得到就是一个login shell。而当我们执行以下命令ssh hadoop103 command在hadoop103执行command的就是一个non-login shell。 登录shell和非登录 shell区别
这两种shell的主要区别在于它们启动时会加载不同的配置文件login shell启动时会加载/etc/profile/.bash_profile/.bashrc。non-login shell启动时会加载~/.bashrc。 而在加载~/.bashrc或/etc/profile时都会执行如下代码片段。
因此无论是login shell还是non-login shell启动时都会加载/etc/profile.d/*.sh中的环境变量。
3.4 模拟数据
3.4.1 使用说明
1、将application.yml、gmall2020-mock-log-2021-10-10.jar、path.json、logback.xml上传到hadoop102的/opt/module/applog目录下 1创建applog路径
[atguiguhadoop102 module]$ mkdir /opt/module/applog2上传文件到/opt/module/applog目录 2、配置文件 1application.yml文件 可以根据需求生成对应日期的用户行为日志。
[atguiguhadoop102 applog]$ vim application.yml修改如下内容。
# 外部配置打开
# 外部配置打开
logging.config: ./logback.xml
#业务日期 注意并不是Linux系统生成日志的日期而是生成数据中的时间
mock.date: 2020-06-14#模拟数据发送模式
#mock.type: http
#mock.type: kafka
mock.type: log#http模式下发送的地址
mock.url: http://hdp1/applog#kafka模式下发送的地址
mock:kafka-server: hdp1:9092,hdp2:9092,hdp3:9092kafka-topic: ODS_BASE_LOG#启动次数
mock.startup.count: 200
#设备最大值
mock.max.mid: 500000
#会员最大值
mock.max.uid: 100
#商品最大值
mock.max.sku-id: 35
#页面平均访问时间
mock.page.during-time-ms: 20000
#错误概率 百分比
mock.error.rate: 3
#每条日志发送延迟 ms
mock.log.sleep: 10
#商品详情来源 用户查询商品推广智能推荐, 促销活动
mock.detail.source-type-rate: 40:25:15:20
#领取购物券概率
mock.if_get_coupon_rate: 75
#购物券最大id
mock.max.coupon-id: 3
#搜索关键词
mock.search.keyword: 图书,小米,iphone11,电视,口红,ps5,苹果手机,小米盒子logging.config: ./logback.xml指定日志配置文件的路径为./logback.xml用于配置应用程序的日志输出方式和级别。mock.date: 2020-06-14指定模拟数据生成时使用的业务日期这个日期将用于生成数据中的时间信息。mock.type: log指定模拟数据发送模式可以是http、kafka或log。在这里模拟数据将以日志方式输出。mock.url: http://hdp1/applog在HTTP模式下指定数据发送的目标地址为http://hdp1/applog。mock.kafka-server: hdp1:9092,hdp2:9092,hdp3:9092和mock.kafka-topic: ODS_BASE_LOG在Kafka模式下指定Kafka服务器地址和Kafka主题。mock.startup.count: 200指定启动次数可能用于控制生成的模拟数据的数量。mock.max.mid: 500000、mock.max.uid: 100、mock.max.sku-id: 35分别指定设备、会员和商品的最大值可能用于生成随机数据。mock.page.during-time-ms: 20000指定页面平均访问时间可能用于模拟用户在页面上停留的时间。mock.error.rate: 3指定错误概率百分比可能用于模拟错误事件的发生。mock.log.sleep: 10指定每条日志发送的延迟时间以毫秒为单位。mock.detail.source-type-rate: 40:25:15:20指定商品详情来源的比例包括用户查询、商品推广、智能推荐和促销活动。mock.if_get_coupon_rate: 75指定领取购物券的概率百分比。mock.max.coupon-id: 3指定购物券的最大ID。mock.search.keyword: 图书,小米,iphone11,电视,口红,ps5,苹果手机,小米盒子指定搜索关键词可能用于模拟用户的搜索行为。2path.json该文件用来配置访问路径 根据需求可以灵活配置用户点击路径。
[{path:[home,good_list,good_detail,cart,trade,payment],rate:20 },{path:[home,search,good_list,good_detail,login,good_detail,cart,trade,payment],rate:40 },{path:[home,mine,orders_unpaid,trade,payment],rate:10 },{path:[home,mine,orders_unpaid,good_detail,good_spec,comment,trade,payment],rate:5 },{path:[home,mine,orders_unpaid,good_detail,good_spec,comment,home],rate:5 },{path:[home,good_detail],rate:10 },{path:[home ],rate:10 }
]
描述了用户在一个应用或网站中的不同路径和用户对这些路径的访问频率。每个路径都是一个由字符串组成的数组表示用户在应用中的一系列操作步骤而与每个路径相关的rate值表示了用户对该路径的访问频率或重要性。3logback配置文件 可配置日志生成路径修改内容如下。
?xml version1.0 encodingUTF-8?
configurationproperty nameLOG_HOME value/opt/module/applog/log /appender nameconsole classch.qos.logback.core.ConsoleAppenderencoderpattern%msg%n/pattern/encoder/appenderappender namerollingFile classch.qos.logback.core.rolling.RollingFileAppenderrollingPolicy classch.qos.logback.core.rolling.TimeBasedRollingPolicyfileNamePattern${LOG_HOME}/app.%d{yyyy-MM-dd}.log/fileNamePattern/rollingPolicyencoderpattern%msg%n/pattern/encoder/appender!-- 将某一个包下日志单独打印日志 --logger namecom.atgugu.gmall2020.mock.log.util.LogUtillevelINFO additivityfalseappender-ref refrollingFile /appender-ref refconsole //loggerroot levelerror appender-ref refconsole //root
/configuration这段代码是一个XML格式的Logback配置文件用于配置日志记录行为和输出目标。Logback是Java的一个流行的日志框架它允许你定义日志记录规则和目标。以下是这段代码的主要作用定义日志文件的存储路径通过property元素定义了一个名为LOG_HOME的属性用于存储日志文件的路径路径为/opt/module/applog/log。配置两个日志输出目标console定义了一个输出到控制台的日志目标。
rollingFile定义了一个按时间滚动的文件日志目标日志文件名会根据时间戳进行命名存储在${LOG_HOME}目录下文件名格式为app.{日期}.log。
针对特定的包设置日志级别通过logger元素将包com.atgugu.gmall2020.mock.log.util.LogUtil的日志级别设置为INFO并指定只向rollingFile和console这两个目标输出日志而不向根级别的日志输出。配置根日志级别通过root元素将根级别的日志级别设置为ERROR并指定只向console目标输出日志。这意味着除了特定包下的日志会输出INFO级别的信息到文件和控制台之外其他日志消息将只在发生错误时输出到控制台。3、生成日志 1进入/opt/module/applog路径执行以下命令
[atguiguhadoop102 applog]$ java -jar gmall2020-mock-log-2021-10-10.jar2在/opt/module/applog/log目录下查看生成日志
[atguiguhadoop102 log]$ ll3.4.2 集群日志生成脚本
在hadoop102的/home/atguigu目录下创建bin目录这样脚本可以在服务器的任何目录执行
[atguiguhadoop102 ~]$ echo $PATH
/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/home/atguigu/.local/bin:/home/atguigu/bin
1、在/home/atguigu/bin目录下创建脚本lg.sh
[atguiguhadoop102 bin]$ vim lg.sh2、在脚本中编写如下内容
#!/bin/bash
for i in hadoop102 hadoop103; doecho $i ssh $i cd /opt/module/applog/; java -jar gmall2020-mock-log-2021-10-10.jar /dev/null 21
done 在远程服务器上启动一个 Java 程序使其在后台运行同时将其输出丢弃以便在不影响脚本执行的情况下生成模拟的日志数据。3、修改脚本执行权限
[atguiguhadoop102 bin]$ chmod ux lg.sh4、将jar包及配置文件上传到hadoop103的/opt/module/applog/路径 5、启动脚本
[atguiguhadoop102 module]$ lg.sh 6、分别在hadoop102、103的/opt/module/applog/log目录上查看生成的数据
[atguiguhadoop102 logs]$ ls
app.2020-06-14.log
[atguiguhadoop103 logs]$ ls
app.2020-06-14.log
第 4 章数据采集模块
4.1 数据通道
用户行为日志数据通道 Flume的角色
这个架构中Flume被部署在不同的服务器上hadoop102、hadoop103、hadoop104它的作用是收集服务器上的日志文件。 在hadoop102和hadoop103服务器上Flume配置被设定为收集名称格式为app-yyyy-MM-dd.log的文件。这里的yyyy-MM-dd很可能是日志文件中的日期格式。 Kafka的集成
Flume将这些日志文件收集后被配置为将数据推送到Kafka中。Kafka是一个分布式流处理平台通常用于处理大量的数据流。 Kafka中有一个名为topic_log的主题topicFlume将日志数据发布到这个主题。 数据流向HDFS
另一个Flume实例在hadoop104上被用来从Kafka的topic_log主题中拉取数据。 这些数据随后被写入到HDFS中的一个特定路径/origin_data/gmall/log/topic_log/并且似乎是按照日期进行分区的例如路径中包含2020-06-14这样的日期格式。 备注
图底部的备注说明了一个关键的操作细节日志数据在每个一天结束时即凌晨不会立即进行收集。
4.2 环境准备
4.2.1 集群所有进程查看脚本
1、在/home/atguigu/bin目录下创建脚本xcall.sh
[atguiguhadoop102 bin]$ vim xcall.sh2、在脚本中编写如下内容
#! /bin/bashfor i in hadoop102 hadoop103 hadoop104
doecho --------- $i ----------ssh $i $*
done
3、修改脚本执行权限
[atguiguhadoop102 bin]$ chmod 777 xcall.sh4、启动脚本
[atguiguhadoop102 bin]$ xcall.sh jps4.2.2 hadoop安装
1、安装步骤 参考之前hadoop文章 2、项目经验 1项目经验之hdfs存储多目录 1生成环境服务器磁盘情况
2在hdfs-site.xml文件中配置多目录 hdfs的datanode节点保存数据的路径由dfs.datanode.data.dir参数决定其默认值为file://${hadoop.tmp.dir}/dfs/data若服务器有多个磁盘必须对该参数进行修改。如服务器磁盘如上图所示则该参数应修改为如下的值。
propertynamedfs.datanode.data.dir/name
valuefile:///dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4/value
/property
注意每台服务器挂载的磁盘不一样所以每个节点的多目录配置可以不一致。单独配置即可。 2项目经验之集群数据均衡 1节点间数据均衡 开启数据均衡命令。
start-balancer.sh -threshold 10对于参数10代表的是集群中各个节点的磁盘空间利用率相差不超过10%可根据实际情况进行调整。 停止数据均衡命令。
stop-balancer.sh2磁盘间数据均衡 生成均衡计划
hdfs diskbalancer -plan hadoop103执行均衡计划
hdfs diskbalancer -execute hadoop103.plan.json查看当前均衡任务的执行情况
hdfs diskbalancer -query hadoop103取消均衡任务
hdfs diskbalancer -cancel hadoop103.plan.json3项目经验之hadoop参数调优 1hdfs参数调优hdfs-site.xml
The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes.
NameNode有一个工作线程池用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。
对于大集群或者有大量客户端的集群来说通常需要增大参数dfs.namenode.handler.count的默认值10。
propertynamedfs.namenode.handler.count/namevalue10/value
/property [atguiguhadoop102 ~]$ python
Python 2.7.5 (default, Apr 11 2018, 07:36:10)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type help, copyright, credits or license for more information.import mathprint int(20*math.log(8))
41quit()
2yarn参数调优yarn-site.xml 情景描述总共7台机器每天几亿条数据数据源-flume-kafka-hdfs-hive 面临问题数据统计主要用hivesql没有数据倾斜小文件已经做了合并处理开启的jvm重用而且io没有阻塞内存用了不到50%。但是还是跑的非常慢而且数据量洪峰过来时整个集群都会宕机。 解决方法 内存利用率不够。这个一般是yarn的2个配置造成的单个任务可以申请的最大内存大小和hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。 a、yarn.nodemanager.resource.memory-mb 表示该节点上yarn可使用的物理内存总量默认是8192MB,注意如果你的节点内存资源不够8GB则需要调减小这个值而yarn不会智能的探测节点的物理内存总量。 b、yarn.scheduler.maximum-allocation-mb 单个任务可申请的最多物理内存量默认是8192MB
4.2.3 zookeeper安装
1、安装步骤 参考之前zookeeper文章 2、zk集群启动停止脚本 1在hadoop102的/home/atguigu/bin目录下创建脚本
[atguiguhadoop102 bin]$ vim zk.sh在脚本中编写如下
#!/bin/bashcase $1 in
start){for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 启动 ------------ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh startdone
};;
stop){for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 停止 ------------ ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh stopdone
};;
status){for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 状态 ------------ ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh statusdone
};;
esac脚本的工作方式基于传入的参数$1它有三个主要的操作模式start, stop, 和 status。启动模式 (start):当脚本使用start参数运行时它会在三台服务器hadoop102, hadoop103, hadoop104上启动Zookeeper服务。
对于每台服务器它使用ssh远程连接到服务器并执行zkServer.sh start命令来启动Zookeeper服务。
在启动每个服务器的Zookeeper服务之前会打印一条消息表明正在启动哪台服务器上的Zookeeper。
停止模式 (stop):使用stop参数时脚本会在上述相同的三台服务器上停止Zookeeper服务。
它通过ssh远程连接到每台服务器并执行zkServer.sh stop命令来停止服务。
停止每个服务器的Zookeeper服务之前同样会打印一条消息来表明正在停止哪台服务器上的服务。
状态检查模式 (status):当使用status参数时脚本会检查并显示每台服务器上Zookeeper服务的状态。
它通过ssh连接到每台服务器并执行zkServer.sh status命令。
在检查每个服务器的状态之前会打印一条消息来指示正在检查哪台服务器上的Zookeeper服务状态。2增加脚本执行权限
[atguiguhadoop102 bin]$ chmod 777 zk.sh3zookeeper集群启动脚本
[atguiguhadoop102 module]$ zk.sh start4zookeeper集群停止脚本
[atguiguhadoop102 module]$ zk.sh stop4.2.4 kafka安装
1、安装步骤 参考之前文章 2、kafka集群启动停止脚本 1在/home/atguigu/bin目录下创建脚本kf.sh
[atguiguhadoop102 bin]$ vim kf.sh在脚本中填写如下内容。
#! /bin/bashcase $1 in
start){for i in hadoop102 hadoop103 hadoop104doecho --------启动 $i Kafka-------ssh $i /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.propertiesdone
};;
stop){for i in hadoop102 hadoop103 hadoop104doecho --------停止 $i Kafka-------ssh $i /opt/module/kafka/bin/kafka-server-stop.sh stopdone
};;
esac这段代码是一个Bash脚本用于在多个服务器上启动或停止Kafka服务。2增加脚本执行权限
[atguiguhadoop102 bin]$ chmod 777 kf.sh3kf集群启动脚本
[atguiguhadoop102 module]$ kf.sh start4kf集群停止脚本
[atguiguhadoop102 module]$ kf.sh stop3、kafka常用命令 1查看kafka topic列表
[atguiguhadoop102 kafka]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list2创建kafka topic 进入到/opt/module/kafka/目录下创建日志主题
[atguiguhadoop102 kafka]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 1 --partitions 1 --topic topic_log3删除kafka topic
[atguiguhadoop102 kafka]$ kafka-topics.sh --delete --bootstrap-server hadoop102:9092 --topic topic_log4kafka生产消息
[atguiguhadoop102 kafka]$ kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
hello world
atguigu atguigu
5kafka消费消息
[atguiguhadoop102 kafka]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first–from-beginning会把主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。 6查看kafka topic详情
[atguiguhadoop102 kafka]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first4.2.5 flume安装
按照采集通道规划需在hadoop102、103、104三台节点分别部署一个flume。 1、按照步骤 参考之前原文 2、分发flume
[atguiguhadoop102 ~]$ xsync /opt/module/flume/3、项目经验 堆内存调整 flume堆内存通常设置为4G或更高配置方式如下 修改/opt/module/flume/conf/flume-env.sh文件配置如下参数
export JAVA_OPTS-Xms4096m -Xmx4096m -Dcom.sun.management.jmxremote-Xms表示堆内存最小尺寸初始分配-Xmx表示堆内存最大允许的尺寸按需分配。
4.3 日志采集flume
4.3.1 日志采集flume配置概述
按照规划需要采集的用户行为日志文件分布在102103两台日志服务器故需要在102103两台节点配置日志采集flume。日志采集flume需要采集日志文件内容并对日志格式JSON进行校验然后将校验通过的日志发送到kafka。 此处可选择taildirsource和kafkachannel并配置日志校验拦截器。 选择taildirsource和kafkachannel的原因如下 1、taildirsource taildirsource相比execsource、spoolingdirectorysource的优势 taildirsource断点续传、多目录。flume以前需要自己自定义source记录每次读取文件位置实现断电续传。 execsource可用实时搜集数据但是在flume不运行或者shell命令出错的情况下数据将会丢失。 spoolingdirectorysource监控目录支持断电续传。 2、kafka channel 采用kafka channel省去了sink提高了效率。 日志采集flume关键配置如下
4.3.2 日志采集flume配置实操
1、创建flume配置文件 在hadoop102节点的flume的job目录下创建file_to_kafka.conf
[atguiguhadoop104 flume]$ mkdir job
[atguiguhadoop104 flume]$ vim job/file_to_kafka.conf
2、配置文件内容如下
#为各组件命名
a1.sources r1
a1.channels c1#描述source
a1.sources.r1.type TAILDIR
a1.sources.r1.filegroups f1
a1.sources.r1.filegroups.f1 /opt/module/applog/log/app.*
a1.sources.r1.positionFile /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors i1
a1.sources.r1.interceptors.i1.type com.atguigu.flume.interceptor.ETLInterceptor$Builder#描述channel
a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic topic_log
a1.channels.c1.parseAsFlumeEvent false#绑定source和channel以及sink和channel的关系
a1.sources.r1.channels c1配置了 Flume 以从指定文件路径读取日志数据使用 TAILDIR source通过一个拦截器处理然后将数据传输到 Kafka使用 Kafka channel3、编写flume拦截器 1创建maven工程flume-interceptor 2创建包com.atguigu.flume.interceptor 3在pom.xml文件中添加如下配置
dependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/versionscopeprovided/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.62/version/dependency
/dependenciesbuildpluginspluginartifactIdmaven-compiler-plugin/artifactIdversion2.3.2/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginpluginartifactIdmaven-assembly-plugin/artifactIdconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins
/buildorg.apache.flume:flume-ng-core:1.9.0: 这是 Apache Flume 的核心库用于数据采集、聚合和移动。版本是 1.9.0。scopeprovided/scope 表示这个依赖在运行时会被提供通常是由运行环境如一个应用服务器提供。
com.alibaba:fastjson:1.2.62: 这是一个由阿里巴巴提供的 JSON 处理库用于解析和生成 JSON 数据。版本是 1.2.62。
maven-compiler-plugin: 这个插件用于编译 Java 代码。它被配置为使用 Java 1.8 版本进行编译即源代码和目标字节码都遵循 Java 1.8 的标准。
maven-assembly-plugin: 这个插件用于创建一个包含所有依赖的单一可执行 JAR 文件通常称为 fat jar 或 uber jar。descriptorRefjar-with-dependencies/descriptorRef 指定了一个预定义的描述符告诉插件将项目的所有依赖项一起打包进 JAR 文件中。这个插件在 package 阶段执行这意味着当运行 mvn package 命令时它会被触发。
这段代码是一个 Maven 配置用于定义项目的依赖、编译标准以及打包方式。这使得项目能够被正确地编译成 Java 1.8 兼容的代码并且生成一个包含所有必需依赖的单一 JAR 文件方便部署和运行。4在com.atguigu.flume.interceptor包下创建jsonutils类
package com.atguigu.flume.interceptor;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;public class JSONUtils {public static boolean isJSONValidate(String log){try {JSON.parse(log);return true;}catch (JSONException e){return false;}}
}提供一种简单的方式来检查一个字符串是否符合JSON格式的标准。5在com.atguigu.flume.interceptor包下创建etlinterceptor类
package com.atguigu.flume.interceptor;import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;public class ETLInterceptor implements Interceptor {Overridepublic void initialize() {}Overridepublic Event intercept(Event event) {byte[] body event.getBody();String log new String(body, StandardCharsets.UTF_8);if (JSONUtils.isJSONValidate(log)) {return event;} else {return null;}}Overridepublic ListEvent intercept(ListEvent list) {IteratorEvent iterator list.iterator();while (iterator.hasNext()){Event next iterator.next();if(intercept(next)null){iterator.remove();}}return list;}public static class Builder implements Interceptor.Builder{Overridepublic Interceptor build() {return new ETLInterceptor();}Overridepublic void configure(Context context) {}}Overridepublic void close() {}
}确保通过Flume传输的所有事件都是有效的JSON格式。这对于后续的数据处理非常重要特别是在将数据发送到需要JSON格式输入的系统如大数据处理平台时。如果事件不是有效的JSON格式该拦截器会将其从传输流中移除从而保证数据的质量和一致性。6打包
7需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面
4.3.3 日志采集flume测试
1、启动zookeeper、kafka集群 2、启动hadoop102的日志采集flume
[atguiguhadoop102 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.loggerinfo,console3、启动一个kafka的console-consumer
[atguiguhadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log4、生成模拟数据
[atguiguhadoop102 ~]$ lg.sh 5、观察kafka消费组是否能消费到数据
4.3.4 日志采集flume启停脚本
1、分发日志采集flume配置文件和拦截器 若上述测试通过需将hadoop102节点的flume的配置文件和拦截器jar包向另一台日志服务器发送一份。
[atguiguhadoop102 flume]$ scp -r job hadoop103:/opt/module/flume/
[atguiguhadoop102 flume]$ scp lib/flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar hadoop103:/opt/module/flume/lib/
2、方便起见此处编写一个日志采集flume进程的启停脚本 1在hadoop102节点的/home/atguigu/bin目录下创建脚本f1.sh
[atguiguhadoop102 bin]$ vim f1.sh在脚本中填写如下内容。
#!/bin/bashcase $1 in
start){for i in hadoop102 hadoop103doecho --------启动 $i 采集flume-------ssh $i nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf /dev/null 21 done
};;
stop){for i in hadoop102 hadoop103doecho --------停止 $i 采集flume-------ssh $i ps -ef | grep file_to_kafka | grep -v grep |awk {print \$2} | xargs -n1 kill -9 done};;
esac
2增加脚本执行权限
[atguiguhadoop102 bin]$ chmod 777 f1.sh3f1启动
[atguiguhadoop102 module]$ f1.sh start4f2停止
[atguiguhadoop102 module]$ f1.sh stop4.4 日志消费flume
4.4.1 日志消费flume配置概述
按照规划该flume需将kafka中topic_log的数据发往hdfs。并且对每天产生的用户行为日志进行区分将不同天的数据发往hdfs不同天的路径。 此处选择kafkasource、filechannel、hdfssink。 关键配置如下
4.4.2 日志消费flume配置实操
1、创建flume配置文件 在104节点的flume的job目录下创建kafka_to_hdfs_log.conf
[atguiguhadoop104 flume]$ vim job/kafka_to_hdfs_log.conf 2、配置文件内容如下
## 组件
a1.sourcesr1
a1.channelsc1
a1.sinksk1## source1
a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize 5000
a1.sources.r1.batchDurationMillis 2000
a1.sources.r1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topicstopic_log
a1.sources.r1.interceptors i1
a1.sources.r1.interceptors.i1.type com.atguigu.flume.interceptor.TimeStampInterceptor$Builder## channel1
a1.channels.c1.type file
a1.channels.c1.checkpointDir /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize 2146435071
a1.channels.c1.capacity 1000000
a1.channels.c1.keep-alive 6## sink1
a1.sinks.k1.type hdfs
a1.sinks.k1.hdfs.path /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix log-
a1.sinks.k1.hdfs.round falsea1.sinks.k1.hdfs.rollInterval 10
a1.sinks.k1.hdfs.rollSize 134217728
a1.sinks.k1.hdfs.rollCount 0## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType CompressedStream
a1.sinks.k1.hdfs.codeC gzip## 拼装
a1.sources.r1.channels c1
a1.sinks.k1.channel c1定义了一个从Kafka源接收数据经过File通道最后输出到HDFS的Flume代理配置。
组件定义
a1.sourcesr1定义了一个名为 r1 的源source。
a1.channelsc1定义了一个名为 c1 的通道channel。
a1.sinksk1定义了一个名为 k1 的汇sink。
Source1源配置
a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource指定源类型为Kafka。
a1.sources.r1.batchSize 5000设置批量大小为5000。
a1.sources.r1.batchDurationMillis 2000设置批处理持续时间为2000毫秒。
a1.sources.r1.kafka.bootstrap.servers设置Kafka的服务器地址。
a1.sources.r1.kafka.topicstopic_log设置Kafka的主题为 topic_log。
a1.sources.r1.interceptors i1定义一个拦截器 i1。
a1.sources.r1.interceptors.i1.type com.atguigu.flume.interceptor.TimeStampInterceptor$Builder设置拦截器类型为时间戳拦截器。
Channel1通道配置
a1.channels.c1.type file指定通道类型为文件。
a1.channels.c1.checkpointDir /opt/module/flume/checkpoint/behavior1设置检查点目录。
a1.channels.c1.dataDirs /opt/module/flume/data/behavior1/设置数据目录。
a1.channels.c1.maxFileSize 2146435071设置最大文件大小。
a1.channels.c1.capacity 1000000设置通道容量。
a1.channels.c1.keep-alive 6设置保持活动状态的时间。
Sink1汇配置
a1.sinks.k1.type hdfs指定汇类型为HDFS。
a1.sinks.k1.hdfs.path /origin_data/gmall/log/topic_log/%Y-%m-%d设置HDFS的路径模板。
a1.sinks.k1.hdfs.filePrefix log-设置文件前缀。
a1.sinks.k1.hdfs.round false禁用轮询。
a1.sinks.k1.hdfs.rollInterval 10设置滚动间隔。
a1.sinks.k1.hdfs.rollSize 134217728设置滚动大小。
a1.sinks.k1.hdfs.rollCount 0设置滚动计数。
a1.sinks.k1.hdfs.fileType CompressedStream设置文件类型为压缩流。
a1.sinks.k1.hdfs.codeC gzip设置压缩格式为gzip。
拼装
a1.sources.r1.channels c1将源 r1 连接到通道 c1。
a1.sinks.k1.channel c1将汇 k1 连接到通道 c1。注配置优化 1filechannel优化 通过配置datadirs指向多个路径每个路径对应不同的硬盘增大flume吞吐量。 checkpointdir和backupcheckpointdir也尽量配置在不同硬盘对应的目录中保证checkpoint坏掉后可用快速使用backupcheckpointdir恢复数据。 2hdfs sink优化 1hdfs存入大量小文件有什么影响 元数据层面每个小文件都有一份元数据其中包含文件路径文件名所有者所属组权限创建事件等这些信息都保存在namenode内存中。所以小文件过多会占用namenode服务器大量内存影响namenode性能和使用寿命。 计算层面默认情况下mr会对每个小文件启用一个map任务计算非常影响计算性能。同时也影响磁盘寻址事件。 2hdfs小文件处理 官方默认的这三个参数配置写入hdfs后会产生小文件hdfs.rollinterval、hdfs.rollsize、hdfs.rollcount。 基于以上hdfs.rollinterval3600hdfs.rollsize134217728hdfs.rollcount0几个参数综合作用效果如下 a、文件在达到128M时会滚动生成新文件 b、文件创建超3600秒时会滚动生成新文件 3编写flume拦截器 a、在com.atguigu.flume.interceptor包下创建timestampinterceptor类
package com.atguigu.interceptor;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class TimeStampInterceptor implements Interceptor {private ArrayListEvent events new ArrayList();Overridepublic void initialize() {}Overridepublic Event intercept(Event event) {MapString, String headers event.getHeaders();String log new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject JSONObject.parseObject(log);String ts jsonObject.getString(ts);headers.put(timestamp, ts);return event;}Overridepublic ListEvent intercept(ListEvent list) {events.clear();for (Event event : list) {events.add(intercept(event));}return events;}Overridepublic void close() {}public static class Builder implements Interceptor.Builder {Overridepublic Interceptor build() {return new TimeStampInterceptor();}Overridepublic void configure(Context context) {}}
}
Apache Flume 是一个分布式、可靠且可用的系统用于有效地收集、聚合和移动大量日志数据。拦截器Interceptor在 Flume 中用于在事件流经过时对这些事件进行处理或修改。具体到这段代码的功能我会逐步解释类定义 (TimeStampInterceptor): 这是一个公共类它实现了 Flume 的 Interceptor 接口使其能夠作为一个拦截器使用。成员变量 (events): 定义了一个 ArrayListEvent 类型的成员变量 events用于存储事件。initialize 方法: 这是 Interceptor 接口的一部分用于初始化拦截器。在这个实现中该方法是空的表示拦截器初始化时不执行任何操作。intercept(Event event) 方法: 这个方法接收一个 Event 对象并对其进行处理。它首先从事件中获取头部信息然后将事件体假设为 JSON 格式的日志转换为字符串。接着它解析这个 JSON 字符串提取名为 ts 的字段并将其值添加到事件的头部信息中键为 timestamp。此方法的目的是从每个日志事件中提取时间戳并将其作为头部信息加入到事件中。intercept(ListEvent list) 方法: 这个方法处理一个事件列表。它首先清空 events 成员变量然后遍历列表中的每个事件使用 intercept(Event event) 方法处理它们并将处理后的事件添加到 events 列表中。最后返回这个处理后的事件列表。close 方法: 这也是 Interceptor 接口的一部分用于执行拦截器关闭前的清理工作。这个方法在这个类中是空的表示没有特殊的清理操作需要执行。内部静态类 (Builder): 这是一个实现了 Interceptor.Builder 接口的内部静态类。build 方法返回一个新的 TimeStampInterceptor 实例。configure 方法用于配置拦截器但在这里它是空的意味着没有特定的配置需要设置。总的来说这个 TimeStampInterceptor 类的目的是在 Flume 事件流中为每个事件添加一个基于其内容的时间戳头部信息。这在处理日志数据时很有用特别是当需要根据时间戳对事件进行排序或筛选时。b、重新打包
c、需要先把打包好的放到hadoop104的/opt/module/flume/lib文件夹下面
4.4.3 日志消费flume测试
1、启动zookeeper、kafka集群 2、启动日志采集flume
[atguiguhadoop102 ~]$ f1.sh start3、启动104的日志消费flume
[atguiguhadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.loggerinfo,console4、生成模拟数据
[atguiguhadoop102 ~]$ lg.sh 5、观察hdfs是否出现数据
4.4.4 日志消费flume启停脚本
若上述测试通过为方便此处创建一个flume的启停脚本 1、在hadoop102节点的/home/atguigu/bin目录下创建脚本f2.sh
[atguiguhadoop102 bin]$ vim f2.sh在脚本中填写如下内容。
#!/bin/bashcase $1 in
start)echo --------启动 hadoop104 日志数据flume-------ssh hadoop104 nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf /dev/null 21
;;
stop)echo --------停止 hadoop104 日志数据flume-------ssh hadoop104 ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk {print \$2} | xargs -n1 kill
;;
esac
2、增加脚本执行权限
[atguiguhadoop102 bin]$ chmod 777 f2.sh3、f2启动
[atguiguhadoop102 module]$ f2.sh start4、f2停止
[atguiguhadoop102 module]$ f2.sh stop4.5 采集通道启动/停止脚本
1、在/home/atguigu/bin目录下创建脚本cluster.sh
[atguiguhadoop102 bin]$ vim cluster.sh在脚本中填写如下内容
#!/bin/bashcase $1 in
start){echo 启动 集群 #启动 Zookeeper集群zk.sh start#启动 Hadoop集群hdp.sh start#启动 Kafka采集集群kf.sh start#启动 Flume采集集群f1.sh start#启动 Flume消费集群f2.sh start};;
stop){echo 停止 集群 #停止 Flume消费集群f2.sh stop#停止 Flume采集集群f1.sh stop#停止 Kafka采集集群kf.sh stop#停止 Hadoop集群hdp.sh stop#循环直至 Kafka 集群进程全部停止kafka_count$(jpsall | grep Kafka | wc -l)while [ $kafka_count -gt 0 ]dosleep 1kafka_count$(jpsall | grep Kafka | wc -l)echo 当前未停止的 Kafka 进程数为 $kafka_countdone#停止 Zookeeper集群zk.sh stop};;
esac
2、增加脚本执行权限
[atguiguhadoop102 bin]$ chmod ux cluster.sh3、cluster集群启动脚本
[atguiguhadoop102 module]$ cluster.sh start4、cluster集群停止脚本
[atguiguhadoop102 module]$ cluster.sh stop