当前位置: 首页 > news >正文

网站建设有限公司六安百度推广公司

网站建设有限公司,六安百度推广公司,网站备案期,做网站烧钱吗SparkSql 结构化数据与非结构化数据 结构化数据就类似于excel表中的数据#xff08;统计的都是结构化的数据#xff09;一般都使用sparkSql处理结构化的数据 结构化的文件#xff1a;JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc 结构化的表#xff1a;…SparkSql 结构化数据与非结构化数据 结构化数据就类似于excel表中的数据统计的都是结构化的数据一般都使用sparkSql处理结构化的数据 结构化的文件JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc 结构化的表数据库中表的数据MySQL、Oracle、Hive 我们在sparkcore中导入数据使用的是textFile而在sparksql中怎么导入数据呢 使用的是DataFrame进行数据的导入 将一些结构化的数据进行sql查询需要将数据变为表是表就必须有表结构表结构就是Schema。 一个经典的wordcount案例 代码如下里面有sql和dsl两种写法 import osfrom pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession import pyspark.sql.functions as Fif __name__ __main__:os.environ[JAVA_HOME] C:/Program Files/Java/jdk1.8.0_131# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] rD:\BigDate\05-Hadoop\software\hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 创建spark对象spark SparkSession.builder.master(local[2]).appName(SparkSQL-wordcount案例).config(spark.sql.shuffle.partitions, 2).getOrCreate()print(spark)# 创建一个DataFrame对象,读取数据df spark.read.text(../../datas/wordcount/data.txt)# 创建一个临时表表名为 wordcountdf.createOrReplaceTempView(wordcount)# 第一种写法使用sparksqlspark.sql(with t as ( select word from wordcount lateral view explode(split(value, )) wordtemp as word),t2 as (select trim(word) word from t where trim(word) ! )select word,count(1) countNum from t2 group by word order by countNum desc).show()# 第二种写法使用 dsldf.select(F.explode(F.split(value, )).alias(word)) \.where( trim(word) ! ).groupby(word).count().orderBy(count,ascendingFalse).show()#这里的where(F.trim(word) ! ) 还可以写成 where( trim(word) ! )# 还可以这样写df.select(F.explode(F.split(value, )).alias(word)) \.where(F.trim(word) ! ).groupby(F.col(word)).agg(F.count(F.col(word)).alias(cou)).orderBy(F.col(cou),ascendingFalse).show()spark.stop() 以上的代码还可以使用with进行优化 补充 with的作用: 我们在创建对象的时候经常需要关闭close、stop 如果忘记关闭太多对象的话就会影响性能使用with自动帮我们关闭 什么时候可以使用with呢 源码中有 __enter__ 和 __exit__ 的时候就可以使用with进行优化 优化过后的代码 此时就不需要在手动stop关闭了 import osfrom pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession import pyspark.sql.functions as Fif __name__ __main__:os.environ[JAVA_HOME] C:/Program Files/Java/jdk1.8.0_131# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] rD:\BigDate\05-Hadoop\software\hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 创建spark对象with SparkSession.builder.master(local[2]).appName(SparkSQL-wordcount案例).config(spark.sql.shuffle.partitions, 2).getOrCreate() as spark:# 创建一个DataFrame对象,读取数据df spark.read.text(../../datas/wordcount/data.txt)# 创建一个临时表表名为 wordcountdf.createOrReplaceTempView(wordcount)# 第一种写法使用sparksqlspark.sql(with t as ( select word from wordcount lateral view explode(split(value, )) wordtemp as word),t2 as (select trim(word) word from t where trim(word) ! )select word,count(1) countNum from t2 group by word order by countNum desc).show()# 第二种写法使用 dsldf.select(F.explode(F.split(value, )).alias(word)) \.where( trim(word) ! ).groupby(word).count().orderBy(count,ascendingFalse).show()#这里的where(F.trim(word) ! ) 还可以写成 where( trim(word) ! )# 还可以这样写df.select(F.explode(F.split(value, )).alias(word)) \.where(F.trim(word) ! ).groupby(F.col(word)).agg(F.count(F.col(word)).alias(cou)).orderBy(F.col(cou),ascendingFalse).show() 一个案例 需求统计评分次数大于2000次的所有电影中平均评分最高的Top10结果显示电影名称、电影平均评分、电影评分次数。 电影评分数据datas/movie/ratings.dat【用户id、电影id、评分、评分时间】 数据如下 1::1193::5::978300760 1::661::3::978302109 1::914::3::978301968 1::3408::4::978300275 1::2355::5::978824291 1::1197::3::978302268 1::1287::5::978302039 1::2804::5::978300719 1::594::4::978302268 1::919::4::978301368 1::595::5::978824268 电影信息数据datas/movie/movies.dat【电影id、电影名称、分类】 1::Toy Story (1995)::Animation|Childrens|Comedy 2::Jumanji (1995)::Adventure|Childrens|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance 4::Waiting to Exhale (1995)::Comedy|Drama 5::Father of the Bride Part II (1995)::Comedy 6::Heat (1995)::Action|Crime|Thriller 7::Sabrina (1995)::Comedy|Romance 8::Tom and Huck (1995)::Adventure|Childrens 9::Sudden Death (1995)::Action 首先给定的数据不是我们所经常使用的格式化数据所以需要先将数据进行格式化 可以使用RDD的算子将数据改为我们想要的格式化数据 也可以直接利用sql将非格式化的数据修改为我们需要的格式的数据 写这个案例我们可以利用前面所学的 RDD 和 sparkSQL一起完成这个案例 使用RDDSparkSQL 代码如下 import os import refrom pyspark import SparkConf, SparkContext from pyspark.sql import SparkSessionif __name__ __main__:os.environ[JAVA_HOME] C:/Program Files/Java/jdk1.8.0_131# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] rD:\BigDate\05-Hadoop\software\hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 创建spark对象with SparkSession.builder.master(local[2]).appName(MovieTop10).config(spark.sql.shuffle.partitions, 2).getOrCreate() as spark:print(spark)rating_df spark.sparkContext.textFile(../../datas/movie/ratings.dat).map(lambda line:re.split(::,line)) \.filter(lambda item:len(item) 4).map(lambda item:(item[0],item[1],item[2],item[3])) \.toDF([user_id,movie_id,score,score_time]).createOrReplaceTempView(rating)# spark.sql(# select * from rating# ).show()movie_df spark.sparkContext.textFile(../../datas/movie/movies.dat) \.map(lambda line:(line.split(::)[0],line.split(::)[1],line.split(::)[2])) \.toDF([movie_id, movie_name, movie_categry]).createOrReplaceTempView(movie)# spark.sql(# select * from movie# ).show(truncateFalse)#统计评分次数大于2000次的所有电影中平均评分最高的Top10结果显示电影名称、电影平均评分、电影评分次数spark.sql(select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id r.movie_idgroup by m.movie_name having countNum 2000 order by avgRate desc limit 10).show(truncateFalse)# 保留两位小数后结果可能有重复的想要获取重复排名也只算一位的可以使用排名函数dense_rank()spark.sql(with t as (select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id r.movie_idgroup by m.movie_name having countNum 2000),t2 as (select *,dense_rank() over(order by avgRate desc) paiming from t) select * from t2 where paiming 10).show() 复习 排名函数 1、row_number() row_number从1开始按照顺序生成分组内记录的序列,row_number()的值不会存在重复,当排序的值相同时,按照表中记录的顺序进行排列 效果如下 98 1 97 2 97 3 96 4 95 5 95 6没有并列名次情况顺序递增 2、rank() 生成数据项在分组中的排名排名相等会在名次中留下空位 效果如下 98 1 97 2 97 2 96 4 95 5 95 5 94 7 有并列名次情况顺序跳跃递增 3、dense_rank() 生成数据项在分组中的排名排名相等会在名次中不会留下空位 效果如下 98 1 97 2 97 2 96 3 95 4 95 4 94 5 有并列名次情况顺序递增 只使用 SparkSQL 以上是RDD sparkSQL的写法 还可以通过 sparkSQL的写法硬写出来 通过split()方法根据非格式化数据的分隔符将数据切成我们需要的DataFrame类型的数据 df1 spark.read.text(../../datas/movie/movies.dat).createOrReplaceTempView(movie1) df2 spark.read.text(../../datas/movie/ratings.dat).createOrReplaceTempView(rating1)#统计评分次数大于2000次的所有电影中平均评分最高的Top10结果显示电影名称、电影平均评分、电影评分次数 spark.sql(with m1 as (select split(value,::)[0] movie_id,split(value,::)[1] movie_name,split(value,::)[2] movie_categary from movie1),r1 as ( select split(value,::)[0] user_id,split(value,::)[1] movie_id,split(value,::)[2] score,split(value,::)[3] score_time from rating1)select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id r1.movie_idgroup by m1.movie_name having countNum 2000 order by avgRote desc limit 10 ).show(truncateFalse)# 同样也可以写成排名函数 spark.sql(with m1 as (select split(value,::)[0] movie_id,split(value,::)[1] movie_name,split(value,::)[2] movie_categary from movie1),r1 as ( select split(value,::)[0] user_id,split(value,::)[1] movie_id,split(value,::)[2] score,split(value,::)[3] score_time from rating1),t as ( select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id r1.movie_idgroup by m1.movie_name having countNum 2000),t2 as ( select *,dense_rank() over(order by avgRote desc) paiming from t)select * from t2 where paiming 10 ).show(truncateFalse)
http://www.pierceye.com/news/19368/

