鞍山网站建设联系方式,大学新校区建设网站,微信小程序有什么用处?,襄阳市建设工程质量监督站网站文章目录 day07_Spark SQL课程笔记一、今日课程内容二、Spark SQL函数定义#xff08;掌握#xff09;1、窗口函数2、自定义函数背景2.1 回顾函数分类标准:SQL最开始是_内置函数自定义函数_两种 2.2 自定义函数背景 3、Spark原生自定义UDF函数3.1 自定义函数流程#x… 文章目录 day07_Spark SQL课程笔记一、今日课程内容二、Spark SQL函数定义掌握1、窗口函数2、自定义函数背景2.1 回顾函数分类标准:SQL最开始是_内置函数自定义函数_两种 2.2 自定义函数背景 3、Spark原生自定义UDF函数3.1 自定义函数流程3.2 自定义演示一:3.3 自定义演示二:3.4 自定义演示三:  4、Pandas的自定义函数4.1 Apache Arrow框架4.2 基于Arrow完成Pandas和Spark的DataFrame互转4.3 基于Pandas自定义函数4.3.1 自定义函数流程4.3.2 自定义UDF函数4.3.3 自定义UDAF函数   三、Spark on Hive操作1、集成原理2、集成环境配置3、启动metastore服务4、SparkOnHive操作4.1 黑窗口测试spark-sql4.2 python代码测试spark-sql  四、SparkSQL的分布式执行引擎(了解)1、启动Thrift服务2、beeline连接Thrift服务3、开发工具连接Thrift服务4、控制台编写SQL代码 五、Spark SQL的运行机制掌握5.1 **Catalyst**内部具体的执行流程**为什么 SparkSQL 的执行流程就像是“从 SQL 语句到结果的流水线”****实际意义**5.2 SparkSQL的执行流程总结:  01_spark原生自定义UDF函数_返回字符串.py结果 02_spark原生自定义UDF函数_返回列表.py结果 03_spark原生自定义UDF函数_返回字典.py结果 04_sparkSQL和pandas中df对象互转操作.py05_spark基于pandas定义udf函数_s到s.py06_spark基于pandas定义udaf函数_s到标量.py07_spark_sql操作数据库.py day07_Spark SQL课程笔记 一、今日课程内容 
1- Spark SQL函数定义掌握2- Spark On Hive操作3- Spark SQL的分布式执行引擎了解4- Spark SQL的运行机制掌握 
今日目的掌握Spark SQL函数定义的两种方式理解-掌握Spark SQL的运行机制 
二、Spark SQL函数定义掌握 
1、窗口函数 
回顾之前学习过的窗口函数 
分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])分析函数可以大致分成如下3类
1- 第一类: 聚合函数 sum() count() avg() max() min()
2- 第二类: 排序函数 row_number() rank() dense_rank() 
3- 第三类: 其他函数 ntile()  first_value() last_value() lead() lag() 三个排序函数的区别?
row_number(): 巧记 1234  特点: 唯一且连续
rank(): 巧记 1224 特点: 并列不连续
dense_rank(): 巧记 1223  特点: 并列且连续在Spark SQL中使用窗口函数案例 
已知数据如下: cookie1,2018-04-10,1
cookie1,2018-04-11,5
cookie1,2018-04-12,7
cookie1,2018-04-13,3
cookie1,2018-04-14,2
cookie1,2018-04-15,4
cookie1,2018-04-16,4
cookie2,2018-04-10,2
cookie2,2018-04-11,3
cookie2,2018-04-12,5
cookie2,2018-04-13,6
cookie2,2018-04-14,3
cookie2,2018-04-15,9
cookie2,2018-04-16,7需求: 要求找出每个cookie中pv排在前3位的数据也就是分组取TOPN问题 
# 导包
import os
from pyspark.sql import SparkSession,functions as F,Window as W# 绑定指定的python解释器
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3# 创建main函数
if __name__  __main__:# 1.创建SparkContext对象spark  SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()# 2.数据输入df  spark.read.csv(pathfile:///export/data/spark_project/spark_sql/data/cookie.txt,sep,,schemacookie string,datestr string,pv int)# 3.数据处理(切分,转换,分组聚合)# 4.数据输出etldf  df.dropDuplicates().dropna()# SQL方式etldf.createTempView(cookie_logs)spark.sql(select cookie,datestr,pvfrom (select cookie,datestr,pv,dense_rank() over(partition by cookie order by pv desc) as rnfrom cookie_logs) temp where rn 3 ).show()# DSL方式etldf.select(cookie, datestr, pv,F.dense_rank().over( W.partitionBy(cookie).orderBy(F.desc(pv)) ).alias(rn)).where(rn 3).select(cookie, datestr, pv).show()# 5.关闭资源spark.stop() 
运行结果截图  
2、自定义函数背景 
2.1 回顾函数分类标准: 
SQL最开始是_内置函数自定义函数_两种 
SQL函数主要分为以下三大类 
UDF函数普通函数 特点一对一输入一个得到一个例如split() … UDAF函数聚合函数 特点多对一输入多个得到一个例如sum() avg() count() min() max() … UDTF函数表生成函数 特点一对多输入一个得到多个例如explode() …  
在SQL中提供的所有的内置函数都是属于以上三类中某一类函数 简单来说UDF、UDAF和UDTF是Spark SQL中用于扩展SQL功能的三种自定义函数分别像是“单兵作战”、“团队协作”和“多面手”满足不同的数据处理需求。  具体而言 UDF用户自定义函数 功能对单行数据进行操作输入一行输出一行。场景适合简单的数据转换比如将字符串转换为大写。示例spark.udf.register(to_upper, lambda x: x.upper()) UDAF用户自定义聚合函数 功能对多行数据进行聚合操作输入多行输出一行。场景适合复杂的聚合计算比如自定义加权平均。示例继承UserDefinedAggregateFunction类实现initialize、update、merge等方法。 UDTF用户自定义表生成函数 功能对单行数据进行操作输入一行输出多行。场景适合数据展开操作比如将JSON数组拆分为多行。示例继承GenericUDTF类实现initialize、process、close等方法。   实际生产场景 在数据清洗中使用UDF将日期格式统一为标准格式。在数据分析中使用UDAF计算复杂的业务指标如客户生命周期价值CLV。在数据展开中使用UDTF将嵌套的JSON数据拆分为多行便于后续分析。  总之UDF、UDAF和UDTF是Spark SQL中强大的扩展工具能够满足从简单转换到复杂聚合、数据展开的多种需求为数据处理提供了极大的灵活性。  2.2 自定义函数背景 
思考有这么多的内置函数为啥还需要自定义函数呢? 为了扩充函数功能。在实际使用中并不能保证所有的操作函数都已经提前的内置好了。很多基于业务处理的功能其实并没有提供对应的函数提供的函数更多是以公共功能函数。此时需要进行自定义来扩充新的功能函数 在Spark SQL中针对Python语言对于自定义函数原生支持的并不是特别好。目前原生仅支持自定义UDF函数而无法自定义UDAF函数和UDTF函数。 
 在1.6版本后Java 和scala语言支持自定义UDAF函数但Python并不支持。 
