当前位置: 首页 > news >正文

行业垂直网站开发漳州最便宜的网站建设价格

行业垂直网站开发,漳州最便宜的网站建设价格,做网站是干嘛,湖南星大建设集团有限公司网站背景 最近在做Spark 3.1 升级 Spark 3.5的过程中#xff0c;遇到了一批SQL在运行的过程中 Driver OOM的情况#xff0c;排查到是AQE开启导致的问题#xff0c;再次分析记录一下#xff0c;顺便了解一下Spark中指标的事件处理情况 结论 SQLAppStatusListener 类在内存中存…背景 最近在做Spark 3.1 升级 Spark 3.5的过程中遇到了一批SQL在运行的过程中 Driver OOM的情况排查到是AQE开启导致的问题再次分析记录一下顺便了解一下Spark中指标的事件处理情况 结论 SQLAppStatusListener 类在内存中存放着 一个整个SQL查询链的所有stage以及stage的指标信息在AQE中 一个job会被拆分成很多job甚至几百上千的job这个时候 stageMetrics的数据就会成百上倍的被存储在内存中从而导致Driver OOM。 解决方法 关闭AQE spark.sql.adaptive.enabled false合并对应的PR-SPARK-45439 分析 背景知识对于一个完整链接的sql语句来说(比如说从 读取数据源到 数据处理操作再到插入hive表),这可以称其为一个最小的SQL执行单元这最小的数据执行单元在Spark内部是可以跟踪的,也就是用executionId来进行跟踪的。 对于一个sql举例来说 : insert into TableA select * from TableB;在生成 物理计划的过程中会调用 QueryExecution.assertOptimized 方法该方法会触发eagerlyExecuteCommands调用最终会到SQLExecution.withNewExecutionId方法 def assertOptimized(): Unit optimizedPlan...lazy val commandExecuted: LogicalPlan mode match {case CommandExecutionMode.NON_ROOT analyzed.mapChildren(eagerlyExecuteCommands)case CommandExecutionMode.ALL eagerlyExecuteCommands(analyzed)case CommandExecutionMode.SKIP analyzed}...lazy val optimizedPlan: LogicalPlan {// We need to materialize the commandExecuted here because optimizedPlan is also tracked under// the optimizing phaseassertCommandExecuted()executePhase(QueryPlanningTracker.OPTIMIZATION) {// clone the plan to avoid sharing the plan instance between different stages like analyzing,// optimizing and planning.val plan sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)// We do not want optimized plans to be re-analyzed as literals that have been constant// folded and such can cause issues during analysis. While clone should maintain the// analyzed state of the LogicalPlan, we set the plan as analyzed here as well out of// paranoia.plan.setAnalyzed()plan}def assertCommandExecuted(): Unit commandExecuted...private def eagerlyExecuteCommands(p: LogicalPlan) p transformDown {case c: Command // Since Command execution will eagerly take place here,// and in most cases be the bulk of time and effort,// with the rest of processing of the root plan being just outputting command results,// for eagerly executed commands we mark this place as beginning of execution.tracker.setReadyForExecution()val qe sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)val name commandExecutionName(c)val result QueryExecution.withInternalError(sEagerly executed $name failed.) {SQLExecution.withNewExecutionId(qe, Some(name)) {qe.executedPlan.executeCollect()}} 而SQLExecution.withNewExecutionId主要的作用是设置当前计划的所属的executionId: val executionId SQLExecution.nextExecutionIdsc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)该EXECUTION_ID_KEY的值会在JobStart的时候传递给Event以便记录跟踪整个执行过程中的指标信息。 同时我们在方法中eagerlyExecuteCommands看到qe.executedPlan.executeCollect()这是具体的执行方法针对于insert into 操作来说物理计划就是 InsertIntoHadoopFsRelationCommand这里的run方法最终会流转到DAGScheduler.submitJob方法 eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,JobArtifactSet.getActiveOrDefault(sc),Utils.cloneProperties(properties)))最终会被DAGScheduler.handleJobSubmitted处理其中会发送SparkListenerJobStart事件 listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,Utils.cloneProperties(properties)))该事件会被SQLAppStatusListener捕获从而转到onJobStart处理这里有会涉及到指标信息的存储这里我们截图出dump的内存占用情况 可以看到 SQLAppStatusListener 的 LiveStageMetrics 占用很大也就是 accumIdsToMetricType占用很大 那在AQE中是怎么回事呢 我们知道再AQE中任务会从source节点按照shuffle进行分割从而形成单独的job从而生成对应的shuffle指标具体的分割以及执行代码在AdaptiveSparkPlanExec.getFinalPhysicalPlan中如下: var result createQueryStages(currentPhysicalPlan)val events new LinkedBlockingQueue[StageMaterializationEvent]()val errors new mutable.ArrayBuffer[Throwable]()var stagesToReplace Seq.empty[QueryStageExec]while (!result.allChildStagesMaterialized) {currentPhysicalPlan result.newPlanif (result.newStages.nonEmpty) {stagesToReplace result.newStages stagesToReplaceexecutionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))// SPARK-33933: we should submit tasks of broadcast stages first, to avoid waiting// for tasks to be scheduled and leading to broadcast timeout.// This partial fix only guarantees the start of materialization for BroadcastQueryStage// is prior to others, but because the submission of collect job for broadcasting is// running in another thread, the issue is not completely resolved.val reorderedNewStages result.newStages.sortWith {case (_: BroadcastQueryStageExec, _: BroadcastQueryStageExec) falsecase (_: BroadcastQueryStageExec, _) truecase _ false}// Start materialization of all new stages and fail fast if any stages failed eagerlyreorderedNewStages.foreach { stage try {stage.materialize().onComplete { res if (res.isSuccess) {events.offer(StageSuccess(stage, res.get))} else {events.offer(StageFailure(stage, res.failed.get))}// explicitly clean up the resources in this stagestage.cleanupResources()}(AdaptiveSparkPlanExec.executionContext)这里就是得看stage.materialize()这个方法这两个stage只有两类BroadcastQueryStageExec 和 ShuffleQueryStageExec 这两个物理计划稍微分析一下如下: BroadcastQueryStageExec 数据流如下broadcast.submitBroadcastJob||\/ promise.future||\/ relationFuture||\/ child.executeCollectIterator() 其中 promise的设置在relationFuture方法中而relationFuture 会被doPrepare调用,而submitBroadcastJob会调用executeQuery,从而调用doPrepare,executeCollectIterator()最终也会发送JobSubmitted事件分析和上面的一样ShuffleQueryStageExec shuffle.submitShuffleJob||\/sparkContext.submitMapStage(shuffleDependency)||\/dagScheduler.submitMapStage 该submitMapStage会发送MapStageSubmitted事件: eventProcessLoop.post(MapStageSubmitted(jobId, dependency, callSite, waiter, JobArtifactSet.getActiveOrDefault(sc),Utils.cloneProperties(properties)))最终会被DAGScheduler.handleMapStageSubmitted处理其中会发送SparkListenerJobStart事件 listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,Utils.cloneProperties(properties)))该事件会被SQLAppStatusListener捕获从而转到onJobStart处理: private val liveExecutions new ConcurrentHashMap[Long, LiveExecutionData]()private val stageMetrics new ConcurrentHashMap[Int, LiveStageMetrics]()...override def onJobStart(event: SparkListenerJobStart): Unit {val executionIdString event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)if (executionIdString null) {// This is not a job created by SQLreturn}val executionId executionIdString.toLongval jobId event.jobIdval exec Option(liveExecutions.get(executionId))该方法会获取事件中的executionId,在AQE中同一个执行单元的executionId是一样的所以stageMetrics内存占用会越来越大。 而这里指标的更新是在AdaptiveSparkPlanExec.onUpdatePlan等方法中。 这样整个事件的数据流以及问题的产生原因就应该很清楚了。 其他 为啥AQE以后多个Job还是共享一个executionId呢因为原则上来说如果没有开启AQE之前一个SQL执行单元的是属于同一个Job的开启了AQE之后因为AQE的原因一个Job被拆成了了多个Job但是从逻辑上来说还是属于同一个SQL处理单元的所以还是得归属到一次执行中。
http://www.pierceye.com/news/755681/

