当前位置: 首页 > news >正文

赣州酒店网站设计装修设计公司咨询

赣州酒店网站设计,装修设计公司咨询,网站维护与更新,加强部门网站建设工作概述 spark 版本为 3.2.4#xff0c;注意 RDD 转 DataFrame 的代码出现的问题及解决方案 本文目标如下#xff1a; RDD ,Datasets,DataFrames 之间的区别入门 SparkSession创建 DataFramesDataFrame 操作编程方式运行 sql 查询创建 DatasetsDataFrames 与 RDDs 互相转换 使用…概述 spark 版本为 3.2.4注意 RDD 转 DataFrame 的代码出现的问题及解决方案 本文目标如下 RDD ,Datasets,DataFrames 之间的区别入门 SparkSession创建 DataFramesDataFrame 操作编程方式运行 sql 查询创建 DatasetsDataFrames 与 RDDs 互相转换 使用反射推断模式编程指定 Schema 参考 Spark 官网 相关文章链接如下 文章链接spark standalone环境安装地址Spark的工作与架构原理地址使用spark开发第一个程序WordCount程序及多方式运行代码地址RDD编程指南地址RDD持久化地址 RDD ,Datasets,DataFrames 之间的区别 Datasets , DataFrames和 RDD Dataset 是一个分布式的数据集合Dataset 是 Spark 1.6 中添加的一个新接口它增益了 RDD (强类型可以使用 lambda 函数的能力) 和 Spark sql 优化执行引擎的优势。Dataset 可以由JVM对象构建然后使用函数转换map、flatMap、filter等进行操作。数据集API有Scala和Java版本。Python不支持数据集API。 DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表DataFrame API在Scala、Java、Python和R中可用。在Scala API中DataFrame只是Dataset[Row]的一个类型别名。而在Java API中用户需要使用DatasetRow来表示DataFrame。 DataFrameRDDSchemaRDD可以认为是表中的数据Schema是表结构信息。DataFrame可以通过很多来源进行构建包括结构化的数据文件Hive中的表外部的关系型数据库以及RDD 入门 Spark SQL是一个用于结构化数据处理的Spark模块。与基本的Spark RDD API不同Spark SQL提供的接口为Spark提供了更多关于正在执行的数据结构信息。在内部Spark SQL使用这些额外的信息来执行额外的优化。有几种方法可以与SparkSQL进行交互包括SQL和 Dataset API。计算结果时使用相同的执行引擎与用于表示计算的API/语言无关。方便用户切换不同的方式进行操作 people.json people.json文件准备 SparkSession Spark sql 中所有功能入口点是 SparkSession类。创建一个基本的 SparkSession只需使用 SparkSession.builder() import org.apache.spark.sql.SparkSessionval spark SparkSession.builder().appName(Spark SQL basic example).config(spark.some.config.option, some-value).getOrCreate()创建 DataFrames 使用 SparkSession通过存在的RDDhive 表或其它的Spark data sources 程序创建 DataFrames val df spark.read.json(/tmp/people.json) df.show()执行如下图 DataFrame 操作 使用数据集进行结构化数据处理的基本示例如下 // 需要引入 spark.implicits._ 才可使用 $ // This import is needed to use the $-notation import spark.implicits._ // 打印schema 以树格式 // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable true) // |-- name: string (nullable true)// 仅显示 name 列 // Select only the name column df.select(name).show() // ------- // | name| // ------- // |Michael| // | Andy| // | Justin| // ------- // 显示所有age 加1 // Select everybody, but increment the age by 1 df.select($name, $age 1).show() // ---------------- // | name|(age 1)| // ---------------- // |Michael| null| // | Andy| 31| // | Justin| 20| // ----------------// 过滤 人的 age 大于 21 // Select people older than 21 df.filter($age 21).show() // ------- // |age|name| // ------- // | 30|Andy| // -------// 按 age 分组统计 // Count people by age df.groupBy(age).count().show() // --------- // | age|count| // --------- // | 19| 1| // |null| 1| // | 30| 1| // ---------spark-shell 执行如下图 编程方式运行 sql 查询 df.createOrReplaceTempView(people)val sqlDF spark.sql(SELECT * FROM people) sqlDF.show()执行如下 scala df.createOrReplaceTempView(people)scala val sqlDF spark.sql(SELECT * FROM people) sqlDF: org.apache.spark.sql.DataFrame [age: bigint, name: string]scala sqlDF.show() ----------- | age| name| ----------- |null|Michael| | 30| Andy| | 19| Justin| -----------创建 Datasets Datasets类似于RDD不是使用Java序列化或Kryo而是使用专门的编码器来序列化对象以便通过网络进行处理或传输。使用的格式允许Spark执行许多操作如过滤、排序和哈希而无需将字节反序列化为对象。 case class Person(name: String, age: Long)// 为 case classes 创建编码器 // Encoders are created for case classes val caseClassDS Seq(Person(Andy, 32)).toDS() caseClassDS.show()// 为能用类型创建编码器并提供 spark.implicits._ 引入 // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS Seq(1, 2, 3).toDS() primitiveDS.map(_ 1).collect() // Returns: Array(2, 3, 4)// 通过定义类将按照名称映射DataFrames 能被转成 Dataset // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path /tmp/people.json val peopleDS spark.read.json(path).as[Person] peopleDS.show()执行如下 scala case class Person(name: String, age: Long) defined class Personscala val caseClassDS Seq(Person(Andy, 32)).toDS() caseClassDS: org.apache.spark.sql.Dataset[Person] [name: string, age: bigint]scala caseClassDS.show() ------- |name|age| ------- |Andy| 32| -------scala val primitiveDS Seq(1, 2, 3).toDS() primitiveDS: org.apache.spark.sql.Dataset[Int] [value: int]scala primitiveDS.map(_ 1).collect() res1: Array[Int] Array(2, 3, 4)scala val path /tmp/people.json path: String /tmp/people.jsonscala val peopleDS spark.read.json(path).as[Person] peopleDS: org.apache.spark.sql.Dataset[Person] [age: bigint, name: string]scala peopleDS.show() ----------- | age| name| ----------- |null|Michael| | 30| Andy| | 19| Justin| -----------DataFrames 与 RDDs 互相转换 Spark SQL支持两种不同的方法将现有RDD转换为Datasets。 第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以生成更简洁的代码当知道 schema 结构的时间会有更好的效果。第二种方法是通过编程接口构造 schema然后将其应用于现有的RDD。虽然此方法更详细直至运行时才能知道他们的字段和类型用于构造 Datasets。 使用反射推断模式 代码如下 object RddToDataFrameByReflect {def main(args: Array[String]): Unit {val spark SparkSession.builder().appName(RddToDataFrameByReflect).master(local).getOrCreate()// 用于从RDD到DataFrames的隐式转换// For implicit conversions from RDDs to DataFramesimport spark.implicits._// Create an RDD of Person objects from a text file, convert it to a Dataframeval peopleDF spark.sparkContext.textFile(/Users/hyl/Desktop/fun/sts/spark-demo/people.txt).map(_.split(,)).map(attributes Person(attributes(0), attributes(1).trim.toInt)).toDF()// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView(people)// SQL statements can be run by using the sql methods provided by Sparkval teenagersDF spark.sql(SELECT name, age FROM people WHERE age BETWEEN 13 AND 19)// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager Name: teenager(0)).show()// or by field nameteenagersDF.map(teenager Name: teenager.getAs[String](name)).show()}case class Person(name: String, age: Long) }执行如下图 编码问题 关于 Spark 官网 上复杂类型编码问题直接加下面一句代码 teenagersDF.map(teenager teenager.getValuesMap[Any](List(name, age))).collect().foreach(println(_))报以下图片错误 将原有代码改变如下 // 没有为 Dataset[Map[K,V]] 预先定义编码器需要自己定义// No pre-defined encoders for Dataset[Map[K,V]], define explicitlyimplicit val mapEncoder org.apache.spark.sql.Encoders.kryo[Map[String, Any]]// 也可以如下操作// Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] ExpressionEncoder()// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagersDF.map(teenager teenager.getValuesMap[Any](List(name, age))).collect().foreach(println(_))// Array(Map(name - Justin, age - 19))通过这一波操作就可以理解什么情况下需要编码器以及编码器的作用 编程指定 Schema 代码如下 object RddToDataFrameByProgram {def main(args: Array[String]): Unit {val spark SparkSession.builder().master(local).getOrCreate()import org.apache.spark.sql.Rowimport org.apache.spark.sql.types._// 加上此解决报错问题import spark.implicits._// Create an RDDval peopleRDD spark.sparkContext.textFile(/Users/hyl/Desktop/fun/sts/spark-demo/people.txt)// The schema is encoded in a stringval schemaString name age// Generate the schema based on the string of schemaval fields schemaString.split( ).map(fieldName StructField(fieldName, StringType, nullable true))val schema StructType(fields)// Convert records of the RDD (people) to Rowsval rowRDD peopleRDD.map(_.split(,)).map(attributes Row(attributes(0), attributes(1).trim))// Apply the schema to the RDDval peopleDF spark.createDataFrame(rowRDD, schema)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView(people)// SQL can be run over a temporary view created using DataFramesval results spark.sql(SELECT name FROM people)// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes Name: attributes(0)).show()} }执行如下图 官方文档的代码不全问题 Unable to find encoder for type String. An implicit Encoder[String] is needed to store String instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. results.map(attributes Name: attributes(0)).show() 加下以下代码 // 加上此解决报错问题 import spark.implicits._如下图解决 结束 spark sql 至此结束如有问题欢迎评论区留言。
http://www.pierceye.com/news/618072/

