网站规划教学设计,手机app开发教程视频,广州做企业网站,企业展厅设计效果图Spark RDD优化
一、分区优化二、持久化优化三、依赖优化四、共享变量优化五、提交模式与运行模式优化六、其他优化
一、分区优化 分区数调整#xff1a;RDD的分区数可以通过repartition和coalesce方法进行调整。合理的分区数可以提高并行度#xff0c;但过多的分区会增加管…Spark RDD优化
一、分区优化二、持久化优化三、依赖优化四、共享变量优化五、提交模式与运行模式优化六、其他优化
一、分区优化 分区数调整RDD的分区数可以通过repartition和coalesce方法进行调整。合理的分区数可以提高并行度但过多的分区会增加管理开销。通常分区数应根据数据规模和集群资源进行调整。 val rdd: RDD[String] rdd.coalesce(numPartitions:Int, shuffle:Boolean)
val rdd: RDD[String] rdd.repartition(numPartitions:Int)
// repartition(numPartitions: Int) 等价于 coalesce(numPartitions, true) 缩小分区 存在过多的小任务的时候收缩合并分区减少分区的个数减少任务调度成本 默认情况下不会对数据重组比如3个合成2个采用 {12}{3}容易导致数据倾斜 若需数据均衡则将 shuffle 参数设置为 true 即可 扩大分区 若需要扩大分区shuffle 参数必须设置为 true 若将2个分区拆分成3个必须打乱重新分区否则数据还是在两个分区(有一个分区为空){1}{2}{空} 数据本地性Spark会尽量将数据分配给与数据源相同的计算节点上以减少数据移动的开销。在创建RDD时可以通过设置分区偏好如preferredLocations或自定义分区来优化数据本地性以最小化网络传输并最大化计算效率。 自定义分区 // 自定义分区器
class MyPartitioner(numPartitions: Int) extends Partitioner {override def numPartitions: Int numPartitions // 返回分区器的分区数量override def getPartition(key: Any): Int {// 这里需要实现分区逻辑// 返回值是一个整数表示该键应该被分配到哪个分区}
}// 使用自定义分区器重新分区
val partitionedRDD rdd.partitionBy(new MyPartitioner(2)) // 传入分区个数处理数据倾斜数据倾斜是指某些分区包含的数据远远多于其他分区导致计算资源分配不均。可以使用repartition或coalesce方法重新分区RDD或使用reduceByKey、groupByKey的变体等特定操作来减轻数据倾斜的影响。
二、持久化优化 持久化策略对于需要多次使用的RDD应该进行持久化操作以避免重复计算。持久化策略包括内存持久化如MEMORY_ONLY、磁盘持久化如DISK_ONLY以及内存和磁盘混合持久化如MEMORY_AND_DISK等。 序列化使用序列化可以进一步减少内存消耗并提高持久化效率。Spark支持多种序列化框架如Java序列化、Kryo序列化等。Kryo序列化通常比Java序列化更快且占用空间更小。 // 临时存储于【xx】重用job结束后自动删除
val rddCache: RDD[T] rdd.cache() // 到内存上
val rdd: RDD[T] rdd.persist(level:StorageLevel)
// cache() 等价于persist(StorageLevel.MEMORY_ONLY)
// persisit() 参数如下StorageLevel.MEMORY_ONLY 只写到内存上
StorageLevel.DISK_ONLY 只写到磁盘上
StorageLevel.OFF_HEAP 使用堆外内存
StorageLevel.MEMORY_AND_DISK 先内存后磁盘
StorageLevel.MEMORY_AND_DISK_SER 先内存后磁盘采取序列化方式
StorageLevel.MEMORY_AND_DISK_SER_2 先内存后磁盘采取二代序列化方式检查点对于需要长时间运行或可能遭受故障的应用设置检查点Checkpoint可以将RDD的状态保存到稳定存储中以便在故障后恢复。检查点会切断RDD的血统关系从而避免重新计算整个血统链。 // checkpoint 长久存储于【磁盘】重用job结束后不会删除涉及IO性能较差安全且一般和cache组合使用
val conf new SparkConf().setAppName(spark_rdd).setMaster(local[4])
val sc SparkContext.getOrCreate(conf)
// 设置检查点路径
sc.setCheckpointDir(hdfs://ip:9000/spark/checkpoint)
// ...
rdd.checkpoint() // 将该 RDD 的内容写入到设置的路径并在该 RDD 的计算图中插入一个检查点Checkpoint节点三、依赖优化 宽依赖与窄依赖RDD之间的依赖关系分为宽依赖和窄依赖。窄依赖有助于实现数据本地性而宽依赖则可能导致数据移动和网络开销。在设计RDD转换操作时应尽量避免不必要的宽依赖。 1、Driver程序提交后 1、Spark调度器将所有的RDD看成是一个Stage 2、然后对此Stage进行逆向回溯遇到Shuffle就断开形成一个新的Stage 3、遇到窄依赖则归并到同一个Stage 4、等到所有的步骤回溯完成便生成一个DAG图 2、为什么要划分阶段 1、基于数据的分区本着传递计算的性能远高于传递数据所以数据本地化是提升性能的重要途径之一 2、一组串行的算子无需 Shuffle基于数据本地化可以作为一个独立的阶段连续并行执行 3、经过一组串行算子计算遇到 Shuffle 操作默认情况下 Shuffle 不会改变分区数量但会因为 numPartitions:Int, partitioner:Partitioner 等参数重新分配过程数据会【写盘供子RDD拉取(类MapReduce)】 3、RDD依赖关系 Lineage血统、遗传 RDD最重要的特性之一保存了RDD的依赖关系 RDD实现了基于Lineage的容错机制 依赖关系 org.apache.spark.Dependency 窄依赖 NarrowDependency1V1 OneToOneDependency1VN RangeDependency 宽依赖 ShuffleDependency 当RDD分区丢失时 对于窄依赖Spark只需要重新计算丢失分区的父RDD分区即可。 对于宽依赖Spark需要重新执行整个shuffle过程以重新生成丢失的数据。 若配合持久化更佳cache, persist, checkpoint 类型窄依赖mapflatMapmapPartitionsmapPartitionsWithIndexglomfilterdistinctintersectionsampleunionsubtractzip…cogroup宽依赖sortBysortByKeygroupByKeyreduceByKeycogroupjoinpartitionByrepartition不一定的情况在Spark中并非所有操作都可以明确地归类为宽依赖或窄依赖。有些操作可能根据具体的实现或上下文而有所不同。然而在大多数情况下上述提到的算子可以清晰地划分为宽依赖或窄依赖。如reduceByKey(【partitioner: Partitioner】, func: (V, V) V)
若使用的是带 partitioner 的重载且 Partitioner 和父RDD的 Partitioner一致
则为窄依赖RDD否则为宽依赖ShuffledRDD优化转换操作在可能的情况下使用能够减少shuffle操作的转换函数如mapPartitions代替mapreduceByKey代替groupByKey等。这些操作可以减少数据在网络中的传输量从而提高性能。 shuffle性能较差因为shuffle必须落盘内存中等数据会OOM
groupByKey只分组(存在Shuffle) reduce只聚合结果同性能不同
reduceByKey先分组、预聚合、再聚合(存在Shuffle) 四、共享变量优化 广播大变量当Spark作业中需要使用到较大的外部变量时可以将这些变量广播到每个节点的Executor上而不是每个Task都复制一份。这样可以减少网络传输开销和内存消耗。 val bc:Broadcast[T] sc.broadcast(value:T) // 创建广播变量
rdd.mapPartitions(itPar{val v:T bc.value // 在每个分区内部通过bc.value获取广播变量的值 ... // 使用v进行计算...
})累加器Accumulators累加器提供了一种有效的手段来进行分布式计算中的统计和计数操作减少通信开销并简化聚合操作。 累加器accumulate只能 add 操作常用于计数 1、定义在Driver端的一个变量Cluster中每一个Task都会有一份Copy 2、所有的Task都计算完成后将所有Task中的Copy合并到驱动程序中的变量中 非累加器在所有Task中的都会是独立Copy不会有合并 val accLong: LongAccumulator sc.longAccumulator(longAcc) // 定义累加器
val accDouble: DoubleAccumulator sc.doubleAccumulator(doubleAcc)
rdd.mapPartitions(itPar{...accLong.add(v:Long) // 将值添加到累加器中accDouble.add(v:Double)...
})
accXxx.reset() // 重置累加器
val isZero:Boolean accXxx.isZero // 检查累加器是否为零值
val num:Long|Double accXxx.value|sum|count|avg // 获取累加器的值、总和、计数或平均值// 定义一个累加器用于统计 bad 记录的数量
val errorCount sc.longAccumulator(Error Count)
val data sc.parallelize(Array(good, bad, good, bad, good))
data.foreach(record if (record bad) errorCount.add(1))
// 打印累加器的值即 bad 记录的总数println(sTotal errors: ${errorCount.value})自定义累加器 写一个类继承 import org.apache.spark.util.AccumulatorV2[IN, OUT] abstract class AccumulatorV2[IN, OUT] extends Serializable {// 返回是否为零值累加器def isZero: Boolean// 创建此累加器的新副本其为零值def copyAndReset(): AccumulatorV2[IN, OUT] {...}// 创建此累加器的新副本def copy(): AccumulatorV2[IN, OUT]// 重置此累加器为零值def reset(): Unit// 添加接收输入并累加def add(v: IN): Unit// 合并合并另一个相同类型的累加器并更新其状态def merge(other: AccumulatorV2[IN, OUT]): Unit// 当前累加器的值def value: OUT
}自定义计量器优化Custom Metrics自定义计量器允许用户定义和收集特定的性能指标提供更细粒度的作业监控和调优能力。通过 SparkListener 接口可以实现自定义的监听器来监控和记录所需的指标。
五、提交模式与运行模式优化 提交模式Spark支持Client模式和Cluster模式两种提交方式。Client模式便于查看日志和结果但可能消耗较多资源Cluster模式则更适合大规模作业但查看日志和结果可能不太方便。应根据实际情况选择合适的提交模式。 spark-submit --class MainClass --master MasterURL --deploy-mode DeployMode PathToJarMainClass包含 main 方法的主类的名称。 MasterURL指定集群的 Master URL。 DeployMode指定提交模式可以是 client 或 cluster。 PathToJar包含 Spark 应用程序的 JAR 文件的路径。 spark-submit --class SparkClientModeApp --master yarn --deploy-mode client /path/to/your/jarfile.jar
spark-submit --class SparkClientModeApp --master yarn --deploy-mode cluster /path/to/your/jarfile.jar运行模式Spark支持多种运行模式如Local模式、Standalone模式、YARN模式等。不同的运行模式适用于不同的场景和需求。例如Local模式适用于本地开发和测试Standalone模式适用于构建独立的Spark集群YARN模式则适用于与Hadoop生态系统集成。 local: 在单核上运行 local[N]: 在指定数量的 N 个核上运行如 “local[4]” local[*]: 使用所有可用的核 spark://HOST:PORT: 连接到指定的 Spark standalone cluster yarn: 连接到 YARN 集群 mesos://HOST:PORT: 连接到 Mesos 集群
六、其他优化
序列化框架选择除了Kryo序列化外还可以考虑使用其他高效的序列化框架来优化Spark作业的性能。监控与调优使用Spark提供的监控工具和API如Spark UI、getStorageLevel方法等来监控作业的运行状态和性能瓶颈并根据监控结果进行调优。