相关文章:

  • wordpress 显示 列表seo网络推广培训班
  • 做网站和网站维护需要多少钱上海中学门户网站登陆
  • 做黄金的人喜欢逛那些网站广告投放系统源码
  • 青岛鲁icp 网站制作 牛商网鲁谷网站建设
  • 网站旁边的小图标怎么做的河南郑州网站建设
  • t和p在一起怎么做网站银川软件开发公司
  • 网站开发用的软件网站建设兼职挣多少钱
  • 建广告网站需要多少钱快速刷排名seo软件
  • 直播教育网站建设普宁市做网站
  • 南阳高质量建设大市网站学习网站建设
  • 源码论坛网站需要多大的空间网络广告是什么意思
  • 自己做网站制作流程拓客软件哪个好用
  • 电子商务网站平台建设策划购物商城模板
  • 八宝山做网站公司公司起名字大全免费评分
  • 九口袋网站建设网站主题咋做
  • 深圳网页服务开发与网站建设城乡住建局官网
  • 集团网站建设调研报告深圳市龙华区观澜街道
  • 网站域名到期会怎么样免费的seo网站
  • 旅游景区网站开发的政策可行性html编辑器安卓版手机版软件
  • 只买域名不建网站wordpress主题 xueui
  • 网站免费建设推荐三亚网络网站建设
  • 网站建设上海Wordpress删除主题的
  • 珠海网站建设网络推广帝国cms仿站工具
  • 网站怎么添加音乐wordpress更改目录插件
  • 企业网站制作公司推荐杭州装修公司哪家好
  • 建设网站的计划书开发区招聘信息最新招聘
  • 天津市做网站的公司定制网站开发哪个好
  • 做网站最专业的公司有哪些中国建设银行对公网站
  • 实时开奖走势网站建设免费建电子商务网站
  • 北京金方网站设计通常做网站的需求