网站开发常用的语言和工具,政协网站法治建设版块,网站建设招标采购需求,英语网站开发的背景目录 一 . SparkSQL简介
二 . Spark SQL与HIVE的异同 三 . DataFrame
1. 创建 DataFrame
2. RDD转换DataFrame
四 . 操作DataFrame SQL方式:
DSL方式: 一 . SparkSQL简介
Spark SQL只能处理结构化数据 ,属于Spark框架一个部分 Schema:元数据信息
特点: 融合性 ,统一数…目录 一 . SparkSQL简介
二 . Spark SQL与HIVE的异同 三 . DataFrame
1. 创建 DataFrame
2. RDD转换DataFrame
四 . 操作DataFrame SQL方式:
DSL方式: 一 . SparkSQL简介
Spark SQL只能处理结构化数据 ,属于Spark框架一个部分 Schema:元数据信息
特点: 融合性 ,统一数据访问,hive兼容 , 标准化连接
将hive sql翻译成Spark上对应的RDD操作 ,底层运行SparkRDD
DataFrames是在RDD上面增加与省略了一些东西
DataFrame RDD -泛型 Schema 方便到的SQL操作 优化 ,是个特殊的RDD
RDD存储任意结构数据 ; DataFrame存储二维表结构数据
二 . Spark SQL与HIVE的异同
1- Spark SQL是基于内存计算, 而HIVE SQL是基于磁盘进行计算的 2- Spark SQL没有元数据管理服务(自己维护), 而HIVE SQL是有metastore的元数据管理服务的 3- Spark SQL底层执行Spark RDD程序, 而HIVE SQL底层执行是MapReduce 4- Spark SQL可以编写SQL也可以编写代码但是HIVE SQL仅能编写SQL语句 三 . DataFrame
DataFrame表示的是一个二维的表。二维表必然存在行、列等表结构描述信息
表结构描述信息(元数据Schema): StructType对象 字段: StructField对象可以描述字段名称、字段数据类型、是否可以为空 行: Row对象 列: Column对象包含字段名称和字段值
在一个StructType对象下由多个StructField组成构建成一个完整的元数据信息
1. 创建 DataFrame
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:spark SparkSession.builder.appName(创建DataFrame)\.master(local[*]).getOrCreate()init_df spark.createDataFrame(data[(1,张三,18),(2,李四,40),(3,王五,60)],schemaid:int,name:string,age:int)init_df2 spark.createDataFrame(data[(1, 张三, 18), (2, 李四, 30),(3,王五,60)],schema[id,name,age])init_df.show()----------| id|name|age|----------| 1|张三| 18|| 2|李四| 30|----------init_df2.show()init_df.printSchema()root|-- id: integer (nullable true)|-- name: string (nullable true)|-- age: integer (nullable true)init_df2.printSchema()root|-- id: long (nullable true)|-- name: string (nullable true)|-- age: long (nullable true)spark.stop()
2. RDD转换DataFrame
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructFieldos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.appName(rdd_2_dataframe)\.master(local[*])\.getOrCreate()# 通过SparkSession得到SparkContextsc spark.sparkContext# 2- 数据输入# 2.1- 创建一个RDDinit_rdd sc.parallelize([1,李白,20,2,安其拉,18])# 2.2- 将RDD的数据结构转换成二维结构new_rdd init_rdd.map(lambda line: (int(line.split(,)[0]),line.split(,)[1],int(line.split(,)[2])))# 将RDD转成DataFrame方式一# schema方式一schema StructType()\.add(id,IntegerType(),False)\.add(name,StringType(),False)\.add(age,IntegerType(),False)# schema方式二schema StructType([StructField(id,IntegerType(),False),StructField(name,StringType(),False),StructField(age,IntegerType(),False)])# schema方式三schema id:int,name:string,age:int# schema方式四schema [id,name,age]init_df spark.createDataFrame(datanew_rdd,schemaschema)# 将RDD转成DataFrame方式二toDF中的schema既可以传List也可以传字符串形式的schema信息# init_df new_rdd.toDF(schema[id,name,age])init_df new_rdd.toDF(schemaid:int,name:string,age:int)# 3- 数据处理# 4- 数据输出init_df.show()init_df.printSchema()# 5- 释放资源sc.stop()spark.stop()四 . 操作DataFrame SQL方式:
df.createTempView(视图名称): 创建一个临时的视图(表名) df.createOrReplaceTempView(视图名称): 创建一个临时的视图(表名)如果视图存在直接替换 临时视图仅能在当前这个Spark Session的会话中使用 df.createGlobalTempView(视图名称): 创建一个全局视图运行在一个Spark应用中多个spark会话中都可以使用。在使用的时候必须通过 global_temp.视图名称 方式才可以加载到。较少使用
DSL方式: show()用于展示DF中数据, 默认仅展示前20行 参数1设置默认展示多少行 默认为20 参数2是否为阶段列, 默认仅展示前20个字符数据, 如果过长, 不展示(一般不设置) printSchema()用于打印当前这个DF的表结构信息 select()类似于SQL中select, SQL中select后面可以写什么, 这样同样也一样 filter()和 where()用于对数据进行过滤操作, 一般在spark SQL中主要使用where groupBy()用于执行分组操作 orderBy()用于执行排序操作