当前位置: 首页 > news >正文

网站开发关联下拉列表wordpress地址更换

网站开发关联下拉列表,wordpress地址更换,登录手机网站模板html,网站 语言切换怎么做Spark Core 本文来自 B站 黑马程序员 - Spark教程 #xff1a;原地址 第一章 RDD详解 1.1 为什么需要RDD 分布式计算需要 分区控制shuffle控制数据存储、序列化、发送数据计算API等一系列功能 这些功能#xff0c;不能简单的通过Python内置的本地集合对象#xff08;如…Spark Core 本文来自 B站 黑马程序员 - Spark教程 原地址 第一章 RDD详解 1.1 为什么需要RDD 分布式计算需要 分区控制shuffle控制数据存储、序列化、发送数据计算API等一系列功能 这些功能不能简单的通过Python内置的本地集合对象如List字典等去完成。 我们在分布式框架中需要有一个统一的数据抽象对象来实现上述分布式计算所需功能 这个抽象对象就是RDD 1.2 什么是RDD Spark起源 在spark开山之作**R**esilient **D**istributed **D**atasets:A Fault-Tolerant Abstraction for In-Memory Cluster Computing这篇paper中 以下简称 RDD PaperMatei等人提出了RDD这种数据结构文中开头对RDD的定义是 RDD定义 RDDResilient Distributed Dataset叫做弹性分布式数据集是Spark中最基本的数据抽象代表一个不可变可分区里面的元素可并行计算的集合。 Dataset一个数据集合用于存放数据的。 DistributedRDD中的数据是分布式存储的可用于分布式计算。 ResilientRDD中的数据可以存储在内存中或者磁盘中。 RDD定义 RDDResilient Distributed Dataset弹性分布式数据集是Spark中最基本的数据抽象代表一个不可变可分区里面的元素可并行计算的集合。所有的运算以及操作都建立在RDD数据结构的基础之上。可以认为RDD是分布式的列表list或者数组Array抽象的数据结构RDD是一个抽象类Abstract Class和泛型Generic Type 1.3 RDD的五大特性 RDD数据结构内部的五个特性 前三个特征每个RDD都具备的后两个特征可选的 特性1 RDD是有分区的 RDD的分区是RDD数据存储的最小单位 一份RDD的数据本质上是分隔成了多个分区 特性2 RDD的方法会作用在其所有的分区上 特性3 RDD之间是有依赖关系RDD有血缘关系 sc SparkContext(conf conf)rdd1 sc.textFile(t.txt) rdd2 rdd1.flatMap(lambda x:x.split( )) rdd3 rdd2.map(lambda x: (x ,1)) rdd4 rdd3.reduceByKey(lambda a, b: a b)print(rdd4.collect())RDD之间是有依赖的如rdd2会产生rdd3但是rdd2依赖rdd1同样rdd3会产生rdd4但rdd3依赖rdd2 … 会形成一个依赖链条。 这个链条称之为RDD的血缘关系。 特性4 Key-Value型的RDD可以有分区器 默认分区器Hash分区规则可以手动设置一个分区器rdd.partitionBy的方法来设置 这个特性是可能的因为不是所有RDD都是key-value型 key-value RDDRDD中存储的是二元元组这就是key-value型RDD 二元元组只有2个元素的元组比如“hadoop”,6 特性5 RDD的分区规划会尽量靠近数据所在的服务器 在初始RDD读取数据的时候规划的时候分区会尽量规划到存储数据所在的服务器上 因为这样可以走本地读取避免网络读取 本地读取Executor所在的服务器同样是一个DataNode同时这个DataNode上有它要读的数据所以可以直接读取机器硬盘即可 无需走网络传输 网络读取读取数据需要经过网络传输才能读取到 本地读取 性能网络读取的 总结spark会在确保并行计算能力的前提下尽量确保本地读取 这里是尽量确保而不是100%确保所以这个特性也是可能的 总结 如何正确理解RDD 弹性分布式数据集分布式计算的实现载体数据抽象 RDD五大特点 Internally, each RDD is characterized by five main properties A list of partitionsA function for computing each splitA list of dependencies on other RDDsOptionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)Optionally, a list of preferred locations to compute eash split on (e.g block locations for an HDFS file) 在内部每个RDD都有五个主要特性 分区列表用于计算每次拆分的函数与其他RDD的依赖关系列表可选地键值RDD的Partitioner例如说RDD是散列分区的可选地计算eash分割的首选位置列表例如HDFS文件的块位置 第二章 RDD编程入门 2.1 程序执行入口 SparkContext对象 Spark RDD 编程的程序入口对象是SparkContext对象不论何种编程语言 只有构建出SparkContextt,基于它才能执行后续的API调用和计算 本质上SparkContext对编程来说主要功能就是创建第一个RDD出来 from pyspark import SparkConf, SparkContextif __name__ __namin__:# 构建SparkConf对象conf SparkConf().setAppName(helloword).setMaster(local[*])# 构建SparkContext执行入口对象sc SparkContext(conf conf)2.2 RDD的创建 RDD的创建主要有2种方式 通过并行化集合创建本地对象 转 分布式RDD读取外部数据源读取文件 并行化创建 概念并行化创建是指将本地集合—转向分布式RDD 这一步就是分布式的开端本地转分布式 API rdd sparkcontext.parallelize(参数1参数2) # 参数1 集合对象即可比如list # 参数2 分区数获取RDD分区数 getNumPartitions API 获取RDD分区数量 返回值是Int数字 用法 rdd.getNumPartitions()读取文件创建 testFile 这个API可以读取 本地数据也可以读取hdfs数据 使用方法 sparkcontext.textFile(参数1参数2) # 参数1必填文件路径 支持本地文件 支持HDFS 也支持一些比如S3协议 # 参数2可填表示最小分区数量 # 注意参数2 话语权不足spark有自己的判断在它允许范围内参数2有效果超过spark允许的范围参数2失败wholeTextFile 读取文件的API有个适用场景适合读取一堆小文件 这个API是小文件读取专用 用法 sparkcontext.wholeTextFiles(参数1参数2) # 参数1必填文件路径支持本地文件支持HDFS 也支持一些比如S3协议 # 参数2可选表示最小分区数量 # 注意参数2 话语权不足这个api 分区数量最多也只能开到文件数量这个API偏向于少量分区读取数据 因为这个API表明了自己是小文件读取专用那么文件的数据很小 分区很多导致shuffle的几率更高所以尽量少分区读取数据 2.3 RDD算子 算子是什么 算子分布式集合对象上的API称之为算子 方法\函数本地对象的API叫做方法\函数 算子分布式对象的API叫做算子 算子分类 RDD的算子 分成2类 Transformation:转换算子 定义RDD的算子返回值仍旧是一个RDD的称之为转换算子 特性这类算子是lazy 懒加载的如果没有action算子Transformation算子是不工作的 Action:动作行动算子 ​ 定义返回值不是RDD的就是action算子 对于这两类算子来说Transformation 算子相当于在构建执行计划action是一个指令让这个执行计划开始工作 如果没有actionTransformation算子之间的迭代关系就是一个没有通电的流水线只有action到来这个数据处理的流水线才开始工作 2.4 常用的TransforMation算子 map算子 功能map算子是将RDD的数据一条条处理处理的逻辑 基于map算子中接收的处理函数返回新的RDD 语法 rdd.map(func) # func : f:(T) - U # f:表示这是一个函数方法 # T- U表示的是方法的定义 # 表示传入参数T表示传入1个参数表示没有传入参数 # T 是泛型的代称在这里表示 任意类型 # U 也是泛型代称在这里表示 任意类型# -U 表示返回值# T - U 总结起来的意思是这是一个方法这个方法接受一个参数传入传入参数类型不限返回一个返回值返回值类型不限。# A - A 总结起来的意思是这是一个方法这个方法接受一个参数传入传入参数类型不限返回一个返回值返回值和传入参数类型一致flatMap算子 功能对rdd执行map操作然后进行解除嵌套操作 解除嵌套 # 嵌套的list list [[123][456][789]]# 如果解除了嵌套 list [1, 2, 3, 4, 5, 6, 7, 8, 9]print(rdd.flatMap(lambda x: x.split( )).collect())reduceByKey算子 功能针对KV型RDD自动按照key分组然后根据你提供的聚合逻辑完成组内数据value的聚合操作 用法 rdd.reduceByKeyfunc# func: (V,V) - V # 接受2个传入参数类型一致返回一个返回值类型和传入要求一致比如有【12345】,然后聚合函数是lambda a, b: a b 注意reduceByKey中接收的函数只负责聚合不理会分组 分组是自动by key来分级的 groupBy算子 功能将rdd的数据进行分组 语法 rdd.groupBy(func)# func 函数 # func: (T) -K # 函数要求传入一个参数返回一个返回值类型无所谓 # 这个函数是 拿到你的返回值后将所有相同返回值的放入一个组中 # 分组完成后 每一个组是一个二元元组key就是返回值所有同组的数据放入一个迭代器对象中作为valueFilter算子 功能过滤想要的数据进行保留 语法 rdd.filter(func) # func:(T) -bool 传入1个参数进来随意类型返回值必须是True or False返回是True的数据被保留False的数据被丢弃 distinct算子 功能对RDD数据进行去重返回新RDD 语法 rdd.distinct(参数1) # 参数1去重分区数量一般不用传union算子 功能2个rdd合并成1个rdd返回 用法 rdd.union(other_rdd) 注意只合并不会去重 注意不同类型的rdd依旧可以混合 join算子 功能对两个RDD执行join操作可实现SQL的内\外连接 注意join算子只能用于二元元组 语法 rdd.json(other_rdd) 内连接 rdd.leftOuterJoin(other_rdd)左外 rdd.rightOuterJoin(other_rdd)右外intersection算子 功能求2个rdd的交集返回一个新rdd 用法 rdd.intersection(other_rdd) glom算子 功能将rdd的数据加上嵌套这具嵌套按照分区来进行 比如rdd数据【12345】有2个分区 那么被glom后数据变成【【123】【45】】 使用方法 rdd.glom() groupByKey算子 功能针对KV型RDD自动按照key分组 用法rdd.groupBYKey(自动按照key分组) sortBy算子 功能对rdd数据进行排序基于你指定的排序依据 语法 rdd.sortBy(func,ascendingFalse,numPartitions1)# func:(T) - U 告知按照rdd中的哪个数据进行排序比如 lambda x: x[1] 表示按照rdd中的第二列元素进行排序 # ascending True升序 False降序 # numPartitions: 用多少分区排序sortByKey算子 功能针对KV型RDD按照key进行排序 语法 sortByKey(ascendingTrue,numPartitionsNone,KeyFuncfunction RDD ) ascending:升序or降序True升序False降序默认是升序numPartitions按照几个分区进行排序如果全局有序设置1keyfunc在排序前对key进行处理语法是k - U一个参数传入返回一个值 2.5 常用Action算子 countByKey算子 功能统计key出现的次数一般适用kv型RDD collect算子 功能将RDD各个分区内的数据统一收集Driver中形成一个list对象 用法 rdd.collect() 这个算子是将rdd各个分区数据都拉取到Driver 注意的是rdd是分布式对象其数据量可以很大所以用这个算子之前 要心知肚明的了解结果数据集不会太大 不然会把Driver内存撑爆 reduce算子 功能对RDD数据集按照你传入的逻辑进行聚合 语法 rdd.reduce(func) # func:(T,T) - T # 2参数传入 1个返回值返回值和参数要求类型一致fold算子 功能和reduce一样接受传入逻辑进行聚合聚合是带有初始值的 这个初始值聚合会作用在 分区内聚合分区间聚合 比如【【123】【456】【789】】 数据分布在3个分区 分区1 123聚合的时候带上10作为初始值得到16 分区2 456聚合的时候带上10作为初始值得到25 分区3 789聚合的时候带上10作为初始值得到34 3个分区的结果做聚合也带上初始值10所以结果是1016253485 first算子 功能取出rdd的每一个元素 用法 sc.parallelize([3,2,1]).first() 3take算子 功能取RDD的前N个元素组合成list返回给你 用法 sc.parallelize([3,2,1,4,5,6]).take(5) [3,2,1,4,5]top算子 功能对RDD数据集进行降序排序取前N个 用法 sc.parallelize([3,2,1,4,5,6]).top(3) # top 3表示降序取前3个 【654】conunt算子 功能计算rdd有多少条数据返回值是一个数字 用法 sc.parallelize([3,2,1,4,5,6]).count() 6takeSample算子 功能随机抽样rdd的数据 用法 takeSample(参数1True or False参数2采样数参数3随机数种子) - 参数1 true表示运行同一个数据false表示不允许取同一个数据和数据无关是否重复表示的是同一个位置的数据 - 参数2 抽样要几个 - 参数3 随机数种子这个参数传入一个数字即可随意给随机数种子 数字可以随便传如果传同一个数字 那么取出的结果是一致的 一般参数3 我们不传Spark会自动给与随机的种子 takeOrdered算子 功能对RDD进行排序取前N个 用法 rdd.takeOrddered(参数1参数2) - 参数1 要几个数据 - 参数2 对排序的数据进行更改不会更改数据本身只是在排序的时候换个样子 这个方法使用安装元素自然顺序升序排序如果你想玩倒叙需要用参数2 来对排序的数据进行处理foreach算子 功能对rdd的每一个元素执行你提供的逻辑的操作和map一个意思但是这个方法没有返回值 用法 rdd.foreach(func) # func: (T) -NonesaveAsTextFile算子 功能将rdd的数据写入文本文件中 支持 本地写出hdfs等文件系统 注意保存文件API是分布式执行的 这个api的执行数据是 不经过 driver 的 如图写出的时候每个分区所在的Executor直接控制数据写出到目标文件系统中 所以才会一个分区产生1个结果文件 注意点 我们学习的action中 foreachsaveAsTextFile 这两个算子是分区Executor直接执行的 跳过Driver由分区所在的Executor直接执行 反之 其余的Action算子都会将结果发送至Driver 2.6 分区操作算子 mapPartitions算子 图解代码 如果mapPartition 一次被传递的是一整个分区的数据 作为一个迭代器一次性list对象传入过来 foreachPartition算子 功能和普通foreach一致一次处理的是一整个分区数据 foreachPartition就是一个没有返回值的mapPartitions PartitionBy算子 功能对RDD进行自定义分区操作 用法 rdd.partitionBy(参数1参数2) - 参数1 重新分区后有几个分区 - 参数2 自定义分区规则函数传入参数2k -int 一个传入参数进来类型无所谓但是返回值一定是int类型将key传给这个函数你自己写逻辑决定返回一个分区编号分区编号从0开始不要超过分区数-1分区号不要超标你设置3个分区分区号只能是 0 1 2 设置5个分区 分区号只能是 0 1 2 3 4 repartition算子 功能对RDD的分区执行重新分区仅数量 用法 rdd.repartition(N) 传入N 决定新的分区数 注意对分区的数量进行操作一定要慎重 一般情况下我们写spark代码 除了要求全局排序设置为1个分区外 多数时候所有API中关于分区相关的代码我们都不太理会 因为如果你改分区了 会影响并行计算内存迭代的并行管道数量分区如果增加极大可能会导致shuffle coalesce算子 功能对分区进行数量增减 用法 rdd.coalesce(参数1参数2) - 参数1分区数 - 参数2True or FlaseTrue 表示允许shuffle也就是可以加分区False表示不允许shuffle也就是不能加分区False是默认对比repartition一般使用coalesce较多因为加分区要写参数2 这样避免写repartition的时候手抖了加分区了 mapValues算子 功能针对二元元组RDD对其内部的二元元组的value执行map操作 语法 rdd.mapValues(func) # func V -U # 注意传入的参数是二元元组的value值 # 我们这个传入的方法只有对value进行处理join算子 功能对两个rdd执行join操作可实现sql的内\外连接 注意join算子只能用于二元元组 语法 rdd.join(other_rdd) # 内连接 rdd.leftOuterJoin(other_rdd) # 左外 rdd.rightOuterJoin(other_rdd) # 右外2.7 groupByKey和reduceByKey的区别 在功能上的区别 groupByKey仅仅有分组功能而已reduceByKey除了有ByKey的分组功能外还有reduce聚合功能所以是一分组聚合一体化的算子 如果对数据执行 分组聚合那么使用这2个算子的性能差别是很大的 reduceByKey的性能是远大于groupByKey 聚合逻辑的 因为 如图,这是groupByKey 聚合逻辑的执行流程 因为groupByKey只能分组所以执行上是先分组shuffle后聚合 如图reduceByKey由于自带聚合逻辑所以可以完成 先在分区内做预聚合然后再走分组流程shuffle分组后再做最终聚合 对于groupByKeyreduceByKey最大的提升在于分组前进行了预聚合那么在shuffle分组节点被shuffle的数据可以极大的减少 这就极大的提升了性能 分组聚合首选reduceByKey,数据越大对groupByKey的优势就越高 总结 RDD创建有哪几种方法 通过并行化集合的方式本地集合转分布式集合 或者读取数据的方式创建TextFile\WholdTextFile RDD分区数如何查看 通过getNumPartitions API查看返回值int Transformation 和 Action 的区别 转换算子的返回值100%是RDD而Action算子的返回值100%不是RDD 转换算子是懒加载的只有遇到Action才会执行Action就是转换算子处理链条的开关 哪两个Action算子不经过Driver直接输出 foreach 和 saveAsTextFile 直接由Executor执行后输出 不会将结果发送到Driver上去 reduceByKey和groupByKey的区别 reduceByKey自带聚合逻辑groupByKey不带 如果做数据聚合reduceByKey的效率更好因为可以先聚合后shuffle再最终聚合传输的IO小 mapPartitions 和 foreachPartition 的区别 mapPartitions带有返回值foreachPartition不带 对于分区操作有什么要注意的地方 尽量不要增加分区可能破坏内存迭代的计算管道 第三章 RDD持久化 3.1 RDD的数据是过程数据 RDD的数据是过程数据 RDD之间进行相互迭代计算Transformation的转换当执行开启后新RDD的生成代表老RDD的消失 RDD的数据是过程数据只在处理的过程中存在一旦处理完成就不见了 这个特性可以最大化的利用资源老旧的RDD没用了 就从内存中清理给后续的计算腾出内存空间 如上图rdd3被2次使用第一次使用之后其实rdd3就不存在了 第二次用的时候只能基于rdd的血缘关系从RDD1重新执行构建出来rdd3供rdd5使用 3.2 rdd的缓存 对于上述的场景肯定要执行优化优化就是 RDD3如果不消失那么RDD1-RDD2-RDD3这个链条就不会执行2次或者更多次 RDD的缓存技术Spark提供了缓存API可以让我们通过调用API将指定的RDD数据保留在内存或者硬盘上缓存的API # RDD3 被2次使用可以加入缓存进行优化 # 缓存到内存中 rdd3.cache() # 仅内存缓存 rdd3.persist(StorageLevel.MEMORY_ONLY) # 仅内存缓存2个副本 rdd3.persist(StorageLevel.MEMORY_ONLY_2) # 仅缓存硬盘上 rdd3.persist(StorageLevel.DISK_ONLY) # 仅缓存硬盘上2个副本 rdd3.persist(StorageLevel.DISK_ONLY_2) # 仅缓存硬盘上3个副本 rdd3.persist(StorageLevel.DISK_ONLY_3) # 先放内存不够放硬盘 rdd3.persist(StorageLevel.MEMORY_AND_DISK) # 先放内存不够放硬盘, 2个副本 rdd3.persist(StorageLevel.MEMORY_AND_DISK_2) # 堆外内存系统内存 rdd3.persist(StorageLever.OFF_HEAP)# 如上API自行选择使用即可 # 一般建议使用rdd3.persistStorageLevel.MEMORY_AND_DISK # 如果内存比较小的集群建议使用rdd3.persist(StorageLevel.DISK.ONLY)或者就别用缓存了 用CheckPoint# 主动清理缓存的API rdd.unpersist()缓存技术可以将过程RDD数据持久化保存到内存或者硬盘上但是这个保存在设定上是认为不安全的 缓存的数据在设计上是认为有丢失风险的 所以缓存有一个特点就是其保留在RDD之间的血缘依赖关系 一旦缓存丢失可以基于血缘关系的记录重新计算这个RDD的数据 缓存如何丢失 在内存中的缓存是不安全的比如断电\计算任务内存不足把缓存清理给计算让路硬盘中因为硬盘损坏也是可能丢失的 缓存到内存 ​ Executor:缓存到Executor的内存空间 缓存到硬盘 ​ Executor:缓存到Executor所在服务器的硬盘 3.3 RDD的CheckPoint CheckPoint技术也是将RDD的数据保存起来 但是它仅支持硬盘存储 并且 它被设计认为是安全的不保留血缘关系 CheckPoint是如何保存数据的 这个RDD的数据将被CheckPoint到HDFS中 CheckPoint存储RDD数据是集中收集各个分区数据进行存储而缓存是分散存储 缓存和CheckPoint的对比 CheckPoint 不管分区数量多少风险是一样的缓存分区越多风险越高CheckPoint 支持写入HDFS缓存不行HDFS是高可靠存储CheckPoint被认为是安全的CheckPoint 不支持内存缓存可以缓存如果写内存性能比CheckPoint要好一些CheckPoint 因为设计认为是安全的所以 不保留血缘关系而缓存因为设计上认为不安全所以保留 注意 CheckPoint是一种重量级的使用也就是RDD的重新计算成本很高的时候我们采用CheckPoint比较合适 或者数据量很大用CheckPoint比较合适 如果数据量小或者RDD重新计算是非常快的用CheckPoint没啥必要直接缓存即可 Cache和CheckPoint两个APi都不是Action类型 所以想要它俩工作必须在后面接上Action 接上Action的目的是让RDD有数据而不是为了让CheckPoint和Cache工作 总结 Cache和CheckPoint区别 cache是轻量化保存RDD数据可存储在内存和硬盘是分散存储设计上数据是不安全的保留RDD血缘关系 checkPoint是重量级保存rdd数据是集中存储只能存储在硬盘HDFS上设计上是安全的不保留rdd血缘关系 Cache和CheckPoint的性能对比 ​ cache性能更好因为是分散存储各个Executor并行执行效率高可以保存到内存中占内存更快 ​ checkpoint比较慢因为是集中存储涉及到网络IO但是存储到HDFS上更加安全多副本 第四章 RDD共享变量 4.1 广播变量 给每个 Executor 来一份数据而不是像原本那样每一个分区的处理线程都来一份节省内存 使用方法 # 1. 将本地list 标记成广播变量即可 broadcast sc.broadcast(stu_info_list)# 2. 使用广播变量从broadcast对象中取出本地list对象即可 value broadcast.value# 也就是 先放进去broadcast内部然后从broadcast内部在取出来用中间传输的是broadcast这个对象了 # 只要中间传输的是broadcast对象spark就会留意只会给每个Executor发一份了而不是傻傻的哪个分区都要给4.2 累加器 不使用累加器不管Executor中累加到多少都和Driver中无关计算结果统计不会累加 sc.accumulator(初始值)即可构建也就是使用累加器的时候要注意因为rdd是过程数据如果rdd被多次使用 可能会重新构建此rdd 如果累加器累加代码存在重新构建的步骤中 累加器累加的代码就可能被多次执行 如何解决加缓存或者CheckPoint即可 总结 广播变量解决了什么问题 分布式集合RDD和本地集合进行关联使用的时候降低内存占用以及减少网络IO传输提高性能 累加器解决了什么问题 分布式代码执行中进行全局累加 第五章 Spark内核调度 5.1 DAG Spark的核心是根据RDD来实现的Spark Scheduler则为Spark核心实现的重要一环其作用就是任务调度。Spark的任务调度就是如何组织任务去处理RDD中每个分区的数据根据RDD的依赖关系构建DAG,基于DAG划分Stage将每个Stage中的任务发到指定节点运行。基于Spark的任务调度原理可以合理规划资源利用做到尽可能用最少的资源高效地完成任务计算。 以词频统计WordCount程序为例DAG图 DAG有向无环图 有向有方向 无环没有闭环 DAG有方向没有形成闭环的一个执行流程图 此图就是一个典型的DAG图 有方向RDD1-RDD2-…-collect结束 无闭环以action(collect)结束了没有形成闭环循环 作用标识代码的逻辑执行流程 Job和Action Action:返回值不是RDD的算子 它的作用是一个触发开关会将action算子之前的一串rdd依赖链条执行起来也就是一个Action会产生一个DAG图。 一个Action会产生一个DAG如果代码中有3个Action就产生3个DAG 一个Action产生的一个DAG会在程序运行中产生一个Job 所以: 1个Action 1个DAG 1个Job 如果一个代码中写了3个Action那么这个代码运行起来产生3个Job每个Job有自己的DAG 一个代码运行起来在Spark中称之为Application 层级关系 1个Application中可以有多个Job每一个Job内含一个DAG同时每一个Job都是由一个Action产生的。 DAG和分区 Dag是Spark代码的逻辑执行图这个Dag的最终作用是;为了构建物理上的Spark详细执行计划而生 所以由于Spark是分布式多分区的那么Dag和分区之间也是有关联的 5.2 DAG的宽窄依赖和阶段划分 在SparkRDD前后之间的关系分为 窄依赖宽依赖 窄依赖父RDD的一个分区全部将数据发给子RDD的一个分区 宽依赖父RDD的一个分区将数据发给子RDD的多个分区 宽依赖还有一个别名shuffle 在stage的内部一定都是窄依赖 5.3 内存迭代计算 如图基于带有分区的Dag以及阶段划分可以从图中得到逻辑上最优的task分配一个task是一个线程来具体执行 那么如上图task1 中 rdd1 rdd2 rdd3 的迭代计算都是由一个task线程完成这一阶段的这一条线是纯内存计算。 如上图task1task2task3就形成了三个并行的内存计算管道 Spark默认受到全局并行度的限制除了个别算子有特殊分区情况大部分的算子都会遵循全局并行度的要求来规划自己的分区数 如果全局并行度是3其实大部分算子分区都是3 注意spark我们一般推荐只设置全局并行度不要再算子上设置并行度 除了一些排序算子外计算算子就让他默认开分区就可以了 Spark是怎么做内存计算的DAG的作用stage阶段划分的作用 spark会产生DAG图DAG图会基于分区和宽窄依赖划分阶段一个阶段的内部都是窄依赖窄依赖内如果形成前后11的分区对应关系就可以产生许多内存迭代计算的管道这些内存迭代计算的管道就是一个个具体的执行task一个task是一个具体的线程任务跑在一个线程内就是走内存计算了 Spark为什么比MapReduce快 Spark的算子丰富MapReduce算子匮乏Map和ReduceMapReduce这个编程模型很难在一套Mr中处理复杂的任务。 很多的复杂任务是需要写多个MapReduce进行串联多个MR串联通过磁盘交互数据 Spark可以执行内存迭代算子之间形成DAG基于依赖划分阶段后在阶段内形成内存迭代管道但是MapReduce的Map和Reduce之间的交互依旧是通过硬盘来交互的 总结 编程模型上Spark占优算子够多算子交互上和计算上可以尽量多的内存计算而非磁盘迭代 5.4 Spark并行度 在同一时间内有多个task在同时运行 并行度并行能力的设置 比如设置并行度6其实就是要6个task并行在跑 在有了6个task并行的前提下rdd的分区就被规划成6个分区了 如何设置并行度 可以在代码中和配置文件中以及提交程序的客户端参数中设置 优先级从高到低 代码中客户端提交参数中配置文件中默认1但是不会全部以1来跑多数时候基于读取文件的分片数量来作为默认并行度 全局并行度配置的参数 spark.default.parallelism 全局并行度 - 推荐 配置文件中 conf/spark-defaults.conf中设置 spark.default.parallelism 100在客户端提交参数中 bin/spark-submit --conf spark.default.parallelism100在代码中设置 conf SparkConf() conf.set(spark.default.parallelism,100)全局并行度是推荐设置不要针对RDD改分区可能会影响内存迭代管道的构建或者会产生额外的shuffle 针对RDD的并行度设置 - 不推荐 只能在代码中写算子 repartition算子coalesce算子partitionBy算子 集群中如何规划并行度 结论设置cpu总核心的2~10倍 比如集群可用cpu核心是100个我们建议并行度是200~1000 确保cpu核心的整数倍即可最小是2倍最大一般10倍或者更高适量均可 为什么设置最少2倍? cpu的一个核心同一时间只能干一件事情 所以在100个核心的情况下设置100个并行就能让cpu 100%出力 这种设置下如果task的压力不均衡某个task先执行完了就导致某个cpu核心空闲 所以我们将task并行分配的数量变多比如800个并行同一时间只有100个在运行700个在等待 但是可以确保某个task运行完了后续有task补上不让cpu闲下来最大程度利用集群的资源 规划并行度只看集群总CPU核心 5.5 Spark的任务调度 Spark的任务由Driver进行调试这个工作包含 逻辑DAG产生分区DAG产生Task划分将Task分配给Executor并监控其工作 如图,Spark程序的调度流程如图 Driver被构建出来构建SparkContext执行环境入口对象基于DAG SchedulerDAG调度器构建逻辑Task分配基于TaskScheduler(Task调度器)将逻辑Task分配到各个Executor上干活并监视它们workerExecutor被TaskScheduler管理监控听从它们的指令干活并定期汇报进度 1234 都是Driver的工作 5 是worker的工作 Driver内的两个组件 Dag调度器 工作内容将逻辑的Dag图进行处理最终得到逻辑上的Task划分 Task调度器 工作内容基于Dag scheduler的产出来规划这些逻辑的task应该在哪些物理的Executor上运行以及监控管理它们的运行 5.6 拓展 - Spark运行中的概念名词大全 Term[术语]Meaning【含义】ApplicationUser program built on Spark. Consists of a driver program and executors on the cluster.建立在Spark上的用户程序。由集群上的驱动程序和执行程序组成。Application jarA jar containing the users Spark application. In some cases users will want to create an “uber jar” containing their application along with its dependencies. The users jar should never include Hadoop or Spark Liberaries, however,these will be added at runtime一个包含用户Spark应用程序的jar。在某些情况下用户会希望创建一个“uber jar”其中包含他们的应用程序及其依赖项。users jar永远不应该包含Hadoop或Spark Liberies但是它们将在运行时添加Driver programThe process running the main() function of the application and creating the SparkContext运行应用程序的main函数并创建SparkContext的过程Cluster manageerAn external service for acquiring resources on the cluster(e.g. standalone manage mesos,YARN)用于获取集群上资源的外部服务例如独立管理mesos、YARNDeploy modeDistinguishes where the diever process runs In “cluster” mode, the framework launches the driver inside of the cluster In “client” mode, the submitter launches the driver outside of the cluster区分diever进程在哪里运行在“集群”模式中框架在集群内部启动驱动程序在“客户端”模式中提交者在集群外部启动驱动程序Worker nodeAny node that can run application code in the cluster可以在群集中运行应用程序代码的任何节点ExecutorA process launched for an application on a worker node. that runs tasks and keeps data in memory or disk storage across them.Each Application has its own executors为工作节点上的应用程序启动的进程。它运行任务并将数据保存在内存或磁盘存储中。每个申请都有自己的执行人TaskA unit of work that will be sent to one executor由多个任务组成的并行计算这些任务是响应Spark操作而产生的例如保存、收集你会在derivers日志中看到这个用法JobA parallel computaion consisting of multiple tasks that gets spawned in response to a Spark action(e.g.save,collect);youll see this used in the derivers logs由多个任务组成的并行计算这些任务是响应Spark操作例如保存、收集而产生的你会在derivers日志中看到这个用法StageEach job gets divided into smaller sets of tasks called stages that depend on each other(similar to the map and reduce stages in MapReduce)youll see this term used in the drivers logs每个作业都被划分为相互依赖的称为阶段的较小任务集类似于MapReduce中的map和reduce阶段你会在驱动程序日志中看到这个术语 层级关系梳理 一个spark环境可以运行多个Application一个代码运行起来会成为一个ApplicationApplication内部可以有多个job每个job由一个Action产生并且每个job有自己的DAG执行图一个Job的DAG图 会基于宽窄依赖划分成不同的阶段不同阶段内基于分区数量形成多个并行的内存迭代管道每一个内存迭代管道形成一个TaskDAG调度器划分将job内划分出具体的task任务一个job被划分出来的task在逻辑上称之为这个job的taskset 5.7 Spark Shuffle 简介 Spark在DAG调度阶段会将一个Job划分为多个Stage上游Stage做map工作下游Stage做reduce工作其本质上还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁它将map的输出对应到reduce输入中涉及到序列化反序列化跨节点网络IO以及磁盘读写IO等 Spark的Shuffle分为Write和Read两个阶段分属于两个不同的stage前者是Parent Stage的最后一步后者是Child Stage的每一步。 执行Shuffle的主体是Stage中的并发任务这些任务分ShuffleMapTask和ResultTask两种ShuffleMap Task要进行Shuffle ResultTask负责返回计算结果一个Job中只有最后的Stage采用ResultTask其他的均为ShuffleMapTask。如果要按照map端和reduce端来分析的话shuffleMapTask可以即是map端任务又是reduce端任务因为Spark中的shuffle是可以串行的resultTask则只能充当reduce端任务的角色。 Spark在1.1以前的版本一直是采用Hash Shuffle的实现方式到1.1版本时参考Hadoop MapReduce的实现开始引入Sort Shuffle, 在1.5版本时开始Tungsten钨丝计划引入Unsafe Shuffle优化内存及Cpu的使用在1.6中将Tungsten统一到Sort Shuffle中实现自我感知选择最佳Shuffle方式到的2.0版本Hash Shuffle已被删除所有shuffle方式全部统一到Sort Shuffle一个实现中 在Spark的中负责shuffle过程的执行计算和处理的组件主要就是ShuffleManager也即shuffle管理器shuffleManager随着Spark的发展有两种实现的方式分别为HashShuffleManager和SortShuffleManager因此spark的Shuffle有Hash Shuffle和Sort Shuffle两种 在Spark 1.2以前默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端就是会产生大量的中间磁盘文件进而由大量的磁盘IO操作影响了性能 因此在Spark 1.2以后的版本中默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说有了一定的改进。主要就在于每个Task在进行shuffle操作时虽然也会产生较多的临时磁盘文件但是最后会将所有的临时文件合并(merge)成一个磁盘文件因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时只要根据索引读取每个磁盘文件中的部分数据即可。 Hash Shuffle了解 Shuffle 阶段划分 shuffle write: mapper阶段上一个stage得到最后的结果写出 shuffle read: reduce阶段下一个stage拉取上一个stage进行合并 未经优化的hashShuffleManager HashShuffle是根据task是计算结果的key值的hashcode%ReduceTask来决定放入哪一个区分这样保证相同的数据一定放入一个分区Hash Shuffle过程如下 根据下游的task决定生成几个文件先生成缓冲区文件在写入磁盘文件再将block文件进行合并。 未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。提出如下解决方案 经过优化的hashShuffleManager 在shuffle write过程中task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念每个shuffleFileGroup会对应一批磁盘文件每一个group磁盘文件的数量与下游stage的task数量是相同的。 未经优化 上游的task数量m 下游的task数量n 上游executor数量k mk 总共的磁盘文件m*n 优化之后的 上游的task数量m 下游的task数量n 上游的executor数量k(mk) 总共的磁盘文件k*n Sort Shuffle Manager了解 SortShuffleManager的运行机制主要分成两种一种普通运行机制另一种bypass运行机制当shuffle write task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时默认为200就会启用bypass机制 该模式下数据会先写入一个内存数据结构中默认5M此时根据不同的shuffle算子可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子那么会选用Map数据结构一边通过Map进行聚合一边写入内存如果是join这种普通的shuffle算子那么会选用Array数据结构直接写入内存 接着每写一条数据进入内存数据结构之后就会判断一下是否达到了某个临界阈值。如果达到临界阈值的话那么就会尝试将内存数据结构中的数据溢写到磁盘然后清空内存数据结构 排序 在溢写到磁盘文件之前会先根据key对内存数据结构中已有的数据进行排序 溢写 排序过后会分批将数据写入磁盘文件。默认的batch数量是10000条也就是说排序好的数据会以每批1万条数据的形式分批写入磁盘文件。 merge 一个task将所有数据写入内存数据结构的过程中会发生多次磁盘溢写操作也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并成1个磁盘文件这就是merge过程 由于一个task就只对应一个磁盘文件也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中因此还会单独写一份索引文件其中标识了下游各个task的数据在文件中的start offset与end offset Sort Shuffle bypass机制 bypass运行机制的触发条件如下 shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold200参数的值不是map combine聚合的shuffle算子比如reduceByKey有map combie。 此时task会为每个reduce端的task都创建一个临时磁盘文件并将数据按key进行hash然后根据key的hash值将key写入对应的磁盘文件之中。当然写入磁盘文件时也是先写入内存缓冲缓冲写满之后再溢写到磁盘文件的。最后同样会将所有临时磁盘文件都合并成一个磁盘文件并创建一个单独的索引文件该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的因为都要创建数量惊人的磁盘文件只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件也让该机制相对未经优化的HashShuffleManager来说shuffle read的性能会更好。 面该机制与普通sortShuffleManager运行机制的不同在于 第一磁盘写机制不同 第二不会进行排序。也就是说启用该机制的最大好处在于shuffle write过程中不需要进行数据的排序操作也就节省掉了这部分的性能开销。 总结 SortShuffle也分为普通机制和bypass机制普通机制在内存数据结构默认为5M完成排序会产生2M个磁盘小文件而当shuffle map task 数量小于Spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子比如reduceByKey的时候会触发SortShuffle的bypass机制sortShuffle的bypass机制不会进行排序极大的提高了其性能。 Shuffle的配置选项 shuffle阶段划分 shuffle write: mapper阶段上一个stage得到最后的结果写出 shuffle read reduce阶段下一个stage拉取上一个stage进行合并 配置选项说明 spark的shuffle调优主要是调整缓冲的大小拉取次数重试次数与等待时间内存比例分配是否进行排序操作等等 spark.shuffle.file.buffer 参数说明 该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小默认是32k。将数据琯到磁盘文件之前会先写入buffer缓冲中待缓冲写满之后才会溢写到磁盘 调优建议如果作业可用的内存资源较为充足的话只可以适当增加这个参数的大小比如64k从而减少shuffle write过程中溢写磁盘文件的次数也就可以减少磁盘IO资次数进而提升性能。在实践中发现合理调节该参数性能会有1%~5%的提升 spark.reducer.maxSizeInFlight 参数说明该参数用于设置shuffle read task的buffer缓冲大小 而这个buffer缓冲决定了每次能够拉取多少数据默认48M 调优建议如果作业可用的内存资源较为充足的话可以适当增加这个参数的大小 比如96m从而减少拉取数据的次数也就可以减少网络传输的次数进而提升性能。在实践中发现 合理调节该参数性能会有1%~5%的提升 spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait spark.shuffle.io.maxRetries shuffle read task从shuffle write task所在节点拉取属于自己的数据时如果因为网络异常导致拉取失败是会自动进行重试的。该参数就代表了可以重试的最大次数默认是3次 spark.shuffle.io.retryWait:该参数代表了每次重试拉取数据的等待间隔默认5s 调优建议一般的调优都是将重试次数调高不调整时间间隔 spark.shuffle.menoryFraction 参数说明该参数代表Executor内存中分配给shuffle read task进行聚合操作内存比例 spark.shuffle.manager 参数说明该参数用于设置shufflemanager的类型默认sortspark1.5x以后有三个可选项 Hashspark1.x版本的默认值HashShuffleManager Sort: Spark2.x版本的默认值普通机制当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数自动开启bypass机制 spark.shuffle.sort.bypassMergeThreshold 参数说明当shuffleManager为sortShuffleManager时如果shuffle read task的数量小于这个阈值默认是200则shuffle write过程中不会进行排序操作 调优建议当你使用sortShuffleManager时如果的确不需要排序操作那么建议将这个参数调大一些 总结 DAG是什么有什么用 DAG有向无环图用以描述任务执行流程主要作用是协助DAG调度器构建Task分配用以做任务管理 内存迭代\阶段划分 基于DAG的宽窄依赖划分阶段阶段内部都是窄依赖可以构建内存迭代的管道 DAG调度器是 构建Task分配用以做任务管理 ​ ​ 到磁盘文件之前会先写入buffer缓冲中待缓冲写满之后才会溢写到磁盘 调优建议如果作业可用的内存资源较为充足的话只可以适当增加这个参数的大小比如64k从而减少shuffle write过程中溢写磁盘文件的次数也就可以减少磁盘IO资次数进而提升性能。在实践中发现合理调节该参数性能会有1%~5%的提升 spark.reducer.maxSizeInFlight 参数说明该参数用于设置shuffle read task的buffer缓冲大小 而这个buffer缓冲决定了每次能够拉取多少数据默认48M 调优建议如果作业可用的内存资源较为充足的话可以适当增加这个参数的大小 比如96m从而减少拉取数据的次数也就可以减少网络传输的次数进而提升性能。在实践中发现 合理调节该参数性能会有1%~5%的提升 spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait spark.shuffle.io.maxRetries shuffle read task从shuffle write task所在节点拉取属于自己的数据时如果因为网络异常导致拉取失败是会自动进行重试的。该参数就代表了可以重试的最大次数默认是3次 spark.shuffle.io.retryWait:该参数代表了每次重试拉取数据的等待间隔默认5s 调优建议一般的调优都是将重试次数调高不调整时间间隔 spark.shuffle.menoryFraction 参数说明该参数代表Executor内存中分配给shuffle read task进行聚合操作内存比例 spark.shuffle.manager 参数说明该参数用于设置shufflemanager的类型默认sortspark1.5x以后有三个可选项 Hashspark1.x版本的默认值HashShuffleManager Sort: Spark2.x版本的默认值普通机制当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数自动开启bypass机制 spark.shuffle.sort.bypassMergeThreshold 参数说明当shuffleManager为sortShuffleManager时如果shuffle read task的数量小于这个阈值默认是200则shuffle write过程中不会进行排序操作 调优建议当你使用sortShuffleManager时如果的确不需要排序操作那么建议将这个参数调大一些 总结 DAG是什么有什么用 DAG有向无环图用以描述任务执行流程主要作用是协助DAG调度器构建Task分配用以做任务管理 内存迭代\阶段划分 基于DAG的宽窄依赖划分阶段阶段内部都是窄依赖可以构建内存迭代的管道 DAG调度器是 构建Task分配用以做任务管理 ​ ​
http://www.pierceye.com/news/528500/

