地方门户网站的特点,4d网站广告图用什么做的,在线制作图片背景,网站页面划分在Spark的学习当中#xff0c;RDD、DataFrame、DataSet可以说都是需要着重理解的专业名词概念。尤其是在涉及到数据结构的部分#xff0c;理解清楚这三者的共性与区别#xff0c;非常有必要。 RDD#xff0c;作为Spark的核心数据抽象#xff0c;是Spark当中不可或缺的存在…在Spark的学习当中RDD、DataFrame、DataSet可以说都是需要着重理解的专业名词概念。尤其是在涉及到数据结构的部分理解清楚这三者的共性与区别非常有必要。 RDD作为Spark的核心数据抽象是Spark当中不可或缺的存在而在SparkSQL中Spark为我们提供了两个新的抽象分别是DataFrame和DataSet。
RDD、DataFrame、DataSet三者的共性 RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集为处理超大型数据提供便利。 三者都有惰性机制在进行创建、转换如map方法时不会立即执行只有在遇到Action如foreach时三者才会开始遍历运算。 三者都会根据spark的内存情况自动缓存运算这样即使数据量很大也不用担心会内存溢出。 三者都有partition的概念。三者有许多共同的函数如filter排序等。
DataFrame、DataSet和RDD有什么区别
首先从版本的产生上来看RDD(Spark1.0)—Dataframe(Spark1.3)—Dataset(Spark1.6)
RDD RDD一般和spark mlib同时使用。 RDD不支持sparksql操作。DataFrame ①与RDD和Dataset不同DataFrame每一行的类型固定为Row只有通过解析才能获取各个字段的值。 ②DataFrame引入了schema和off-heap schemaRDD每一行的数据结构都是一样的。这个结构就存储在schema中。Spark通过schame就能够读懂数据因此在通信和IO时就只需要序列化和反序列化数据而结构的部分就可以省略了。 off-heap意味着JVM堆以外的内存这些内存直接受操作系统管理而不是JVM。Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中当要操作数据时就直接操作off-heap内存。由于Spark理解schema所以知道该如何操作。 off-heap就像地盘schema就像地图Spark有地图又有自己地盘了就可以自己说了算了不再受JVM的限制也就不再收GC的困扰了。 ③结构化数据处理非常方便支持AvroCSVElasticsearch数据等也支持HiveMySQL等传统数据表。 ④兼容Hive支持Hql、UDF 有schema和off-heap概念DataFrame解决了RDD的缺点但是却丢了RDD的优点。DataFrame不是类型安全的只有编译后才能知道类型错误API也不是面向对象风格的。Dataset ①DataSet集中了RDD的优点强类型和可以用强大lambda函数以及Spark SQL优化的执行引擎。DataSet可以通过JVM的对象进行构建可以用函数式的转换map/flatmap/filter进行多种操作。 ②DataSet结合了RDD和DataFrame的优点并带来的一个新的概念Encoder。DataSet通过Encoder实现了自定义的序列化格式使得某些操作可以在无需序列化情况下进行。另外Dataset还进行了包括Tungsten优化在内的很多性能方面的优化。 ③DatasetRow等同于DataFrameSpark 2.X
RDD、DataFrame、DataSet的创建
创建RDD 在Spark中创建RDD的方式主要分为2种 1.读取内存数据创建RDD 2.读取文件创建RDD 3.通过其他RDD创建RDD
1、读取内存数据创建RDD 读取内存数据创建RDDSpark主要提供了两个方法parallelize和makeRDD。 使用makeRDD创建RDD的时候还可以指定分区数量。 val sc new SparkContext(new SparkConf().setMaster(local[*]).setAppName(CreateRDD)) // 从内存中创建RDD将内存中集合的数据作为处理的数据源 val seq Seq[Int](elems 1,2,3,4) // parallelize方法创建RDD // val rdd sc.parallelize(seq) // makeRDD方法创建RDD // val rdd sc.makeRDD(seq) // 指定分区数量创建RDD val rdd sc.makeRDD(seq,3) rdd.collect().foreach(println) sc.stop() 2、读取文件创建RDD 读取文件创建RDDSpark提供了textFile和wholeTextFiles方法 textFile以行为单位进行读取数据 wholeTextFiles以文件为单位读取数据读取的结果为元组形式第一个值为文件路径第二个值为文件内容。 val sc new SparkContext(new SparkConf().setMaster(local[*]).setAppName(Rdd_File)) // textFile方法读取文件创建RDD // val rdd sc.textFile(path test.txt) // textFile方法也是可以指定分区数量的 // val rdd sc.textFile(path test.txt, 3) // wholeTextFiles方法读取多个文件创建RDD val rdd sc.wholeTextFiles(path test*.txt) rdd.collect().foreach(println) sc.stop() 3、通过其他RDD创建RDD val conf: SparkConf new SparkConf().setAppName(this.getClass.getName).setMaster(local[*]) val sc new SparkContext(conf) val rdd: RDD[String] sc.textFile(D:\\develop\\workspace\\bigdata2021\\spark2021\\input) val flatRDD: RDD[String] rdd.flatMap(_.split( )) sc.stop() 创建DataFrame
1、通过Seq生成 val spark SparkSession .builder() .appName(this.getClass.getSimpleName).master(local) .getOrCreate() val df spark.createDataFrame(Seq( (ming, 20, 15552211521L), (hong, 19, 13287994007L), (zhi, 21, 15552211523L) )) toDF(name, age, phone) df.show() 2、读取Json文件生成 json文件内容 {name:ming,age:20,phone:15552211521} {name:hong, age:19,phone:13287994007} {name:zhi, age:21,phone:15552211523} val dfJson spark.read.format(json).load(/Users/shirukai/Desktop/HollySys/Repository/sparkLearn/data/student.json) dfJson.show() 3、读取csv文件生成 csv文件 name,age,phone ming,20,15552211521 hong,19,13287994007 zhi,21,15552211523 val dfCsv spark.read.format(csv).option(header, true).load(/Users/shirukai/Desktop/HollySys/Repository/sparkLearn/data/students.csv) dfCsv.show() 4、通过Json格式的RDD生成弃用 val sc spark.sparkContext import spark.implicits._ val jsonRDD sc.makeRDD(Array( {\name\:\ming\,\age\:20,\phone\:15552211521}, {\name\:\hong\, \age\:19,\phone\:13287994007}, {\name\:\zhi\, \age\:21,\phone\:15552211523} )) val jsonRddDf spark.read.json(jsonRDD) jsonRddDf.show() 5、通过Json格式的DataSet生成 val jsonDataSet spark.createDataset(Array( {\name\:\ming\,\age\:20,\phone\:15552211521}, {\name\:\hong\, \age\:19,\phone\:13287994007}, {\name\:\zhi\, \age\:21,\phone\:15552211523} )) val jsonDataSetDf spark.read.json(jsonDataSet) jsonDataSetDf.show() 6、通过csv格式的DataSet生成 val scvDataSet spark.createDataset(Array( ming,20,15552211521, hong,19,13287994007, zhi,21,15552211523 )) spark.read.csv(scvDataSet).toDF(name,age,phone).show() 7、动态创建schema val schema StructType(List( StructField(name, StringType, true), StructField(age, IntegerType, true), StructField(phone, LongType, true) )) val dataList new util.ArrayList[Row]() dataList.add(Row(ming,20,15552211521L)) dataList.add(Row(hong,19,13287994007L)) dataList.add(Row(zhi,21,15552211523L)) spark.createDataFrame(dataList,schema).show() 8、通过jdbc创建 //第八种读取数据库mysql val options new util.HashMap[String,String]() options.put(url, jdbc:mysql://localhost:3306/spark) options.put(driver,com.mysql.jdbc.Driver) options.put(user,root) options.put(password,hollysys) options.put(dbtable,user) spark.read.format(jdbc).options(options).load().show() 创建Dateset
1、通过createDatasetseq,list,rdd import org.apache.spark.SparkContext import org.apache.spark.sql.{Dataset, SparkSession} object CreateDataset { def main(args: Array[String]): Unit { val spark: SparkSession SparkSession.builder().master(local[4]).appName(this.getClass.getName).getOrCreate() // 需要导入隐式转换 import spark.implicits._ val sc: SparkContext spark.sparkContext //通过seq创建Dataset val seqDs: Dataset[Int] spark.createDataset(1 to 10) //通过list创建Dataset val listDs: Dataset[(String, Int)] spark.createDataset(List((a,1),(b,2),(c,3))) //通过rdd创建Dataset val rddDs: Dataset[(String, Int, Int)] spark.createDataset(sc.parallelize(List((a,1,2),(b,2,3),(c,3,4)))) seqDs.show() listDs.show() rddDs.show() } } 2、通过case class import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, SparkSession} import scala.collection.mutable object CreateDataSetByCaseClass { case class Point(label:String,x:Double,y:Double) case class Category(id:Long,name:String) def main(args: Array[String]): Unit { val spark: SparkSession SparkSession.builder().master(local[4]).appName(this.getClass.getName).getOrCreate() // 需要导入隐式转换 import spark.implicits._ val sc: SparkContext spark.sparkContext //通过Point的样例类创建一个seq并将它转化为Dataset val points: Dataset[Point] Seq(Point(bar,2.6,3.5),Point(foo,4.0,3.7)).toDS() //通过Category的样例类创建一个seq并将它转化为Dataset val categories: Dataset[Category] Seq(Category(1,bar),Category(2,foo)).toDS() //进行join连接,注意这里需要传入三个”“这时一个方法 points.join(categories,points(label)categories(name)).show() //通过Point的样例类创建一个List并将它转化为Dataset val points2: Dataset[Point] List(Point(bar,2.6,3.5),Point(foo,4.0,3.7)).toDS() //通过Category的样例类创建一个List并将它转化为Dataset val categories2: Dataset[Category] List(Category(1,bar),Category(2,foo)).toDS() //进行join连接,注意这里需要传入三个”“这时一个方法 points2.join(categories2,points2(label)categories2(name)).show() //通过Point的样例类创建一个RDD并将它转化为Dataset val points3: Dataset[Point] sc.parallelize(List(Point(bar,2.6,3.5),Point(foo,4.0,3.7))).toDS() //通过Category的样例类创建一个RDD并将它转化为Dataset val categories3: Dataset[Category] sc.parallelize(List(Category(1,bar),Category(2,foo))).toDS() points3.join(categories3,points3(label)categories3(name)).show() } } RDD、DataFrame、DataSet三者之间的转换
1.RDD与DataFrame转换
1toDF方法将RDD转换为DataFrame ## 创建RDD val rdd: RDD[(Int, String, Int)] spark.sparkContext.makeRDD(List((1, ww, 20), (2, ss, 30), (3, xx, 40))) ## 指定列名 val df: DataFrame rdd.toDF(id, name, age) ## 不指定列名 val df1: DataFrame rdd.toDF() ## 展示 df.show() df1.show() 2rdd方法将DataFrame转换为RDD。 val rowRDD: RDD[Row] df.rdd ## 输出 rowRDD.collect().foreach(println) 2.DataFrame与DataSet转换
1as方法将DataFrame转换为DataSet使用 as[] 方法时需要指明数据类型或者采用样例类的方式 ## 引入隐式转换 import spark.implicits._ ## 创建样例类(不能创建于main方法中) case class User(id:Int,name:String,age:Int) ## 指定数据类型 val ds: Dataset[(Int,String,Int)] df.as[(Int,String,Int)] ## 采用样例类 val ds1: Dataset[User] df.as[User] ## 展示 ds.show() ds1.show() 2toDF方法将DataSet转换为DataFrame。 ## 转换 val df2: DataFrame ds.toDF() ## 展示 df2.show() 3.RDD与DataSet转换
1toDS方法将RDD转换为DataSet使用 toDS() 方法时可以先将数据包装为样例类的形式也可以直接以数据类型输出 ## 通过case将样例类User与数据进行匹配 val ds2: Dataset[User] rdd.map { case (id, name, age) { User(id, name, age) } }.toDS() ## 直接转换 val ds3: Dataset[(Int, String, Int)]rdd.toDS() ## 展示 ds2.show() ds3.show() 2rdd方法将DataSet转换为RDD ## 转换 val userRDD: RDD[User] ds1.rdd ## 输出 userRDD.collect().foreach(println) 编程要求 DD 转换成 DataFrame、Dataset 1、读取list数据创建 RDD 2、将 RDD转换为 DataFrame,并指定列名为(id,name,sex,age) 3、将 RDD转换为 DataSet,并以样例类的方式转换。 DataFrame 转换成 RDD、DataSet 1、读取staff.josn文件创建 DataFrame 2、将 DataFrame转换为 RDD 3、将 DataFrame转换为 DataSet。 DataSet 转换成 RDD、DataFrame 1、读取staff2.json文件创建 DataSet并以Staff样例类的方法创建 2、将 DataSet转换为 DataFrame 3、将 DataSet转换为 RDD。 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, sql} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object sparkSql_transform { case class Message() def main(args: Array[String]): Unit { val sparkConf new SparkConf().setMaster(local[*]).setAppName(sparkSQL) val spark SparkSession.builder().config(sparkConf).getOrCreate() import spark.implicits._ val list List((202201, Mark, female, 23), (202202, Peter, female, 24), (202203, Anna, male, 21)) val path1 /data/workspace/myshixun/step1/data/staff.json val path2 /data/workspace/myshixun/step1/data/staff2.json /********* Begin *********/ /********* RDD 转换成 DataFrame、DataSet *********/ // 读取list数据创建RDD val rdd:RDD[(Int,String,String,Int)]spark.sparkContext.makeRDD(list) // 将RDD转换为DataFrame,指定列名为id,name,sex,age,并打印输出 val df:DataFramerdd.toDF(id,name,sex,age) df.show() // 将RDD转换为DataSet,以样例类的方式转换,并打印输出 val dsrdd.map{lineStaff(line._1,line._2,line._3,line._4)}.toDS() ds.show() /********* DataFrame 转换成 RDD、DataSet *********/ // 读取staff.josn文件创建DataFrame val df1: DataFrame spark.read.json(path1) // 将DataFrame转换为RDD,并打印输出 val rdd1df1.rdd rdd1.collect().foreach(println) // 将DataFrame转换为DataSet,并打印输出 val ds1df1.as[Staff] ds1.show() /********* DataSet 转换成 RDD、DataFrame *********/ // 读取staff2.json文件创建DataSet并以Staff样例类的方法创建 val ds2: Dataset[Staff] spark.read.json(path2).as[Staff] // 将DataSet转换为DataFrame并打印输出 val df2ds2.toDF df2.show() // 将DataSet转换为RDD并打印输出 val rdd2ds2.rdd rdd2.collect().foreach(println) /********* End *********/ // TODO 关闭环境 spark.close() } // Staff样例类 case class Staff(id: BigInt,name: String,sex: String,age: BigInt) }