呼市城乡建设厅网站,网站开发从哪里学起,诸城做网站的,网络推广建设期的网站1、任务调度与资源调度
任务调度#xff1a;是指通过DAGScheduler#xff0c;TaskScheduler#xff0c;SchedulerBackend等进行的作业调度。
资源调度#xff1a;是指应用程序获取资源。
任务调度是在资源调度的基础上#xff0c;没有资源调度#xff0c;那么任务调度…1、任务调度与资源调度
任务调度是指通过DAGSchedulerTaskSchedulerSchedulerBackend等进行的作业调度。
资源调度是指应用程序获取资源。
任务调度是在资源调度的基础上没有资源调度那么任务调度就没有任何意义了。
2、分配Driver只对cluster模式有效
Spark的Driver的运行有2种模式一种是Client模式Driver程序运行在客户端适用于交互、调试希望立即看到app的输出一种是cluster模式真正的Driver就会在worker中的一台机器上在哪台有Master决定。
schedule的调用时机每次有新的应用程序提交或者集群资源发生改变的时候包括Executor的增加或减少Worker的增加或减少。
资源调度肯定是在master里面因为master负责资源调度每次资源变动或者注册程序或新任务提交等一大堆的东西都会产生schedule()的调用如RegisterApplication里面最后一行就是schedule() 进入schedule方法里面为我们当前等待的应用程序调度当前可用的资源每当一个新的应用程度进来的时候这个方法都会被调用。或者说资源的可用状况改变例如说那个executor挂掉了或者worker挂掉了或者新增加了机器 1首先是判断master的状态如果不是ALIVE的状态这个资源调度就无从谈起直接返回也就是说Standby 的master不会进行application资源的调用。所以master必须是ALIVE的方式采用进行资源的调度。
2Random.shuffle()是将worker打乱打乱有利于负载均衡workers是HashSet的数据结构里面是WorkInfoWorkInfo里面的内容是注册的时候把这些信息注册进来的。shuffle方法中打乱之前要判断一下要工作就必须要让worker的state是ALIVE的所以就判断所有Worker中哪些是ALIVE级别的ALIVE才能参与资源的分配工作 3在shuffle方法中他的随机打乱是首先构建一个ArrayBuffer把所有的worker都放进去这个函数内部有定义了swap函数这个函数是将两个索引上的位置进行交换。然后从ArrayBuffer中最后一个元素开始一直到2就是第3个索引每次都减1.nextInt(n)是取出0到n-1里面的一个整数。然后将取出的索引与n-1索引位置的替换这样的话顺序就特别的乱。
4然后将这个shuffledAlivedWorkers的长度赋值到numWorkerAlive并定义变量curPos 5遍历waitingDrivers队列也就是等待被调度的Driver队列使其的launched值为false定义numWorkerVisited的值为0。waitingDrivers就是一个ArrayBuffer里面存放DriverInfoDriverInfo里面的DriverDescription里面有supervise是因为如果是cluster模式submit的时候有指定supervise在driver挂掉的时候会自动重启。
注意循环遍历等待启动的drivercluster模式才有driver等待启动如果是client模式是不需要等待启动driver的因为你提交driver就启动了。 6进行循环判断条件为状态为Alive的worker数量大于0并且是没有被调度的launchedfasle那么将这些worker进行shuffle被调用的worker的数量numWorkerVisited的值加1进一步判断如果worker的剩余内存大于等于driver的进行调度所需的内存driver.desc.mem并且worker的剩余CPU数量大于等于driver调度所需的CPU数量driver.desc.coresDriverInfo里面的DriverDescription中有当前Driver启动是需要的内存和cpu等要求的内容。调用launchDriver方法将这个Driver从driver的等待队列移除并设置launched的值为true。curPos (curPos 1) % numWorkersAlive是将指针指向下一个worker。 7符合要求之后launchDriverlaunch到循环时候的一个worker中去了而这个worker是Shuffle之后的for循环随机产生的一个worker因为每次都调用Shuffle所以顺序不一样所以我们的driver放到这个worker上这就保证负载均衡 launchDriver中首先先打印一个log然后就是worker.addDrievr(driver)这个worker是当前master中对这个集群元数据的一个描述需要保存元数据现在还是在master上。然后driver.worker Some(worker)driver说明自己在哪个worker上相互引用。然后就是关键点了worker.endpoint.send(LaunchDriver(driver.id, driver.desc))这个就是发远程消息给worker消息通信远程rpc。所以master发指令给worker让远程的worker具体启动executor。启动之后driver的state就变成running了。
注意所有schedule的时候首先就是进行所有driver级别的launch这说明一件事情要现有driver才有其他的。
3、为Application分配资源
1在schedule中继续最后一行是startExecutorsOnWorkers为Application分配资源并启动Executors。 调度和启动Executor在Worker上为我们具体的当前程序。默认使用FIFO的方式就是先满足第一个应用程序再满足第二个应用程序...。Spark默认为应用程序启动Executor的方式是FIFO的方式也就是所有提交的应用程序都是放在调度的等待队列中的先进先出只有满足了前面应用程序的资源分配的基础上才能满足下一个应用程序资源的分配。
我们现在是Master类中Master直接调这个方法但是具体在哪些Executor上启动executor还不知道。for循环waitingApps要求app.coresLeft0 equestedCores总共需要的核数(--total-executor-cores指定默认Int的最大值)
coresGranted已经分配的值
假设整个程序要求1000个core但是现在可用的只有100个不能立即满足所以可能在等待队列中我们要看一下coresLeft还需要的cores如果0就就还需要调度了如果不大于0也就是不需要core就不会为应用程序分配executor
进入到for循环中这个是应用程序在提交的时候会配置很多参数这里说每个executor需要的core有多少。过滤出worker是ALIVE的状态才能分配executor然后worker的内存要大于配置内存然后进行排序谁的core多就排在前面。排序后就产生可用的useableWorkers。
现在不应该考虑数据本地性因为不是资源分配的内容这个不是job调度还没到计算不考虑数据本地性。
2进入到scheduleExecutorsOnWorkers方法中 这个注释的意思具体调度在这个worker上启动executor
返回的是一个数组包含到每个worker上的赋值的具体的cores
有2种启动worker的方式第一个方式是spread out尝试把一个应用程序运行在尽可能多的worker上我们把executor运行在尽可能多的node上是更符合数据本地性的表现做基础建设的时候是这么考虑的因为数据有可能在所有的worker上这其实也有风险也可能有500台机器数据存在200台机器上但是这个是默认的情况默认层面上从资源调度的层面上考虑最大化的数据本地性调度层面上必须决定这个代码具体运行在哪几台机器上 。这个数据本地性只是顺便带来的因为这样更好的响应了并发处理能力不是考虑数据的本地性所以是潜在的数据本地性
第二种方式是把我们当前应用程序运行在尽可能少的worker上。在为每一个executor分配多少个core是可配置的可以在submit的时候设置。在一个worker下面可能会有多个executor前提是worker上有足够的cores和memory的时候。否则的话默认情况下分配一个executor把当前worker上所有的cores都拿走了。
一次在我们的executor上分配多个core是非常重要的我们的集群现在4个worker每个worker有16个cores。我们的用户要求3个executor配置的时候最大可以48个每个executor16个。
大致意思是说有两种分配模型第一种是将executors分配到尽可能多的workers上第二种与第一种相反。默认使用的是第一种模型这种模型更加符合数据的本地性原则为每个Executor分配的cores的个数是可以进行配置的spark-submit 或者 spark-env.sh如果设置了多个executors可能会被分配在一个worker上前提是该worker拥有足够的cores和memory否则每个executor会充分利用worker上的cores这种情况下一个executor会被分配在一个worker上。具体在集群上分配cores的时候会尽可能的满足我们的要求如果需要的cores的个数大于workers中空闲的cores的个数那么就先分配空闲的cores尽可能的去满足要求。
spreadOutApps是让我们的应用程序尽可能多的运行在所有的node上
下面具体进入到scheduleExecutorsOnWorkers代码中 3回到startExecutorsOnWorkers方法中现在还没有真正的发生调度在这里获得元数据信息之后前面决定了在哪些机器上分配多少个Executor每个Executor上分配多少个cores下面就开始循环找出可用的workers 这里就具体直接分配了就是先决定后分配 分配肯定是远程通信循环遍历要分配个数的executor把要分配的executor元数据信息交给addExecutor没后发送给worker
4、总结
1Master在接收搭配RegisterApplication之后创建一个applicationInfo对象 然后registerApplication方法中将application加入到队列持久化application 发送RegisteredApplication消息到ClientEndPoint,ClientPoint接收到消息后设置registeredtrue 最后schedule()进行资源分配
2在schedule()中先打乱可以使用的workers主要看在这个worker上内存和cpu的cores满不满足启动一个driver满足的话则在这个worker上启动一个driver直到找到一个合适的worker然后launchDriver启动Driver然后调用startExecutorsOnWorkers
3在startExecutorsOnWorkers 中首先筛选出可以使用的workers主要看内存和cpu 然后调用shceduleExecutorsOnWorkers得到要分配的信息 shceduleExecutorsOnWorkers方法中主要是①获取当前的app还有多少cpuCore没有被分配 ②筛选出可以用来启动executor的workers saber在筛选出来的worker上面进行executor的分配的信息的记载在assignedCores中 并返回
4根据上面得到的要分配的信息调用allocateWorkerResourceToExecutors()在每一个worker上面分配资源启动executor 。在allocateWorkerResourceToExecutors方法中主要是①计算在当前的worker上启动几个executor ②计算在当前的worker上一个executor会分配几个cpuCores ③调用launchExecutor进行启动executor