1- SparkSQL原生的时候Python只能开发UDF函数
2- SparkSQL借助其他第三方组件(Arrow,pandas...)Python可以开发UDF、UDAF函数,同时也提升效率 Spark SQL原生UDF函数存在的问题大量的序列化和反序列 虽然Python支持自定义UDF函数但是其效率并不是特别的高效。因为在使用的时候传递一行处理一行返回一行的方式。这样会带来非常大的序列化的开销的问题导致原生UDF函数效率不好早期解决方案: 基于Java/Scala来编写自定义UDF函数然后基于python调用即可目前主要的解决方案: 引入Arrow框架可以基于内存来完成数据传输工作可以大大的降低了序列化的开销提供传输的效率解决原生的问题。同时还可以基于pandas的自定义函数利用pandas的函数优势完成各种处理操作3、Spark原生自定义UDF函数 
3.1 自定义函数流程 
第一步: 在PySpark中创建一个Python的函数在这个函数中书写自定义的功能逻辑代码即可第二步: 将Python函数注册到Spark SQL中注册方式一: udf对象  sparkSession.udf.register(参数1,参数2,参数3)参数1: 【UDF函数名称】此名称用于后续在SQL中使用可以任意取值但是要符合名称的规范参数2: 【自定义的Python函数】表示将哪个Python的函数注册为Spark SQL的函数参数3: 【UDF函数的返回值类型】。用于表示当前这个Python的函数返回的类型udf对象: 返回值对象是一个UDF对象可以在DSL中使用说明: 如果通过方式一来注册函数, 【可以用在SQL和DSL】注册方式二:  udf对象  F.udf(参数1,参数2)参数1: Python函数的名称表示将那个Python的函数注册为Spark SQL的函数参数2: 返回值的类型。用于表示当前这个Python的函数返回的类型udf对象: 返回值对象是一个UDF对象可以在DSL中使用说明: 如果通过方式二来注册函数【仅能用在DSL中】注册方式三:  语法糖写法  F.udf(returnType返回值类型)  放置到对应Python的函数上面说明: 实际是方式二的扩展。如果通过方式三来注册函数【仅能用在DSL中】第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可 
3.2 自定义演示一: 
需求1: 请自定义一个函数完成对 数据 统一添加一个后缀名的操作 , 例如后缀名 ‘_itheima’ 
效果如下: # 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType# 绑定指定的python解释器os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3# 创建main函数
if __name__  __main__:# 1.创建SparkContext对象spark  SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()# 2.数据输入df  spark.createDataFrame(data[(1,张三,广州),(2,李四,深圳)],schemaid int,name string,address string)df.show()# 3.SparkSQL自定义udf函数# 第一步.自定义python函数def add_suffix(data):return data_itheima# 第二步.把python函数注册到SparkSQL# ① spark.udf.register注册dsl1_add_suffix  spark.udf.register(sql_add_suffix,add_suffix,StringType())# ②F.udf注册dsl2_add_suffix  F.udf(add_suffix, StringType())# ③F.udf注册F.udf( StringType())def candy_add_suffix(data):return data_itheima# 第三步.在SparkSQL中调用自定义函数# SQL方式df.createTempView(temp)spark.sql(select id,name,sql_add_suffix(address) as new_address from temp).show()# DSL方式# 调用dsl1_add_suffixdf.select(id, name, dsl1_add_suffix(address).alias(new_address)).show()# 调用dsl2_add_suffixdf.select(id, name, dsl2_add_suffix(address).alias(new_address)).show()# 调用candy_add_suffixdf.select(id, name, candy_add_suffix(address).alias(new_address)).show()# 4.关闭资源spark.stop()斌哥友情提醒: 可能遇到的问题如下 原因: 在错误的地方调用了错误的函数。spark.udf.register参数1取的函数名只能在SQL中使用不能在DSL中用。3.3 自定义演示二: 
需求2: 请自定义一个函数返回值类型为复杂类型: 列表 
效果如下: 参考代码: 
# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType, ArrayType# 绑定指定的python解释器os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3# 创建main函数
if __name__  __main__:# 1.创建SparkContext对象spark  SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()# 2.数据输入df  spark.createDataFrame(data[(1,张三_广州),(2,李四_深圳)],schemaid int,name_address string)df.show()# 3.SparkSQL自定义udf函数# 第一步.自定义python函数def my_split(data:str):list1  data.split(_)return list1# 第二步.把python函数注册到SparkSQL# ① spark.udf.register注册dsl1_add_suffix  spark.udf.register(sql_add_suffix,my_split,ArrayType(StringType()))# ②F.udf注册dsl2_add_suffix  F.udf(my_split, ArrayType(StringType()))# ③F.udf注册F.udf(ArrayType(StringType()))def candy_add_suffix(data):list1  data.split(_)return list1# 第三步.在SparkSQL中调用自定义函数# SQL方式df.createTempView(temp)spark.sql(select id,sql_add_suffix(name_address) as new_address from temp).show()# DSL方式# 调用dsl1_add_suffixdf.select(id,  dsl1_add_suffix(name_address).alias(new_name_address)).show()# 调用dsl2_add_suffixdf.select(id,dsl2_add_suffix(name_address).alias(new_name_address)).show()# 调用candy_add_suffixdf.select(id,candy_add_suffix(name_address).alias(new_name_address)).show()# 4.关闭资源spark.stop()3.4 自定义演示三: 
需求3: 请自定义一个函数返回值类型为复杂类型: 字典 
效果如下: 注意: 注意: 如果是字典类型,StructType中列名需要和字典的key值一致,否则是null补充# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType, ArrayType, StructType# 绑定指定的python解释器os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3# 创建main函数
if __name__  __main__:# 1.创建SparkContext对象spark  SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()# 2.数据输入df  spark.createDataFrame(data[(1,张三_广州),(2,李四_深圳)],schemaid int,name_address string)df.show()# 3.SparkSQL自定义udf函数# 第一步.自定义python函数def my_split(data:str):list1  data.split(_)return {name:list1[0],address:list1[1]}# 第二步.把python函数注册到SparkSQL# 注意: 如果是字典类型,StructType中列名需要和字典的key值一致,否则是nullt  StructType().add(name,StringType()).add(address,StringType())# ① spark.udf.register注册dsl1_add_suffix  spark.udf.register(sql_add_suffix,my_split,t)# ②F.udf注册dsl2_add_suffix  F.udf(my_split, t)# ③F.udf注册F.udf(t)def candy_add_suffix(data):list1  data.split(_)return {name:list1[0],address:list1[1]}# 第三步.在SparkSQL中调用自定义函数# SQL方式df.createTempView(temp)spark.sql(select id,sql_add_suffix(name_address) as new_name_address from temp).show()# DSL方式# 调用dsl1_add_suffixdf.select(id, dsl1_add_suffix(name_address).alias(new_name_address)).show()# 调用dsl2_add_suffixdf.select(id,dsl2_add_suffix(name_address).alias(new_name_address)).show()# 调用candy_add_suffixdf.select(id,candy_add_suffix(name_address).alias(new_name_address)).show()# 4.关闭资源spark.stop()4、Pandas的自定义函数 2-如果不是3.1.2版本那么先卸载pyspark 命令: pip uninstall pyspark 3- 再按照【Spark课程阶段_部署文档.doc】中重新安装3.1.2版本pyspark 命令: pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark3.1.2 4.1 Apache Arrow框架 
 Apache Arrow是Apache旗下的一款顶级的项目。是一个跨平台的在内存中以列式存储的数据层它的设计目标就是作为一个跨平台的数据层来加快大数据分析项目的运行效率 
 Pandas 与 Spark SQL 进行交互的时候建立在Apache Arrow上带来低开销 高性能的UDF函数 
 