相关文章:

  • 网站后台凡科建设有做网站维护的
  • 搭建网站需要什么软件上海在线
  • led灯外贸网站建设网站代码怎么优化
  • 网站建设维护什么意思江苏网络推广专员
  • 潍坊网站开发asp培训珠海市网站建设公司
  • 用什么做响应式网站建行个人余额查询网站
  • 做网站网站代理怎么找客源企业团建公司
  • 电子商务网站开发实战济南兼职做网站
  • 怎样创建网站视频学历提升的重要性
  • 百度搜索引擎录入网站1_ 掌握网站开发的基本流程 要求:熟悉网站开发与设计的基本流程.
  • 广州做网站建设如何在别人网站挂黑链
  • 宁德北京网站建设任丘建设银行网站
  • 积极加强网站建设连锁会员管理系统
  • 河南做外贸网站的公司简介wordpress做教育网站
  • 兴城做网站推广的企业网站后台管理软件
  • 自定义优定软件网站建设申请永久网站空间
  • 免费发布信息的网站平台怎么做网站统计
  • 制作网站的过程是对信息的龙海市住房和城乡建设局网站
  • 鱼台县建设局网站免费ppt模板制作软件
  • 质量好网站建设多少钱黄冈网站建设哪家快些
  • 使用阿里云部署wordpressseo搜索排名影响因素主要有
  • 大连制作网站建站教程图解
  • 百度的合作网站有哪些网站建设费用写创意
  • 建设个人网站ip护肤品网页设计图片
  • 德州网站建设优化金阳龙泉苑网站建设
  • 建站公司最新价格网站素材网
  • 高品质的网站开发公优酷网站谁做的
  • 广西兴业县建设局网站湖北天健建设集团有限公司网站
  • 学多久可以做网站 知乎中国100强企业名单公布
  • 江阴网站优化公司开源的 二次网站开发