手机网站底部固定菜单,wordpress微信插件,中国十大门窗品牌,浙江建设工程造价信息网站概述
共享变量 共享变量的工作原理Broadcast VariableAccumulator
共享变量
共享变量的工作原理
通常#xff0c;当给 Spark 操作的函数(如 mpa 或 reduce) 在 Spark 集群上执行时#xff0c;函数中的变量单独的拷贝到各个节点上#xff0c;函数执行时#xff0c;使用…概述
共享变量 共享变量的工作原理Broadcast VariableAccumulator
共享变量
共享变量的工作原理
通常当给 Spark 操作的函数(如 mpa 或 reduce) 在 Spark 集群上执行时函数中的变量单独的拷贝到各个节点上函数执行时使用的是自己节点执行上的变量节点上的变量更新不会更新至 driver 在任务之间支持通用的读写共享变量是低效的然而Spark 的提供了两种有限类型的共享变量broadcast variables 和 accumulators。
Broadcast Variable
Broadcast Variable会将使用到的变量仅仅为每个节点拷贝一份而不会为每个task都拷贝一份副本因此其最大的作用就是减少变量到各个节点的网络传输消耗以及在各个节点上的内存消耗 通过调用SparkContext的broadcast()方法针对某个变量创建广播变量 注意 广播变量是只读的在算子函数内使用到广播变量时每个节点只会拷贝一份副本。可以使用广播变量的value()方法获取值。
由下图深入理解 Broadcast Variable 由图可知普通变量 和 Broadcast Variable 区别就是网络传输可以大大的降低Broadcast Variable 是每个节点机器只有一份而 普通变量 是每个 task 都会有一份浪费内存存储。
可以想象一个极端情况如果map算子有10个task恰好这10个task还都在一个worker节点上那么这个时候map算子使用的外部变量就会在这个worker节点上保存10份这样就很占用内存了。
接下来通过具体的案例来使用一下这个广播案例代码如下图
object BroadcastOpScala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(BroadcastOpScala).setMaster(local)val sc new SparkContext(conf)val dataRdd sc.parallelize(Array(1, 2, 3, 4, 5))val variable 2// 1.定义广播变量val variableBroadcast sc.broadcast(variable)// 2.使用广播变量调用其 value方法dataRdd.map(_ * variableBroadcast.value).foreach(println _)}
}Accumulator
Spark 提供的 Accumulator主要用于多个节点对一个变量进行共享性的操作。 正常情况下在 Spark的任务中由于一个算子可能会产生多个 task 并行执行所以在这个算子内部执行的聚合计算都是局部的想要实现多个 task 进行全局聚合计算此时就需要用到 Accumulator 这个共享的累加变量 。
注意 Accumulator只提供了累加的功能。在task只能对Accumulator进行累加操作不能读取它的值。只有在Driver进程中才可以读取Accumulator的值。
代码如下
object AccumulatorOpScala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(AccumulatorOpScala).setMaster(local)val sc new SparkContext(conf)val dataRDD sc.parallelize(Array(1,2,3,4,5))// 1.定义累加变量val sumAccumulator sc.longAccumulator// 2.使用累加变量dataRDD.foreach(sumAccumulator.add(_))println(sumAccumulator.value)}
}结束
至此共享变量就结束了如有问题欢迎评论区提问。