怎么开通网站,wordpress小工具音乐美化,百度搜索风云榜下载,新型营销方式1. 什么是RDD#xff1f;
RDD#xff08;Resilient Distributed Dataset#xff09;叫做弹性分布式数据集#xff0c;是Spark中最基本的数据处理模型。在代码中#xff0c;RDD是一个抽象类#xff0c;他代表着一个弹性的、不可变的、可分区的、里面的元素可并行计算的集…1. 什么是RDD
RDDResilient Distributed Dataset叫做弹性分布式数据集是Spark中最基本的数据处理模型。在代码中RDD是一个抽象类他代表着一个弹性的、不可变的、可分区的、里面的元素可并行计算的集合。注意RDD只是封装了计算逻辑并不保存数据。RDD是一个抽象类需要子类去实现。不可变指的是计算逻辑不可变如果想要改变则要产生新的RDD。
2. 五大核心属性
源码中五大属性介绍如下 1分区列表
分区的主要目的是实现并行计算/分布式计算
2分区计算函数
以分区为单位进行计算每个分区的计算函数都是一样的
3RDD之间的依赖关系
一个RDD能够转换成另一个RDD形成一种包装的依赖关系
4分区器
负责如何划分分区分区器是Option属性可能有可能没有
5计算每个分区的首选位置
数据存储的节点和数据计算节点可能不一样判断计算发给哪个节点更好移动数据不如移动计算
3. 执行原理
Spark框架在执行计算时先申请资源然后将数据处理逻辑分解成一个个计算任务然后将计算任务发送到已经分配资源的计算节点上按照指定的计算模型进行计算。以Yarn集群环境为例 其中Yarn只是负责资源调度的而NodeManager中的Driver才是负责任务调度的而NodeManager中的Executor是负责任务执行的。 4. 从集合中创建RDD
通过parallelize和makeRDD方法
val sparkConf new SparkConf.setMaster(local[*]).setAppName(RDD)val sc new SparkContext(sparkConf)val seq Seq[Int](1, 2, 3, 4)
// val rdd : RDD[Int] sc.parallelize(seq)
val rdd : RDD[Int] sc.makeRDD(seq)rdd.collect().foreach(println)sc.stop()
其中local[*]表示使用当前本机的核数如果不写[*]就用单核。parallelize和makeRDD方法本质是一样的makeRDD方法内部调用了parallelize方法。
makeRDD可以加上第二个参数表示分区数量如果不传会使用默认值scheduler.conf.getInt(spark.default.parallelism, totalCores)即会从sparkConf中获取配置参数如果没配置则使用totalCores即当前环境最大核数。当然这是针对本地模式的源码分析。
另外使用saveAsTextFile保存每个分区的文件。
val sparkConf new SparkConf.setMaster(local[*]).setAppName(RDD)val sc new SparkContext(sparkConf)val seq Seq[Int](1, 2, 3, 4)
// val rdd : RDD[Int] sc.parallelize(seq)
val rdd : RDD[Int] sc.makeRDD(seq, 2)rdd.saveAsTextFile(output)rdd.collect().foreach(println)sc.stop()
结果如下2个分区 可以设置sparkConf中的分区数量配置参数为5
val sparkConf new SparkConf.setMaster(local[*]).setAppName(RDD)
sparkConf.set(spark.default.parallelism, 5)val sc new SparkContext(sparkConf)val seq Seq[Int](1, 2, 3, 4)
// val rdd : RDD[Int] sc.parallelize(seq)
val rdd : RDD[Int] sc.makeRDD(seq)rdd.saveAsTextFile(output)rdd.collect().foreach(println)sc.stop()
结果如下 分区数据的划分可以参考 036 RDD-集合数据源-分区数据的分配
5. 从文件中创建RDD
val sparkConf new SparkConf.setMaster(local[*]).setAppName(RDD)val sc new SparkContext(sparkConf)val seq Seq[Int](1, 2, 3, 4)
// val rdd : RDD[Int] sc.parallelize(seq)
val rdd : RDD[String] sc.textFile(path)rdd.collect().foreach(println)sc.stop()
path可以是文件夹也可以是文件 还可以加上通配符*。另外path可以是分布式文件系统的路径。这里的textFile是以行为单位进行读取数据不考虑数据来自于哪个文件。如果需要考虑数据来源于哪个文件则需要用到wholeTextFiles方法。
val sparkConf new SparkConf.setMaster(local[*]).setAppName(RDD)val sc new SparkContext(sparkConf)val seq Seq[Int](1, 2, 3, 4)
// val rdd : RDD[Int] sc.parallelize(seq)
val rdd : RDD[String] sc.wholeTextFiles(path)rdd.collect().foreach(println)sc.stop()
读取结果形式类似如下 可以看出是以文件为单位进行读取文件全路径名称和文件内容以逗号隔开。
textFile也可以通过第二个参数指定分区数量如果不传默认为min(scheduler.conf.getInt(spark.default.parallelism, totalCores), 2)但是第二个参数并不完全是最终分区的数量这里只是表示最小分区数实际分区数量可能比这个值要大。实际分区数量怎么计算可以考037 RDD-文件数据源-分区的设定。分区数据的划分可参考038 RDD-文件数据源-分区数据的分配和039 RDD-文件数据源-分区数据的分配-案例分析