阿凡达网站建设网,网络营销包括什么内容,如何制作网络游戏,wordpress 管理员登陆文章目录 1 DataFrame的构建方式方式一#xff1a;JavaBean反射的方式1.1 创建Scala类1.2 创建Scala对象 方式二#xff1a;动态编码的方式 2 DataSet的构建方式3 RDD和DataFrame以及DataSet之间的相互转换3.1【RDD--DataFrame】和【RDD--DataSet】3.2【DataFrame--JavaBean反射的方式1.1 创建Scala类1.2 创建Scala对象 方式二动态编码的方式 2 DataSet的构建方式3 RDD和DataFrame以及DataSet之间的相互转换3.1【RDD--DataFrame】和【RDD--DataSet】3.2【DataFrame--RDD】和【DataFrame--DataSet】3.3【DataSet--RDD】和【DataSet--DataFrame】 1 DataFrame的构建方式
方式一JavaBean反射的方式
1.1 创建Scala类 package _02SparkSQL// 统一的样例类
case class _02student(id:Int,name:String,gender:String,age:Int)1.2 创建Scala对象 package _02SparkSQLimport org.apache.spark.sql.{DataFrame, SparkSession}object _02createDataFrame {//使用JavaBean方式反射def main(args: Array[String]): Unit {//创建SparkSession对象val spark SparkSession.builder().appName(CreateDataFrame).master(local[*]).getOrCreate()val list List(new _02student(id 1,name张三,gender 男,age18),new _02student(id 1,name李四,gender 女,age26),new _02student(id 1,name王五,gender 男,age34),new _02student(id 1,name赵六,gender 女,age45),)//需要提供隐式转换才可以进行操作需要使用SparkSession对象进行操作import spark.implicits._val frame: DataFrame list.toDF()frame.printSchema()/*root|-- id: integer (nullable false)|-- name: string (nullable true)|-- gender: string (nullable true)|-- age: integer (nullable false)*/frame.show()/*运行结果----------------| id|name|gender|age|----------------| 1|张三| 男| 18|| 1|李四| 女| 26|| 1|王五| 男| 34|| 1|赵六| 女| 45|----------------*/}
}
方式二动态编码的方式
说明这里学习三个新的类 【Row】代表的是二维表中的一行记录或者就是一个Java对象 【StructType】是该二维表的元数据信息是StructField的集合 【StructField】是该二维表中某一个字段/列的元数据信息主要包括列名类型是否可以为null 总结 这两种方式都是非常常用但是动态编程更加的灵活因为javabean的方式的话提前要确定好数据格式类型后期无法做改动。
package _02SparkSQLimport org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object _03createDataFrame {//动态编程方式def main(args: Array[String]): Unit {// 构建SparkSession对象val spark SparkSession.builder().appName(03createDataFrame).master(local[*]).getOrCreate()//需要构建RDD数据//因为SparkSession的底层是包含是SparkContext对象val row spark.sparkContext.makeRDD(List(// 需要使用Row来表示一行的内容Row(1,张三,男,18),Row(2,李四,女,23),Row(3,王五,男,35),Row(4,赵六,女,56)))//表对应的元数据信息【列列数据类型是否可以为空】val schema StructType(List(//需要根据Row中列的个数来决定提供StructField的个数StructField(id,DataTypes.IntegerType,false),StructField(name,DataTypes.StringType,false),StructField(gender,DataTypes.StringType,false),StructField(age,DataTypes.IntegerType,false),))//构建DataFrame对象val frame: DataFrame spark.createDataFrame(row, schema)frame.printSchema()/*运行结果root|-- id: integer (nullable false)|-- name: string (nullable false)|-- gender: string (nullable false)|-- age: integer (nullable false)*/frame.show()/*运行结果----------------| id|name|gender|age|----------------| 1|张三| 男| 18|| 2|李四| 女| 23|| 3|王五| 男| 35|| 4|赵六| 女| 56|----------------*/}
}2 DataSet的构建方式
DataSet是DataFrame的升级版创建方式和DataFrame类似但有不同 在创建Dataset的时候需要注意数据的格式必须使用 caseclass 或者基本数据类型同时需要通过import spark.implicts._来完成数据类型的编码从而抽取出对应的元数据信息否则编译无法通过
package _02SparkSQLimport org.apache.spark.sql.{Dataset, SparkSession}object _04createDataSet {def main(args: Array[String]): Unit {//创建SparkSession对象val session SparkSession.builder().appName(CreateDataSet).master(local[*]).getOrCreate()//提供List集合存储数据val list List(new _02student(id 1,name咪咪,gender 男,age6),new _02student(id 2,name凯凯,gender 男,age8),new _02student(id 3,name超超,gender 男,age7),new _02student(id 4,name大宝,gender 女,age9),)//通过List集合构建DataSet对象List集合中存储的是样例类对象import session.implicits._val ds: Dataset[_02student] list.toDS()ds.printSchema()/*运行结果root|-- id: integer (nullable false)|-- name: string (nullable true)|-- gender: string (nullable true)|-- age: integer (nullable false)*/ds.show()/*运行结果----------------| id|name|gender|age|----------------| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|----------------*///支持基本数据类型val list2 List(1,2,3,4,5,6,7,8)val ds2: Dataset[Int] list2.toDS()ds2.printSchema()/*运行结果root|-- value: integer (nullable false)*/ds2.show()/*运行结果-----|value|-----| 1|| 2|| 3|| 4|| 5|| 6|| 7|| 8|-----*/
}
}3 RDD和DataFrame以及DataSet之间的相互转换
3.1【RDD–DataFrame】和【RDD–DataSet】
package _02SparkSQLimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object _05RDDToDataFrame {def main(args: Array[String]): Unit {//创建SparkSession对象val spark SparkSession.builder().appName(RDDToDataFrame).master(local[*]).getOrCreate()val rdd spark.sparkContext.makeRDD(List(new _02student(id 1,name咪咪,gender 男,age6),new _02student(id 2,name凯凯,gender 男,age8),new _02student(id 3,name超超,gender 男,age7),new _02student(id 4,name大宝,gender 女,age9),))println(rdd.collect().toBuffer)/*运行结果ArrayBuffer(_02student(1,咪咪,男,6),_02student(2,凯凯,男,8),_02student(3,超超,男,7),_02student(4,大宝,女,9))*//*** RDD 转换为 DataFrame*///需要使用SparkSession所创建的对象进行隐式转换导入操作import spark.implicits._val frame: DataFrame rdd.toDF()frame.show()/*运行结果----------------| id|name|gender|age|----------------| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|----------------*//*** RDD 转换为 DataSet*/val dataset: Dataset[_02student] rdd.toDS()dataset.show()/*运行结果----------------| id|name|gender|age|----------------| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|----------------*/}
}3.2【DataFrame–RDD】和【DataFrame–DataSet】
package _02SparkSQLimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object _05RDDToDataFrame {def main(args: Array[String]): Unit {//创建SparkSession对象val spark SparkSession.builder().appName(RDDToDataFrame).master(local[*]).getOrCreate()val rdd spark.sparkContext.makeRDD(List(new _02student(id 1,name咪咪,gender 男,age6),new _02student(id 2,name凯凯,gender 男,age8),new _02student(id 3,name超超,gender 男,age7),new _02student(id 4,name大宝,gender 女,age9),))println(rdd.collect().toBuffer)/*运行结果ArrayBuffer(_02student(1,咪咪,男,6),_02student(2,凯凯,男,8),_02student(3,超超,男,7),_02student(4,大宝,女,9))*//*** RDD 转换为DataFrame*///需要使用SparkSession所创建的对象进行隐式转换导入操作import spark.implicits._val frame: DataFrame rdd.toDF()frame.show()/*运行结果----------------| id|name|gender|age|----------------| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|----------------*//*** DataFrame转换为RDD*///Row是DataFrame动态构建时提供的行对象val rdd1: RDD[Row] frame.rddrdd1.foreach(row{//按照列的序号获取即可序号与数组下标一样从0开始到长度-1println(row)/*运行结果[2,凯凯,男,8][4,大宝,女,9][1,咪咪,男,6][3,超超,男,7]*///取值的时候使用getxxx方法xxx就是列的数据类型val id row.getInt(0)val name row.getString(1)val gender row.getString(2)val age row.getAs[Int](age)println(id name gender age)/*运行结果3 超超 男 71 咪咪 男 62 凯凯 男 84 大宝 女 9*/})/*** DataFrame转换为DataSet*///DataFrame其实就是DataSet的特例val dataset2: Dataset[_02student] frame.as[_02student]dataset2.show()/*运行结果----------------| id|name|gender|age|----------------| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|----------------*/}
}3.3【DataSet–RDD】和【DataSet–DataFrame】
package _02SparkSQLimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object _05RDDToDataFrame {def main(args: Array[String]): Unit {//创建SparkSession对象val spark SparkSession.builder().appName(RDDToDataFrame).master(local[*]).getOrCreate()val rdd spark.sparkContext.makeRDD(List(new _02student(id 1,name咪咪,gender 男,age6),new _02student(id 2,name凯凯,gender 男,age8),new _02student(id 3,name超超,gender 男,age7),new _02student(id 4,name大宝,gender 女,age9),))println(rdd.collect().toBuffer)/*运行结果ArrayBuffer(_02student(1,咪咪,男,6), _02student(2,凯凯,男,8), _02student(3,超超,男,7), _02student(4,大宝,女,9))*///RDD 转换为DataSetval dataset: Dataset[_02student] rdd.toDS()dataset.show()/*运行结果----------------| id|name|gender|age|----------------| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|----------------*//*** DataSet转换为RDD*/val rdd2: RDD[_02student] dataset.rddprintln(rdd2.collect().toBuffer)/*运行结果ArrayBuffer(_02student(1,咪咪,男,6), _02student(2,凯凯,男,8), _02student(3,超超,男,7), _02student(4,大宝,女,9))*//*** DataSet转换为DataFrame*/val frame1: DataFrame dataset.toDF()frame1.show()/*运行结果----------------| id|name|gender|age|----------------| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|----------------*/}
}附上完整版代码
package _02SparkSQLimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object _05RDDToDataFrame {def main(args: Array[String]): Unit {//创建SparkSession对象val spark SparkSession.builder().appName(RDDToDataFrame).master(local[*]).getOrCreate()val rdd spark.sparkContext.makeRDD(List(new _02student(id 1,name咪咪,gender 男,age6),new _02student(id 2,name凯凯,gender 男,age8),new _02student(id 3,name超超,gender 男,age7),new _02student(id 4,name大宝,gender 女,age9),))println(rdd.collect().toBuffer)/*运行结果*///RDD 转换为DataFrame//需要使用SparkSession所创建的对象进行隐式转换导入操作import spark.implicits._val frame: DataFrame rdd.toDF()frame.show()/*运行结果*///RDD 转换为DataSetval dataset: Dataset[_02student] rdd.toDS()dataset.show()/*运行结果*///DataFrame转换为RDD//Row是DataFrame动态构建时提供的行对象val rdd1: RDD[Row] frame.rddrdd1.foreach(row{//按照列的序号获取即可序号与数组下标一样从0开始到长度-1println(row)/*运行结果[2,凯凯,男,8][4,大宝,女,9][1,咪咪,男,6][3,超超,男,7]*///取值的时候使用getxxx方法xxx就是列的数据类型val id row.getInt(0)val name row.getString(1)val gender row.getString(2)val age row.getAs[Int](age)println(id name gender age)/*运行结果3 超超 男 71 咪咪 男 62 凯凯 男 84 大宝 女 9*/})//DataFrame转换为DataSet//DataFrame其实就是DataSet的特例val dataset2: Dataset[_02student] frame.as[_02student]dataset2.show()/*运行结果----------------| id|name|gender|age|----------------| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|----------------*///DataSet转换为RDDval rdd2: RDD[_02student] dataset.rddprintln(rdd2.collect().toBuffer)/*运行结果ArrayBuffer(_02student(1,咪咪,男,6),_02student(2,凯凯,男,8),_02student(3,超超,男,7),_02student(4,大宝,女,9))*///DataSet转换为DataFrameval frame1: DataFrame dataset.toDF()frame1.show()/*运行结果----------------| id|name|gender|age|----------------| 1|咪咪| 男| 6|| 2|凯凯| 男| 8|| 3|超超| 男| 7|| 4|大宝| 女| 9|----------------*/}
}