flash素材网站有哪些,定制家居软件app哪个好,百度竞价官网,济南物流公司网站建设Flume 功能 Flume主要作用#xff0c;就是实时读取服务器本地磁盘数据#xff0c;将数据写入到 HDFS。
Flume是 Cloudera提供的高可用#xff0c;高可靠性#xff0c;分布式的海量日志采集、聚合和传输的系统工具。
Flume 架构
Flume组成架构如下图所示#xff1a;
A…Flume 功能 Flume主要作用就是实时读取服务器本地磁盘数据将数据写入到 HDFS。
Flume是 Cloudera提供的高可用高可靠性分布式的海量日志采集、聚合和传输的系统工具。
Flume 架构
Flume组成架构如下图所示
Agent
每个 Agent 代表着一个 JVM 进程它以事件的方式将数据从源头送至目的地。 Agent 由 3 个部分组成Source、Channel、Sink。
SourceSource是负责接收数据到Flume Agent 的组件Source 组件可以处理各种类型、各种格式的日志数据。SinkSink不断轮询Channel中的事件并且批量移除他们并将这些数据批量写入到存储或者索引系统或被发送到另一个Flume Agent。ChannelChannel是位于Source 和Sink 之间的缓冲区。因此Channel允许 Source和 Sink 有不同的运行速率。Channel 是线程安全的 可以同时处理几个 Source 的写操作和多个 Sink 的读操作。 Flume自带两种 ChannelMemory Channel 和File Channel。 Memory Channel 是一个内存队列Source将事件写入其尾部Sink从其头部读取事件。 Memory Channel 将源写入的事件存储在堆上。由于它将所有数据存储在内存中因此提供了高吞吐量。 它最适合那些不担心数据丢失的流。 它不适合涉及数据丢失的数据流。 File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。EventFlume 传输数据的基本单元数据以 Event的形式将数据从源头送至目的地。Event由Header 和 Body 组成Header用来存放event 的属性为 K-V结构Body 用于存放数据为字节数组结构。
安装 flume
话不多说直接开始安装 flume。
前往 http://archive.apache.org/dist/flume/ 选择1.9.0将apache-flume-1.9.0-bin.tar.gz使用 sftp上传到linux的/opt/software目录下 解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
[loganhadoop101 hadoop]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/创建软链接
cd /opt/module
ln -snf flume-1.9.0/ flume删除jar 包否则会报错
cd /opt/module/flume
rm lib/guava-11.0.2.jar注意删除guava一定要配置 Hadoop 环境变量否则会报错
Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Listsat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 1 more修改log4j.properties配置文件
[loganhadoop101 flume]$ vim conf/log4j.properties
# 注意是修改内容
flume.log.dir/opt/module/flume/logs验证 flume
[loganhadoop101 flume]$ /opt/module/flume/bin/flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9Flume日志采集
配置解析
需要采集的日志文件分布在hadoop101hadoop102两台日志服务器故需要在hadoop101hadoop102上配置日志采集 Flume。Flume需要采集日志文件内容并对日志格式JSON进行校验然后将校验通过的日志发送到 Kafka。 此处选择TailDirSource和 KafkaChannel。
TailDirSource优势支持断点续传、多目录。KafkaChannel优势省去了 Sink提高了效率。日志采集 Flume关键配置
Flume日志采集配置
创建Flume 配置文件
[loganhadoop101 flume]$ pwd
/opt/module/flume
[loganhadoop101 flume]$ mkdir job
[loganhadoop101 flume]$ vim job/file_to_kafka.conf
#定义组件
a1.sources r1
a1.channels c1#配置source
a1.sources.r1.type TAILDIR
a1.sources.r1.filegroups f1
a1.sources.r1.filegroups.f1 /opt/module/applog/log/app.*
a1.sources.r1.positionFile /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors i1
a1.sources.r1.interceptors.i1.type com.logan.gmall.flume.interceptor.ETLInterceptor$Builder#配置channel
a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers hadoop101:9092
a1.channels.c1.kafka.topic topic_log
a1.channels.c1.parseAsFlumeEvent false#组装
a1.sources.r1.channels c1编写拦截器 创建Maven工程flume-interceptor创建包com.logan.gmall.flume.interceptor在pom.xml文件中添加如下配置 dependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/versionscopeprovided/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.62/version/dependency
/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion2.4/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginpluginartifactIdmaven-assembly-plugin/artifactIdconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins
/build在com.logan.gmall.flume.utils包下创建JSONUtil类 package com.logan.gmall.flume.utils;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;public class JSONUtil {/*** 通过异常判定是否是JSON字符串*/public static boolean isJSONValidate(String input){try {JSON.parseObject(input);return true;} catch (JSONException e) {return false;}}
}在com.logan.gmall.flume.interceptor包下创建ETLInterceptor类 package com.logan.gmall.flume.interceptor;import com.logan.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;public class ETLInterceptor implements Interceptor {Overridepublic void initialize() {}Overridepublic Event intercept(Event event) {//1、获取body当中的数据并转成字符串byte[] body event.getBody();String log new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json是返回当前event不是返回nullif (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}Overridepublic ListEvent intercept(ListEvent list) {IteratorEvent iterator list.iterator();while (iterator.hasNext()){Event next iterator.next();if(intercept(next)null){iterator.remove();}}return list;}public static class Builder implements Interceptor.Builder{Overridepublic Interceptor build() {return new ETLInterceptor();}Overridepublic void configure(Context context) {}}Overridepublic void close() {}
}打包 将打包文件放到hadoop101 的/opt/moduie/flume/lib文件夹下
采集测试
启动Zookeeper、Kafka集群启动 hadoop101上的日志采集Flume
[loganhadoop101 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.loggerinfo,console启动一个Kafka的Console-Consumer
[loganhadoop101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic topic_log生成模拟数据
[loganhadoop101 module]$ mkdir -p /opt/module/applog/log/
[loganhadoop101 module]$ vim /opt/module/applog/log/app.2023-12-02.log
{common:{ar:500000,ba:iPhone,ch:Appstore,is_new:1,md:iPhone Xs,mid:mid_552171,os:iOS 13.3.1,uid:919,vc:v2.1.134},displays:[{display_type:activity,item:1,item_type:activity_id,order:1,pos_id:5},{display_type:activity,item:2,item_type:activity_id,order:2,pos_id:5},{display_type:query,item:19,item_type:sku_id,order:3,pos_id:4},{display_type:query,item:3,item_type:sku_id,order:4,pos_id:2},{display_type:query,item:5,item_type:sku_id,order:5,pos_id:2},{display_type:promotion,item:19,item_type:sku_id,order:6,pos_id:4},{display_type:query,item:14,item_type:sku_id,order:7,pos_id:2},{display_type:query,item:9,item_type:sku_id,order:8,pos_id:2},{display_type:promotion,item:35,item_type:sku_id,order:9,pos_id:1}],page:{during_time:9853,page_id:home},ts:1672512476000}
{actions:[{action_id:favor_add,item:9,item_type:sku_id,ts:1672512480386},{action_id:get_coupon,item:2,item_type:coupon_id,ts:1672512483772}],common:{ar:500000,ba:iPhone,ch:Appstore,is_new:1,md:iPhone Xs,mid:mid_552171,os:iOS 13.3.1,uid:919,vc:v2.1.134},displays:[{display_type:promotion,item:19,item_type:sku_id,order:1,pos_id:4},{display_type:promotion,item:14,item_type:sku_id,order:2,pos_id:5},{display_type:query,item:21,item_type:sku_id,order:3,pos_id:1},{display_type:query,item:11,item_type:sku_id,order:4,pos_id:2},{display_type:promotion,item:28,item_type:sku_id,order:5,pos_id:1}],page:{during_time:10158,item:9,item_type:sku_id,last_page_id:home,page_id:good_detail,source_type:promotion},ts:1672512477000}观察kafka是否有消费到数据