外国做动漫图片的网站叫什么名字,网站建设家乡网页设计模板,公司网站建设周期及费用,网站导航栏固定1、读取json格式的文件创建DataFrame
注意#xff1a;
1、可以两种方式读取json格式的文件。
2、df.show()默认显示前20行数据。
3、DataFrame原生API可以操作DataFrame。
4、注册成临时表时#xff0c;表中的列默认按ascii顺序显示列。
df.createTempView(mytab…1、读取json格式的文件创建DataFrame
注意
1、可以两种方式读取json格式的文件。
2、df.show()默认显示前20行数据。
3、DataFrame原生API可以操作DataFrame。
4、注册成临时表时表中的列默认按ascii顺序显示列。
df.createTempView(mytable)
df.createOrReplaceTempView(mytable)
df.createGlobalTempView(mytable)
df.createOrReplaceGlobalTempView(mytable)
Session.sql(select * from global_temp.mytable).show()
5、DataFrame是一个Row类型的RDDdf.rdd()/df.javaRdd()。
java
SparkConf conf new SparkConf();
conf.setMaster(local).setAppName(jsonfile);
SparkContext sc new SparkContext(conf);//创建sqlContext
SQLContext sqlContext new SQLContext(sc);/*** DataFrame的底层是一个一个的RDD RDD的泛型是Row类型。* 以下两种方式都可以读取json格式的文件*/
DataFrame df sqlContext.read().format(json).load(sparksql/json);
// DataFrame df2 sqlContext.read().json(sparksql/json.txt);
// df2.show();/*** DataFrame转换成RDD*/
RDDRow rdd df.rdd();
/*** 显示 DataFrame中的内容默认显示前20行。如果现实多行要指定多少行show(行数)* 注意当有多个列时显示的列先后顺序是按列的ascii码先后显示。*/
// df.show();
/*** 树形的形式显示schema信息*/
df.printSchema();
/*** dataFram自带的API 操作DataFrame*///select name from table// df.select(name).show();//select name age10 as addage from tabledf.select(df.col(name),df.col(age).plus(10).alias(addage)).show();//select name ,age from table where age19df.select(df.col(name),df.col(age)).where(df.col(age).gt(19)).show();//select count(*) from table group by agedf.groupBy(df.col(age)).count().show();/*** 将DataFrame注册成临时的一张表这张表临时注册到内存中是逻辑上的表不会雾化到磁盘*/df.registerTempTable(jtable);DataFrame sql sqlContext.sql(select age,count(1) from jtable group by age);DataFrame sql2 sqlContext.sql(select * from jtable);sc.stop();
scala:
1.val session SparkSession.builder().appName(jsonData).master(local).getOrCreate()
2.// val frame: DataFrame session.read.json(./data/json)
3.val frame session.read.format(json).load(./data/json)
4.frame.show(100)
5.frame.printSchema()
6.
7./**
8.* DataFrame API 操作
9.*/
10.//select name ,age from table
11.frame.select(name,age).show(100)
12.
13.//select name,age 10 as addage from table
14.frame.select(frame.col(name),frame.col(age).plus(10).as(addage)).show(100)
15.
16.//select name,age from table where age 19
17.frame.select(name,age).where(frame.col(age).(19)).show(100)
18.frame.filter(age19).show(100)
19.
20.//select name ,age from table order by name asc ,age desc
21.import session.implicits._
22.frame.sort($name.asc,frame.col(age).desc).show(100)
23.
24.//select name ,age from table where age is not null
25.frame.filter(age is not null).show()
26.
27./**
28.* 创建临时表
29.*/
30.frame.createTempView(mytable)
31.session.sql(select name ,age from mytable where age 19).show()
32.frame.createOrReplaceTempView(mytable)
33.frame.createGlobalTempView(mytable)
34.frame.createOrReplaceGlobalTempView(mytable)
35.
36./**
37.* dataFrame 转换成RDD
38.*/
39.val rdd: RDD[Row] frame.rdd
40.rdd.foreach(row{
41. val name row.getAs[String](name)
42. val age row.getAs[Long](age)
43. println(sname is $name ,age is $age)
44.})
2、通过json格式的RDD创建DataFrame
java:
SparkConf conf new SparkConf();
conf.setMaster(local).setAppName(jsonRDD);
JavaSparkContext sc new JavaSparkContext(conf);
SQLContext sqlContext new SQLContext(sc);
JavaRDDString nameRDD sc.parallelize(Arrays.asList({\name\:\zhangsan\,\age\:\18\},{\name\:\lisi\,\age\:\19\},{\name\:\wangwu\,\age\:\20\}
));
JavaRDDString scoreRDD sc.parallelize(Arrays.asList(
{\name\:\zhangsan\,\score\:\100\},
{\name\:\lisi\,\score\:\200\},
{\name\:\wangwu\,\score\:\300\}
));DataFrame namedf sqlContext.read().json(nameRDD);
DataFrame scoredf sqlContext.read().json(scoreRDD);
namedf.registerTempTable(name);
scoredf.registerTempTable(score);DataFrame result sqlContext.sql(select name.name,name.age,score.score from name,score where name.name score.name);
result.show();sc.stop();
scala:
1.val session SparkSession.builder().appName(jsonData).master(local).getOrCreate()
2.val jsonList List[String](
3. {name:zhangsan,age:18},
4. {name:lisi,age:19},
5. {name:wangwu,age:20},
6. {name:maliu,age:21},
7. {name:tainqi,age:22}
8.)
9.
10.import session.implicits._
11.val jsds: Dataset[String] jsonList.toDS()
12.val df session.read.json(jsds)
13.df.show()
14.
15./**
16.* Spark 1.6
17.*/
18.val jsRDD: RDD[String] session.sparkContext.parallelize(jsonList)
19.val frame: DataFrame session.read.json(jsRDD)
20.frame.show()3、非json格式的RDD创建DataFrame
1、通过反射的方式将非json格式的RDD转换成DataFrame不建议使用
自定义类要可序列化自定义类的访问级别是PublicRDD转成DataFrame后会根据映射将字段按Assci码排序将DataFrame转换成RDD时获取字段两种方式,一种是df.getInt(0)下标获取不推荐使用另一种是df.getAs(“列名”)获取推荐使用
/**
* 注意
* 1.自定义类必须是可序列化的
* 2.自定义类访问级别必须是Public
* 3.RDD转成DataFrame会把自定义类中字段的名称按assci码排序
*/
SparkConf conf new SparkConf();
conf.setMaster(local).setAppName(RDD);
JavaSparkContext sc new JavaSparkContext(conf);
SQLContext sqlContext new SQLContext(sc);
JavaRDDString lineRDD sc.textFile(sparksql/person.txt);
JavaRDDPerson personRDD lineRDD.map(new FunctionString, Person() {/*** */private static final long serialVersionUID 1L;Overridepublic Person call(String s) throws Exception {Person p new Person();p.setId(s.split(,)[0]);p.setName(s.split(,)[1]);p.setAge(Integer.valueOf(s.split(,)[2]));return p;}
});
/**
* 传入进去Person.class的时候sqlContext是通过反射的方式创建DataFrame
* 在底层通过反射的方式获得Person的所有field结合RDD本身就生成了DataFrame
*/
DataFrame df sqlContext.createDataFrame(personRDD, Person.class);
df.show();
df.registerTempTable(person);
sqlContext.sql(select name from person where id 2).show();/**
* 将DataFrame转成JavaRDD
* 注意
* 1.可以使用row.getInt(0),row.getString(1)...通过下标获取返回Row类型的数据但是要注意列顺序问题---不常用
* 2.可以使用row.getAs(列名)来获取对应的列值。
*
*/
JavaRDDRow javaRDD df.javaRDD();
JavaRDDPerson map javaRDD.map(new FunctionRow, Person() {/*** */private static final long serialVersionUID 1L;Overridepublic Person call(Row row) throws Exception {Person p new Person();//p.setId(row.getString(1));//p.setName(row.getString(2));//p.setAge(row.getInt(0));p.setId((String)row.getAs(id));p.setName((String)row.getAs(name));p.setAge((Integer)row.getAs(age));return p;}
});
map.foreach(new VoidFunctionPerson() {/*** */private static final long serialVersionUID 1L;Overridepublic void call(Person t) throws Exception {System.out.println(t);}
});sc.stop();
scala
1.case class MyPerson(id:Int,name:String,age:Int,score:Double)
2.
3.object Test {
4. def main(args: Array[String]): Unit {
5. val session SparkSession.builder().appName(jsonData).master(local).getOrCreate()
6. val peopleInfo: RDD[String] session.sparkContext.textFile(./data/people.txt)
7. val personRDD : RDD[MyPerson] peopleInfo.map(info {
8.MyPerson(info.split(,)(0).toInt,info.split(,)(1),info.split(,)(2).toInt,info.split(,)(3).toDouble)
9. })
10. import session.implicits._
11. val ds personRDD.toDS()
12. ds.createTempView(mytable)
13. session.sql(select * from mytable ).show()
14. }
15.}
2、动态创建Schema将非json格式的RDD转换成DataFrame
java:
SparkConf conf new SparkConf();
conf.setMaster(local).setAppName(rddStruct);
JavaSparkContext sc new JavaSparkContext(conf);
SQLContext sqlContext new SQLContext(sc);
JavaRDDString lineRDD sc.textFile(./sparksql/person.txt);
/*** 转换成Row类型的RDD*/
JavaRDDRow rowRDD lineRDD.map(new FunctionString, Row() {/*** */private static final long serialVersionUID 1L;Overridepublic Row call(String s) throws Exception {return RowFactory.create(String.valueOf(s.split(,)[0]),String.valueOf(s.split(,)[1]),Integer.valueOf(s.split(,)[2]));}
});
/*** 动态构建DataFrame中的元数据一般来说这里的字段可以来源自字符串也可以来源于外部数据库*/
ListStructField asList Arrays.asList(DataTypes.createStructField(id, DataTypes.StringType, true),DataTypes.createStructField(name, DataTypes.StringType, true),DataTypes.createStructField(age, DataTypes.IntegerType, true)
);StructType schema DataTypes.createStructType(asList);
DataFrame df sqlContext.createDataFrame(rowRDD, schema);df.show();
sc.stop();
scala:
1.val session SparkSession.builder().appName(jsonData).master(local).getOrCreate()
2.val peopleInfo: RDD[String] session.sparkContext.textFile(./data/people.txt)
3.
4.val rowRDD: RDD[Row] peopleInfo.map(info {
5. val id info.split(,)(0).toInt
6. val name info.split(,)(1)
7. val age info.split(,)(2).toInt
8. val score info.split(,)(3).toDouble
9. Row(id, name, age, score)
10.})
11.val structType: StructType StructType(Array[StructField](
12. StructField(id, IntegerType),
13. StructField(name, StringType),
14. StructField(age, IntegerType),
15. StructField(score, DoubleType)
16.))
17.val frame: DataFrame session.createDataFrame(rowRDD,structType)
18.frame.createTempView(mytable)
19.session.sql(select * from mytable ).show()
4、读取parquet文件创建DataFrame
注意
可以将DataFrame存储成parquet文件。保存成parquet文件的方式有两种
df.write().mode(SaveMode.Overwrite)format(parquet).save(./sparksql/parquet);
df.write().mode(SaveMode.Overwrite).parquet(./sparksql/parquet);
SaveMode指定文件保存时的模式。
Overwrite覆盖
Append追加
ErrorIfExists如果存在就报错
Ignore如果存在就忽略
java:
SparkConf conf new SparkConf();
conf.setMaster(local).setAppName(parquet);
JavaSparkContext sc new JavaSparkContext(conf);
SQLContext sqlContext new SQLContext(sc);
JavaRDDString jsonRDD sc.textFile(sparksql/json);
DataFrame df sqlContext.read().json(jsonRDD);
/*** 将DataFrame保存成parquet文件SaveMode指定存储文件时的保存模式* 保存成parquet文件有以下两种方式*/
df.write().mode(SaveMode.Overwrite).format(parquet).save(./sparksql/parquet);
df.write().mode(SaveMode.Overwrite).parquet(./sparksql/parquet);
df.show();
/*** 加载parquet文件成DataFrame * 加载parquet文件有以下两种方式 */DataFrame load sqlContext.read().format(parquet).load(./sparksql/parquet);
load sqlContext.read().parquet(./sparksql/parquet);
load.show();sc.stop();
scala:
1.val session SparkSession.builder().appName(jsonData).master(local).getOrCreate()
2.val frame: DataFrame session.read.json(./data/json)
3.frame.show()
4.frame.write.mode(SaveMode.Overwrite).parquet(./data/parquet)
5.
6.val df: DataFrame session.read.format(parquet).load(./data/parquet)
7.df.createTempView(mytable)
8.session.sql(select count(*) from mytable ).show()
5、读取JDBC中的数据创建DataFrame(MySql为例)
两种方式创建DataFrame
java:
SparkConf conf new SparkConf();
conf.setMaster(local).setAppName(mysql);
JavaSparkContext sc new JavaSparkContext(conf);
SQLContext sqlContext new SQLContext(sc);
/*** 第一种方式读取MySql数据库表加载为DataFrame*/
MapString, String options new HashMapString,String();
options.put(url, jdbc:mysql://192.168.179.4:3306/spark);
options.put(driver, com.mysql.jdbc.Driver);
options.put(user, root);
options.put(password, 123456);
options.put(dbtable, person);
DataFrame person sqlContext.read().format(jdbc).options(options).load();
person.show();
person.registerTempTable(person);
/*** 第二种方式读取MySql数据表加载为DataFrame*/
DataFrameReader reader sqlContext.read().format(jdbc);
reader.option(url, jdbc:mysql://192.168.179.4:3306/spark);
reader.option(driver, com.mysql.jdbc.Driver);
reader.option(user, root);
reader.option(password, 123456);
reader.option(dbtable, score);
DataFrame score reader.load();
score.show();
score.registerTempTable(score);DataFrame result
sqlContext.sql(select person.id,person.name,score.score from person,score where person.name score.name);
result.show();
/*** 将DataFrame结果保存到Mysql中*/
Properties properties new Properties();
properties.setProperty(user, root);
properties.setProperty(password, 123456);
result.write().mode(SaveMode.Overwrite).jdbc(jdbc:mysql://192.168.179.4:3306/spark, result, properties);sc.stop();
scala:
1.val session SparkSession.builder().appName(jsonData).master(local).getOrCreate()
2.
3.val prop new Properties()
4.prop.setProperty(user,root)
5.prop.setProperty(password,123456)
6./**
7.* 第一种方式
8.*/
9.val df1 session.read.jdbc(jdbc:mysql://192.168.179.14:3306/spark,person,prop)
10.df1.show()
11.df1.createTempView(person)
12.
13./**
14.* 第二种方式
15.*/
16.val map Map[String,String](
17. url - jdbc:mysql://192.168.179.14:3306/spark,
18. driver - com.mysql.jdbc.Driver,
19. user - root,
20. password - 123456,
21. dbtable - score
22.)
23.val df2 session.read.format(jdbc).options(map).load()
24.df2.show()
25.
26./**
27.* 第三种方式
28.*/
29.val df3 session.read.format(jdbc)
30. .option(url, jdbc:mysql://192.168.179.14:3306/spark)
31. .option(driver, com.mysql.jdbc.Driver)
32. .option(user, root)
33. .option(password, 123456)
34. .option(dbtable, score)
35. .load()
36.df3.show()
37.df3.createTempView(score)
38.
39.val result session.sql(select person.id,person.name,person.age,score.score from person ,score where person.id score.id)
40.
41.result.show()
42.//将结果保存到mysql中
43.result.write.mode(SaveMode.Overwrite).jdbc(jdbc:mysql://192.168.179.14:3306/spark,result,prop)
44.