当前位置: 首页 > news >正文

营销网站定位湖北专业网站建设市面价

营销网站定位,湖北专业网站建设市面价,网站运营怎么自学,温州网站设计服务商目录 1. 代码功能概述 2. 代码逐段解析 主程序逻辑 自定义累加器 MyAccumulator 3. Spark累加器原理 累加器的作用 AccumulatorV2 vs AccumulatorV1 累加器执行流程 4. 代码扩展与优化建议 支持多词统计 线程安全优化 使用内置累加器 5. Spark累加器的适用场景 6…目录 1. 代码功能概述 2. 代码逐段解析 主程序逻辑 自定义累加器 MyAccumulator 3. Spark累加器原理 累加器的作用 AccumulatorV2 vs AccumulatorV1 累加器执行流程 4. 代码扩展与优化建议 支持多词统计 线程安全优化 使用内置累加器 5. Spark累加器的适用场景 6. 总结 package core.bcimport org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutableobject AccWordCount {def main(args: Array[String]): Unit {val sparkConfnew SparkConf().setMaster(local).setAppName(AccWordCount)val sc new SparkContext(sparkConf)val value sc.makeRDD(List(hello,spark,hello))//累加器WordCount//创建累加器对象val wcAccnew MyAccumulator()//向Spark进行注册sc.register(wcAcc, wordCountAcc)value.foreach(word{//数据的累加使用累加器wcAcc.add(word)})//获取累加器结果println(wcAcc.value)sc.stop()}/*** 自定义数据累加器* 1、继承AccumulatorV2。定义泛型* IN累加器输入的数据类型* OUT返回的数据类型* 2、重写方法*/class MyAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{val wcMap mutable.Map[String, Long]()override def isZero: Boolean wcMap.isEmpty//判断知否为初始状态override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] new MyAccumulator()//复制一个新的累加器override def reset(): Unit wcMap.clear()//重置累加器override def add(word: String): Unit { //获取累加器需要计算的值val newcountwcMap.getOrElse(word,0L)1LwcMap.update(word,newcount)}override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit {//Driver合并多个累加器val map1this.wcMapval map2other.valuemap2.foreach {case (word, count) {val newCount map1.getOrElse(word, 0L) countwcMap.update(word, newCount)}}}override def value: mutable.Map[String, Long] wcMap //获取累加器结果} }1. 代码功能概述 该代码使用Apache Spark实现了一个基于自定义累加器的单词计数WordCount程序。通过自定义MyAccumulator类继承AccumulatorV2统计RDD中每个单词的出现次数并利用累加器的分布式聚合特性将结果汇总到驱动程序。 2. 代码逐段解析 主程序逻辑 object AccWordCount {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local).setAppName(AccWordCount)val sc new SparkContext(sparkConf)val value sc.makeRDD(List(hello, spark, hello))// 创建并注册累加器val wcAcc new MyAccumulator()sc.register(wcAcc, wordCountAcc)// 遍历RDD累加单词value.foreach(word wcAcc.add(word))// 输出结果println(wcAcc.value) // 预期输出Map(hello - 2, spark - 1)sc.stop()} }RDD创建sc.makeRDD生成包含3个单词的RDD。累加器注册MyAccumulator实例通过sc.register注册到SparkContext名称为wordCountAcc。累加操作foreach遍历RDD中的每个单词调用wcAcc.add(word)累加计数。结果获取wcAcc.value返回最终的单词计数Map。 自定义累加器 MyAccumulator class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {val wcMap mutable.Map[String, Long]()override def isZero: Boolean wcMap.isEmptyoverride def copy(): AccumulatorV2[String, mutable.Map[String, Long]] new MyAccumulator()override def reset(): Unit wcMap.clear()override def add(word: String): Unit {val newCount wcMap.getOrElse(word, 0L) 1LwcMap.update(word, newCount)}override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit {val map1 this.wcMapval map2 other.valuemap2.foreach { case (word, count) val newCount map1.getOrElse(word, 0L) countwcMap.update(word, newCount)}}override def value: mutable.Map[String, Long] wcMap }核心字段wcMap用于存储单词及其计数。关键方法 isZero判断累加器是否为空初始状态。copy创建累加器的副本用于任务节点本地计算。reset清空累加器状态。add累加单个单词的计数。merge合并其他累加器的统计结果分布式汇总。value返回最终结果。 3. Spark累加器原理 累加器的作用 分布式聚合在多个任务节点上独立计算局部结果最后汇总到驱动程序。高效通信避免频繁的Shuffle操作减少网络开销。线程安全Spark保证每个任务节点内的累加器操作是串行的。 AccumulatorV2 vs AccumulatorV1 AccumulatorV1仅支持简单数据类型如Long、Double适用于计数、求和等场景。AccumulatorV2支持复杂数据类型如Map、List需自定义add和merge方法适用于更灵活的聚合需求如WordCount。 累加器执行流程 任务节点本地计算每个任务节点维护累加器的本地副本通过add方法累加数据。结果汇总任务完成后Spark将各节点的累加器副本发送到驱动程序调用merge方法合并结果。驱动程序获取结果通过value方法获取全局聚合结果。 4. 代码扩展与优化建议 支持多词统计 当前代码统计单次出现的单词若需统计多个单词如键值对可修改add方法 override def add(input: String): Unit {val words input.split(\\s) // 按空格分割多词words.foreach(word {val newCount wcMap.getOrElse(word, 0L) 1LwcMap.update(word, newCount)}) }线程安全优化 若add方法可能被多线程并发调用如在复杂算子中需添加同步锁 override def add(word: String): Unit this.synchronized {val newCount wcMap.getOrElse(word, 0L) 1LwcMap.update(word, newCount) }使用内置累加器 对于简单场景如全局计数可直接使用Spark内置的LongAccumulator val countAcc sc.longAccumulator(countAcc) value.foreach(_ countAcc.add(1)) println(countAcc.value) // 输出总记录数5. Spark累加器的适用场景 全局计数统计任务处理的总记录数、错误数等。分组统计如WordCount、用户行为分类统计。指标监控实时计算平均值、最大值等需结合自定义逻辑。调试与日志在不中断作业的情况下收集分布式运行状态。 6. 总结 该代码通过自定义AccumulatorV2实现了分布式单词计数展示了累加器的核心原理任务节点本地计算 驱动程序全局汇总。通过合理设计add和merge方法累加器可支持复杂聚合逻辑是Spark中高效的分布式统计工具。
http://www.pierceye.com/news/852454/

相关文章:

  • 国外的购物网站有哪些安徽省住房和城乡建设厅官方网站
  • 网站策划需要什么能力网页游戏平台软件
  • phpmysql网站开发网络结构
  • 微官网和移动网站区别论坛网站建设多少钱
  • 怎么做公司网站优化凡科h5登录入口
  • 做电影网站如何推广方案房产网络平台
  • 站长工具 seo查询python爬数据做网站
  • 网站 底部医院网站建设的要求
  • asp网站静态化seo关键词排名优化软件怎么选
  • wordpress apache版本北京seo招聘
  • 南京玄武网站建设信息服务公司的经营范围有哪些
  • 旅游网站建设与翻译wordpress 显示作者
  • 网站建设与维护报告总结国家外汇管理局网站怎么做报告
  • 南沙区网站建设网站开发人员薪酬
  • 设计外贸英文网站简述网站开发的流程
  • 电商网站设计是干什么的如何建设cpa影视网站
  • wordpress设置阅读全文什么是seo搜索引擎优化
  • 网站名重复网站建设的经验之谈
  • 网站优化软件排名器有含义的公司名
  • 像wordpress一样的网站吗老徐蜂了网站策划书
  • ps做网站首页效果特效wordpress无法修改密码
  • 蚌埠网站设计一句话宣传自己的产品
  • 织梦开发供需网站杭州互联网企业排名
  • 网站结构分析关键词林俊杰的寓意
  • 网站备案 超链接青岛胶南做网站的
  • 国内ui做的好的网站网站底部 图标
  • 网站开发维护人员天津微外卖网站建设
  • 保定网站建设推广公司怎么样雄安优秀网站建设
  • 上海集团网站建设做网站用asp好吗
  • h5网站建设价格wp-wordpress