网站建设有限公司,六安百度推广公司,网站备案期,做网站烧钱吗SparkSql
结构化数据与非结构化数据 结构化数据就类似于excel表中的数据#xff08;统计的都是结构化的数据#xff09;一般都使用sparkSql处理结构化的数据
结构化的文件#xff1a;JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc
结构化的表#xff1a;…SparkSql
结构化数据与非结构化数据 结构化数据就类似于excel表中的数据统计的都是结构化的数据一般都使用sparkSql处理结构化的数据
结构化的文件JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc
结构化的表数据库中表的数据MySQL、Oracle、Hive 我们在sparkcore中导入数据使用的是textFile而在sparksql中怎么导入数据呢
使用的是DataFrame进行数据的导入 将一些结构化的数据进行sql查询需要将数据变为表是表就必须有表结构表结构就是Schema。 一个经典的wordcount案例
代码如下里面有sql和dsl两种写法
import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as Fif __name__ __main__:os.environ[JAVA_HOME] C:/Program Files/Java/jdk1.8.0_131# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] rD:\BigDate\05-Hadoop\software\hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 创建spark对象spark SparkSession.builder.master(local[2]).appName(SparkSQL-wordcount案例).config(spark.sql.shuffle.partitions, 2).getOrCreate()print(spark)# 创建一个DataFrame对象,读取数据df spark.read.text(../../datas/wordcount/data.txt)# 创建一个临时表表名为 wordcountdf.createOrReplaceTempView(wordcount)# 第一种写法使用sparksqlspark.sql(with t as ( select word from wordcount lateral view explode(split(value, )) wordtemp as word),t2 as (select trim(word) word from t where trim(word) ! )select word,count(1) countNum from t2 group by word order by countNum desc).show()# 第二种写法使用 dsldf.select(F.explode(F.split(value, )).alias(word)) \.where( trim(word) ! ).groupby(word).count().orderBy(count,ascendingFalse).show()#这里的where(F.trim(word) ! ) 还可以写成 where( trim(word) ! )# 还可以这样写df.select(F.explode(F.split(value, )).alias(word)) \.where(F.trim(word) ! ).groupby(F.col(word)).agg(F.count(F.col(word)).alias(cou)).orderBy(F.col(cou),ascendingFalse).show()spark.stop()
以上的代码还可以使用with进行优化
补充
with的作用: 我们在创建对象的时候经常需要关闭close、stop 如果忘记关闭太多对象的话就会影响性能使用with自动帮我们关闭
什么时候可以使用with呢
源码中有 __enter__ 和 __exit__ 的时候就可以使用with进行优化
优化过后的代码 此时就不需要在手动stop关闭了
import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as Fif __name__ __main__:os.environ[JAVA_HOME] C:/Program Files/Java/jdk1.8.0_131# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] rD:\BigDate\05-Hadoop\software\hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 创建spark对象with SparkSession.builder.master(local[2]).appName(SparkSQL-wordcount案例).config(spark.sql.shuffle.partitions, 2).getOrCreate() as spark:# 创建一个DataFrame对象,读取数据df spark.read.text(../../datas/wordcount/data.txt)# 创建一个临时表表名为 wordcountdf.createOrReplaceTempView(wordcount)# 第一种写法使用sparksqlspark.sql(with t as ( select word from wordcount lateral view explode(split(value, )) wordtemp as word),t2 as (select trim(word) word from t where trim(word) ! )select word,count(1) countNum from t2 group by word order by countNum desc).show()# 第二种写法使用 dsldf.select(F.explode(F.split(value, )).alias(word)) \.where( trim(word) ! ).groupby(word).count().orderBy(count,ascendingFalse).show()#这里的where(F.trim(word) ! ) 还可以写成 where( trim(word) ! )# 还可以这样写df.select(F.explode(F.split(value, )).alias(word)) \.where(F.trim(word) ! ).groupby(F.col(word)).agg(F.count(F.col(word)).alias(cou)).orderBy(F.col(cou),ascendingFalse).show()
一个案例
需求统计评分次数大于2000次的所有电影中平均评分最高的Top10结果显示电影名称、电影平均评分、电影评分次数。 电影评分数据datas/movie/ratings.dat【用户id、电影id、评分、评分时间】
数据如下
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
1::595::5::978824268 电影信息数据datas/movie/movies.dat【电影id、电影名称、分类】
1::Toy Story (1995)::Animation|Childrens|Comedy
2::Jumanji (1995)::Adventure|Childrens|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Childrens
9::Sudden Death (1995)::Action
首先给定的数据不是我们所经常使用的格式化数据所以需要先将数据进行格式化
可以使用RDD的算子将数据改为我们想要的格式化数据
也可以直接利用sql将非格式化的数据修改为我们需要的格式的数据 写这个案例我们可以利用前面所学的 RDD 和 sparkSQL一起完成这个案例 使用RDDSparkSQL
代码如下
import os
import refrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSessionif __name__ __main__:os.environ[JAVA_HOME] C:/Program Files/Java/jdk1.8.0_131# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] rD:\BigDate\05-Hadoop\software\hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 创建spark对象with SparkSession.builder.master(local[2]).appName(MovieTop10).config(spark.sql.shuffle.partitions, 2).getOrCreate() as spark:print(spark)rating_df spark.sparkContext.textFile(../../datas/movie/ratings.dat).map(lambda line:re.split(::,line)) \.filter(lambda item:len(item) 4).map(lambda item:(item[0],item[1],item[2],item[3])) \.toDF([user_id,movie_id,score,score_time]).createOrReplaceTempView(rating)# spark.sql(# select * from rating# ).show()movie_df spark.sparkContext.textFile(../../datas/movie/movies.dat) \.map(lambda line:(line.split(::)[0],line.split(::)[1],line.split(::)[2])) \.toDF([movie_id, movie_name, movie_categry]).createOrReplaceTempView(movie)# spark.sql(# select * from movie# ).show(truncateFalse)#统计评分次数大于2000次的所有电影中平均评分最高的Top10结果显示电影名称、电影平均评分、电影评分次数spark.sql(select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id r.movie_idgroup by m.movie_name having countNum 2000 order by avgRate desc limit 10).show(truncateFalse)# 保留两位小数后结果可能有重复的想要获取重复排名也只算一位的可以使用排名函数dense_rank()spark.sql(with t as (select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id r.movie_idgroup by m.movie_name having countNum 2000),t2 as (select *,dense_rank() over(order by avgRate desc) paiming from t) select * from t2 where paiming 10).show()
复习 排名函数
1、row_number()
row_number从1开始按照顺序生成分组内记录的序列,row_number()的值不会存在重复,当排序的值相同时,按照表中记录的顺序进行排列
效果如下
98 1
97 2
97 3
96 4
95 5
95 6没有并列名次情况顺序递增
2、rank()
生成数据项在分组中的排名排名相等会在名次中留下空位
效果如下
98 1
97 2
97 2
96 4
95 5
95 5
94 7
有并列名次情况顺序跳跃递增
3、dense_rank()
生成数据项在分组中的排名排名相等会在名次中不会留下空位
效果如下
98 1
97 2
97 2
96 3
95 4
95 4
94 5
有并列名次情况顺序递增
只使用 SparkSQL
以上是RDD sparkSQL的写法 还可以通过 sparkSQL的写法硬写出来
通过split()方法根据非格式化数据的分隔符将数据切成我们需要的DataFrame类型的数据
df1 spark.read.text(../../datas/movie/movies.dat).createOrReplaceTempView(movie1)
df2 spark.read.text(../../datas/movie/ratings.dat).createOrReplaceTempView(rating1)#统计评分次数大于2000次的所有电影中平均评分最高的Top10结果显示电影名称、电影平均评分、电影评分次数
spark.sql(with m1 as (select split(value,::)[0] movie_id,split(value,::)[1] movie_name,split(value,::)[2] movie_categary from movie1),r1 as ( select split(value,::)[0] user_id,split(value,::)[1] movie_id,split(value,::)[2] score,split(value,::)[3] score_time from rating1)select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id r1.movie_idgroup by m1.movie_name having countNum 2000 order by avgRote desc limit 10
).show(truncateFalse)# 同样也可以写成排名函数
spark.sql(with m1 as (select split(value,::)[0] movie_id,split(value,::)[1] movie_name,split(value,::)[2] movie_categary from movie1),r1 as ( select split(value,::)[0] user_id,split(value,::)[1] movie_id,split(value,::)[2] score,split(value,::)[3] score_time from rating1),t as ( select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id r1.movie_idgroup by m1.movie_name having countNum 2000),t2 as ( select *,dense_rank() over(order by avgRote desc) paiming from t)select * from t2 where paiming 10
).show(truncateFalse)