怎么看一个网站用什么平台做的,什么是网络设计编辑,宁波seo在线优化,西安网站制作资源一、Task的执行流程
1.1、Task执行流程
DAGScheduler将Stage生成TaskSet之后#xff0c;会将Task交给TaskScheduler进行处理#xff0c;TaskScheduler负责将Task提交到集群中运行#xff0c;并负责失败重试#xff0c;为DAGScheduler返回事件信息等#xff0c;整体如流程…一、Task的执行流程
1.1、Task执行流程
DAGScheduler将Stage生成TaskSet之后会将Task交给TaskScheduler进行处理TaskScheduler负责将Task提交到集群中运行并负责失败重试为DAGScheduler返回事件信息等整体如流程如下
当任务提交到TaskScheduler时TaskScheduler会通知SchedulerBackend分配计算资源SchedulerBackend将所有可用的Executor的资源信息转换成WorkerOffer交给TaskSchedulerWorkerOffer中包含executorId、Executor的hostname、Executor的可用CPU等。TaskScheduler负责根据WorkerOffer在相应的Executor分配TaskSet中的Task并将Task转换为TaskDescription交给SchedulerBackend。最终有空闲的的CPU的Executor会被分配到一个或者多个TaskDescriptionSchedulerBackend将这些TaskDescription提交到对应的Executor中执行。
1.2、集群资源管理
Task运行离不开集群中的计算资源即在SparkContext初始化过程中创建的Executor资源。在Executor创建完毕后回向SchedulerBackend中注册。Executor在注册时发送的信息包含的内容有executorIdExecutor-Ref引用、Executor的hostname、可用的CPU核数。
SchedulerBackend收到后会将Executor的注册信息转换为ExecutorData进行保存并且在SchedulerBackend中使用Map结构保存每个executorId和ExecutorData的关系ExecutorData中还记录了剩余的可用的CPU核数
在为计算任务分配资源时只需遍历所有的ExecutorData分配可用的资源即可。由ExecutorData分配的可用资源使用WorkerOffer表示WorkerOffer中包含executorId、Executor的hostname、Executor的可用CPU等。
1.3、任务的分配
TaskScheduler在接受到DAGScheduler提交的TaskSet以后会为每个TaskSet创建一个TaskSetManager用于管理TaskSet中所有任务的运行。TaskSetManager会根据Task中的最佳运行位置计算TaskSet的所有本地运行级别本地运行级别决定Task最终在哪个Executor上运行。Spark中本地运行级别从小到达可分为
进程本地化节点本地化无优先位置机架本地化任意节点 在TaskSetManager初始化时根据着5个本地运行级别分别创建5个Map分别记录其下可以运行的所有Task。这些映射关系的建立时根据生成Task时Task运行的最佳位置确定的。。在这5种映射关系中某个Task可能会重复存在于几个本地化级别中。
当有新的TaskSet加入、由Task执行完成、由新的Executor加入时都会触发SchedulerBackend重新计算可用资源。TaskScheduler根据调度的顺序依次调度TaskSetManager中的TaskSet对于每个TaskSet遍历所有本地化级别从小到大尝试在Executor分配Task根据每个WorkerOffer的executorId和hostname使用TaskSetManager判断在当前本地化级别中是否可以在该Executor或Host上分配任务直到该本地化级别无法分配Task再将本地化级别提高一级再次尝分配Task。经过对本地化级别的便利即可实现WorkerOffer分配任务或将所有待执行的任务分配完成。TaskSet中部分任务分配完成以后会生成一组TaskDescription每个TaskDescription中包含executorId和Task的其他运行信息。SchedulerBackend根据TaskDescription的executorId将每个任务封装成LaunchTask消息提交到不同的Executor中
二、Task的执行
Executor收到SchedulerBackend提交的LaunchTask消息后即可运行该消息中包含的Task。Executor将接收到的Task封装到TaskRunner中TaskRunner是一个Runnable接口从而可以将该任务提交到线程池中运行。
2.1、Executor可以并行运行Task的数量
在创建Executor时每个Executor可能会分配多个CPU核数而Executor运行的所有任务都是在线程池中运行。Executor运行的时候其本身没有记录CPU使用的情况对于Executor能够同时运行多少个任务是由SchedulerBackend控制的SchedulerBackend每在一个Executor中提交一个任务时便在ExecutorData中减少该Executor可用的CPU核数直到该Executor生成的WorkerOffer可用的CPU核数为0便不再为Executor分配任务了。默认每个Task使用一个CPU核心运行该变量可以通过Spark的配置spark.task.CPUs修改
2.2、Executor中资源共享
当在一个Executor上运行多个Task时多个Task共享Executor中的SparkEnv的所有组件共用Executor中分配的内存。如使用Spark广播变量时每个Executor中会存在一份Executor所有任务共享这一份变量。当Executor中的BlockManager缓存了某个rdd某分区的数据时在该Executor上调度使用这个RDD的这个分区的数据的Task执行可以有效的减少网络加载数据的过程减少网络传输
2.3、ResultTask运行
在执行ResultTask时首先会反序列化出该Task执行计算的RDD和对该RDD执行的操作。根据是否涉及Shuffle操作分为两种
用户编写的RDDtransformation中不涉及Shuffle操作一个Job就只涉及一个ResultStagerdd1直接从数据源中加载 2. 过程中涉及Shuffle操作划分为两个Stagerdd1位Shuffle的Reduce阶段。由于DAGScheduler在划分Stage必先会先计算父Stage所以执行到ResultStage时其父Stage的Map阶段已经完成并且计算结果已经保存到了BlockManager中ResultStage中的rdd1之需要根据MapOutputTrackerMaster的计算结果位置信息加载该分区的数据即可 2.4、ShuffleMapTask运行
在计算ShuffleMapTask时首先会反序列化出Task包含的计算的RDD和划分此Stage的ShulffleDependency。ShulffleDependency包含RDD需要执行分组操作的分区器partitioner,并且通过ShulffleDependency可以获取ShulffleManager的写入器将本分区的分组计算结果通过写入器写入文件中进行保存。在这个过程中一个分区的数据生成的多个分组的数据分别属于下游Reduce阶段的不同的分区的数据
ShuffleMapTask中计算的RDD同样为这个Stage中最后的一个RDD。 下图是多个ShuffleMapStage的RDD转换过程 三、Task结果处理
当Executor中Task运行完成时需要将Task运行结果返回Driver程序Driver程序根据结果判断该Stage是否计算完成
3.1、ResultTask结果
ResultTask完成后会将其结果返回直Driver端。根据运行结果的大小返回的结果 被分为直接运行结果和非直接间接运行结果。 当运行结果大于Spark配置的最大直接结果大小的参数时 会将运行结果保存至当前Executor的BlockManager中并将保存的地址序列化后返回否则直接将运行结果序列化后返回
3.2、ShuffleMapTask结果
ShuffleMapTask运行完成后会将运行结果直接保存至当前Executor的BlockManager中并将保存结果的位置封装到MapStatus中最终ShuffleMapTask运行完成结果都为MapStatus类型
3.3、返回至Driver端
Executor将Task的运行结果序列化后通过Driver的Endpoint-Ref发送至Driver端Driver的Endpoint收到运行结果后通知TaskScheduler Task运行完成