如何安装? 三个节点建议都安装 
检查服务器上是否有安装pyspark
pip list | grep pyspark  或者 conda list | grep pysparkpip list | grep pyarrow  
如果服务器已经安装了pyspark的库那么仅需要执行以下内容即可安装。例如在 node1安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark[sql]如果服务器中python环境中没有安装pyspark建议执行以下操作即可安装。例如在 node2 和 node3安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyarrow10.0.0Arrow并不会自动使用在某些情况下需要配置 以及在代码中需要进行小的更改才可以使用 
如何使用呢? 默认不会自动启动的, 一般建议手动配置 
spark.conf.set(spark.sql.execution.arrow.pyspark.enabled,True)4.2 基于Arrow完成Pandas和Spark的DataFrame互转 
Pandas中DataFrame 
DataFrame表示一个二维表对象就是表示整个表 
字段、列、索引Series表示一列  
Spark SQL中DataFrame  
使用场景 
1- Spark的DataFrame - Pandas的DataFrame当大数据处理到后期的时候可能数据量会越来越少这样可以考虑使用单机版的Pandas来做后续数据的分析 
2- Pandas的DataFrame - Spark的DataFrame当数据量达到单机无法高效处理的时候或者需要和其他大数据框架集成的时候可以转成Spark中的DataFrame 
Pandas的DataFrame - Spark的DataFrame: spark.createDataFrame(datapandas_df)
Spark的DataFrame - Pandas的DataFrame: init_df.toPandas()示例: 
# 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3# 创建main函数
if __name__  __main__:# 1.创建SparkContext对象spark  SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()# TODO: 手动开启arrow框架spark.conf.set(spark.sql.execution.arrow.pyspark.enabled, True)# 2.数据输入df  spark.createDataFrame(data[(1,张三_广州),(2,李四_深圳)],schemaid int ,name_address string)df.show()print(type(df))print(------------------------)# 3.数据处理(切分,转换,分组聚合)# 4.数据输出# spark-pandaspd_df  df.toPandas()print(pd_df)print(type(pd_df))print(------------------------)# pandas-sparkdf2  spark.createDataFrame(pd_df)df2.show()print(type(df2))# 5.关闭资源spark.stop() 
4.3 基于Pandas自定义函数 
 基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输允许向量化可以充分利用计算机CPU性能操作。 
 Pandas的UDF函数其实本质上就是Python的函数只不过函数的传入数据类型为Pandas的类型 
 基于Pandas的UDF可以使用自定义UDF函数和自定义UDAF函数 