相关文章:

  • 宁波模板建站哪家服务专业wordpress 神箭手
  • 一张图片网站代码视频生成链接在线工具
  • 网站品牌推广浙江手机版建站系统开发
  • 网站后台密码在哪个文件建站报价表
  • 昌乐营销型网站建设个人管理系统
  • 手机网站开发位置定位天津和平做网站公司
  • 搜搜提交网站入口国外wordpress空间
  • python 做网站 数据库做企业官网还有必要吗
  • 数据录入网站开发安阳县实验中学
  • 网站 风格镜子厂家东莞网站建设
  • 做网站策划需要用什么软件网站建设 好发信息网
  • wordpress网站优化pc建站 手机网站
  • 教研网站建设方案如何网上接单做设计
  • 魏县网站建设推广怎样做seo搜索引擎优化
  • 网站优化外链怎么做东莞公司注册流程及需要的材料
  • 做交通锁具网站拍摄广告片制作公司
  • 学院网站建设项目范围变更申请表建设工程公司名称大全
  • 南京学校网站建设策划做的好的电商网站项目
  • apache 配置php网站石家庄做公司网站
  • 新动力网站建设wordpress顶部图片大小
  • 网站开发 手机 电脑手机网站建设文章
  • 网站维护的过程及方法济南街道办网站建设
  • 服务佳的小企业网站建设智慧团建pc端入口
  • 兰州北山生态建设局网站今天重大新闻2021
  • 民权网站建设用别人服务器做网站
  • 周口网站建设 网站制作 网络推广wordpress4.0安装教程
  • 长治市建设局网站自己做网站的优势
  • 网站管理与维护的优势php 做视频网站
  • 建设部网站业绩补录商河 网站建设
  • 网站页面设计考虑要素建站知识