深圳网站维护seo,google网站,设计签名免费艺术签名,浙江建设继续教育网站文章目录 1.UDF2.UDAF2.1 UDF函数实现原理2.2需求:计算用户平均年龄2.2.1 使用RDD实现2.2.2 使用UDAF弱类型实现2.2.3 使用UDAF强类型实现 1.UDF
用户可以通过 spark.udf 功能添加自定义函数#xff0c;实现自定义功能。
如#xff1a;实现需求在用户name前加上Name:… 文章目录 1.UDF2.UDAF2.1 UDF函数实现原理2.2需求:计算用户平均年龄2.2.1 使用RDD实现2.2.2 使用UDAF弱类型实现2.2.3 使用UDAF强类型实现 1.UDF
用户可以通过 spark.udf 功能添加自定义函数实现自定义功能。
如实现需求在用户name前加上Name:字符串并打印在控制台 def main(args: Array[String]): Unit {//创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLDemo03)//创建 SparkSession 对象val sc: SparkSession SparkSession.builder().config(conf).getOrCreate()import sc.implicits._//创建DataFrameval dataRDD: RDD[(String,Int)] sc.sparkContext.makeRDD(List((zhangsan,21),(lisi,24)))val dataframe dataRDD.toDF(name,age)//注册udf函数sc.udf.register(addName,(x:String)Name:x)//创建临时视图dataframe.createOrReplaceTempView(people)//对临时视图使用udf函数sc.sql(select addName(name) from people).show()sc.stop()}2.UDAF
强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数 如 count()countDistinct()avg()max()min()。除此之外用户可以设定自己的自定义聚合函数。**通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。**从 Spark3.0 版本后UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator。
2.1 UDF函数实现原理 在Spark中UDF用户自定义函数在对表中的数据进行处理时通常会将数据放入缓冲区中以便进行计算。这种缓冲策略可以提高数据处理的效率特别是对于大数据集。
2.2需求:计算用户平均年龄
2.2.1 使用RDD实现 val dataRDD: RDD[(String,Int)] sc.sparkContext.makeRDD(List((zhangsan,21),(lisi,24),(wangwu,26)))val reduceResult: (Int, Int) dataRDD.map({case (name, age) {(age, 1)}}).reduce((t1, t2) {(t1._1 t2._1, t1._2 t2._2)})println(reduceResult._1/reduceResult._2)2.2.2 使用UDAF弱类型实现
需要用户自定义类实现UserDefinedAggregateFunction并重写其中的方法当前已不推荐使用。
package bigdata.wordcount.udfimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/
object UDF_Demo02 {def main(args: Array[String]): Unit {//创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLDemo03)//创建 SparkSession 对象val sc: SparkSession SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] sc.sparkContext.makeRDD(List((zhangsan, 19), (lisi, 21), (wangwu, 22)))val dataFrame: DataFrame dataRDD.toDF(name,age)dataFrame.createOrReplaceTempView(user)//创建聚合函数var myAvgnew MyAverageUDAF()//在Spark中注册自定义的聚合函数sc.udf.register(avgMy,myAvg)sc.sql(select avgMy(age) from user).show()sc.stop()}case class User(var name:String,var age:Int)}class MyAverageUDAF extends UserDefinedAggregateFunction{//输入的要进行聚合的参数的类型override def inputSchema: StructType StructType(Array(StructField(age,IntegerType)))//聚合函数缓冲区中的值的数据类型override def bufferSchema: StructType StructType(Array(StructField(sum,LongType),StructField(count,LongType)))//函数返回的值的数据类型override def dataType: DataType DoubleType//判断函数的稳定性//对于相同类型的输入是否有相同类型的输出override def deterministic: Boolean true//聚合函数缓冲区中值的初始化//因为数据是弱类型的函数缓冲区中是根据索引来找到对应的变量override def initialize(buffer: MutableAggregationBuffer): Unit {//年龄的总和buffer(0)0L//年龄的个数buffer(1)0L}//更新缓冲区中的数据(执行操作步骤)override def update(buffer: MutableAggregationBuffer, input: Row): Unit {//第0个索引值是否为空if(!input.isNullAt(0)) {//更新年龄sum的值buffer(0)buffer.getLong(0)input.getInt(0)//更新年龄个数buffer(1)buffer.getLong(1)1;}}//合并缓冲区override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit {buffer1(0)buffer1.getLong(0)buffer2.getLong(0)buffer1(1)buffer1.getLong(1)buffer2.getLong(1)}//计算最终结果override def evaluate(buffer: Row): Double {buffer.getLong(0).toDouble / buffer.getLong(1)}
}2.2.3 使用UDAF强类型实现
Spark3.0 版本可以采用强类型的 Aggregator 方式代替 UserDefinedAggregateFunction
package bigdata.wordcount.udfimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn}
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/
object UDF_Demo03 {def main(args: Array[String]): Unit {//创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLDemo03)//创建 SparkSession 对象val sc: SparkSession SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] sc.sparkContext.makeRDD(List((zhangsan, 19), (lisi, 21), (wangwu, 22)))val dataFrame: DataFrame dataRDD.toDF(name,age)val dataset: Dataset[User01] dataFrame.as[User01]//创建聚合函数var myAvgnew MyAverageUDAF01()//将聚合函数转换为查询的列val col: TypedColumn[User01, Double] myAvg.toColumn//执行查询操作dataset.select(col).show()sc.stop()}case class User(var name:String,var age:Int)}//输入数据类型
case class User01(var name:String,var age:Int)
//缓存中的数据类型
case class AgeBuffer(var sum:Long,var count:Long)class MyAverageUDAF01 extends Aggregator[User01,AgeBuffer,Double]{//设置初始值override def zero: AgeBuffer {AgeBuffer(0L,0L)}//缓冲区实现聚合override def reduce(b: AgeBuffer, a: User01): AgeBuffer {b.sum b.sum a.ageb.count b.count 1b}//合并缓冲区override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer {b1.sumb2.sumb1.countb2.countb1}//计算最终结果override def finish(buff: AgeBuffer): Double {buff.sum.toDouble/buff.count}//设置编码器和解码器//自定义类型就是 product 自带类型根据类型选择override def bufferEncoder: Encoder[AgeBuffer] {Encoders.product}override def outputEncoder: Encoder[Double] {Encoders.scalaDouble}
}