4.3.1 自定义函数流程 
第一步: 在PySpark中创建一个Python的函数在这个函数中书写自定义的功能逻辑代码即可第二步: 将Python函数包装成Spark SQL的函数注册方式一: udf对象  spark.udf.register(参数1, 参数2)参数1: UDF函数名称。此名称用于后续在SQL中使用可以任意取值但是要符合名称的规范参数2: Python函数的名称。表示将哪个Python的函数注册为Spark SQL的函数使用: udf对象只能在DSL中使用。参数1指定的名称只能在SQL中使用注册方式二: udf对象  F.pandas_udf(参数1, 参数2)参数1: 自定义的Python函数。表示将哪个Python的函数注册为Spark SQL的函数参数2: UDF函数的返回值类型。用于表示当前这个Python的函数返回的类型对应到Spark SQL的数据类型udf对象: 返回值对象是一个UDF对象。仅能用在DSL中使用注册方式三: 语法糖写法  F.pandas_udf(returnType)  放置到对应Python的函数上面说明: 实际是方式二的扩展。仅能用在DSL中使用第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可基于pandas方式还支持自定义UDAF函数
注意: 如果要用于自定义UDAF函数,理论上只能用上述注册方式三语法糖方式,也就意味着理论只能DSL使用
注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式register注册,就可以使用了!4.3.2 自定义UDF函数 自定义Python函数的要求SeriesToSeries   
# 导包
import os
from pyspark.sql import SparkSession,functions as F
import pandas as pd# 绑定指定的python解释器
from pyspark.sql.types import LongType, IntegerTypeos.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3# 创建main函数
if __name__  __main__:# 1.创建SparkContext对象spark  SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()# TODO: 开启Arrow的使用spark.conf.set(spark.sql.execution.arrow.pyspark.enabled, True)# 2.数据输入df  spark.createDataFrame(data  [(1,1),(2,2),(3,3)],schema num1 int,num2 int)df.show()# 3.基于pandas自定义函数 :SeriesTOSeries# 第一步: 自定义python函数def multiply(num1:pd.Series,num2:pd.Series)-pd.Series:return num1*num2# 第二步: 把python注册为SparkSQL函数# ①spark.udf.register注册dsl1_multiply  spark.udf.register(sql_multiply,multiply)# ②F.pandas_udf注册dsl2_multiply  F.pandas_udf(multiply,IntegerType())# ③F.pandas_udf注册F.pandas_udf(IntegerType())def candy_multiply(num1: pd.Series, num2: pd.Series) - pd.Series:return num1 * num2# 第三步: 在SparkSQL中调用注册后函数# SQL方式df.createTempView(temp)spark.sql(select num1,num2,sql_multiply(num1,num2) as result from temp).show()# DSL方式#调用dsl1_multiplydf.select(num1,num2,dsl1_multiply(num1,num2).alias(result)).show()# 调用dsl2_multiplydf.select(num1, num2, dsl2_multiply(num1, num2).alias(result)).show()# 调用candy_multiplydf.select(num1, num2, candy_multiply(num1, num2).alias(result)).show()# 4.关闭资源spark.stop() 
4.3.3 自定义UDAF函数 
自定义Python函数的要求Series To 标量 简单来说Series To 标量是指将一个Pandas Series一维数组转换为一个标量值单个值就像是“把一串数据浓缩成一个结果”。  具体而言 SeriesPandas中的一维数据结构类似于带标签的数组可以存储任意类型的数据。标量单个值比如整数、浮点数、字符串等。转换场景 聚合操作将Series中的所有值通过某种计算如求和、平均值转换为一个标量值。提取操作从Series中提取某个特定位置的值作为标量。 示例import pandas as pd# 创建一个Series
s  pd.Series([1, 2, 3, 4, 5])# 聚合操作求和
sum_result  s.sum()  # 输出15# 提取操作获取第一个值
first_value  s[0]    # 输出1实际生产场景 在数据分析中使用聚合操作将一列数据如销售额转换为总销售额或平均销售额。在数据处理中从时间序列数据中提取某个时间点的值作为标量。  总之Series To 标量是Pandas中常见的操作通过聚合或提取将一维数据转换为单个值为数据分析和处理提供了便利。  表示自定义函数的输入数据类型是Pandas中的Series对象返回值数据类型是标量数据类型。也就是Python中的数据类型例如int、float、bool、list… 基于pandas方式还支持自定义UDAF函数
注意: 如果要用于自定义UDAF函数,理论上只能用上述注册方式三语法糖方式,也就意味着理论只能DSL使用
注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式register注册,就可以使用了!# 导包
import os
from pyspark.sql import SparkSession, functions as F
import pandas as pd# 绑定指定的python解释器
from pyspark.sql.types import LongType, IntegerType, FloatTypeos.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3# 创建main函数
if __name__  __main__:# 1.创建SparkContext对象spark  SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()# TODO: 开启Arrow的使用spark.conf.set(spark.sql.execution.arrow.pyspark.enabled, True)# 2.数据输入df  spark.createDataFrame(data[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],schemaid int,value float)df.show()# 3.基于pandas自定义函数 :SeriesTOSeries# 第一步: 自定义python函数# ③F.pandas_udf注册  注意: 理论上UDAF只能用注册方式三语法糖方式,也就意味着只能DSL使用F.pandas_udf(FloatType())def candy_mean_v(value: pd.Series) - float:return value.mean()# 第二步: 注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式一register注册# ①spark.udf.register注册dsl1_mean_v  spark.udf.register(sql_mean_v, candy_mean_v)# 第三步: 在SparkSQL中调用注册后函数# DSL方式# 调用candy_mean_vdf.groupBy(id).agg(candy_mean_v(value).alias(result)).show()# 调用dsl1_mean_vdf.groupBy(id).agg(dsl1_mean_v(value).alias(result)).show()# SQL方式df.createTempView(temp)spark.sql(select id,sql_mean_v(value) as result from temp group by id).show()# 4.关闭资源spark.stop()三、Spark on Hive操作 
1、集成原理 HiveServer2的主要作用: 接收SQL语句进行语法检查解析SQL语句优化将SQL转变成MapReduce程序提交到Yarn集群上运行SparkSQL与Hive集成实际上是替换掉HiveServer2。是SparkSQL中的HiveServer2替换掉了Hive中的HiveServer2。集成以后优点如下
1- 对于SparkSQL来说可以避免在代码中编写schema信息。直接向MetaStore请求元数据信息
2- 对于SparkSQL来说多个人可以共用同一套元数据信息避免每个人对数据理解不同造成代码功能兼容性问题
3- 对于Hive来说底层执行引擎由之前的MapReduce变成了Spark Core能够提升运行效率
4- 对于使用者/程序员来说SparkSQL与Hive集成对于上层使用者来说是完全透明的。2、集成环境配置 
环境搭建参考【Spark课程阶段_部署文档.doc】的7章节内容。 
1-node1上将hive-site.xml拷贝到spark安装路径conf目录 
cd /export/server/hive/confcp hive-site.xml /export/server/spark/conf/2-node1上执行以下命令将mysql的连接驱动包拷贝到spark的jars目录下 
注意: 之前拷贝过的可以忽略此操作 
cd /export/server/hive/libcp mysql-connector-java-5.1.32.jar  /export/server/spark/jars/3、启动metastore服务 
# 注意: 
# 启动 hadoop集群
start-all.sh# 启动hive的metastore
nohup /export/server/hive/bin/hive --service metastore # 测试spark-sql
/export/server/spark/bin/spark-sql4、SparkOnHive操作 
4.1 黑窗口测试spark-sql 
[rootnode1 bin]# /export/server/spark/bin/spark-sql
...
spark-sqlshow databases;
...
spark-sqlcreate database if not exists spark_demo;
...
spark-sqlcreate table if not exists spark_demo.stu(id int,name string);
...
spark-sqlinsert into  spark_demo.stu values(1,张三),(2,李四);
...4.2 python代码测试spark-sql 
SparkOnHive配置: 
spark.sql.warehouse.dir: 告知Spark数据表存放的地方。推荐使用HDFS。如果不配置默认使用本地磁盘存储。
hive.metastore.uris: 告知SparkMetaStore元数据管理服务的连接信息
enableHiveSupport() : 开启Spark和Hive的集成使用格式如下:spark  SparkSession.builder\.config(spark.sql.warehouse.dir,hdfs://node1:8020/user/hive/warehouse)\.config(hive.metastore.uris,thrift://node1.itcast.cn:9083)\.appName(pyspark_demo)\.master(local[1])\.enableHiveSupport()\.getOrCreate()示例: 
# 导包
import os
import timefrom pyspark.sql import SparkSession# 绑定指定的python解释器
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3# 创建main函数
if __name__  __main__:# 1.创建SparkContext对象spark  SparkSession.builder\.config(spark.sql.warehouse.dir,hdfs://node1:8020/user/hive/warehouse)\.config(hive.metastore.uris,thrift://node1.itcast.cn:9083)\.appName(pyspark_demo)\.master(local[1])\.enableHiveSupport()\.getOrCreate()# 2.执行sql# 查看所有库spark.sql( show databases).show()# 查看demo1的student表内容spark.sql(select * from demo1.student).show()# 测试是否能建库: 可以spark.sql( create database if not exists spark_demo )# 测试是否能在spark_demo建表: 可以spark.sql(create table if not exists spark_demo.stu(id int,name string))# 测试是否可以往spark_demo.stu表插入数据: 可以spark.sql(insert into  spark_demo.stu values(1,张三),(2,李四))# 为了方便查看web页面time.sleep(500)# 3.关闭资源spark.stop()四、SparkSQL的分布式执行引擎(了解) 
分布式执行引擎  Thrift服务  ThriftServer  SparkSQL中的Hiveserver2 
1、启动Thrift服务 
 目前我们已经完成Spark集成Hive的配置。但是目前集成后如果需要连接Hive此时需要启动一个Spark的客户端spark-sql、代码才可以。这个客户端底层相当于启动服务项用于连接Hive的metastore的服务进行处理操作。一旦退出客户端相当于这个服务也就没有了无法再使用 
 目前的情况非常类似于在Hive部署的时候有一个本地模式部署在启动Hive客户端的时候内部自动启动一个Hive的hiveserver2服务项 
大白话: 目前在Spark后台并没有一个长期挂载的Spark的服务(Spark HiveServer2服务)。导致每次启动Spark客户端都需要在内部启动一个服务项。这种方式不适合测试使用不合适后续的快速开发 如何启动Spark 提供的分布式的执行引擎呢? 这个引擎大家完全可以将其理解为Spark的HiveServer2服务实际上就是Spark的Thrift服务项 
# 注意: 要启动sparkThriftServer2服务必须要保证先启动好Hadoop以及Hive的metastore不能启动Hive的hiveserver2服务!
# 启动 hadoop集群
start-all.sh# 启动hive的metastore
nohup /export/server/hive/bin/hive --service metastore # 最后执行以下命令启动sparkThriftServer2:
/export/server/spark/sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port10000 \
--hiveconf hive.server2.thrift.bind.hostnode1 \
--hiveconf spark.sql.warehouse.dirhdfs://node1:8020/user/hive/warehouse \
--master local[2]校验是否成功  
访问界面默认4040 2、beeline连接Thrift服务 
启动后可以通过spark提供beeline的方式连接这个服务。连接后直接编写SQL即可 
相当于模拟了一个Hive的客户端但是底层执行的是Spark SQL最终将其转换为Spark RDD的程序 
启动命令:/export/server/spark/bin/beeline然后输入:!connect jdbc:hive2://node1:10000继续输入用户名: root
注意密码: 不需要写直接回车3、开发工具连接Thrift服务 
如何通过DataGrip或者PyCharm连接Spark进行操作         
4、控制台编写SQL代码 进入以下页面就可以愉快的编写sql了,再也不用担心在spark.sql()中编写没有提示了:)  五、Spark SQL的运行机制掌握 
 Spark SQL底层依然运行的是Spark RDD的程序所以说Spark RDD程序的运行的流程在Spark SQL中依然是存在的只不过在这个流程的基础上增加了从SQL翻译为RDD的过程 
 Spark SQL的运行机制其实就是在描述如何将Spark SQL翻译为RDD程序:  
 整个Spark SQL 转换为RDD 是基于Catalyst 优化器实施基于这个优化器即可完成整个转换操作 
