网站出现用户名密码提示,凡科建站官网页更换视频,制作企业网站的问题,做网站建设的企业综合案例
以下案例结合了spark sql、dataframe、udf、读写文件等操作
# encoding:utf8
from pyspark.sql import SparkSession
from pyspark.sql import functions as F#需求1#xff1a;各省销售额的统计
#需求2#xff1a;T0P3销售省份中#xff0c;有多少店铺达到过日…综合案例
以下案例结合了spark sql、dataframe、udf、读写文件等操作
# encoding:utf8
from pyspark.sql import SparkSession
from pyspark.sql import functions as F#需求1各省销售额的统计
#需求2T0P3销售省份中有多少店铺达到过日销售额1000
#需求3T0P3省份中各省的平均单单价
#需求4T0P3省份中各个省份的支付类型比例
#receivable:订单金额
#storeProvince:店铺省份
#dateTs:订单的销售日期
#payType:支付类型
#storeID:店铺ID
if __name__ __main__:spark SparkSession.builder.\appName(SparkSQL ExampLe).\master(local[*]).\config(spark.sql.shuffle.partitions,2).\ #local模式下调整为2config(spark.sql.warehouse.dir,hdfs://node1:8020/user/hive/warehouse).\ #hdfs配置config(hive.metastore.uris,thrift://node3:9083).\ # metastore配置配置spark on hiveenableHiveSupport().\getorCreate()#1.读取数据#省份信息缺失值过滤同时省份信息中会有”nu1”字符串#订单的金额数据集中有的订单的金额是单笔超过10800的这些是测试数#列值剪(SparkSQL会自动做这个优化)df spark.read.format(json).load(../../data/input/mini.json).\dropna(thresh1, subset[storeProvince]).\filter(storeProvince ! null).\filter(receivable 10000).\select(storeProvince,storeID,receivable,dateTs,payType) # 筛选必须数据#T0D0需求1各省销售额统计province_sale_df df.groupBy(storeProvince).sum(receivable).\withColumnRenamed(sum(receivable), money).\ # sum求和后新生成的列名默认为聚合函数名和操作的列名此处重命名withColumn(money, F.round(money,2)).\ # round四舍五入orderBy (money,ascendingFalse)province_sale_df.show(truncateFalse)#写出MySQLprovince_sale_df.write.mode(overwrite).\format(jdbc).\option(url,jdbc:mysql://node1:3306/bigdata?useSSLfalseuseUnicodetruecharacterEncodingutf8).\option(dbtable,province_sale).\option(user,root).\option(password,2212072ok1).\option(encoding,utf-8).\save()#写出Hive表saveAsTable可以写出表要求已经配置好Spark On Hive,配置好后#会将表写入到HiVe的数据仓库中province_sale_df.write.mode (overwrite).saveAsTable (default.province_sale,parquet)#T000需求2T0P3销售省份中有多少店铺达到过日销售额1000#2.1先找到T0P3的销售省份top3_province_df province_sale_df.limit(3).select(storeProvince).withColumnRenamed(storeProvince,top3_storeProvince) #这里需要对top3的stroprovince列重命名否则下面groupby会有问题#2.2和原始的DF进行内关联数据关联后就是全部都是T0P3省份的销售数据了top3_province_df_joined df.join(top3_province_df, on df[storeProvince] top3_province_df[top3_province])top3_province_df_joined.persist(StorageLevel.MEMORY_AND_DISK)#广东省1 2021-01-03 1005#广东省2#广东省3·#湖南省1.#湖南省2。#广东省33#湖南省123#from_unixtime的精度是秒级数据的精度是毫秒级要对数据进行精度的裁剪province_hot_store_count_df top3_province_df_joined.groupBy(storeProvince,storeIDF.from_unixtime(df[dateTs]sbstr(0,10),yyyy-MM-dd).alias(day)).\ sum(receivable).withColumnRenamed(sum(receivable),money).\ #这里withColumnRenamed和上面alias都是重命名只不过alias返回column对象withColumnRenamed返回dffilter(money 1000).\dropDuplicates(subset[storeID]).\groupBy(storeProvince).count()province_hot_store_count_df.show()#写出MySQLprovince_hot_store_count_df.write.mode(overwrite).\format(jdbc).\option(url,jdbc:mysql://node1:3306/bigdata?useSSLfalseuseUnicodetruecharacterEncodingutf8).\option(dbtable,province_hot_store_count).\option(user,root).\option(password,22120720k1).\option(encoding,utf-8).\save()#写出Hiveprovince_hot_store_count_df.write.mode(overwrite).saveAsTable(default.province_hot_store_count,parquet)#T0D0需求3T0P3省份中各个省份的平均订单价格单单价top3_province_order_avg_df top3_province_df_joined.groupBy(storeProvince).\avg(receivable).\withColumnRenamed(avg(receivable),money).\withColumn(money,F.round(money,2)).\orderBy(money,ascendingFalse)top3_province_order_avg_df.show(truncateFalse)#T0D0需求4T0P3省份中各个省份的支付比例#湖南省支付宝33%#湖南省现金36%#广东省微信33%top3_province_df_joined.createTempView(province_pay)# 由于spark中没有将数字转换为含百分号的字符串函数定义udf实现def udf_func(percent):return str(round(percent 100,2))%#注册UDFmy_udf F.udf(udf_func,StringType())# 下面group by的total和storeProvince作用相同只是为了语法正确加上在 SELECT 列表中除了聚合函数外所有列都必须在 GROUP BY 子句中明确指定。否则大多数数据库系统会抛出错误pay_type_df spark.sql(SELECT storeProvince,payType, (COUNT(payType) / total) AS percent FROM(SELECT storeProvince,payType,count(1)OVER(PARTITION BY storeProvince)AS total FROM province_pay)AS subGROUP BY storeProvince,payType,total).withColumn(percent,my_udf(percent))top3_province_df_joined.unpersist()
上面案例中为什么 GROUP BY 需要包含 total 列
语义一致性在 SELECT 子句中你使用了 COUNT(payType) / total 来计算百分比。由于 total 是通过窗口函数计算得出的并且不是直接通过 GROUP BY 子句中的列聚合得到的为了让查询的语义更加明确和一致最好将 total 列也包含在 GROUP BY 子句中。这样数据库知道如何根据 storeProvince、payType 和 total 的每个唯一组合来分组结果并计算相应的 COUNT(payType)。避免错误如果不包含 total 在 GROUP BY 子句中某些数据库系统可能会抛出错误因为它们无法确定如何在没有显式分组的情况下处理这个非聚合列。即使某些数据库系统允许这样做例如通过假设 total 对于每个 storeProvince 和 payType 组合都是相同的这也是一个不安全的做法因为它可能导致逻辑错误或不可预测的结果。逻辑准确性在计算百分比时确保每个 storeProvince 和 payType 组合都与其对应的 total 值匹配是非常重要的。通过将 total 列包含在 GROUP BY 子句中你确保了这个匹配是精确的。