相关文章:

  • 建设厅报名网站做PHP网站前端网站进不去
  • 网站开发后台 amp建网页还是网站
  • 云南个旧建设局网站宁波关键词优化品牌
  • 网站建设方案应急处置wordpress我爱水煮鱼
  • 网页设计模板html代码班级主题广东seo推广
  • 西安 网站托管西安网站关键词排名
  • 做网站用discuz还是wp汉狮做网站公司郑州
  • 网站运营服务中心建设方案网页免费制作网站
  • 做网站销售有前景怎么注销网站
  • 福州建设网站设计电子商务网站平台有哪些
  • 扁平化色块风格的网站企业网站建设客户需求调查问卷
  • 网站建设产品服务痘痘如何去除效果好
  • 展会电子商务网站如何建设单页网站如何做
  • 济南软件外包邢台seo服务公司
  • 网站建设2017主流代码语言垂直型电商网站如何做
  • 重庆 网站定制推广产品怎么发朋友圈
  • 网站建设公司初心经典企业网站欣赏
  • 本地网站开发公司网站建设 产品拍照
  • 军队营房基础建设网站wordpress 标签云集
  • 苏州建设建设信息网站如何给自己的店做小程序
  • 沈阳微营销网站制作厨师培训机构 厨师短期培训班
  • 个人备案用作资讯网站网站开发yuanmus
  • 大连网站建设 选领超科技网站建设实录音乐
  • 上海网站建设流wordpress关闭会员
  • 网站运营的目的及意义pc网站怎么适配移动端
  • 网站深圳优化建设10月上海娱乐场所又要关门了
  • 怎么做网页文件打开别的网站河南省城乡和住房建设厅
  • 泰州公司做网站成都网页设计培训中心
  • 网站业务需求文档网站正在建设中 动态
  • 一级a做爰电影片免费网站姑苏区住房建设局网站