5.1 Catalyst内部具体的执行流程 大白话 
SQL执行顺序: from-join on-where-groupby-聚合操作-having-select [distinct] -order by -limit 
1- 接收客户端提交过来的SQL/DSL代码首先会校验SQL/DSL的语法是否正常。如果通过校验根据SQL/DSL的执行顺序生成未解析的逻辑计划也叫做AST抽象语法树2- 对于AST抽象语法树加入元数据信息确定一共涉及到哪些字段、字段的数据类型是什么以及涉及到的表的其他相关元数据信息。加入元数据信息以后就得到了已经解析但是未优化的逻辑计划3- 对未优化的逻辑计划执行优化操作整个优化通过优化器来执行。在优化器匹配相对应的优化规则实时具体的优化。SparkSQL底层提供了一两百中优化规则得到优化的逻辑计划。例如: 谓词下推断言下推、列值裁剪3.1- 谓词下推: 也叫做断言下推。将数据过滤操作提前到数据扫描的时候执行减少后续处理数据量提升效率。3.2- 列值裁剪: 在表中只加载数据分析用到的字段,不相关的字段不加载进来。减少后续处理数据量提升效率。4- 由于优化规则很多导致会得到多个优化的逻辑计划。在转换成物理执行计划的过程中会根据 成本模型对比每个计划运行的耗时、资源消耗等得到最优的一个物理执行计划5- 将物理执行计划通过code generation代码生成器转变成Spark RDD的代码6- 最后就是将Spark RDD代码部署到集群上运行。后续过程与Spark内核调度中Job的调度流程完全一致。专业的术语 
1- Spark SQL底层解析是由RBO基于规则的优化器和CBO基于代价的优化器优化完成的2- RBO是基于规则优化对于SQL或DSL的语句通过执行引擎得到未执行逻辑计划在根据元数据得到逻辑计划之后加入列值裁剪或谓词下推等优化手段形成优化的逻辑计划3- CBO是基于优化的逻辑计划得到多个物理执行计划根据 代价函数(成本模型) 选择出最优的物理执行计划4- 通过code genaration代码生成器完成RDD的代码构建5- 底层依赖于DAGScheduler和TaskScheduler完成任务计算执行后续过程与Spark内核调度中Job的调度流程完全一致。简单来说SparkSQL的执行流程就像是“从SQL语句到结果的流水线”通过解析、优化和执行将SQL查询转化为分布式计算任务最终返回结果。  具体而言 SQL解析 将SQL语句解析为抽象语法树AST。使用ANTLR工具将AST转换为逻辑计划Logical Plan。 逻辑优化 对逻辑计划进行优化如谓词下推、列剪裁等。生成优化后的逻辑计划。 物理计划生成 将逻辑计划转换为物理计划Physical Plan选择最优的执行策略。物理计划包括RDD转换、数据源读取等具体操作。 任务调度与执行 将物理计划分解为多个Stage和Task。通过DAGScheduler和TaskScheduler将Task分配到集群节点上执行。 结果返回 将计算结果返回给客户端如DataFrame或直接输出。   实际生产场景 在数据仓库中使用SparkSQL查询海量数据生成报表和洞察。在实时分析中结合Structured Streaming使用SparkSQL处理实时数据流。  总之SparkSQL的执行流程通过解析、优化和执行将SQL查询高效地转化为分布式计算任务为大规模数据处理提供了强大的支持。  为什么 SparkSQL 的执行流程就像是“从 SQL 语句到结果的流水线” 流水线分阶段处理 流水线将复杂任务分解为多个阶段每个阶段专注于特定任务。SparkSQL将SQL查询分解为SQL解析、逻辑优化、物理计划生成、任务调度与执行和结果返回等多个阶段每个阶段完成特定任务。  高效流转逐步优化和执行 流水线每个阶段完成后数据会流转到下一个阶段逐步完成最终目标。SparkSQLSQL语句经过解析、优化、物理计划生成等步骤逐步转化为分布式计算任务最终高效执行并返回结果。  自动化无需手动干预 流水线自动化完成每个阶段的任务无需人工干预。SparkSQL通过Catalyst优化器和Tungsten引擎自动优化查询计划并执行开发者只需关注SQL语句和结果。  结果导向最终输出 流水线最终输出成品。SparkSQL最终输出查询结果如DataFrame或报表为业务决策提供支持。  实际意义 
SparkSQL的执行流程就像“从SQL语句到结果的流水线”通过分阶段、高效流转和自动化的方式将SQL查询转化为分布式计算任务最终返回结果为大规模数据处理提供了强大的支持。 
5.2 SparkSQL的执行流程总结: 01_spark原生自定义UDF函数_返回字符串.py 
# 导包
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 先创建spark session对象spark  SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 创建DF对象df  spark.createDataFrame([(1, 张三, 广州),(2, 李四, 深圳),(3, 王五, 上海)], schema[id, name, address])# 测试是否有数据df.show()# 需求: 自定义函数,功能是给df的所有地址都添加一个后缀_itheima# 一.自定义能添加后缀_itheima功能的python函数def add_suffix(address):return address  _itheima# 二.将python函数注册为spark的UDF函数(SQL风格和DSL风格)# 注册方式1: 适用于sql和dsl风格dsl_add_suffix  spark.udf.register(sql_add_suffix, add_suffix, StringType())# 三.使用UDF函数# 方式1: SQL风格# 先有临时表,再调用sql执行df.createTempView(stu_tb)spark.sql(select *,sql_add_suffix(address) as address_newfrom stu_tb).show()# 方式2: DSL风格# df.select(#     *,#     dsl_add_suffix(address).alias(address_new)# ).show()# 注意: 最后一定释放资源spark.stop() 结果 02_spark原生自定义UDF函数_返回列表.py 
# 导包
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType, ArrayType# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 先创建spark session对象spark  SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 创建DF对象df  spark.createDataFrame([(1, 张三_广州),(2, 李四_深圳),(3, 王五_上海)], schema[id, name_address])# 测试是否有数据df.show()# 需求: 自定义函数# 一.自定义能返回列表的功能的python函数def my_split(name_address):return name_address.split(_)# 二.将python函数注册为spark的UDF函数(SQL风格和DSL风格)# 注册方式1: 适用于sql和dsl风格dsl_my_split  spark.udf.register(sql_my_split, my_split, ArrayType(StringType()))# 三.使用UDF函数# 方式1: SQL风格# 先有临时表,再调用sql执行df.createTempView(stu_tb)spark.sql(select *,sql_my_split(name_address)[0] as name,sql_my_split(name_address)[1] as addressfrom stu_tb).show()# 方式2: DSL风格df.select(*,dsl_my_split(name_address)[0].alias(name),dsl_my_split(name_address)[1].alias(address)).show()# 注意: 最后一定释放资源spark.stop() 
结果 03_spark原生自定义UDF函数_返回字典.py 
# 导包
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType, ArrayType, StructType# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 先创建spark session对象spark  SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 创建DF对象df  spark.createDataFrame([(1, 张三_广州),(2, 李四_深圳),(3, 王五_上海)], schema[id, name_address])# 测试是否有数据df.show()# 需求: 自定义函数# 一.自定义能返回字典的功能的python函数def my_split(name_address):list1  name_address.split(_)dict1  {name: list1[0], address: list1[1]}return dict1# 二.将python函数注册为spark的UDF函数(SQL风格和DSL风格)# 注册方式1: 适用于sql和dsl风格# 注意: 如果原始函数返回的是字典,就必须用StructType()且字段名必须和原生字典的key值一样,否则null补充t  StructType().add(name, StringType()).add(address, StringType())dsl_my_split  spark.udf.register(sql_my_split, my_split, t)# 三.使用UDF函数# 方式1: SQL风格# 先有临时表,再调用sql执行df.createTempView(stu_tb)spark.sql(select *,sql_my_split(name_address)[name] as name,sql_my_split(name_address)[address] as addressfrom stu_tb).show()# 方式2: DSL风格df.select(*,dsl_my_split(name_address)[name].alias(name),dsl_my_split(name_address)[address].alias(address)).show()# 注意: 最后一定释放资源spark.stop() 
结果 04_sparkSQL和pandas中df对象互转操作.py 
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 先创建spark session对象spark  SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 注意: 如果想优化createDataFrame()效率可以手动开启arrow设置# TODO: 手动开启arrow设置spark.conf.set(spark.sql.execution.arrow.pyspark.enabled, True)# 1.先创建sparkSQL的df对象spark_df  spark.createDataFrame([(1, 张三),(2, 李四),(3, 王五)], schema[id, name])# 查看数据类型print(type(spark_df))  # class pyspark.sql.dataframe.DataFramespark_df.show()# 2.把saprk_df转换为pandas的df对象pd_df  spark_df.toPandas()# 查看数据类型print(type(pd_df))  # class pandas.core.frame.DataFrameprint(pd_df)# 3.把pandas的df对象转换为sparkSQL的df对象spark_df2  spark.createDataFrame(pd_df)# 查看数据类型print(type(spark_df2))  # class pyspark.sql.dataframe.DataFramespark_df2.show()# 注意: 最后一定释放资源spark.stop() 
05_spark基于pandas定义udf函数_s到s.py 
# 导包
import os
from pyspark.sql import SparkSession, functions as F
import pandas as pd
from pyspark.sql.types import DoubleType, IntegerType# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 先创建spark session对象spark  SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# TODO: 手动开启arrow设置spark.conf.set(spark.sql.execution.arrow.pyspark.enabled, True)# 创建DF对象df  spark.createDataFrame([(1, 1), (2, 2), (3, 3)], schema[n1, n2])df.show()# 一.自定义python函数# 功能:输入两列,输出对应乘积1列def mul(n1: pd.Series, n2: pd.Series) - pd.Series:return n1 * n2# 二.把python函数包装成spark的UDF函数(sql和dsl风格)# 注册方式1: 适用于sql和dsl风格dsl_mul  spark.udf.register(sql_mul, mul)# 注册方式2: 仅适用于dsl风格dsl2_mul  F.pandas_udf(mul, IntegerType())# 注册方式3: 仅适用于dsl风格F.pandas_udf(IntegerType())def candy_mul(n1: pd.Series, n2: pd.Series) - pd.Series:return n1 * n2# 三.使用UDF函数# 方式1: SQL风格# 先有临时表,再调用sql执行df.createTempView(nums_tb)spark.sql(select n1,n2,sql_mul(n1, n2) as n3 from nums_tb).show()# 方式2: DSL风格df.select(n1, n2,dsl_mul(n1, n2).alias(n3),dsl2_mul(n1, n2).alias(n4),candy_mul(n1, n2).alias(n5)).show()# 注意: 最后一定释放资源spark.stop() 
06_spark基于pandas定义udaf函数_s到标量.py 
# 导包
import osimport pandas as pd
from pyspark.sql import SparkSession, functions as F# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 先创建spark session对象spark  SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 创建DF对象df  spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],schemaid int, value float)df.show()# 二.使用语法糖方式注册原始函数为udaf函数F.pandas_udf(float)# 一.定义原始python函数def candy_my_avg(values: pd.Series) - float:return values.mean()# 三.使用自定义的udaf函数# dsl方式df.groupby(id).agg(candy_my_avg(value).alias(avg_value)).show()# 如果想用sql方式怎么办?把添加了语法糖的函数,再注册为udaf函数dsl_my_avg  spark.udf.register(sql_my_avg, candy_my_avg)df.createTempView(nums_tb)spark.sql(select id,sql_my_avg(value) as avg_valuefrom nums_tbgroup by id).show()# 注意: 最后一定释放资源spark.stop() 
07_spark_sql操作数据库.py 
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 先创建spark session对象spark  (SparkSession.builder.config(spark.sql.warehouse.dir, hdfs://node1:8020/user/hive/warehouse).config(hive.metastore.uris, thrift://node1.itcast.cn:9083).appName(spark_demo).master(local[1]).enableHiveSupport().getOrCreate())spark.sql(create database if not exists spark_demo2)spark.sql(create table if not exists spark_demo2.stu(id int,name string,age int);)spark.sql(insert into spark_demo2.stu values(1,张三,18),(2,李四,28))spark.sql(select * from  spark_demo2.stu).show()# 注意: 最后一定释放资源spark.stop()