如果查询网站内页的收录情况,福建优化seo,怎么做页面跳转,青岛网站建设公司排名目录 Part.01 关于HDP Part.02 核心组件原理 Part.03 资源规划 Part.04 基础环境配置 Part.05 Yum源配置 Part.06 安装OracleJDK Part.07 安装MySQL Part.08 部署Ambari集群 Part.09 安装OpenLDAP Part.10 创建集群 Part.11 安装Kerberos Part.12 安装HDFS Part.13 安装Ranger …目录 Part.01 关于HDP Part.02 核心组件原理 Part.03 资源规划 Part.04 基础环境配置 Part.05 Yum源配置 Part.06 安装OracleJDK Part.07 安装MySQL Part.08 部署Ambari集群 Part.09 安装OpenLDAP Part.10 创建集群 Part.11 安装Kerberos Part.12 安装HDFS Part.13 安装Ranger Part.14 安装YARNMR Part.15 安装HIVE Part.16 安装HBase Part.17 安装Spark2 Part.18 安装Flink Part.19 安装Kafka Part.20 安装Flume
二、核心组件原理 1.分布式协调ZooKeeper
(1)应用场景
使用分布式系统就无法避免对节点管理的问题需要实时感知节点的状态、对节点进行统一管理等而由于这些问题处理起来可能相对麻烦和提高了系统的复杂性ZooKeeper作为一个能够通用解决这些问题的中间件就应运而生了。 应用场景
统一配置管理比如现在有A.ymlB.ymlC.yml配置文件里面有一些公共的配置但是如果后期对这些公共的配置进行修改就需要修改每一个文件还要重启服务器。比较麻烦现在将这些公共配置信息放到ZK中修改ZK的信息会通知ABC配置文件。统一命名服务这个的理解其实跟域名一样在某一个节点下放一些ip地址我现在只需要访问ZK的一个Znode节点就可以获取这些ip地址。同一集群管理分布式集群中状态的监控和管理使用Zookeeper来存储。分布式协调比如把多个服务提供者的信息放在某个节点上服务的消费者就可以通过ZK调用。 服务节点动态上下线如何提供者宕机就会删除在ZK节点然后ZK通知消费者软负载均衡
(2)实现原理
zookeeper文件系统通知机制 ① 文件系统 ZooKeeper的数据结构跟Unix文件系统非常类似可以看做是一颗树每个节点叫做Znode。每一个Znode只能存1MB数据。数据只是配置信息。每一个节点可以通过路径来标识结构图如下 1每个子目录项如NameService都被称作为znode这个znode是被它所在的路径唯一标识如Server1这个znode的标识为/NameService/Server1。 2znode可以有子节点目录并且每个znode可以存储数据注意EPHEMERAL临时的类型的目录节点不能有子节点目录。 3znode是有版本的version每个znode中存储的数据可以有多个版本也就是一个访问路径中可以存储多份数据version号自动增加。 4znode的类型
Persistent节点一旦被创建便不会意外丢失即使服务器全部重启也依然存在。每个Persist节点即可包含数据也可包含子节点。Ephemeral节点在创建它的客户端与服务器间的Session结束时自动被删除。服务器重启会导致Session结束因此Ephemeral类型的znode此时也会自动删除。Non-sequence节点多个客户端同时创建同一Non-sequence节点时只有一个可创建成功其它匀失败。并且创建出的节点名称与创建时指定的节点名完全一样。Sequence节点创建出的节点名在指定的名称之后带有10位10进制数的序号。多个客户端创建同一名称的节点时都能创建成功只是序号不同。 5znode可以被监控包括这个目录节点中存储的数据的修改子节点目录的变化等一旦变化可以通知设置监控的客户端这个是Zookeeper的核心特性Zookeeper的很多功能都是基于这个特性实现的。 6ZXID每次对Zookeeper的状态的改变都会产生一个zxidZooKeeper Transaction Idzxid是全局有序的如果zxid1小于zxid2则zxid1在zxid2之前发生。 ② 通知机制监听机制 ZooKeeper可以提供分布式数据的发布/订阅功能依赖的就是Wather监听机制。 客户端可以向服务端注册Wather监听服务端的指定事件触发之后就会向客户端发送一个事件通知。 1客户端向服务端注册Wather监听 2保存Wather对象到客户端本地的WatherManager中 3服务端Wather事件触发后客户端收到服务端通知从WatherManager(watcher管理器)中取出对应Wather对象执行回调逻辑 主要监听内容 1监听Znode节点的数据变化就是哪个节点信息更新了 2监听子节点的增减变化就是增加了一个Znode或者删除了一个Znode 几个特性 1一次性一旦一个Wather触发之后Zookeeper就会将它从存储中移除 2客户端串行客户端的Wather回调处理是串行同步的过程不要因为一个Wather的逻辑阻塞整个客户端 3轻量Wather通知的单位是WathedEvent只包含通知状态、事件类型和节点路径不包含具体的事件内容具体的时间内容需要客户端主动去重新获取数据
(3)角色
角色描述领导者leader负责进行投票的发起和决议更新系统状态学习者learner跟随者follower接受客户端请求并想客户端返回结果在选主过程中参与投票观察者observer接受客户端连接将写请求转发给leader但observer不参加投票过程只同步leader的状态observer的目的是为了扩展系统提高读取速度客户端client请求发起方 
为了保证事务的顺序一致性zookeeper采用了递增的事务id号zxid来标识事务。所有的提议proposal都在被提出的时候加上了zxid。实现中zxid是一个64位的数字它高32位是epoch用来标识leader关系是否改变每次一个leader被选出来它都会有一个新的epoch标识当前属于那个leader的统治时期。低32位用于递增计数。
(4)集群
Leader选举算法采用了Paxos协议 Paxos核心思想当多数Server写成功则任务数据写成功如果有3个Server则两个写成功即可如果有4或5个Server则三个写成功即可。 Server数目一般为奇数3、5、7如果有3个Server则最多允许1个Server挂掉如果有4个Server则同样最多允许1个Server挂掉。由此我们看出3台服务器和4台服务器的的容灾能力是一样的所以为了节省服务器资源一般我们采用奇数个数作为服务器部署个数。
(5)Leader选举
假设现在ZooKeeper集群有五台服务器它们myid分别是服务器1、2、3、4、5 zookeeper集群初始化阶段服务器myid1-5依次启动开始zookeeper选举Leader 服务器1myid1启动当前只有一台服务器无法完成Leader选举 服务器2myid2启动此时两台服务器能够相互通讯开始进入Leader选举阶段
每个服务器发出一个投票 服务器1和服务器2都将自己作为Leader服务器进行投票投票的基本元素包括服务器的myid和ZXID我们以myidZXID形式表示。初始阶段服务器1和服务器2都会投给自己即服务器1的投票为1,0服务器2的投票为2,0然后各自将这个投票发给集群中的其他所有机器。接受来自各个服务器的投票 每个服务器都会接受来自其他服务器的投票。同时服务器会校验投票的有效性是否本轮投票、是否来自LOOKING状态的服务器。处理投票 收到其他服务器的投票会将被人的投票跟自己的投票PKPK规则如下 zookeeper集群中只有超过了半数以上的服务器启动此集群才能正常工作 在集群正常工作之前myid小的服务器会给myid大的服务器投票这种投票会一直持续到集群开始正常工作即选出了leader。 选出leader之后之前的服务器节点的状态要由looking转为following从节点以后的服务器不管是不是新加进来的都会变成follower从节点。 服务器1的投票是1,0它收到投票是2,0两者zxid都是0因为收到的myid2大于自己的myid1所以它更新自己的投票为2,0然后重新将投票发出去。对于服务器2呢即不再需要更新自己的投票把上一次的投票信息发出即可。统计投票 每次投票后服务器会统计所有投票判断是否有过半的机器接受到相同的投票信息。服务器2收到两票少于3n/21,n为总服务器所以继续保持LOOKING状态 服务器3myid3启动继续进入Leader选举阶段。跟前面流程一致服务器1和2先投自己一票因为服务器3的myid最大所以大家把票改投给它。此时服务器为3票大于等于n/21,所以服务器3当选为Leader。服务器12更改状态为FOLLOWING服务器3更改状态为LEADING 服务器4myid4启动发起一次选举。 此时服务器123已经不是LOOKING状态不会更改选票信息。选票信息结果服务器3为3票服务器4为1票。服务器4并更改状态为FOLLOWING 服务器5myid5启动发起一次选举。 同理服务器也是把票投给服务器3服务器5并更改状态为FOLLOWING 投票结束服务器3当选为Leader
2.分布式存储HDFS
(1)组件功能 ① NameNodeHDFS的管理节点负责客户端的请求响应存放元数据 ② SecondaryNameNode辅助节点当编辑日志和映像文件需要合并时在同一个NameNode上执行合并操作会耗费大量内存和计算能力因此合并操作一般会在另一台机器上执行即SecondaryNamenode ③ DataNodeHDFS的工作节点受客户端和NameNode的调度检索并存放数据块。没有NameNodeDataNode将无法使用
(2)关于nn、2nn和nn HA
① NameNode NameNode主要是用来保存HDFS的元数据信息比如命名空间信息块信息等。当它运行的时候这些信息是存在内存中的但是这些信息也可以持久化到磁盘上。 Fsimage是在NameNode启动时对整个文件系统的快照 edit logs是在NameNode启动后对文件系统的改动序列 只有在NameNode重启时edit logs才会合并到fsimage文件中从而得到一个文件系统的最新快照。 但是在生产集群中NameNode是很少重启的这也意味着当NameNode运行了很长时间后edit logs文件会变得很大。 ② 2nn/Secondary NameNode SecondaryNameNode就是来帮助解决上述问题的它的职责是合并NameNode的edit logs到fsimage文件中。 首先它定时到NameNode去获取edit logs并更新到fsimage上Secondary NameNode自己的fsimage。 一旦它有了新的fsimage文件它将其拷贝回NameNode中。NameNode在下次重启时会使用这个新的fsimage文件从而减少重启的时间。 所以2nn并不是nn的备份而是给nn提供了一个检查点。 ③ nn HA 在hadoop2.0之前namenode只有一个存在单点问题虽然hadoop1.0有secondarynamenodecheckpointnodebuckcupnode这些但是单点问题依然存在在hadoop2.0引入了HA机制。 hadoop2.0的HA机制有两个namenode一个是active namenode状态是active另外一个是standby namenode状态是standby。 两者的状态是可以切换的但不能同时两个都是active状态最多只有1个是active状态。只有active namenode提供对外的服务standby namenode是不对外服务的。 active namenode和standby namenode之间通过NFS或者JNjournalnodeQJM方式来同步数据。
(3)HDFS块大小
HDFS中的文件在物理上是分块存储Block块的大小可以通过配置参数 ( dfs.blocksize来规定默认大小在Hadoop2.x/3.x版本中是128M1.x版本中是64M。 如果使用的是机械硬盘,可以设置块大小为128M如果使用的是固态硬盘,则可以设置为256M 默认是这样的,数据的寻址时间要约等于传输时间的1%即最佳状态,如果硬盘的传输速度较快,在寻址时间变化不大的条件下,我们传输的数据块可以大一点,所以设置为256M 如果设置块太小,会增加我们的寻址时间程序会一直找数据块的位置 如果块设置的太大从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。导致程序在处理这块数据时会非常慢 总结HDFS块的大小设置主要取决于磁盘传输速率。
(4)HDFS写数据流程
①客户端通过Distributed FileSystem模块向NameNode请求上传文件NameNode检查目标文件是否已存在父目录是否存在。 ②NameNode检查是否已存在文件、检查权限。若通过检查直接先将操作写入EditLog并返回输出流对象。 注WALwrite ahead log先写Log再写内存因为EditLog记录的是最新的HDFS客户端执行所有的写操作。如果后续真实写操作失败了由于在真实写操作之前操作就被写入EditLog中了故EditLog中仍会有记录 ③客户端按128MB的块切分文件并请求第一个Block上传到哪几个DataNode服务器上。 ④NameNode返回3个DataNode节点分别为dn1、dn2、dn3。 ⑤客户端通过FSDataOutputStream模块请求dn1上传数据dn1收到请求会继续调用dn2然后dn2调用dn3将这个通信管道建立完成。 ⑥dn1、dn2、dn3逐级应答客户端。 ⑦客户端开始往dn1上传第一个Block先从磁盘读取数据放到一个本地内存缓存以Packet为单位dn1收到一个Packet就会传给dn2dn2传给dn3dn1每传一个packet会放入一个应答队列等待应答。 ⑧当一个Block传输完成之后客户端再次请求NameNode上传第二个Block的服务器。重复执行3-7步。
(5)HDFS读数据流程
①客户端通过DistributedFileSystem向NameNode请求下载文件NameNode通过查询元数据找到文件块所在的DataNode地址。 ②挑选一台DataNode就近原则然后随机服务器请求读取数据。 ③DataNode开始传输数据给客户端从磁盘里面读取数据输入流以Packet为单位来做校验。 ④客户端以Packet为单位接收先在本地缓存然后写入目标文件。
(6)关于Fsimage和Edits
NameNode被格式化之后将在…/hadoop/hdfs/namenode/current/目录中产生如下文件 ①Fsimage文件HDFS文件系统元数据的一个永久性的检查点其中包含HDFS文件系统的所有目录和文件inode的序列化信息 ②Edits文件存放HDFS文件系统的所有更新操作的路径文件系统客户端执行的所有写操作首先会被记录到Edits文件中 ③seen_txid文件保存的是一个数字就是最后一个edits_的数字 ④每次NameNode启动的时候都会将Fsimage文件读入内存加载Edits里面的更新操作保证内存中的元数据信息是最新的、同步的可以看成NameNode启动的时候就将Fsimage和Edits文件进行了合并
(7)关于JournalNode
从Hadoop2.x版本后HDFS采用了一种全新的元数据共享机制即通过Quorum Journal NodeJournalNode集群或者network File SystemNFS进行数据共享。NFS是操作系统层面的而JournalNode是Hadoop层面的成熟可靠、使用简单方便一般采用JournalNode集群进行元数据共享。 JournalNode集群以及与NameNode之间共享元数据如下图所示。 JournalNode集群可以几乎实时的去NameNode上拉取元数据然后保存元数据到JournalNode集群同时处于standby状态的NameNode也会实时的去JournalNode集群上同步JNS数据通过这种方式就实现了两个NameNode之间的数据同步。 两个NameNode为了数据同步会通过一组称作JournalNodes的独立进程进行相互通信。当Active状态的NameNode元数据有任何修改时会告知大部分的JournalNodes进程。同时Standby状态的NameNode也会读取JNs中的变更信息并且一直监控EditLog事务日志的变化并把变化应用于自己的命名空间。Standby可以确保在集群出错时元数据状态已经完全同步了。 JN1、JN2、JN3等是JournalNode集群的节点QJMQuorom Journal Manager的基本原理是用2N1台JournalNode存储EditLog每次写数据操作有N/21个节点返回成功那么本次写操作才算成功保证数据高可用。当然这个算法所能容忍的是最多有N台机器挂掉如果多于N台挂掉算法就会失效。 ZooKeeper也是作为分布式协调的组件namenode HA用了JournalNode而没有用ZooKeeper原因是Zookeeper不适合存储znode中可以存储的默认最大数据大小为1MB。
(8)关于zkfc
健康检测zkfc会周期性的向它监控的namenode只有namenode才有zkfc进程并且每个namenode各一个发生健康探测命令从而鉴定某个namenode是否处于正常工作状态如果机器宕机心跳失败那么zkfc就会标记它处于不健康的状态。 会话管理如果namenode是健康的zkfc机会保持在zookeeper中保持一个打开的会话如果namenode是active状态的那么zkfc还会在zookeeper中占有一个类型为短暂类型的znode当这个namenode挂掉时这个znode将会被删除然后备用的namenode得到这把锁升级为主的namenode同时标记状态为active,当宕机的namenode,重新启动他会再次注册zookeeper,发现已经有znode了就自动变为standby状态如此往复循环保证高可靠性但是目前仅支持最多配置两个namenode。 master选举如上所述通过在zookeeper中维持一个短暂类型的znode,来实现抢占式的锁机制从而判断哪个namenode为active状态。
(9)安全模式
在安全模式下集群在进行恢复元数据即在合并fsimage和edits log并且接受datanode的心跳信息恢复block的位置信息将集群恢复到上次关机前的状态。
3.资源调度器YARN
(1)RM/AM/NM/Container
在MRv1中JobTracker由资源管理由TaskScheduler模块实现和作业控制由JobTracker中多个模块共同实现两部分组成由于Hadoop对JobTracker赋予的功能过多而造成负载过重。从设计角度上看Hadoop未能够将资源管理相关的功能与应用程序相关的功能分开造成Hadoop难以支持多种计算框架。 YARN的基本设计思想是将JobTracker的两个主要功能即资源管理和作业控制包括作业监控、容错等分拆成两独立的进程资源管理进程与具体应用程序无关它负责整个集群的资源内存、CPU、磁盘等管理而作业控制进程则是直接与应用程序相关的模块且每个作业控制进程只负责管理一个作业。YARN的基本设计思想是将MRv1中的JobTracker拆分成了两个独立的服务一个全局的资源管理器ResourceManager和每个应用程序特有的ApplicationMaster。其中ResourceManager负责整个系统的资源管理和分配而ApplicationMaster负责单个应用程序的管理。这样通过将原有JobTracker中与应用程序相关和无关的模块分开不仅减轻JobTracker负载也使得Hadoop支持更多的计算框架。 ResourceManagerRM RM是一个全局的资源管理器负责整个系统的资源管理和分配。它主要由调度器Scheduler和应用程序管理器Applications ManagerASM。 调度器根据容量、队列等限制条件将系统中的资源分配给各个正在运行的应用程序。需要注意的是调度器是一个“纯调度器”它不再从事任何与具体应用程序相关的工作比如不负责监控或者跟踪应用的执行状态等也不负责重新启动因应用执行失败或者硬件故障而产生的失败任务这些均交由应用程序相关的ApplicationMaster完成调度器仅根据各个应用程序的资源需求进行资源分配。该调度器是一个可插拔的组件用户可根据自己的需要设计新的调度器YARN提供了多种直接可用的调度器比如Fair Scheduler和Capacity Scheduler等。 应用程序管理器负责管理整个系统中所有应用程序包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。 ApplicationMasterAM 用户提交的每个应用程序均包含一个AM主要用来与RM调度器协商以获取资源得到的任务进一步分配给内部的任务与NM通信以启动/停止任务监控所有任务运行状态并在任务运行失败时重新为任务申请资源以重启任务。当前YARN自带了两个AM实现一个是用于演示AM编写方法的实例程序distributedshell它可以申请一定数目的Container以并行运行一个Shell命令或者Shell脚本另一个是运行MapReduce应用程序的MRAppMaster。此外一些其他的计算框架例如Spark也有对应的AM。 NodeManagerNM NM是每个节点上的资源和任务管理器一方面它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态另一方面它接收并处理来自AM的Container启动/停止等各种请求。 Container Container是YARN中的资源抽象它封装了某个节点上的多维度资源如内存、CPU、磁盘、网络等当AM向RM申请资源时RM为AM返回的资源便是用Container表示的。YARN会为每个任务分配一个Container且该任务只能使用该Container中描述的资源。需要注意的是Container不同于MRv1中的slot它是一个动态资源划分单位是根据应用程序的需求动态生成的。
(2)工作流程
当用户向YARN中提交一个应用程序后YARN将分两个阶段运行该应用程序第一个阶段是启动ApplicationMaster。第二个阶段是由ApplicationMaster创建应用程序为它申请资源并监控它的整个运行过程直到运行完成。 ①用户向YARN中提交应用程序其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等 ②ResourceManager为该应用程序分配第一个Container并与对应的NodeManager通信要求它在这个Container中启动应用程序的ApplicationMaster。 ③ApplicationMaster首先向ResourceManager注册这样用户可以直接通过ResourceManager查看应用程序的运行状态然后它将为各个任务申请资源并监控它的运行状态直到运行结束即重复4~7。 ④ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。 ⑤ApplicationMaster申请到资源后便与对应的NodeManager通信要求它启动任务。 ⑥NodeManager为任务设置好运行环境包括环境变量、JAR包、二进制程序等后将任务启动命令写到一个脚本中并通过运行该脚本启动任务。 ⑦各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度以让ApplicationMaster随时掌握各个任务的运行状态从而可以在任务失败时重新启动任务。在应用程序运行过程中用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。 ⑧应用程序运行完成后ApplicationMaster向ResourceManager注销并关闭自己。
(3)其他
①JobHistory YARN提供的一个查看已经完成的任务的历史日志记录的服务可查询每个job运行完以后的历史日志信息比如map个数、reduce个数等。 ②Timeline Serivce Hadoop 2.4.0以后出现的新特性主要是为了监控运行在YARN平台上的所有任务例如MR、Storm、Spark、HBase等JobHistoryServer只能查看mr任务的信息不能查看spark、flink的信息针对这个问题spark、flink都要提供自己的server为了统一yarn提供了timelineserver功能更强大但不是替代jobhistory两者是功能间的互补关系。 Collector将数据写入后端存储 Reader是与Collector分开的独立守护进程专用于通过REST API提供查询 ③Registry DNS 由Yarn服务注册表支持的Yarn DNS服务器可通过其标准DNS在Yarn上查找服务 通过DNS向外提供已有的service-discovery信息将YARN Service registry records转换为DNS记录从而使用户可以通过标准的DNS客户端机制例如DNS SRV记录描述host:port查询YARN Applciation信息
4.分布式计算引擎
(1)MapReduce
一个完整的 MapReduce 程序在分布式运行时有三类实例进程 ①MRAppMaster负责整个程序的过程调度及状态协调 ②MapTask负责Map阶段的整个数据处理流程 ③ReduceTask负责Reduce阶段的整个数据处理流程 MapReduce运算程序一般需要分成2个阶段Map阶段和Reduce阶段 ①Map阶段的并发MapTask完全并行运行互不相干 ②Reduce阶段的并发ReduceTask完全互不相干但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出 ③MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段如果用户的业务逻辑非常复杂那就只能多个MapReduce程序串行运行
(2)Spark
Spark计算框架在处理数据时所有的中间数据都保存在内存中从而减少磁盘读写操作提高框架计算效率。同时Spark还兼容HDFS、Hive可以很好地与Hadoop系统融合从而弥补MapReduce高延迟的性能缺点。
①核心模块
【Spark Core】 Spark由Scala语言开发的Spark Core中提供了Spark最基础与最核心的功能Spark其他的功能如Spark SQLSpark StreamingGraphXMLlib都是在Spark Core的基础上进行扩展的。 SparkCore是Spark的基础底层的最小数据单位是RDD。主要是处理一些离线可以通过结合Spark Streaming来处理实时的数据流、非格式化数据。它与Hadoop的MapReduce的区别就是spark core基于内存计算在速度方面有优势尤其是机器学习的迭代过程。 【Spark SQL】 Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL用户可以使用SQL或者Apache Hive版本的SQL方言HQL来查询数据。 Spark SQL底层的数据处理单位是DataSet。主要是通过执行标准SQL来处理一些离线可以通过结合Spark Streaming来处理实时的数据流、格式化数据。就是Spark生态系统中一个开源的数据仓库组件可以认为是Hive在Spark的实现用来存储历史数据做OLAP、日志分析、数据挖掘、机器学习等等。可以在Spark SQL中执行SQL语句数据既可以来自RDD也可以是HIVE、HDFS、Cassandra等外部数据源还可以是JSON格式的数据。 Hive是将SQL转为MapReduce SparkSQL可以理解成是将SQL解析成RDD优化再执行 【Spark Streaming】 Spark Streaming是Spark平台上针对实时数据进行流式计算的组件提供了丰富的处理数据流的API。 Spark Streaming底层的数据处理单位是DStream。主要是处理流式数据数据一直不停的在向Spark程序发送这里可以结合Spark Core和Spark SQL来处理数据如果来源数据是非结构化的数据那么我们这里就可以结合Spark Core来处理如果数据为结构化的数据那么可以结合Spark SQL来进行处理。 Spark SQL构建在Spark Core之上专门用来处理结构化数据(不仅仅是SQL)。即Spark SQL是Spark Core封装而来的Spark SQL在Spark Core的基础上针对结构化数据处理进行很多优化和改进。
②Spark架构
Driver Program 相当于AppMaster整个应用管理者负责应用中所有Job的调度执行; 运行JVM Process运行程序的MAIN函数必须创建SparkContext上下文对象 一个SparkApplication仅有一个 Executors 相当于一个线程池运行JVM Process其中有很多线程每个线程运行一个Task任务一个Task运行需要1 Core CPU所有可以认为Executor中线程数就等于CPU Core核数 一个Spark Application可以有多个可以设置个数和资源信息 Driver Program是用户编写的数据处理逻辑这个逻辑中包含用户创建的SparkContext。SparkContext是用户逻辑与Spark集群主要的交互接口它会和Cluster Manager交互包括向它申请计算资源等。**Cluster Manager负责集群的资源管理和调度现在支持Standalone、Apache Mesos和Hadoop的YARN。**Worker Node是集群中可以执行计算任务的节点。**Executor是在一个Worker Node上为某应用启动的一个进程该进程负责运行任务并且负责将数据存在内存或者磁盘上。**Task是被送到某个Executor上的计算单元每个应用都有各自独立的Executor计算最终在计算节点的Executor中执行。
③Spark on Yarn
Spark可以跑在很多集群上比如跑在local上跑在Standalone上跑在Apache Mesos上跑在Hadoop YARN上等等。不管Spark跑在什么上面它的代码都是一样的区别只是master的时候不一样。其中Spark on YARN是工作中或生产上用的非常多的一种运行模式。 Spark可以和Yarn整合将Application提交到Yarn上运行Yarn有两种提交任务的方式。 - yarn-client提交任务方式 1.客户端提交一个Application在客户端启动一个Driver进程 2.Driver进程会向RS(ResourceManager)发送请求启动AM(ApplicationMaster) 3.RS收到请求随机选择一台NM(NodeManager)启动AM。这里的NM相当于Standalone中的Worker节点 4.AM启动后会向RS请求一批container资源用于启动Executor 5.RS会找到一批NM返回给AM,用于启动Executor。AM会向NM发送命令启动Executor 6.Executor启动后会反向注册给DriverDriver发送task到Executor,执行情况和结果返回给Driver端 Yarn-client模式适用于测试因为Driver运行在本地Driver会与yarn集群中的Executor进行大量的通信会造成客户机网卡流量的大量增加. - yarn-cluster提交任务方式 1.客户机提交Application应用程序发送请求到RS(ResourceManager),请求启动AM(ApplicationMaster) 2.RS收到请求后随机在一台NM(NodeManager)上启动AM相当于Driver端 3.AM启动AM发送请求到RS请求一批container用于启动Executor 3.RS返回一批NM节点给AM 4.AM连接到NM发送请求到NM启动Executor 5.Executor反向注册到AM所在的节点的DriverDriver发送task到Executor Yarn-Cluster主要用于生产环境中因为Driver运行在Yarn集群中某一台NodeManager中每次提交任务的Driver所在的机器都是随机的不会产生某一台机器网卡流量激增的现象缺点是任务提交后不能看到日志。只能通过yarn查看日志。
④SparkSQL数据抽象
DataFrame是一种以RDD为基础的带有Schema元信息的分布式数据集类似于传统数据库的二维表格。 DataSet是保存了更多的描述信息类型信息的分布式数据集。 与RDD相比保存了更多的描述信息概念上等同于关系型数据库中的二维表。 与DataFrame相比保存了类型信息是强类型的提供了编译时类型检查调用Dataset的方法先会生成逻辑计划然后被spark的优化器进行优化最终生成物理计划然后提交到集群中运行 DataSet包含了DataFrame的功能Spark2.0中两者统一DataFrame表示为DataSet[Row]即DataSet的子集。DataFrame其实就是Dateset[Row]。 RDD、DataFrame、DataSet的区别 RDD[Person]以Person为类型参数但不了解其内部结构 DataFrame提供了详细的结构信息schema列的名称和类型更像是一张表 DataSet[Person]不光有schema信息还有类型信息
(3)Flink
Apache Flink是一个框架和分布式处理引擎用于对无界和有界数据流进行有状态计算。Flink设计为在所有常见的集群环境中运行以内存速度和任何规模执行计算。
①运行模式
local 本地测试Standallone Cluster 独立集群做实时计算不需要hadoopFlink on YarnKubernetes
②有界流和无界流
任何类型的数据都是作为事件流产生的。信用卡交易传感器测量机器日志或网站或移动应用程序上的用户交互所有这些数据都作为流生成。 数据可以作为无界或有界流处理。 无界流有一个开始但没有定义的结束。它们不会在生成时终止并提供数据。必须持续处理无界流即必须在摄取事件后立即处理事件。无法等待所有输入数据到达因为输入是无界的并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序例如事件发生的顺序摄取事件以便能够推断结果完整性。 有界流具有定义的开始和结束。可以在执行任何计算之前通过摄取所有数据来处理有界流。处理有界流不需要有序摄取因为可以始终对有界数据集进行排序。有界流的处理也称为批处理。 Apache Flink擅长处理无界和有界数据集。精确控制时间和状态使Flink的运行时能够在无界流上运行任何类型的应用程序。有界流由算法和数据结构内部处理这些算法和数据结构专门针对固定大小的数据集而设计从而产生出色的性能。
③随处部署应用程序
Apache Flink是一个分布式系统需要计算资源才能执行应用程序。Flink与所有常见的集群资源管理器如Hadoop YARNApache Mesos和Kubernetes集成但也可以设置为作为独立集群运行。 Flink旨在很好地适用于之前列出的每个资源管理器。这是通过特定于资源管理器的部署模式实现的这些模式允许Flink以其惯用的方式与每个资源管理器进行交互。 部署Flink应用程序时Flink会根据应用程序配置的并行性自动识别所需资源并从资源管理器请求它们。如果发生故障Flink会通过请求新资源来替换发生故障的容器。提交或控制应用程序的所有通信都通过REST调用进行。这简化了Flink在许多环境中的集成。
④以任何比例运行应用程序
Flink旨在以任何规模运行有状态流应用程序。应用程序可以并行化为数千个在集群中分布和同时执行的任务。因此应用程序可以利用几乎无限量的CPU主内存磁盘和网络IO。而且Flink可以轻松维护非常大的应用程序状态。其异步和增量检查点算法确保对处理延迟的影响最小同时保证一次性状态一致性。
⑤利用内存中的性能
有状态Flink应用程序针对本地状态访问进行了优化。任务状态始终保留在内存中或者如果状态大小超过可用内存则保存在访问高效的磁盘上数据结构中。因此任务通过访问本地通常是内存中状态来执行所有计算从而产生非常低的处理延迟。Flink通过定期和异步检查本地状态到持久存储来保证在出现故障时的一次状态一致性。
⑥架构
用户通过DataStream API、DataSet API、SQL和Table API编写Flink任务它会生成一个JobGraph。JobGraph是由source、map()、keyBy()/window()/apply()和Sink等算子组成的。当JobGraph提交给Flink集群后能够以Local、Standalone、Yarn和Kubernetes四种模式运行。 ClientFlink Client用于与JobManger建立连接进行Flink任务的提交。Client会将Flink任务组装为一个JobGraph并进行提交。一个JobGraph是一个flink dataflow其中包含了一个Flink程序的JobID、Job名称、配置信息、一组JobVertex等; JobMangerFlink系统协调者负责接收job任务并调度job的多个task执行。同时负责job信息的收集和管理TaskManger TaskManger负责执行计算的Worker同时进行所在节点的资源管理包括内存、cup、网络启动时向JobManger汇报资源信息。 JobManager的功能主要有
将JobGraph转换成Execution Graph最终将Execution Graph拿来运行Scheduler组件负责Task的调度Checkpoint Coordinator组件负责协调整个任务的Checkpoint包括Checkpoint的开始和完成通过Actor System与TaskManager进行通信其它的一些功能例如Recovery Metadata用于进行故障恢复时可以从Metadata里面读取数据。 TaskManager的功能主要有 负责具体任务的执行过程在JobManager申请到资源之后开始启动。TaskManager里面的主要组件有Memory I/O Manager即内存I/O的管理Network Manager用来对网络方面进行管理Actor system用来负责网络的通信 TaskManager被分成很多个TaskSlot每个任务都要运行在一个TaskSlot里面TaskSlot是调度资源里的最小单位。
⑦Flink on yarn
flink yarn client负责与yarn RM进行通信及资源申请 JobManger和TaskManger分别申请Container资源运行各自的进程 JobManger和YARN AM属于同一个Container中从而YARN AM可进行申请Container及调度TaskManger HDFS用于数据的存储如checkpoints、savepoints等数据。 Flink与Yarn的关系与MapReduce和Yarn的关系是一样的。Flink通过Yarn的接口实现了自己的App Master。当在Yarn中部署了FlinkYarn就会用自己的Container来启动Flink的JobManager也就是AppMaster和TaskManager。 启动新的Flink YARN会话时客户端首先检查所请求的资源容器和内存是否可用。之后它将包含Flink和配置的jar上传到HDFS步骤1。 客户端的下一步是请求步骤2YARN容器以启动ApplicationMaster步骤3。由于客户端将配置和jar文件注册为容器的资源因此在该特定机器上运行的YARN的NodeManager将负责准备容器例如下载文件。完成后将启动ApplicationMasterAM。 该JobManager和AM在同一容器中运行。一旦它们成功启动AM就知道JobManager它自己的主机的地址。它正在为TaskManagers生成一个新的Flink配置文件以便它们可以连接到JobManager。该文件也上传到HDFS。此外AM容器还提供Flink的Web界面。YARN代码分配的所有端口都是临时端口。这允许用户并行执行多个Flink YARN会话。 之后AM开始为Flink的TaskManagers分配容器这将从HDFS下载jar文件和修改后的配置。完成这些步骤后即可建立Flink并准备接受作业。
⑧任务执行流程
一个flink程序执行时都会映射为一个Streaming Dataflow 进行处理类似一个DAG图。从Source开始到Sink结束。 flink程序由一个或多个输入流StreamSource经过计算Transformation, 最终输出到一个或多个输出流Stream中Sink。 支持的Sourcekafka、hdfs、本地文件… 支持的Sinkkafka、mysql、hdfs、本地文件… parallel Dataflow并发原理 flink程序天生就支持并行及分部署处理 a、一个Stream支持分为多个Stream分区一个Operate支持分成多个Operate Subtask每个Subtask都执行在不同的线程中。 b、一个Operate并行度等于Operate Subtask个数Stream并行度总等于Operate并行度。 parallel Dataflow示例 Source并行度为2Sink并行度为1。 上图展示了Operate与Stream之间存在的两种模式 a、One-to-one模式Source[1]–Map[1]的数据流模式该模式保持了Source的分区特性及数据处理的有序性。 b、Redistribution模式map[1]–apply()[2]的数据流模式该模式会改变数据流的分区。其与选择的Operate操作有关。 Task Operator Chain flink在分布式环境中会将多个Operate Subtask串在一起作为一个Operate Chain的执行链。每个执行链在TaskManger上独立的线程中执行。多个Operate通过Stream进行连接。每个Operate对应一个task。 下图分别展示单个并发与多个并发的执行原理图。 时间窗口 flink支持基于时间和数据的时间窗口。Flink支持基于多种时间的窗口。 a、基于事件的创建时间 b、基于事件进入Dataflow的时间 c、基于某Operate对事件处理时的本地时间 各种时间所处的位置及含义
⑨checkpoint原理
flink是在Chandy–Lamport算法的基础上实现的一种分布式快照。通过不断的生成分布式Streaming数据流Snapshot实现利用snapshot进行数据流的恢复处理。 checkpoint主要步骤 a、Checkpoint Coordinator向所有source节点trigger Checkpoint b、source节点向下游广播barrier附带在数据流中随DAG流动 c、task收到barrier后异步的执行快照并进行持久化处理 d、sink完成快照后表示本次checkpoint完成。并将所有快照数据进行整合持久化处理。 Barrier a、Stream Barrier是Flink分布式Snapshotting中的核心元素它会对数据流进行记录并插入到数据流中对数据流进行分组并沿着数据流的方向向前推进。 b、每个Barrier会携带一个Snapshot ID属于该Snapshot的记录会被推向该Barrier的前方。Barrier非常轻量不会中断数据流处理。 带有Barrier的数据流图 Strean Aligning 当Operate具有多个数据输入流时需在Snapshot Barrier中进行数据对齐处理。 具体处理过程 a、Operator从一个incoming Stream接收到Snapshot Barrier n然后暂停处理直到其它的incoming Stream的Barrier n否则属于2个Snapshot的记录就混在一起了到达该Operator。 b、接收到Barrier n的Stream被临时搁置来自这些Stream的记录不会被处理而是被放在一个Buffer中 c、一旦最后一个Stream接收到Barrier nOperator会emit所有暂存在Buffer中的记录然后向Checkpoint Coordinator发送Snapshot n d、继续处理来自多个Stream的记录 基于Stream Aligning操作能够实现Exactly Once语义但是也会给流处理应用带来延迟因为为了排列对齐Barrier会暂时缓存一部分Stream的记录到Buffer中。通常以最迟对齐Barrier的一个Stream做为处理Buffer中缓存记录的时刻点。可通过开关选择是否使用Stream Aligning如果关掉则Exactly Once会变成At least once。 State Backend数据持久化方案 flink的State Backend是实现快照持久化的重要功能flink将State Backend抽象成一种插件支持三种State Backend。 a、MemoryStateBackend基于内存实现将数据存储在堆中。数据过大会导致OOM问题不建议生产环境使用默认存储的大小为4M。 b、FsStateBackend将数据持久化到文件系统包括本地hdfsAmazon阿里云通过地址进行指定。 c、RocksDBStateBackendRocksDB是一种嵌入式Key-Value数据库数据实际保存在本地磁盘上。比起FsStateBackend的本地状态存储在内存中RocksDB利用了磁盘空间所以可存储的本地状态更大。从RocksDB中读写数据都需要进行序列化和反序列化读写成本更高。允许增量快照每次快照时只对发生变化的数据增量写到分布式存储上而不是将所有的本地状态都拷贝过去。
⑩Flink的编程模型
Flink提供不同级别的抽象来开发流/批处理应用程序。
⑪Flink on YARN模式
flink on yarn主要有两种运行模式。一种是内存集中管理模式即Session-Cluster模式另一种是内存job管理模式即Per-Job-Cluster模式。 - Session-cluster模式 在Yarn中初始化一个Flink集群开辟指定的资源资源申请到之后资源永远保持不变之后我们提交的Flink Jon都在这个Flink yarn-session中也就是说不管提交多少个job这些job都会共用开始时在yarn中申请的资源。这个Flink集群会常驻在Yarn集群中除非手动停止。 如果资源满了下一个作业就无法提交阻塞只能等到yarn中的其中一个作业执行完成后释放了资源下个作业才会正常提交。 适用场景 适合规模小、执行时间短的作业。 比如适合小的有界流不适合无界流因为无界流的Job 7*24小时运行始终占用资源如果Job多了资源不够用导致Job阻塞 - Per-Job-Cluster模式 先提交job再启动flink集群。一个job对应一个flink集群每个flink集群单独向yarn申请资源因此每个job之间资源不共享、每个job之间互相独立互不影响方便管理一个作业的失败与否并不会影响下一个作业的正常提交和运行。 只有当整个yarn集群资源不足才会造成任务无法提交了。 job执行完成之后对应的集群也会消失。 由于每个job独享一个flink集群每个job独享Dispatcher和ResourceManager按需接受资源申请。 适用场景规模大时间长的任务无界流任务可用。
5.安全框架
(1)LDAP
轻量目录访问协议Lightweight Directory Access Protocol作为目录服务系统实现了Hadoop的集中账号管理 LDAP目录是一种树型结构数据库适用于一次写入多次查询的场景 域名dcDomain Component类似于关系型数据库的中的Database 组织单位ouOrganization Unit类似于Database中的table的集合 用户uidUser ID类似于table中的主键 对象名称cnCommon Name类似于table中的单位数据的名称 LDAP在制作层级的时候可以在people下创建user然后在group下创建公司不同的组然后把uid加到对应的组里面通过LDAP脚本实现 OpenLDAP高可用 当在主服务器上更新数据时该更新通过更新日志记录并将更新复制到从服务器上。当在从服务器上更新数据时该更新请求将重定向给主服务器然后主服务器将更新数据复制到从服务器。 HA共有5种模式
模式特点Syncrepl从(slave)服务器到主(master)服务器以拉的模式同步目录树。当主服务器对某个条目或更多条目修改条目属性时从服务器会把修改的整个条目进行同步而不是单独地同步修改的属性值。Delta-syncrepl当主服务器对目录树上的相关条目进行修改时会产生一条日志信息于是这时候从服务器会通过复制协议将主服务器记录的日志应用到从服务器本地完成数据同步的过程。但每个消费者获取和处理完全改变的对象都执行同步操作。Syncrepl Proxy代理同步它将主服务器隐藏起来而代理主机上边通过syncrepl从主服务器上以拉的方式同步目录树数据当代理主机数据发生改变时代理服务器又以推的方式将数据更新到下属的从LDAP服务器上且从LDAP服务器只有对代理LDAP服务器有读权限。N-Way Mulit-Master主要用于多台主服务器之间进行LDAP目录树信息的同步更好的提供了服务器的冗余性。MirrorMode镜像模式主服务器互相以推的方式实现目录树条目同步最多只允许且两台机器为主服务器。如果要添加更多的节点此时只能增加多台从服务器而不能将添加的节点配置为主服务器。当一台服务器出现故障时另一台服务器立即对外提供验证服务。当异常服务器恢复正常时会自动通过另一个节点所添加或修改的条目信息进行同步并应用在本地。
(2)Kerberos
Kerberos是一种身份认证协议被广泛运用在大数据生态中甚至可以说是大数据身份认证的事实标准。 KDC由AS和TGS组成AS进行身份认证发放TGT(Ticket Granting Tickets)TGT是用来避免多次请求而需要重复认证的凭证TGS发放STST用来访问某个service时的凭证ST相当于告诉service你的身份被KDC认证为合法的一个凭证。 Kerberos核心概念 Principal大致可以认为是Kerberos世界的用户名用于标识身份。principal主要由三部分构成primaryinstance(可选)和realm。
包含instance的principal一般会作为server端的principal如NameNodeHiverServer2Presto Coordinator等不含有instance的principal一般会作为客户端的principal用于身份认证。 Keytab“密码本”。包含了多个principal与密码的文件用户可以利用该文件进行身份认证。 Ticket Cache客户端与KDC交互完成后包含身份认证信息的文件短期有效需要不断renew。 RealmKerberos系统中的一个namespace。不同Kerberos环境可以通过realm进行区分。
(3)Ranger
Ranger Admin用于管理安全策略、用户/组的UI门户并提供Rest Server。 Ranger UserSync定期将Unix系统或LDAP或Active Directory中的用户/组同步到RangerAdmin中。也可用作RangerAdmin的身份验证服务器以使用linux用户/密码登陆到RangerAdmin。 Ranger TagSync将资源分类与访问授权分开只要资源附加相同的标签就可以有一个标签策略应用于多个组件。可以有助于减少Ranger所需的策略数量需要Apache Atlas管理元数据Hive DB/Tables、HDFS路径、Kafka主题和标签/分类等 Agent Plugin插件是嵌入到Hadoop各个组件的轻量级java程序。插件定期从AdminServer拉取策略存储在本地文件中。当用户访问Hadoop组件时插件会拦截请求根据策略进行安全评估并且定期发送数据到审计服务器做记录。
6.SQL查询引擎
(1)HIVE2
Hive是一个SQL解析引擎将SQL语句转译成MR Job然后再Hadoop平台上运行达到快速开发的目的。 Hive中的表是纯逻辑表就只是表的定义等即表的元数据。本质就是Hadoop的目录/文件达到了元数据与数据存储分离的目的 Hive本身不存储数据它完全依赖HDFS和MapReduce。 Hive的内容是读多写少不支持对数据的改写和删除。
① 特点
Hive采用类SQL开发简单容易上手避免了编写MapReduce的工作Hive执行延迟比较高无法胜任实时的工作OLTP大多用于数据分析工作OLAPHive擅长处理大规模的数据
② 系统架构
InterfaceHive提供三个主要的用户接口
CLI是Shell命令行接口提供交互式SQL查询JDBC/ODBC是Hive的Java数据接口实现使远程客户端可以通过Hiveserver2查询数据例如beeline方式WebUI用户可以通过浏览器访问Hive页面查看Hive使用的信息 MetaDataHive将元数据存储在RMDB中如mysql\Derby\Postgresql。元数据包括表结构、表名、列属性、分区信息、权限信息及Location等信息。 MetaStoreHive提供的元数据查询服务通过MetaStore管理、操作元数据。 Hiveserver2基于thrift的跨平台、跨编程语言的Hive查询服务。为Hive客户端提供远程访问、查询服务。 DriverHive的核心是驱动引擎驱动引擎由四部分组成解释器Parser解释器的作用是将HiveSQL语句转换为抽象语法树编译器Compiler编译器是将语法树编译为逻辑执行计划优化器Optimizer优化器是对逻辑执行计划进行优化执行器Execution执行器是调用底层的运行框架执行逻辑执行计划
③ HiveServer2
Hive外部连接提供的是一个shell客户端是直接启动了一个org.apache.hadoop.hive.cli.cliDriver的进程这个进程主要包含了两块内容一个是提供交互的cli另外一个就是Driver驱动引擎这种情况下如果有多个客户端的情况下就需要多个Driver但如果通过HiveServer2连接就可以共享Driver一方面可以简化客户端的设计降低资源损耗另外一方面还能降低对MetaStore的压力减少连接的数量。 在生产环境中使用Hive强烈建议使用HiveServer2来提供服务好处很多
在应用端不用部署Hadoop和Hive客户端相比hive-cli方式HiveServer2不用直接将HDFS和Metastore暴漏给用户有安全认证机制并且支持自定义权限校验有HA机制解决应用端的并发和负载均衡问题JDBC方式可以使用任何语言方便与应用进行数据交互从2.0开始HiveServer2提供了WEB UI。
④ 高可用
MetaStore HA Hive Metastore HA解决方案旨在处理Metastore服务失败。每当部署的Metastore服务关闭时Metastore服务在相当长的时间内都会保持不可用状态直到恢复服务为止。为避免此类停机在HA模式下部署Metastore服务。 Hive Metastore客户端始终使用第一个URI连接Metastore服务器。如果Metastore服务器变得无法访问则客户端从列表中随机选取一个URI并尝试与其连接。 HiveServer2 HA 如果只是使用一台服务来启动hiveserver2那么如果hiveserver2挂掉便不能提供jdbc的支持。hive 支持hiveserver2 HA,用于进行负载均衡和高可用 Hive从0.14开始使用Zookeeper实现了HiveServer2的HA功能Client端可以通过指定一个nameSpace来连接HiveServer2而不是指定某一个host和port。
⑤ 工作流程
a.用户提交查询任务给Driver b.Driver将查询任务给编译器并请求返回一个plan c.编译器向MetaStore获取必要的元数据 d.MetaStore返回编译器请求的元数据 e.编译器生成多个stage的DAG作为plan并将plan交给Driver。每个stage代表着一个map/reduce任务或一个元数据操作或一个HDFS操作。这相当于做了一套job的流水线。在Map stage中plan包含了Map Operator Tree在Reduce stage中plan包含了Reduce Operator Tree f.Driver上交plan给执行器去执行获取元数据信息提交给JobTracker或者ResourceManager去处理该任务任务会直接读取HDFS中文件进行相应的操作 g.获取执行的结果 h.取得并返回执行结果。
⑥ 原理
MapReduce实现基本SQL操作的原理
SQL转换为MapReduce实现Join 需求将两个表的数据使用uid连接起来。
SELECT name, orderid FROM User u JOIN Order o ON u.uido.uid;a.两种表进行了join命令而join on所用的uid正是两个map任务中的key。同时将表中的其他数据连同各个表的标识id放到value中 b.通过这种key的选择方式就可以用shuffle将相同key的不同表的数据聚合在一起 c.通过shuffle操作后相同key的数据被聚合到一起接下来使用reduce把不同表的数据做整合就得到了最后的查询结果。
SQL转换为MapReduce实现Group By 需求对一张表的数据进行分组将同一个rank和level的数据分为一组。
SELECT rank,level,count(*) as value FROM score GROUP BY rank,level;a.sql进行了group by对rank和level进行分组也就是说将这两列组合起来当作key得到像A, 1的格式 b.value记录该数据的出现次数 c.使用shuffle操作将相同key的数据分到一起实现了分组效果 d.使用reduce操作将数据进行聚合累加得到最后结果。 SQL转换为MR的具体流程 a.在编译器中用Antlr语言识别工具对HQL的输入进行词法和语法解析将HQL语句转换成抽象语法树AST Tree的形式 b.遍历抽象语法树转化成QueryBlock查询块。因为AST结构复杂不方便直接翻译成MR算法程序。其中QueryBlock是一条最基本的SQL语法组成单元包括输入源、计算过程、和输入三个部分 c.遍历QueryBlock生成OperatorTree操作树OperatorTree由很多逻辑操作符组成如TableScanOperator、SelectOperator、FilterOperator、JoinOperator、GroupByOperator和ReduceSinkOperator等。这些逻辑操作符可在Map、Reduce阶段完成某一特定操作 d.Hive驱动模块中的逻辑优化器对OperatorTree进行优化变换OperatorTree的形式合并多余的操作符减少MR任务数、以及Shuffle阶段的数据量 e.遍历优化后的OperatorTree根据OperatorTree中的逻辑操作符生成需要执行的MR任务 f.启动Hive驱动模块中的物理优化器对生成的MR任务进行优化生成最终的MR任务执行计划 g.最后有Hive驱动模块中的执行器对最终的MR任务执行输出。 Hive驱动模块中的执行器执行最终的MR任务时Hive本身不会生成MR算法程序。它通过一个表示“Job执行计划”的XML文件来驱动内置的、原生的Mapper和Reducer模块。Hive通过和JobTracker通信来初始化MR任务而不需直接部署在JobTracker所在管理节点上执行。通常在大型集群中会有专门的网关机来部署Hive工具这些网关机的作用主要是远程操作和管理节点上的JobTracker通信来执行任务。Hive要处理的数据文件常存储在HDFS上HDFS由名称节点NameNode来管理。
⑦ Tez计算引擎
Hive引擎包括默认MR、Tez、Spark不更换引擎hive默认的是MR。 MR性能差资源消耗大如Hive作业之间的数据不是直接流动的而是借助HDFS作为共享数据存储系统即一个作业将处理好的数据写入HDFS下一个作业再从HDFS重新读取数据进行处理。很明显更高效的方式是第一个作业直接将数据传递给下游作业。 用Hive直接编写MR程序假设有四个有依赖关系的MR作业上图中绿色是Reduce Task云状表示写屏蔽需要将中间结果持久化写到HDFS。 Tez可以将多个有依赖的作业转换为一个作业这样只需写一次HDFS且中间节点较少从而大大提升作业的计算性能。
7.数据采集
(1)Kafka
Kafka是最初由Linkedin公司开发是一个分布式、支持分区的partition、多副本的replica基于zookeeper协调的分布式消息系统它的最大的特性就是可以实时的处理大量数据以满足各种需求场景比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎web/nginx日志、访问日志消息服务等等用scala语言编写Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
① 特性
高吞吐量、低延迟kafka每秒可以处理几十万条消息它的延迟最低只有几毫秒 可扩展性kafka集群支持热扩展 持久性、可靠性消息被持久化到本地磁盘并且支持数据备份防止数据丢失 容错性允许集群中节点失败若副本数量为n,则允许n-1个节点失败 高并发支持数千个客户端同时读写
② 应用场景
日志收集一个公司可以用Kafka可以收集各种服务的log通过kafka以统一接口服务的方式开放给各种consumer例如hadoop、Hbase、Solr等 消息系统解耦和生产者和消费者、缓存消息等 用户活动跟踪Kafka经常被用来记录web用户或者app用户的各种活动如浏览网页、搜索、点击等活动这些活动信息被各个服务器发布到kafka的topic中然后订阅者通过订阅这些topic来做实时的监控分析或者装载到hadoop、数据仓库中做离线分析和挖掘 运营指标Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据生产各种操作的集中反馈比如报警和报告 流式处理比如spark streaming和storm 事件源
③ 消息队列通信的模式
点对点模式 点对点模式通常是基于拉取或者轮询的消息传送模型这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。生产者将消息放入消息队列后由消费者主动的去拉取消息进行消费。点对点模型的的优点是消费者拉取消息的频率可以由自己控制。但是消息队列是否有消息需要消费在消费者端无法感知所以在消费者端需要额外的线程去监控。 发布订阅模式 发布订阅模式是一个基于消息送的消息传送模型改模型可以有多种不同的订阅者。生产者将消息放入消息队列后队列会将消息推送给订阅过该类消息的消费者类似微信公众号。由于是消费者被动接收推送所以无需感知消息队列是否有待消费的消息但是consumer1、consumer2、consumer3由于机器性能不一样所以处理消息的能力也会不一样但消息队列却无法感知消费者消费的速度所以推送的速度成了发布订阅模模式的一个问题假设三个消费者处理速度分别是8M/s、5M/s、2M/s如果队列推送的速度为5M/s则consumer3无法承受如果队列推送的速度为2M/s则consumer1、consumer2会出现资源的极大浪费
④ 架构
ProducerProducer即生产者消息的产生者是消息的入口。 BrokerBroker是kafka实例每个服务器上有一个或多个kafka的实例我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号如图中的broker-0、broker-1等…… Topic消息的主题可以理解为消息的分类kafka的数据就保存在topic。在每个broker上都可以创建多个topic。 PartitionTopic的分区每个topic可以有多个分区分区的作用是做负载提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的partition的表现形式就是一个一个的文件夹 Replication:每一个分区都有多个副本副本的作用是做备胎。当主分区Leader故障的时候会选择一个备胎Follower上位成为Leader。在kafka中默认副本的最大数量是10个且副本的数量不能大于Broker的数量follower和leader绝对是在不同的机器同一机器对同一个分区也只可能存放一个副本包括自己。 Message每一条发送的消息主体。 Consumer消费者即消息的消费方是消息的出口。 Consumer Group我们可以将多个消费组组成一个消费者组在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据这也是为了提高kafka的吞吐量 Zookeeperkafka集群依赖zookeeper来保存集群的的元信息来保证系统的可用性。
⑤ 发送数据流程
producer就是生产者是数据的入口。注意看图中的红色箭头Producer在写入数据的时候永远的找leader不会直接将数据写入follower。 1、先从集群获取分区的leader 2、producer将消息发送给leader 3、leader将消息写入到本地文件 4、followers从leader pull消息 5、followers将消息写入本地后向leader发送ACK 6、leader收到所有副本的ACK后向producer发送ACK 消息写入leader后follower是主动的去leader进行同步的producer采用push模式将数据发布到broker每条消息追加到分区中顺序写入磁盘所以保证同一分区内的数据是有序的 分区的主要目的是 方便扩展因为一个topic可以有多个partition所以我们可以通过扩展机器去轻松的应对日益增长的数据量。 提高并发以partition为读写单位可以多个消费者同时消费数据提高了消息的处理效率。 在kafka中如果某个topic有多个partitionproducer又怎么知道该将数据发往哪个partition呢kafka中有几个原则 partition在写入的时候可以指定需要写入的partition如果有指定则写入对应的partition。 如果没有指定partition但是设置了数据的key则会根据key的值hash出一个partition。 如果既没指定partition又没有设置key则会轮询选出一个partition。 保证消息不丢失是一个消息队列中间件的基本保证那producer在向kafka写入消息的时候怎么保证消息不丢失呢其实上面的写入流程图中有描述出来那就是通过ACK应答机制在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据这个参数可设置的值为0、1、all。 0代表producer往集群发送数据不需要等到集群的返回不确保消息发送成功。安全性最低但是效率最高1代表producer往集群发送数据只要leader应答就可以发送下一条只确保leader发送成功all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条确保leader发送成功和所有的副本都完成备份。安全性最高但是效率最低。 最后要注意的是如果往不存在的topic写数据能不能写入成功呢kafka会自动创建topic分区和副本的数量根据默认配置都是1。
⑥ 保存数据
Producer将数据写入kafka后集群就需要对数据进行保存了。kafka将数据保存在磁盘可能在我们的一般的认知里写入磁盘是比较耗时的操作不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间顺序写入数据效率比随机写入高。 Partition结构 前面说过了每个topic都可以分为一个或多个partition如果你觉得topic比较抽象那partition就是比较具体的东西了。Partition在服务器上的表现形式就是一个一个的文件夹每个partition的文件夹下面会有多组segment文件每组segment文件又包含.index文件、.log文件、.timeindex文件早期版本中没有三个文件log文件就实际是存储message的地方而index和timeindex文件为索引文件用于检索消息。 如上图这个partition有三组segment文件每个log文件的大小是一样的但是存储的message数量是不一定相等的每条的message大小不一致。文件的命名是以该segment最小offset来命名的如000.index存储offset为0~368795的消息kafka就是利用分段索引的方式来解决查找效率的问题。 Message结构 上面说到log文件就实际是存储message的地方我们在producer往kafka写入的也是一条一条的message那存储在log中的message是什么样子的呢消息主要包含消息体、消息大小、offset、压缩类型……等等重点需要知道的是下面三个
offsetoffset是一个占8byte的有序id号它可以唯一确定每条消息在parition内的位置消息大小消息大小占用4byte用于描述消息的大小。消息体消息体存放的是实际的消息数据被压缩过占用的空间根据具体的消息而不一样。 存储策略 无论消息是否被消费kafka都会保存所有的消息。那对于旧数据有什么删除策略呢基于时间默认配置是168小时7天。基于大小默认配置是1073741824。 需要注意的是kafka读取特定消息的时间复杂度是O(1)所以这里删除过期的文件并不会提高kafka的性能
⑦ 消费数据流程
消息存储在log文件后消费者就可以进行消费了。Kafka采用的是发布订阅模式消费者主动的去kafka集群拉取消息与producer相同的是消费者在拉取消息的时候也是找leader去拉取。 多个消费者可以组成一个消费者组consumer group每个消费者组都有一个组id。同一个消费组者的消费者可以消费同一topic下不同分区的数据但是不会组内多个消费者消费同一分区的数据。 图示是消费者组内的消费者小于partition数量的情况所以会出现某个消费者消费多个partition数据的情况消费的速度也就不及只处理一个partition的消费者的处理速度如果是消费者组的消费者多于partition的数量多出来的消费者不消费任何partition的数据。所以在实际的应用中建议消费者组的consumer的数量与partition的数量一致。 partition划分为多组segment每个segment又包含.log、.index、.timeindex文件存放的每条message包含offset、消息大小、消息体……我们多次提到segment和offset查找消息的时候是怎么利用segmentoffset配合查找的呢假如现在需要查找一个offset为368801的message是什么样的过程呢我们先看看下面的图 先找到offset的368801message所在的segment文件利用二分法查找这里找到的就是在第二个segment文件。 打开找到的segment中的.index文件也就是368796.index文件该文件起始偏移量为3687961我们要查找的offset为368801的message在该index内的偏移量为3687965368801所以这里要查找的相对offset为5。由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系所以直接找相对offset为5的索引找不到这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset所以找到的是相对offset为4的这个索引。 根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。 这套机制是建立在offset为有序的基础上利用segment有序offset稀疏索引二分查找顺序查找等多种手段来高效的查找数据至此消费者就能拿到需要处理的数据进行处理了。那每个消费者又是怎么记录自己消费的位置呢在早期的版本中消费者将消费到的offset维护zookeeper中consumer每间隔一段时间上报一次这里容易导致重复消费且性能不好在新的版本中消费者消费到的offset已经直接维护在kafk集群的__consumer_offsets这个topic中。
⑧ Controller作用
在Kafka集群中某个Broker将被选举出来担任一种特殊的角色其用于管理和协调Kafka集群即管理集群中的所有分区的状态并执行相应的管理操作。 每个Kafka集群任意时刻都只能有一个Controller。 当集群启动时所有Broker都参与Controller的竞选最终有一个胜出一旦Controller在某个时刻崩溃集群中的其他的Broker会收到通知然后开启新一轮的Controller选举新选举出来的Controller将承担起之前Controller的所有工作。 集群状态的维护是Controller保持运行状态一致性的基本要素如果要持续稳定的对外提供服务除了集群状态还有其他的工作需要Controller负责如下
更新集群元数据信息 一个client可以向集群中任意一台Broker发送METADATA请求来查询topic的分区信息例如topic有多少个分区、每个分区的leader在哪台Broker上以及分区的副本列表随着集群的运行元数据信息可能会发生变化因此Controller必须提供一种机制用于随时随地将变更后的分区信息广播出去同步给集群中所有的Broker 具体做法是当有分区信息发生变更时Controller将变更后的信息封装进UpdateMetaRequests请求中然后发送给集群中的每个Broker后续client在请求数据时就可以获取最新、最及时的分区信息。创建topic Controller启动时会创建一个Zookeeper的监听器该监听器的唯一任务就是监控Zookeeper节点/brokers/topics下节点的变更情况。 通过client或admin进行topic创建的方式主要有下面3种 通过kafka-topics脚本的–create创建 构造CreateTopicsRequest请求创建 配置broker端参数auto.create.topics.enable为true然后发送MetadataRequest请求 无论是上面哪种方式最终都是在Zookeeper的/brokers/topics下创建一个对应的znode然后将该topic分区以及对应的副本列表写入到这个znode中Controller监听器一旦监控到该目录下有新增znode立即触发topic的创建逻辑即为新建Topic的每个分区确定leader和ISR然后更新集群的元数据信息之后Controller将创建一个新的监听器用于监听Zookeeper的/brokers/topics/新增 topic节点内容的变更之后该topic分区发生变化时Controller可立即收到通知。删除topic 标准的Kafka删除Topic方式有两种 通过kafka-topics脚本的–delete来删除topic 构造DeleteTopicsRequest 这两种方式都是向Zookeeper的/admin/delete_topics下新建一个znodeController启动时会创建一个监听器专门监听该路径下的子节点变更情况 当发现有新增节点则Controller开启删除Topic的逻辑该逻辑主要分为两个阶段 停止所有副本运行 删除所有副本的日志数据 一旦完成这些操作Controller将移除/admin/delete_topics/待删除topic节点表示Topic删除操作正式完成。分区重分配 分区重分配操作一般由Kafka集群的管理员发起用于对Topic的所有分区重新分配副本所在Broker的位置达到更均匀的分配效果该操作中管理员需要手动制定分配方案并按照执行的格式写入Zookeeper的/admin/reassign_partitions节点下。 分区副本重分配的过程实际上是先扩展再收缩的过程Controller首先将分区副本集合进行扩展等待它们全部与leader保持同步之后从新分配方案中的副本选举leader最后执行收缩阶段将分区副本集合缩减成分配方案中的副本集合。preferred leader选举 为了避免分区副本分配不均匀Kafka引入了preferred副本的概念例如一个分区的副本列表是[1,2,3]那么broker1就被称为该分区的preferred leader因为它位于副本列表的第一位。 在集群运行过程中分区的leader会因为各种各样的原因发生变更从而使得leader不再是preferred leader此时用户可以自行通过命令将分区的leader重新调整为preferred leader方法有如下两种 设置broker端参数auto.leader.rebalance.enable为trueController将定时自动调整preferred leader 通过kafka-preferred-replica-election脚本手动触发 上面两种方式都向Zookeeper的/admin/preferred_replica_election节点写入数据同时Controller将注册该目录的监听器一旦触发Controller将对应分区的leader调整回副本列表中的第一个副本并将此变更广播出去。topic分区拓展 在Kafka集群运行过程中用户可能会发现某Topic的现有分区不足以支撑client的业务量因此需要新增分区目前增加分区主要使用kafka-topics脚本的–alter选项来完成和创建topic一样它会向Zookeeper的/brokers/topics/节点下写入新的分区目录因为topic在创建的时候Controller已经注册过一个监听器用于监听分区目录数据的变化因此一旦新增了topic分区该监听器就会被触发执行对应的分区创建任务例如选举leader和ISR之后更新集群的元数据。broker加入集群 每个Broker成功启动之后都会在Zookeeper的/broker/ids下创建一个znode并写入broker的信息为了动态维护Broker列表Kafka注册一个Zookeeper监听器用于时刻监控该目录下的数据变化每当有新Broker加入集群时该监听器会感知到变化执行对应的Broker启动任务之后更新集群元数据信息并广而告之。broker崩溃 Broker加入集群时注册的是临时节点znode因此一旦Broker崩溃与Zookeeper的会话失效临时节点就会被立即删除因此上面的监听器也可以监视因为崩溃而退出集群的Broker列表如果发现Broker子目录消失Controller可立即知道该Broker退出集群从而开启Broker退出逻辑最后更新集群元数据并同步到其他Broker上。受控关闭 受控关闭指通过kafka-server-stop脚本、kill -15的方式关闭BrokerBroker崩溃一般是机器突然断电或kill -9导致的受控关闭可以最大程度的降低Broker的不一致性由即将关闭的Broker向Controller发送 ControlledShutdownRequest发送完毕后待关闭Broker将处于阻塞状态直到收到 ControlledShutdownResponse 表示关闭成功或用完所有重试机会后强制退出Controller在完成必要的leader重选举和ISR收缩调整之后会向待关闭Broker发送ControlledShutdownResponse表示该Broker现在可以正常退出。 受控关闭是由待关闭Broker发起RPC请求给Controller来实现的并没有依赖Zookeeper之前的功能都依赖了Zookeeper来做。controller leader选举 如果Kafka当前集群中的Controller发生故障或显式关闭Kafka必须保证可以及时选出新的leader即故障转移(fail-over)一般会导致Controller leader选举的场景有如下几种 关闭Controller所在Broker 当前Controller所在的Broker宕机或崩溃 手动删除Zookeeper的/controller节点 手动向Zookeeper的/controller节点写入新的broker id 上面4种操作都是修改了/controller节点内容因此Controller只需要做一件事情创建一个监听该目录的监听器。 /controller本质上是一个临时节点节点保存了当前Controller所在的broker id集群首次启动时所有broker都会争抢该节点但Zookeeper保证最终只有一个Broker胜出并成为Controller一旦成为Controller它就会增加Controller的版本号即更新/controller/_epoch的节点值然后履行上面的所有职责对于没有成为Controller的broker而言它们将继续监听/controller节点的存活情况并随时准备竞选为新的controller。
(2)Flume
Flume是一个高可用的高可靠的分布式的海量日志采集、聚合和传输的系统。Flume最主要是用在分布式系统中例如读取服务器本地的磁盘数据并将数据写入到HDFS中。
① 基础架构
flume的核心是把数据从数据源(source)收集过来在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功在送到目的地(sink)之前会先缓存数据(channel),待数据真正到达目的地(sink)后flume在删除自己缓存的数据。 Agent flume中通过agent进行日志采集、聚合、传输。agent是一个JVM进程它以事件的形式将数据从源头送至目的。Agent主要有3个部分组成Source、Channel、Sink。 Source Source作为Agent的输入口负责接收各种类型、各种格式的日志数据包括avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy。 Sink Sink作为Agent的输出口负责不断轮询Channel中的事件并批量移除事件根据配置文件的配置将事件写入到HDFS等存储系统或者发到另一个Agent中。Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。 Channel Channel是位于Source和Sink之间的缓冲区。因此Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的可以同时处理几个Source的写入操作和几个Sink的读取操作。 Flume自带两种ChannelMemory Channel 和File Channel。 Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失那么Memory Channel就不应该使用因为程序死亡、机器宕机或者重启都会导致数据丢失。File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。 Event event是flume的数据传输基本单元event由两部分组成Header和Body。
② 广义用法
flume可以支持多级flume的agent即flume可以前后相继例如sink可以将数据写到下一个agent的source中这样的话就可以连成串了可以整体处理了。flume还支持扇入(fan-in)、扇出(fan-out)。所谓扇入就是source可以接受多个输入所谓扇出就是sink可以将数据输出多个目的地destination中。
8.列式数据库HBase
HBase是一种面向列存储的非关系型数据库是在Hadoop之上的NoSQL的Key/value数据库不支持join的、也不支持ACID、对事务支持有限无schema创建表的时候无需去指定列、列类型、原生就支持分布式存储的所以可以用来存储海量数据同时也兼顾了快速查询、写入的功能。是大数据领域中Key-Value数据结构存储最常用的数据库方案。 HBase作为NoSQL数据库的代表属于三驾马车之一BigTable的对应实现HBase的出现很好地弥补了大数据快速查询能力的空缺。 Hbase利用Hadoop的HDFS作为其文件存储系统利用Hadoop的MapReduce来处理Hbase中的海量数据利用zookeeper作为其协调工具。 HBase依赖于ZooKeeper、HDFS在启动HBase之前必须要启动ZK、HDFS。
(1)架构
HBase的核心架构由五部分组成分别是HBase Client、HMaster、Region Server、ZooKeeper以及HDFS。 HBase Client 为用户提供了访问HBase的接口可以通过元数据表来定位到目标数据的RegionServer另外HBase Client还维护了对应的cache来加速Hbase的访问比如缓存元数据的信息。 HMaster HBase集群的主节点负责整个集群的管理工作主要工作职责如下
分配Region负责启动的时候分配Region到具体的RegionServer负载均衡一方面负责将用户的数据均衡地分布在各个Region Server上防止Region Server数据倾斜过载。另一方面负责将用户的请求均衡地分布在各个Region Server上防止Region Server请求过热维护数据发现失效的Region并将失效的Region分配到正常的RegionServer上并且在Region Sever失效的时候协调对应的HLog进行任务的拆分。 RegionServer 直接对接用户的读写请求是真正的干活的节点主要工作职责如下。管理HMaster为其分配的Region负责与底层的HDFS交互存储数据到HDFS负责Region变大以后的拆分以及StoreFile的合并工作。 与HMaster的协同当某个RegionServer宕机之后ZK会通知Master进行失效备援。下线的RegionServer所负责的Region暂时停止对外提供服务Master会将该RegionServer所负责的Region转移到其他RegionServer上并且会对所下线的RegionServer上存在MemStore中还未持久化到磁盘中的数据由WAL重播进行恢复。 Region Serve数据存储的基本结构 Region每一个Region都有起始RowKey和结束RowKey代表了存储的Row的范围保存着表中某段连续的数据。一开始每个表都只有一个Region随着数据量不断增加当Region大小达到一个阀值时Region就会被Regio Server水平切分成两个新的Region。当Region很多时HMaster会将Region保存到其他Region Server上。 Store一个Region由多个Store组成每个Store都对应一个Column Family,Store包含MemStore和StoreFile。 MemStore作为HBase的内存数据存储数据的写操作会先写到MemStore中当MemStore中的数据增长到一个阈值默认64M后Region Server会启动flasheatch进程将MemStore中的数据写人StoreFile持久化存储每次写入后都形成一个单独的StoreFile。当客户端检索数据时先在MemStore中查找如果MemStore中不存在则会在StoreFile中继续查找。 StoreFile:MemStore内存中的数据写到文件后就是StoreFileStoreFile底层是以HFile的格式保存。HBase以Store的大小来判断是否需要切分Region。 当一个Region中所有StoreFile的大小和数量都增长到超过一个阈值时HMaster会把当前Region分割为两个并分配到其他Region Server上实现负载均衡。 HFileHFile和StoreFile是同一个文件只不过站在HDFS的角度称这个文件为HFile站在HBase的角度就称这个文件为StoreFile。 HLog负责记录着数据的操作日志当HBase出现故障时可以进行日志重放、故障恢复。例如磁盘掉电导致MemStore中的数据没有持久化存储到StoreFile这时就可以通过HLog日志重放来恢复数据。 ZooKeeper HBase通过ZooKeeper来完成选举HMaster、监控Region Server、维护元数据集群配置等工作主要工作职责如下选举HMaster通ooKeeper来保证集中有1HMaster在运行如果HMaster异常则会通过选举机制产生新的HMaster来提供服务监控Region Server通过ZooKeeper来监控Region Server 的状态当Region Server有异常的时候通过回调的形式通知HMaster有关Region Server上下线的信息维护元数据和集群配置通过ooKeeper储B信息并对外提供访问接口。 HDFS HDFS为HBase提供底层数据存储服务同时为HBase提供高可用的支持HBase将HLog存储在HDFS上当服务器发生异常宕机时可以重放HLog来恢复数据。
(2)写流程
Region Server寻址 A、HBase Client访问ZooKeeper B、获取写入Region所在的位置即获取hbase:meta表位于哪个Region Server C、访问对应的Region Server D、获取hbase:meta表并查询出目标数据位于哪个Region Server中的哪个Region中。并将该table的Region信息以及meta表的位置信息缓存在客户端的meta cache方便下次访问 写Hlog E、HBase Client向Region Server发送写Hlog请求 F、Region Server会通过顺序写入磁盘的方式将Hlog存储在HDFS上 写MemStore并返回结果 G、HBase Client向Region Server发送写MemStore请求 只有当写Hlog和写MemStore的请求都成功完成之后并将反馈给HBase Client这时对于整个HBase Client写入流程已经完成。 MemStore刷盘 HBase会根据MemStore配置的刷盘策略定时将数据刷新到StoreFile 中完成数据持久化存储。 为什么要把WAL加载到MemStore中再刷写成HFile呢 WAL(Write-Ahead-Log)预写日志是HBase的RegionServer在处理数据插入和删除过程中用来记录操作内容的一种日志。每次Put、Delete等一条记录时首先将其数据写入到RegionServer对应的HLog文件中去。 而WAL是保存在HDFS上的持久化文件数据到达Region时先写入WAL然后被加载到MemStore中。这样就算Region宕机了操作没来得及执行持久化也可以再重启的时候从WAL加载操作并执行。 从写入流程中可以看出数据进入HFile之前就已经被持久化到WAL了而WAL就是在HDFS上的MemStore是在内存中的增加MemStore并不能提高写入性能为什么还要从WAL加载到MemStore中再刷写成HFile呢 数据需要顺序写入但HDFS是不支持对数据进行修改的 WAL的持久化为了保证数据的安全性是无序的 Memstore在内存中维持数据按照row key顺序排列从而顺序写入磁盘 所以MemStore的意义在于维持数据按照RowKey的字典序排列而不是做一个缓存提高写入效率。
(3)读流程
Region Server寻址 HBase Client请求ZooKeeper获取元数据表所在的Region Server的地址。 Region寻址 HBase Client请求RegionServer获取需要访问的元数据查询出目标数据位于哪个Region Server中的哪个Region中。并将该table的region信息以及meta表的位置信息缓存在客户端的meta cache方便下次访问。 数据读取 HBase Client请求数据所在的Region Server获取所需要的数据。Region首先在MemStore中查找若命中则返回如果在MemStore中找不到则通过BloomFilter判断数据是否存在如果存在则在StoreFile中扫描并将结果返回客户端。
(4)数据删除
HBase的数据删除操作并不会立即将数据从磁盘上删除因为HBase的数据通常被保存在HDFS中而HDFS只允许新增或者追加数据文件所以删除操作主要对要被删除的数据进行标记。 当执行删除操作时HBase新插入一条相同的Key-Value数据但是keyTypeDelete这便意味着数据被删除了直到发生Major_compaction操作数据才会真正地被从磁盘上删除。 HBase这种基于标记删除的方式是按顺序写磁盘的的因此很容易实现海量数据的快速删除有效避免了在海量数据中查找数据、执行删除及重建索引等复杂的流程。
(5)行式存储与列式存储
行式存储的原理与特点 对于OLAP场景大多都是对一整行记录进行增删改查操作的那么行式存储采用以行的行式在磁盘上存储数据就是一个不错的选择。 当查询基于需求字段查询和返回结果时由于这些字段都埋藏在各行数据中就必须读取每一条完整的行记录大量磁盘转动寻址的操作使得读取效率大大降低。 举个例子下图为员工信息emp表。 数据在磁盘上是以行的形式存储在磁盘上同一行的数据紧挨着存放在一起。 对于emp表要查询部门dept为A的所有员工的名字。 select name from emp where deptA 由于dept的值是离散地存储在磁盘中在查询过程中需要磁盘转动多次才能完成数据的定位和返回结果。 列式存储的原理与特点 对于OLAP场景一个典型的查询需要遍历整个表进行分组、排序、聚合等操作这样一来行式存储中把一整行记录存放在一起的优势就不复存在了。而且分析型SQL常常不会用到所有的列而仅仅对其中某些需要的的列做运算那一行中无关的列也不得不参与扫描。 然而在列式存储中由于同一列的数据被紧挨着存放在了一起。 那么基于需求字段查询和返回结果时就不许对每一行数据进行扫描按照列找到需要的数据磁盘的转动次数少性能也会提高。 还是上面例子中的查询由于在列式存储中dept的值是按照顺序存储在磁盘上的因此磁盘只需要顺序查询和返回结果即可。 列式存储不仅具有按需查询来提高效率的优势由于同一列的数据属于同一种类型如数值类型字符串类型等相似度很高还可以选择使用合适的编码压缩可减少数据的存储空间进而减少IO提高读取性能。 总的来说行式存储和列式存储没有说谁比谁更优越只能说谁更适合哪种应用场景。