商城网站建设专业公司,云梦主城区核酸检测,广州网页设计美工培训,网站接口需求本博文的主要内容是#xff1a; 1、rdd基本操作实战 2、transformation和action流程图 3、典型的transformation和action RDD有3种操作#xff1a; 1、 Trandformation 对数据状态的转换#xff0c;即所谓算子的转换 2、 Action 触发作业#xff0c;即所谓得结果…本博文的主要内容是 1、rdd基本操作实战 2、transformation和action流程图 3、典型的transformation和action RDD有3种操作 1、 Trandformation 对数据状态的转换即所谓算子的转换 2、 Action 触发作业即所谓得结果的 3、 Contoller 对性能、效率和容错方面的支持如cache、persist、checkpoint Contoller包括cache、persist、checkpoint。 /** * Return a new RDD by applying a function to all elements of this RDD. */def map[U: ClassTag](f: T U): RDD[U] withScope { val cleanF sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) iter.map(cleanF))} 传入类型是T返回类型是U。 元素之间为什么reduce操作要符合结合律和交换律答因为交换律不知哪个数据先过来。所以必须符合交换律。 在交换律基础上想要reduce操作必须要符合结合律。/** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */def reduce(f: (T, T) T): T withScope { val cleanF sc.clean(f) val reducePartition: Iterator[T] Option[T] iter { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } var jobResult: Option[T] None val mergeResult (index: Int, taskResult: Option[T]) { if (taskResult.isDefined) { jobResult jobResult match { case Some(value) Some(f(value, taskResult.get)) case None taskResult } } } sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(throw new UnsupportedOperationException(empty collection))} RDD.scala源码 这里新建包com.zhouls.spark.cores package com.zhouls.spark.cores/** * Created by Administrator on 2016/9/27. */object TextLines {}下面开始编代码本地模式 自动 会写好 源码来看 所以 val lines sc.textFile(C:\\Users\\Administrator\\Desktop\\textlines.txt) //通过HadoopRDD以及MapPartitionsRDD获取文件中每一行的内容本身 val lineCount lines.map(line (line,1)) //每一行变成行的内容与1构成的Tuple val textLines lineCount.reduceByKey(__) textLines.collect.foreach(pair println(pair._1 : pair._2)) 成功 现在将此行代码 textLines.collect.foreach(pair println(pair._1 : pair._2))改一改 textLines.foreach(pair println(pair._1 : pair._2)) 总结 本地模式里 textLines.collect.foreach(pair println(pair._1 : pair._2))改一改 textLines.foreach(pair println(pair._1 : pair._2)) 运行正常因为在本地模式下是jvm但这样书写是不正规的。 集群模式里 textLines.collect.foreach(pair println(pair._1 : pair._2))改一改 textLines.foreach(pair println(pair._1 : pair._2)) 运行无法通过因为结果是分布在各个节点上。 collect源码 /** * Return an array that contains all of the elements in this RDD. */def collect(): Array[T] withScope { val results sc.runJob(this, (iter: Iterator[T]) iter.toArray) Array.concat(results: _*)}得出collect后array中就是一个元素只不过这个元素是一个Tuple。 Tuple是元组。通过concat合并 foreach源码 /** * Applies a function f to all elements of this RDD. */def foreach(f: T Unit): Unit withScope { val cleanF sc.clean(f) sc.runJob(this, (iter: Iterator[T]) iter.foreach(cleanF))} rdd实战rdd基本操作实战至此 rdd实战transformation流程图 拿wordcount为例 启动hdfs集群 sparkSparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh 启动spark集群 sparkSparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh 启动spark-shell sparkSparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g scala val partitionsReadmeRdd sc.textFile(hdfs://SparkSingleNode:9000/README.md).flatMap(_.split( )).map(word (word,1)).reduceByKey(__,1).saveAsTextFile(~/partition1README.txt) 或者 scala val readmeRdd sc.textFile(hdfs://SparkSingleNode:9000/README.md) scala val partitionsReadmeRdd readmeRdd.flatMap(_.split( )).map(word (word,1)).reduceByKey(__,1) .saveAsTextFile(~/partition1README.txt) 注意~目录不是这里。 为什么我的不是这样的显示呢 RDD的transformation和action执行的流程图 典型的transformation和action 转载于:https://www.cnblogs.com/zlslch/p/5913334.html