甘肃省城乡城乡建设厅网站首页,以net结尾的网站,石河子做网站,东莞网站推广的公司RDD
简介 在Spark中#xff0c;RDD是弹性分布式数据集#xff08;Resilient Distributed Dataset#xff09;的缩写。通俗来讲#xff0c;RDD是一种抽象的数据结构#xff0c;用于表示分布式计算中的数据集合。它是Spark中最基本的数据模型#xff0c;可以看作是一个不可…RDD
简介 在Spark中RDD是弹性分布式数据集Resilient Distributed Dataset的缩写。通俗来讲RDD是一种抽象的数据结构用于表示分布式计算中的数据集合。它是Spark中最基本的数据模型可以看作是一个不可变的、可分区、可并行处理的数据集合。这个数据集的全部或部分可以缓存在内存中可在多次计算中重用。 RDD是由一系列的记录或元素组成的这些记录可以分散存储在集群的多个节点上每个节点上的数据可以被并行处理。RDD提供了一系列的操作函数例如map、reduce、filter等可以对数据进行转换和计算。RDD的特点是具有容错性和弹性即使在节点故障的情况下也能自动恢复数据和计算过程。
RDD编程基础
1、RDD 创建
Spark 通过 textFile() 从文件系统本地系统、HDFS、集合中加载数据来创建RDD。
1.1、从文件系统中加载数据创建 RDD
import org.apache.spark.{SparkConf, SparkContext}object CreateRddByFileScala {def main(args: Array[String]): Unit {//创建SparkContext对象val conf new SparkConf()conf.setAppName(CreateRddByFileScala).setMaster(local)val sc new SparkContext(conf)//windowsval path D:\\test\\data//linux
// val path file:///usr/local/test/data///读取文件数据可以在textFile中生成的RDD分区数量val rdd sc.textFile(path,2)//获取每一行数据的长度计算文件内数据的总长度val length rdd.map(_.length).reduce(__)println(length)//关闭SparkContextsc.stop()}}1.2、从HDFS中加载数据
只需要修改路径如下 val path hadoop101:9000/test///读取文件数据可以在textFile中生成的RDD分区数量val rdd sc.textFile(path,2)
1.3、通过并行集合数组创建RDD
调用 SparkContext 的 parallelize() 方法通过一个已经存在的集合数组来创建RDD。
//创建SparkContextval conf new SparkConf()conf.setAppName(CreateRddByArrayScala).setMaster(local) //local表示在本地执行val sc new SparkContext(conf)//创建集合val arr Array(1,2,3,4,5)//基于集合创建RDDval rdd sc.parallelize(arr)
2、RDD 操作 RDD 的操作包括两种类型转换操作和行动操作。其中转换操作主要有map()、filter()、groupBy()、join()等对RDD而言每次转换都会产生一个新的RDD供下一次操作使用。而行动操作如count()、collect()等返回的一般都是一个值。
2.1、转换操作 RDD 的真个转换过程是采用惰性机制的也就是说整个转换过程只记录了转换的轨迹并不会真正的运算只有遇到行动操作才会触发从头到尾的真正计算。
1、filter(f: String Boolean)
用法和Scala中的filter一致。
输入文档
Hadoop is good
Spark is better
Spark is fast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RDDAction {def main(args: Array[String]): Unit {// 创建 SparkContext 对象val conf new SparkConf()conf.setAppName(filter-test).setMaster(local)val sc new SparkContext(conf)// 通过加载数据创建RDD对象val rdd: RDD[String] sc.textFile(data/word.txt)//filter 的参数是一个匿名函数 要求返回一个Boolean 类型的值 true-留下 false-过滤val lineWithSpark: RDD[String] rdd.filter(line {line.contains(Spark)})lineWithSpark.foreach(println)// 关闭sc对象sc.stop()}
}运行结果
Spark is better
Spark is fast
2、map()
同样和Scala中的map()用法一致。
//省略创建AparkContext对象的代码...// 使用并行集合创建 RDDval arr Array(1,2,3,4,5)val rdd1: RDD[Int] sc.parallelize(arr)//转换操作val rdd2 rdd1.map(num num*2)rdd2.foreach(println)
运行结果
2
4
6
8
10
//使用本地文件作为数据加载创建RDD 对象val rdd1: RDD[String] sc.textFile(data/word.txt)val rdd2: RDD[Array[String]] rdd1.map(line {line.split( )})
解析
输入
Hadoop is good
Spark is better
Spark is fast
Spark 读取进来后就变成了 RDD(Hadoop is good,Spark is better,Spark is fast)我们知道Scala中要进行扁平化操作的话对象必须是一个多维数组所以我们要通过 map() 对读取进来的格式进行处理处理后的格式RDD(Array(Hadoop is good),Array(Spark is better),Array(Spark is fast))
RDD(Hadoop is good,Spark is better,Spark is fast) RDD(Array(Hadoop is good),Array(Spark is better),Array(Spark is fast))
3、flatMap()
和Scala中用法基本一样。
//使用本地文件作为数据加载创建RDD 对象val rdd1: RDD[String] sc.textFile(data/word.txt)val rdd2: RDD[String] rdd1.flatMap(line line.split( ))
flatMap 的过程
RDD(Hadoop is good,Spark is better,Spark is fast)
先进行 map()
RDD(Array(Hadoop is good),Array(Spark is better),Array(Spark is fast))
在进行 flatten
RDD(Hadoop,is,good,Spark,is,better,Spark,is,fast))
扁平化后我们的数据又变为了一维集合的数据结构RDD了。
4、groupByKey() 这个函数十分重要上面我们得到了关于每次单词的一个RDD集合现在我们要进行wordcount 的话肯定还需要对相同的键进行一个分类这样会生成一个RDD集合(key:String,valut_listInterable[Int])。
我们同样基于上面的结果进行操作
val rdd3: RDD[(String, Int)] rdd2.map(word {(word, 1)})//RDD((Hadoop,1),(is,1),(good,1),(Spark,1),(is,1),(better,1),(Spark,1),(is,1),(fast,1)))val rdd4: RDD[(String, Iterable[Int])] rdd3.groupByKey()//RDD((Hadoop,1),(is,1,1,1),(good,1),(Spark,1,1),(better,1),(fast,1)))
5、reduceByKey()
需要注意的是reduceByKey是对(key:String,value:Int)这种相同键值对元素的合并而不是对上面groupByKey()的结果(key:String,value_list:Interable[Int])进行操作这个粗心让我找了半天。
//rdd5和6效果都一样val rdd5: RDD[(String,Int)] rdd4.map(t {(t._1, t._2.size)})//RDD((Hadoop,1),(is,3),(good,1),(Spark,2),(better,1),(fast,1)))// rdd3.reduceByKey((v1,v2)v1v2) //v1 v2代表发现key相同的键值对的值 参数按照顺序在函数体中只出现了一次 那么可以用下划线代替val rdd6: RDD[(String, Int)] rdd3.reduceByKey(_ _)//RDD((Hadoop,1),(is,3),(good,1),(Spark,2),(better,1),(fast,1)))//打印结果rdd6.foreach(println)运行结果
(Spark,2)
(is,3)
(fast,1)
(good,1)
(better,1)
(Hadoop,1)Process finished with exit code 0总结
剩下的RDD转换操作下午再新开一篇以及RDD的行动操作篇、持久化、分区和综合实例后续更新。