用自己网站做淘宝客,餐饮公司介绍模板,产品展示网站方案,南阳美容网站建设目录
#x1f436;3.2.1 分区过程
#x1f436;3.2.2 SplitSize计算和分区个数计算
#x1f436;3.2.3 Partition的数目设置
1. #x1f959;对于数据读入阶段#xff0c;输入文件被划分为多少个InputSplit就会需要多少初始task.
2. #x1f959;对于转换算子产生的…目录
3.2.1 分区过程
3.2.2 SplitSize计算和分区个数计算
3.2.3 Partition的数目设置
1. 对于数据读入阶段输入文件被划分为多少个InputSplit就会需要多少初始task.
2. 对于转换算子产生的RDD的分区数 3. 如果指定了spark.default.parallelism在进行shuffle之后的新的rdd会和spark.default.parallelism设置的一致
编辑
4. repartition和coalesce操作会聚合成指定分区数。
3.2.4 groupBy不一定会Shuffle 3.2.1 分区过程 每一个过程的任务数对应一个InputSplit,Paritition 输入可能以多个文件的形式存储在HDFS上面每个File都包含了很多块128切分称为block。
当Spark读取这些文件作为输入时会根据具体数据格式对应的InputFormat进行解析按照SplitSize切成一个个输入分片。随后将为这些输入分片生成具体的task. InputSplit与Task是一一对应的关系。 注意:InputSplit不能跨越文件。 随后这些具体的Task每个都会被分配到集群上的某个节点的某个Executor去执行。 每个节点可以起一个或多个Executor. 每个Executor由若干core组成每个Executor的每个core一次只能执行一个task. 每个task执行的结果就就是生成了目标rdd的一个partition. 注意这里的core是虚拟的core而不是机器的物理CPU核可以理解为Executor的一个工作线程。Task被执行的并发度Executor数目*每个Executor核数core总个数 3.2.2 SplitSize计算和分区个数计算 3.2.3 Partition的数目设置
1. 对于数据读入阶段输入文件被划分为多少个InputSplit就会需要多少初始task. 集合 优先等级1指定分区数 优先等级2使用 set(spark.default.parallelism,8) 优先等级3所有的可用核数 文件 根据计算来的任务切片大小和输入路径下的文件大小 ,至少2并行度 数据库 指定的
2. 对于转换算子产生的RDD的分区数 默认和父RDD的分区数一致 有些算子可以调用的时候指定分区个数 distinct groupBy groupByKey 特殊的算子 有特殊规定 union(和) join
val rdd3 rdd1.intersection(rdd2) // 取大的
val rdd4 rdd1.subtract(rdd2) // 前面的RDD分区数
println(rdd1.cartesian(rdd2).getNumPartitions) // 两个分区个数乘积 注意: 可能产生Shuffle的算子可以指定分区个数的 //可能产生shuffle的操作
distinct(p) 减少
groupBy(_._1 , p) Shuffle
groupByKey( p) Shuffle
groupByKey(__, p) Shuffle
join( , p) 3. 如果指定了spark.default.parallelism在进行shuffle之后的新的rdd会和spark.default.parallelism设置的一致
package com.doit.com.doit.day0128import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}/*** 日期: 2024/1/30* Author: Wang NaPao* Blog: https://blog.csdn.net/weixin_40968325?spm1018.2226.3001.5343* Tips: 我是技术大牛* Description:*//** data/orders.txt
oid01,100,bj
oid02,100,bj
oid03,100,bj
oid04,100,nj
oid05,100,nj
*/object Test06 {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(Starting...).setMaster(local[*]).set(spark.default.parallelism, 8)val sc new SparkContext(conf)//设置spark-submit提交程序时不在控制台打印日志信息Logger.getLogger(org.apache.spark).setLevel(Level.WARN)val rdd1 sc.textFile(data/orders.txt)//将rdd1的分区设置为2rdd1.repartition(2)println(rdd1 partition为rdd1.getNumPartitions)//将rdd1按照城市分组val rdd2 rdd1.groupBy(tp{val arr tp.split(,)arr(2)})println(rdd2 partition为rdd2.getNumPartitions)sc.stop()}
} 4. repartition和coalesce操作会聚合成指定分区数。
println(rdd1.repartition(3).getNumPartitions) // 增加
println(rdd1.repartition(1).getNumPartitions) //减少
println(rdd1.coalesce(1, true).getNumPartitions) //减少
println(rdd1.coalesce(3, true).getNumPartitions) //增加
// 不允许Shuffle就不能增加分区
println(rdd1.coalesce(3, false).getNumPartitions) //增加失败
println(rdd1.coalesce(1, false).getNumPartitions) //减少 不会Shuffle
3.2.4 groupBy不一定会Shuffle
Shuffle上游一个分区的数据可能被下游所有分区引用 package com.doit.com.doit.day0128import org.apache.spark.SparkContext.jarOfObject
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}/*** 日期: 2024/1/29* Author: Wang NaPao* Blog: https://blog.csdn.net/weixin_40968325?spm1018.2226.3001.5343* Tips: 我是技术大牛* Description:*/object Test03 {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(doe).setMaster(local[*])val sc new SparkContext(conf)val rdd1 sc.makeRDD(List(a b c d e f g), 2)val rdd2: RDD[String] rdd1.flatMap(_.split(\\s))val wordOne rdd2.map(line{println(aaaaaa)(line,1)}) //2//对数据使用HashPartitioner在分区 2val rdd3 wordOne.partitionBy(new HashPartitioner(3))rdd3.mapPartitionsWithIndex((p,iter){iter.map(e(p,e))}).foreach(println)//底层默认是HashPartition分区 2val rdd4: RDD[(String, Iterable[(String, Int)])] rdd3.groupBy(_._1, 3)val rdd5: RDD[(Int, (String, Iterable[(String, Int)]))] rdd4.mapPartitionsWithIndex((p, iter) {iter.map(e (p, e))})rdd5.foreach(println)sc.stop()}
} 结果