站内推广策略,wordpress好还是自己写好,深圳网站 建设信科网络,做料理网站关键词怎么设置参考文档及示例代码均基于pyspark3.1.2 1.什么是RDD#xff1f;2.job、stage、task如何划分#xff1f;3.什么是宽窄依赖#xff1f;4.spark有哪几种部署模式#xff1f;5.spark中的算子分为哪些类型#xff0c;举例说明。6.cache、persist、checkpoint的区别#xff0c;… 参考文档及示例代码均基于pyspark3.1.2 1.什么是RDD2.job、stage、task如何划分3.什么是宽窄依赖4.spark有哪几种部署模式5.spark中的算子分为哪些类型举例说明。6.cache、persist、checkpoint的区别及各自的使用场景7.广播变量与累加器。8.reduceByKey与groupByKey的区别?9.spark数据倾斜及通用调优。10.map与flatMap区别11.spark中的shuffle有哪几种方式 1.什么是RDD RDD弹性分布式数据集Resilient Distributed Datasets即一个分布于多个节点机器上的数据集合。为开发人员提供编程抽象具有只读的特点。这里只读的意思是当对RDD中的数据修改时并不修改原RDD而是返回一个新的RDD。注意RDD本身并不保存数据只是定义了一组计算规则。 RDD中的弹性体现在 1容错性包括基于血缘关系的容错和自动失败重试的容错。 血缘关系的容错RDD中一个分区的数据丢失可以通过RDD间的血缘关系重新计算得到该分区的数据。单个节点的故障不影响其他节点的任务处理。自动失败重试的容错包括task失败重试和stage失败重试由spark自动支持。且stage失败重试时只重试任务失败的分区而不是全部计算。 2计算存储方面内存和磁盘空间的自动切换和管理。包括计算过程中RDD的存储及持久化时持久化级别的动态管理。 计算过程中RDD的存储当内存使用完毕时自动溢写磁盘使得内存较小时也可以处理大数据量。持久化方面开发者可以自定义选择持久化级别包括持久化内存持久化磁盘持久化内存磁盘相结合的方式。 3计算过程中可动态调整分区repartition、coalesce。 2.job、stage、task如何划分 job应用程序中每遇到一个action算子就会划分为一个job。 stage一个job任务中从后往前划分分区间每产生了shuffle也就是宽依赖则划分为一个stagestage的划分体现了spark的pipeline思想即数据在内存中尽可能的往后多计算减少磁盘或者网络IO。 taskRDD中一个分区对应一个task。 3.什么是宽窄依赖 根据分区之间是否产生shuffle来确定。 宽依赖上游一个分区的数据被打散到下游的多个分区1:N 窄依赖上游一个分区的数据全部进入到下游的一个分区可以是1:1也可以是N:1 4.spark有哪几种部署模式 1.Local本地模式运行在单个机器一般用作测试环境。 2.Standalone一个基于MasterSlaves的资源调度集群。spark任务提交给Master调度管理是spark自带的一个调度系统。 3.Yarnspark客户端直接连接yarn不需要额外构建spark集群。有yarn-client和yarn-cluster两种模式主要区别在于driver程序的运行节点。yarn-client时driver运行在本地提交任务的客户端yarn-cluster是driver运行在集群中随机的任一节点。 4.Mesos比较少用不了解。 5.K8sspark后续高版本新增支持。 5.spark中的算子分为哪些类型举例说明。 spark中算子类型分为两类 1转换算子(Transformation)惰性求值需要action算子进行触发才会执行。返回一个新的RDD。不负责数据存储只是定义了一个计算规则。 map对RDD中的每个元素应用规则。 filter对RDD中的每个元素按规则过滤。 groupByKey将相同key的数据合并。 glom将RDD中的每个分区合并为一个列表。 union合并两个RDD。 simple抽样。 注关于持久化类算子也有人叫控制算子(cache、persist、checkpoint)严格意义上也属于转换算子需要动作算子才能触发。 2动作算子(Action)触发spark任务执行立即构建DAG有向无环图不返回RDD返回RDD的结果或者没有返回值。 collect以数组形式获取RDD中所有元素。 count获取RDD中元素个数。 first获取RDD中的第一个元素等价于take(1)。 take通过指定参数n获取RDD中前n个元素。 top通过指定参数n获取RDD中排序后的前n个元素。 更多RDD相关API参考官方文档https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.html#rdd-apis 6.cache、persist、checkpoint的区别及各自的使用场景 共同点1都用来做持久化避免多个action算子对同一个RDD的重复计算。2都遵循spark的惰性执行策略需要通过action算子触发执行。 区别 cache仅持久化到内存MEMORY_ONLY级别。等价于persist的默认持久化级别。persist默认持久化到内存(MEMORY_ONLY)但同时支持开发者自定义存储级别例如仅磁盘(DISK_ONLY)磁盘内存结合(MEMORY_AND_DISK)。 更多的存储级别设置及使用场景参考https://spark.apache.org/docs/3.1.2/rdd-programming-guide.html#rdd-persistencecheckpoint将数据持久化到节点指定路径中(sc.setCheckpointDir方法设置)如果执行模式是cluster则检查点路径必须为HDFS路径。该方法与上述两种方法最大的不同点在于会截断RDD的血缘关系而上述两种方法不会截断血缘关系只是起到了缓存数据避免重复计算的作用。checkpoint实际使用中有两点需要注意1checkpoint之前不要触发RDD的动作算子否则会截断血缘关系导致checkpoint重新计算时找不到血缘链条从而保存不到数据。2checkpoint前最好将需要保存的RDD通过cache或者persist缓存一下避免RDD的重复计算。 7.广播变量与累加器。
广播变量和累加器是spark中提供的两种共享变量分别用来解决广播通信和任务结果汇总的两种业务场景问题。详细参考官方文档https://spark.apache.org/docs/3.1.2/rdd-programming-guide.html#shared-variables
1广播变量 简而言之就是在每个集群节点中缓存一份driver端定义的公共变量且该被广播的变量在executor中只读。 当不使用广播变量的时候spark任务中需要用到的公共变量会copy到每个task中这种方式弊端一是重复存储占用内存资源二是增加了IO操作。而使用广播变量driver端定义的公共变量只会往每个集群中的worker节点中copy一份由executor中的所有task共享。且该方法的底层实现涉及到了序列化与反序列化以及高效的广播算法所以效率比较高。 demo
from pyspark.sql import SparkSession
需求从rdd中过滤掉singer中歌手的歌曲spark SparkSession.builder \.master(local[*]) \.appName(broadcast_demo) \.config(spark.executor.instances, 4) \.config(spark.executor.cores, 2) \.config(spark.executor.memory, 1g) \.getOrCreate()
sc spark.sparkContextrdd sc.parallelize([(梁静茹, 向左转向右转), (梁静茹, 亲亲), (王诗安, Home), (李宗盛, 山丘), (邵夷贝, 未来俱乐部)], 2)
print(f过滤前{rdd.collect()})singer [梁静茹, 王诗安]
# 设置广播变量并将singer广播到executor
bc sc.broadcast(singer)# 根据广播变量过滤并输出过滤结果
rdd_filter rdd.filter(lambda x: x[0] not in bc.value)
print(f过滤后{rdd_filter.collect()})sc.stop()
spark.stop()2累加器 累加器简要的概括是一种分布式共享只写变量。在driver端定义并被序列化到每个executor中在使用时被反序列化。所有executor中的task持有一个累加器的副本进行累加操作。并将结果回传给driver进行汇总。spark原生支持数值型累加器也支持开发人员自定义累计器类型。 demo
from pyspark.sql import SparkSession
需求统计rdd中属于singer中歌手的歌曲数量spark SparkSession.builder \.master(local[*]) \.appName(accumulator_demo) \.config(spark.executor.instances, 4) \.config(spark.executor.cores, 2) \.config(spark.executor.memory, 1g) \.getOrCreate()
sc spark.sparkContextrdd sc.parallelize([(梁静茹, 向左转向右转), (梁静茹, 亲亲), (王诗安, Home), (李宗盛, 山丘), (邵夷贝, 未来俱乐部)], 2)
singer [梁静茹, 王诗安]# 初始化一个初值为0的累加器
acc sc.accumulator(0)# 定义map函数统计属于singer的歌曲数量
def map_fun(x, s):if x[0] in s:acc.add(1)# 使用collect算子触发执行map函数并输出结果
rdd.map(lambda x: map_fun(x, singer)).collect()
print(f属于singer的歌曲数量{acc.value})sc.stop()
spark.stop()8.reduceByKey与groupByKey的区别? https://blog.csdn.net/atwdy/article/details/133155108 9.spark数据倾斜及通用调优。
10.map与flatMap区别 map对RDD中的每个元素应用规则并返回一个新的元素。也就是结果RDD的元素数量与原始RDD元素数量相等。 flatMap对RDD中每个元素应用规则并返回一个集合集合中的元素可以为0个或多个。在此基础之上再对所有的集合进行flat平铺操作可以理解为将各个集合元素合并到一起。 demo
from pyspark.sql import SparkSessionspark SparkSession.builder \.master(local[*]) \.appName(demo) \.config(spark.executor.instances, 4) \.config(spark.executor.cores, 2) \.config(spark.executor.memory, 1g) \.getOrCreate()
sc spark.sparkContextrdd sc.parallelize([2, 3, 4], 2)
rdd1 rdd.map(lambda x: range(1, x))
rdd2 rdd.flatMap(lambda x: range(1, x))print(fmap: {rdd1.collect()})
print(fflatMap: {rdd2.collect()})sc.stop()
spark.stop()11.spark中的shuffle有哪几种方式 两种。早期的HashShuffle和后期的SortShuffle。 HashShuffle后续高版本已被SortShuffle取代 未优化基于对下游分区个数hash取模实现下游有多少个分区上游每个task都会产生多少个小文件带来的问题是小文件过多增大磁盘和网络IO拖慢执行效率。同时上游每个task维护了多个小文件缓冲区增加内存压力。理论上的小文件个数 map task数量 x 下游分区数量。优化后HashShuffle的优化其实就是针对上游task产生的小文件的合并优化。未优化前每个task维护各自的缓冲区并生成和下游分区数量相等的小文件优化后每个executor中属于同一个的core的task会产生和下游分区数量相等的小文件并复用同一组小文件。所以理论上的小文件个数 上游core个数 x 下游分区数量。 SortShuffle 普通SortShuffle上游的每个map task会不断地往磁盘溢写小文件溢写前会进行排序每次溢写产生一个小文件最终将所有属于同一个task溢写的小文件merge为一个大文件并且产生一个索引文件下游的reduce task根据索引文件去读取属于自己分区的数据。即产生的小文件个数 map task数量 x 2。bypass机制这种机制可以理解为在未优化的HashShuffle机制基础上对同一个task产生的小文件进行了一个合并的功能产生一个大文件同时生成一个索引文件。这种机制相比普通SortShuffle省略了排序的过程。产生的文件个数 map task数量 x 2。触发该机制的两个阈值条件1reduce task数量 spark.shuffle.sort.bypassMergeThreshold参数的值默认为200。2不是聚合类的shuffle算子。准确来说不是map端预聚合的算子egreduceByKey因为为了聚合的高效通常要求数据有序而bypass机制并不对数据排序。 12.spark为什么比MR快