古色古香网站模板,响应式布局网站,pc网站模板,阿里巴巴官网首页官网目录 PySpark SQL
基础
SparkSession对象
DataFrame入门 DataFrame构建
DataFrame代码风格 DSL
SQL
SparkSQL Shuffle 分区数目 DataFrame数据写出
Spark UDF
Catalyst优化器
Spark SQL的执行流程 PySpark SQL
基础
PySpark SQL与Hive的异同 Hive和Spark 均是:“分…目录 PySpark SQL
基础
SparkSession对象
DataFrame入门 DataFrame构建
DataFrame代码风格 DSL
SQL
SparkSQL Shuffle 分区数目 DataFrame数据写出
Spark UDF
Catalyst优化器
Spark SQL的执行流程 PySpark SQL
基础
PySpark SQL与Hive的异同 Hive和Spark 均是:“分布式SQL计算引擎” 均是构建大规模结构化数据计算的绝佳利器同时SparkSQL拥有更好的性能。 目前企业中使用Hive仍旧居多但SparkSQL将会在很近的未来替代Hive成为分布式SQL计算市场的顶级 这里的重点是Spark SQL能支持SQL和其他代码混合执行自由度更高且其是内存计算更快。但是其没有元数据管理然而它最终还是会作用到Hive层面可以调用Hive的Metasotre
SparkSQL的基本对象是DataFrame其特点及与其他对象的区别为 SparkSQL 其实有3类数据抽象对象 SchemaRDD对象 (已废弃)DataSet对象: 可用于Java、Scala语言DataFrame对象:可用于Java、Scala、Python、R SparkSession对象 在RDD阶段程序的执行入口对象是: SparkContext 在Spark 2.0后推出了SparkSession对象作为Spark编码的统一入口对象 SparkSession对象可以:-用于SparkSQL编程作为入口对象 - 用于SparkCore编程可以通过SparkSession对象中获取到SparkContext
from pyspark.sql import SparkSession
if __name__ __main__:spark SparkSession.builder.appName(lmx).master(local[*]).getOrCreate()sc spark.sparkContext
DataFrame入门 DataFrame的组成如下在结构层面StructType对象描述整个DataFrame的表结构 StructField对象描述一个列的信息在数据层面Row对象记录一行数据Column对象记录一列数据并包含列的信息 DataFrame构建
1、用RDD进行构建
rdd的结构要求为[[xx,xx],[xx,xx]]
spark.createDataFrame(rdd,schema[]) spark SparkSession.builder.appName(lmx).master(local[*]).getOrCreate()sc spark.sparkContextrdd sc.textFile(data/input/sql/people.txt).map(lambda x:x.split(,)).map(lambda x:[x[0],int(x[1])])print(rdd.collect())# [[Michael, 29], [Andy, 30], [Justin, 19]]df spark.createDataFrame(rdd,schema[name,age])df.printSchema()#打印表结构df.show()#打印表
# root
# | -- name: string(nullabletrue)
# | -- age: long(nullabletrue)
#
# ----------
# | name | age |
# ----------
# | Michael | 29 |
# | Andy | 30 |
# | Justin | 19 |
# ----------
2、利用StructType进行创建
需要先引入StructType,StringType,IntegerType等构建schema
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType
if __name__ __main__:spark SparkSession.builder.appName(lmx).master(local[*]).getOrCreate()sc spark.sparkContextrdd sc.textFile(data/input/sql/people.txt).map(lambda x:x.split(,)).map(lambda x:[x[0],int(x[1])])
#构建schema
schema StructType().add(name,StringType(),nullableFalse).\add(age,IntegerType(),nullableTrue)df spark.createDataFrame(rdd,schemaschema)df.printSchema()df.show()
3、toDF将rdd转换为df
下面展示了两种方式 # 只设定列名列的数据结构则是内部自己判断df rdd.toDF([name,age])df.printSchema()# root# | -- name: string(nullabletrue)# | -- age: long(nullabletrue)# 设定列名和数据类型schema StructType().add(name,StringType(),nullableFalse).\add(age,IntegerType(),nullableTrue)df rdd.toDF(schemaschema)df.printSchema()# root# | -- name: string(nullablefalse)# | -- age: integer(nullabletrue)
4、基于pandas构建 dfp pd.DataFrame({id:[1,2,3],score:[99,98,100]})df spark.createDataFrame(dfp)df.printSchema()df.show()# root# | -- id: long(nullabletrue)# | -- score: long(nullabletrue)# # --------# | id | score |# --------# | 1 | 99 |# | 2 | 98 |# | 3 | 100 |# --------
5、通过文件读取创造 在读取json和parquet文件时不需要设定schema因为文件已经自带
而读取csv时还需要使用.option设定 header等参数
这里说一下parquet文件 parquet:是Spark中常用的一种列式存储文件格式 和Hive中的ORC差不多他俩都是列存储格式 parquet对比普通的文本文件的区别 parquet 内置schema(列名列类型 是否为空)存储是以列作为存储格式存储是序列化存储在文件中的(有压缩属性体积小) DataFrame代码风格 DataFrame支持两种风格进行编程分别是DSL风格和SQL风格DSL语法风格 DSL称之为:领域特定语言 其实就是指DataFrame的特有API DSL风格意思就是以调用API的方式来处理Data比如: df.where0.limit0SQL语法风格 SQL风格就是使用SQL语句处理DataFrame的数据比如: spark.sql(“SELECT*FROM xxx) DSL
其实就是用其内置的API处理数据举例 df.select(id,subject).show()df.where(subject语文).show()df.select(id,subject).where(subject语文).show()df.groupBy(subject).count().show()
API其实跟SQL类似这里不详细说明了个人感觉不如直接写SQL语句
SQL
DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表然后可以通过在程序中使用spark.sgl0来执行SQL语句查询结果返回一个DataFrame。如果想使用SQL风格的语法需要将DataFrame注册成表采用如下的方式: df.createTempView(tmp) #创建临时视图df.createGlobalTempView(global_tmp)#创建全局试图# 全局表: 跨SparkSession对象使用在一个程序内的多个SparkSession中均可调用查询前带上前缀global_tmpdf.createOrReplaceTempView(repalce_tmp)#创建临时表如果存在则替换
然后使用spark.sql的形式书写sql代码 spark.sql(select * from tmp where subject 语文).show()spark.sql(select id,score from repalce_tmp where score90).show()spark.sql(select subject,max(score) from global_temp.global_tmp group by subject).show()
SparkSQL Shuffle 分区数目 原因: 在SparkSQL中当Job中产生Shufle时默认的分区数 spark.sql.shufle,partitions 为200在实际项目中要合理的设置。 在代码中可以设置
spark SparkSession.builder.appName(lmx).\
master(local[*]).config(spark.sql.shufle,partitions,2).\
getOrCreate() spark.sqL.shuffle.partitions 参数指的是在sql计算中shuffle算子阶段默认的分区数是200 对于集群模式来说200个默认也算比较合适 如在Local下运行200个很多在调度上会带宋限外的损耗所以在Local下建议修改比较低 比如2\4\10均可这个参数和Spark RDD中设置并行度的参数是相互独立的 DataFrame数据写出
统一API 下面提供两种方法分别写出为json和csv spark.sql(select user_id,avg(score) avg_score from tmp group by user_id order by avg_score desc).write.mode(overwrite).format(json).save(data/output/1t)spark.sql(select user_id,avg(score) avg_score from tmp group by user_id order by avg_score desc).write.mode(overwrite).format(csv)\.option(header,True)\.option(sep,;)\.save(data/output/csv)
其他的一些方法
SparkSQL中读取数据和写出数据 - 知乎
不过这里似乎不能自己命名导出的数据文件
Spark UDF 无论Hive还是SparKSQL分析处理数据时往往需要使用函数SparkSQL模块本身自带很多实现公共功能的函数在pyspark.sql.functions中SparkSQL与Hive一样支持定义函数:UDF和UDAF尤其是UDF函数在实际项目中使用最为广泛。回顾Hive中自定义函数有三种类型:第一种:UDF(User-Defined-Function)函数. 一对一的关系输入一个值经过函数以后输出一个值; 在Hive中继承UDF类方法名称为evaluate返回值不能为void其实就是实现一个方法; 第二种:UDAF(User-Defined Aggregation Function)聚合函数 多对一的关系输入多个值输出一个值通常与groupBy联合使用; 第三种:UDTF(User-DefinedTable-Generating Functions)函数 一对多的关系输入一个值输出多个值(一行变为多行)用户自定义生成函数有点像flatMap; 在SparkSQL中目前仅仅支持UDF函数和UDAF函数目前Python仅支持UDF
UDF有两种定义方式 方式1语法udf对象sparksession.udfregister(参数1参数2参数3) 参数1:UDF名称可用于SQL风格 参数2:被注册成UDF的方法名 参数3:声明UDF的返回值类型 udf对象:返回值对象是一个UDF对象可用于DSL风格 方式2语法 from pyspark.sql import functions as F udf对象 F.udf(参数1参数2) 参数1:被注册成UDF的方法名 参数2:声明UDF的返回值类型 udf对象:返回值对象是一个UDF对象可用于DSL风格 举例 def double_score(num):return 2*numudf1 spark.udf.register(udf_1,double_score,IntegerType())# dsl风格df.select(udf1(df[score])).show()# sql风格df.selectExpr(udf_1(score)).show()# sql风格2df.createTempView(tmp)spark.sql(select udf_1(score) from tmp).show()udf2 F.udf(double_score,IntegerType())df.select(udf2(df[score])).show()
当返回值是数组时需要定义数组内部数据的数据类型ArrayType(StringType()) spark SparkSession.builder.appName(lmx).master(local[*]).config(spark.sql.shufle,partitions,2).getOrCreate()sc spark.sparkContextrddsc.parallelize([[i love you],[i like you]])df rdd.toDF([ifo])def func(num):return num.split( )udf spark.udf.register(udf_sql,func,ArrayType(StringType()))# dsl风格df.select(udf(df[ifo])).show()
当返回值是字典时需要使用StructType()且定义每个列的名字需要跟函数返回值的列名一样和数据类型 rddsc.parallelize([[1],[2],[3],[4],[5]])df rdd.toDF([ifo])df.show()def func(num):return {num:num,num1:num10}udf spark.udf.register(udf_sql,func,StructType().\add(num,IntegerType(),nullableFalse).\add(num1,IntegerType(),nullableFalse))df.select(udf(df[ifo])).show()
Catalyst优化器 RDD的执行流程为 代码 -DAG调度器逻辑任务 -Task调度器任务分配和管理监控 -Worker干活 SparkSQL会对写完的代码执行“自动优化”既Catalyst优化器以提升代码运行效率避免开发者水平影响到代码执行效率。 RDD代码不会是因为RDD的数据对象太过复杂无法被针对性的优化
加入优化的SparkSQL大致架构为 1.API 层简单的说就是 Spark 会通过一些 API 接受 SQL 语句 2.收到 SQL 语句以后,将其交给 Catalyst,Catalyst 负责解析 SQL,生成执行计划等 3.Catalyst 的输出应该是 RDD 的执行计划 4.最终交由集群运行 Catalyst优化器主要分为四个步骤
1、解析sql生成AST(抽象语法树) 2、在 AST 中加入元数据信息,做这一步主要是为了一些优化,例如 colcol 这样的条件
以上面的图为例
score.id → id#1#L 为 score.id 生成 id 为1,类型是 Longscore.math_score→math_score#2#L为 score.math_score 生成 id 为 2,类型为 Longpeople.id→id#3#L为 people.id 生成 id 为3,类型为 Longpeople.age→age#4#L为 people.age 生成 id 为 4,类型为 Long
3、对已经加入元数据的 AST,输入优化器,进行优化,主要包含两种常见的优化 谓词下推(Predicate Pushdown)\ 断言下推:将逻辑判断 提前到前面,以减少shuffle阶段的数据量。 以上面的demo举例可以先进行people.age10的判断再进行Join等操作。 列值裁剪(Column Pruning):将加载的列进行裁剪,尽量减少被处理数据的宽度 以上面的demo举例由于只select了score和id所以开始的时候可以只保留这两个列由于parquet是按列存储的所以很适合这个操作 4、上面的过程生成的 AST 其实最终还没办法直接运行,这个 AST 叫做 逻辑计划,结束后,需要生成 物理计划,从而生成 RDD 来运行
Spark SQL的执行流程
如此Spark SQL的执行流程为 1.提交SparkSQL代码 2.catalyst优化 a.生成原始AST语法数 b.标记AST元数据 c.进行断言下推和列值裁剪 以及其它方面的优化作用在AST上 d.将最终AST得到,生成执行计划 e.将执行计划翻译为RDD代码 3.Driver执行环境入口构建(SparkSession) 4.DAG 调度器规划逻辑任务 5.TASK 调度区分配逻辑任务到具体Executor上工作并监控管理任务 6.Worker干活