当前位置: 首页 > news >正文

深圳网站维护seogoogle网站

深圳网站维护seo,google网站,设计签名免费艺术签名,浙江建设继续教育网站文章目录 1.UDF2.UDAF2.1 UDF函数实现原理2.2需求:计算用户平均年龄2.2.1 使用RDD实现2.2.2 使用UDAF弱类型实现2.2.3 使用UDAF强类型实现 1.UDF 用户可以通过 spark.udf 功能添加自定义函数#xff0c;实现自定义功能。 如#xff1a;实现需求在用户name前加上Name:… 文章目录 1.UDF2.UDAF2.1 UDF函数实现原理2.2需求:计算用户平均年龄2.2.1 使用RDD实现2.2.2 使用UDAF弱类型实现2.2.3 使用UDAF强类型实现 1.UDF 用户可以通过 spark.udf 功能添加自定义函数实现自定义功能。 如实现需求在用户name前加上Name:字符串并打印在控制台 def main(args: Array[String]): Unit {//创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLDemo03)//创建 SparkSession 对象val sc: SparkSession SparkSession.builder().config(conf).getOrCreate()import sc.implicits._//创建DataFrameval dataRDD: RDD[(String,Int)] sc.sparkContext.makeRDD(List((zhangsan,21),(lisi,24)))val dataframe dataRDD.toDF(name,age)//注册udf函数sc.udf.register(addName,(x:String)Name:x)//创建临时视图dataframe.createOrReplaceTempView(people)//对临时视图使用udf函数sc.sql(select addName(name) from people).show()sc.stop()}2.UDAF 强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数 如 count()countDistinct()avg()max()min()。除此之外用户可以设定自己的自定义聚合函数。**通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。**从 Spark3.0 版本后UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator。 2.1 UDF函数实现原理 在Spark中UDF用户自定义函数在对表中的数据进行处理时通常会将数据放入缓冲区中以便进行计算。这种缓冲策略可以提高数据处理的效率特别是对于大数据集。 2.2需求:计算用户平均年龄 2.2.1 使用RDD实现 val dataRDD: RDD[(String,Int)] sc.sparkContext.makeRDD(List((zhangsan,21),(lisi,24),(wangwu,26)))val reduceResult: (Int, Int) dataRDD.map({case (name, age) {(age, 1)}}).reduce((t1, t2) {(t1._1 t2._1, t1._2 t2._2)})println(reduceResult._1/reduceResult._2)2.2.2 使用UDAF弱类型实现 需要用户自定义类实现UserDefinedAggregateFunction并重写其中的方法当前已不推荐使用。 package bigdata.wordcount.udfimport org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType} import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/ object UDF_Demo02 {def main(args: Array[String]): Unit {//创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLDemo03)//创建 SparkSession 对象val sc: SparkSession SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] sc.sparkContext.makeRDD(List((zhangsan, 19), (lisi, 21), (wangwu, 22)))val dataFrame: DataFrame dataRDD.toDF(name,age)dataFrame.createOrReplaceTempView(user)//创建聚合函数var myAvgnew MyAverageUDAF()//在Spark中注册自定义的聚合函数sc.udf.register(avgMy,myAvg)sc.sql(select avgMy(age) from user).show()sc.stop()}case class User(var name:String,var age:Int)}class MyAverageUDAF extends UserDefinedAggregateFunction{//输入的要进行聚合的参数的类型override def inputSchema: StructType StructType(Array(StructField(age,IntegerType)))//聚合函数缓冲区中的值的数据类型override def bufferSchema: StructType StructType(Array(StructField(sum,LongType),StructField(count,LongType)))//函数返回的值的数据类型override def dataType: DataType DoubleType//判断函数的稳定性//对于相同类型的输入是否有相同类型的输出override def deterministic: Boolean true//聚合函数缓冲区中值的初始化//因为数据是弱类型的函数缓冲区中是根据索引来找到对应的变量override def initialize(buffer: MutableAggregationBuffer): Unit {//年龄的总和buffer(0)0L//年龄的个数buffer(1)0L}//更新缓冲区中的数据(执行操作步骤)override def update(buffer: MutableAggregationBuffer, input: Row): Unit {//第0个索引值是否为空if(!input.isNullAt(0)) {//更新年龄sum的值buffer(0)buffer.getLong(0)input.getInt(0)//更新年龄个数buffer(1)buffer.getLong(1)1;}}//合并缓冲区override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit {buffer1(0)buffer1.getLong(0)buffer2.getLong(0)buffer1(1)buffer1.getLong(1)buffer2.getLong(1)}//计算最终结果override def evaluate(buffer: Row): Double {buffer.getLong(0).toDouble / buffer.getLong(1)} }2.2.3 使用UDAF强类型实现 Spark3.0 版本可以采用强类型的 Aggregator 方式代替 UserDefinedAggregateFunction package bigdata.wordcount.udfimport org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn} import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType} import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/ object UDF_Demo03 {def main(args: Array[String]): Unit {//创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLDemo03)//创建 SparkSession 对象val sc: SparkSession SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] sc.sparkContext.makeRDD(List((zhangsan, 19), (lisi, 21), (wangwu, 22)))val dataFrame: DataFrame dataRDD.toDF(name,age)val dataset: Dataset[User01] dataFrame.as[User01]//创建聚合函数var myAvgnew MyAverageUDAF01()//将聚合函数转换为查询的列val col: TypedColumn[User01, Double] myAvg.toColumn//执行查询操作dataset.select(col).show()sc.stop()}case class User(var name:String,var age:Int)}//输入数据类型 case class User01(var name:String,var age:Int) //缓存中的数据类型 case class AgeBuffer(var sum:Long,var count:Long)class MyAverageUDAF01 extends Aggregator[User01,AgeBuffer,Double]{//设置初始值override def zero: AgeBuffer {AgeBuffer(0L,0L)}//缓冲区实现聚合override def reduce(b: AgeBuffer, a: User01): AgeBuffer {b.sum b.sum a.ageb.count b.count 1b}//合并缓冲区override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer {b1.sumb2.sumb1.countb2.countb1}//计算最终结果override def finish(buff: AgeBuffer): Double {buff.sum.toDouble/buff.count}//设置编码器和解码器//自定义类型就是 product 自带类型根据类型选择override def bufferEncoder: Encoder[AgeBuffer] {Encoders.product}override def outputEncoder: Encoder[Double] {Encoders.scalaDouble} }
http://www.pierceye.com/news/971201/

相关文章:

  • 电商网站建设概念wordpress主题个人博客
  • 福州网站建设多少钱全网商城系统
  • 自己如何做团购网站在设计赚钱的网站
  • 支持wap网站的系统百度seo关键词排名查询
  • 做最好最全的命理网站郴州网络营销
  • wap网站技术怎么查询网站的建站时间
  • 深圳网站开发专业团队2o18江苏建设网站施工员模试卷
  • 网站购物建设实训心得体会中山皇冠建设开发有限公司网站
  • 做360pc网站排名首页学校网站建设工作计划
  • 网站设计与制作的基本步骤移动互联网论文
  • 建设部网站2015年第158号中国建筑材料网官网
  • 大理网站建设沛宣郑州模板建站代理
  • 新浪博客怎么上传wordpress佛山推广seo排名
  • 北京建设网站有哪些公司网络营销方法有哪几种
  • 在中国备案的网站服务器利用小说网站做本站优化
  • 网站风格的表现形式重庆观音桥房价
  • 哪些公司的网站做的很好手机网页素材
  • 天津地铁建设网站百度广告太多
  • 保定php网站制作wordpress的seo收件箱
  • 网站建设公司-跨界鱼科技优外国网站设计风格
  • 网站营销平台注册微信公众号流程
  • 西安专业网站建设服务公司商标查询网入口
  • 营销型网站设计房地产wordpress多媒体路径
  • 门户网站建设解决方案wordpress图片广告
  • 哈尔滨h5模板建站设计一个软件需要多少钱
  • 青岛网站建设方案服务惠民卡看电影怎么用
  • 兰州新站点seo加盟网站建设工作有底薪吗
  • 哈尔滨建设网站官网清远头条新闻
  • 泉州网站设计平台wordpress cenos
  • 网站内容批量替换站长之家网站素材