当前位置: 首页 > news >正文

门头沟做网站公司网站页面设计说明书

门头沟做网站公司,网站页面设计说明书,wordpress 非法阻断,网站建设及维护业务服务合同Scala第二十章节 scala总目录 文档资料下载 章节目标 理解Akka并发编程框架简介掌握Akka入门案例掌握Akka定时任务代码实现掌握两个进程间通信的案例掌握简易版spark通信框架案例 1. Akka并发编程框架简介 1.1 Akka概述 Akka是一个用于构建高并发、分布式和可扩展的基于事…Scala第二十章节 scala总目录 文档资料下载 章节目标 理解Akka并发编程框架简介掌握Akka入门案例掌握Akka定时任务代码实现掌握两个进程间通信的案例掌握简易版spark通信框架案例 1. Akka并发编程框架简介 1.1 Akka概述 Akka是一个用于构建高并发、分布式和可扩展的基于事件驱动的应用工具包。Akka是使用scala开发的库同时可以使用scala和Java语言来开发基于Akka的应用程序。 1.2 Akka特性 提供基于异步非阻塞、高性能的事件驱动编程模型内置容错机制允许Actor在出错时进行恢复或者重置操作超级轻量级的事件处理每GB堆内存几百万Actor使用Akka可以在单机上构建高并发程序也可以在网络中构建分布式程序。 1.3 Akka通信过程 以下图片说明了Akka Actor的并发编程模型的基本流程 学生创建一个ActorSystem通过ActorSystem来创建一个ActorRef老师的引用并将消息发送给ActorRefActorRef将消息发送给Message Dispatcher消息分发器Message Dispatcher将消息按照顺序保存到目标Actor的MailBox中Message Dispatcher将MailBox放到一个线程中MailBox按照顺序取出消息最终将它递给TeacherActor接受的方法中 2. 创建Actor Akka中也是基于Actor来进行编程的。类似于之前学习过的Actor。但是Akka的Actor的编写、创建方法和之前有一些不一样。 2.1 API介绍 ActorSystem: 它负责创建和监督Actor 在Akka中ActorSystem是一个重量级的结构它需要分配多个线程.在实际应用中, ActorSystem通常是一个单例对象, 可以使用它创建很多Actor.直接使用context.system就可以获取到管理该Actor的ActorSystem的引用. 实现Actor类 定义类或者单例对象继承Actor注意要导入akka.actor包下的Actor实现receive方法receive方法中直接处理消息即可不需要添加loop和react方法调用. Akka会自动调用receive来接收消息.【可选】还可以实现preStart()方法, 该方法在Actor对象构建后执行在Actor生命周期中仅执行一次. 加载Actor 要创建Akka的Actor必须要先获取创建一个ActorSystem。需要给ActorSystem指定一个名称并可以去加载一些配置项后面会使用到调用ActorSystem.actorOf(Props(Actor对象), “Actor名字”)来加载Actor. 2.2 Actor Path 每一个Actor都有一个Path这个路径可以被外部引用。路径的格式如下 Actor类型路径示例本地Actorakka://actorSystem名称/user/Actor名称akka://SimpleAkkaDemo/user/senderActor远程Actorakka.tcp://my-sysip地址:port/user/Actor名称akka.tcp://192.168.10.17:5678/user/service-b 2.3 入门案例 2.3.1 需求 基于Akka创建两个ActorActor之间可以互相发送消息。 2.3.2 实现步骤 创建Maven模块创建并加载Actor发送/接收消息 2.3.3 创建Maven模块 使用Akka需要导入Akka库这里我们使用Maven来管理项目, 具体步骤如下: 创建Maven模块. 选中项目, 右键 - new - Module - Maven - Next - GroupId: com.itheimaArtifactId: akka-demo next - 设置module name值为akka-demo - finish打开pom.xml文件导入akka Maven依赖和插件. //1. 直接把资料的pom.xml文件中的内容贴过来就行了. //2. 源码目录在: src/main/scala下 //3. 测试代码目录在: src/test/scala下. //4. 上述的这两个文件夹默认是不存在的, 需要我们手动创建. //5. 创建出来后, 记得要修改两个文件夹的类型.选中文件夹, 右键 - Mark Directory as - Source Roots //存放源代码.Test Source Roots //存放测试代码. 2.3.4 创建并加载Actor 到这, 我们已经把Maven项目创建起来了, 后续我们都会采用Maven来管理我们的项目. 接下来, 我们来实现: 创建并加载Actor, 这里, 我们要创建两个Actor: SenderActor用来发送消息ReceiverActor用来接收回复消息 具体步骤 在src/main/scala文件夹下创建包: com.itheima.akka.demo 在该包下创建两个Actor(注意: 用object修饰的单例对象). SenderActor: 表示发送消息的Actor对象. ReceiverActor: 表示接收消息的Actor对象. 在该包下创建单例对象Entrance, 并封装main方法, 表示整个程序的入口. 把程序启动起来, 如果不报错, 说明代码是没有问题的. 参考代码 object SenderActor extends Actor {/*细节: 在Actor并发编程模型中, 需要实现act方法, 想要持续接收消息, 可通过loop react实现.在Akka编程模型中, 需要实现receive方法, 直接在receive方法中编写偏函数处理消息即可.*///重写receive()方法override def receive: Receive {case x println(x)} } object ReceiverActor extends Actor{//重写receive()方法override def receive: Receive {case x println(x)} }object Entrance { def main(args:Array[String]) {//1. 实现一个Actor Trait, 其实就是创建两个Actor对象(上述步骤已经实现).//2. 创建ActorSystem//两个参数的意思分别是:ActorSystem的名字, 加载配置文件(此处先不设置)val actorSystem ActorSystem(actorSystem,ConfigFactory.load())//3. 加载Actor//actorOf方法的两个参数意思是: 1. 具体的Actor对象. 2.该Actor对象的名字val senderActor actorSystem.actorOf(Props(SenderActor), senderActor)val receiverActor actorSystem.actorOf(Props(ReceiverActor), receiverActor)} }2.3.5 发送/接收消息 思路分析 使用样例类封装消息 SubmitTaskMessage——提交任务消息SuccessSubmitTaskMessage——任务提交成功消息 使用!发送异步无返回消息. 参考代码 MessagePackage.scala文件中的代码 /*** 记录发送消息的 样例类.* param msg 具体的要发送的信息.*/ case class SubmitTaskMessage(msg:String)/*** 记录 回执信息的 样例类.* param msg 具体的回执信息.*/ case class SuccessSubmitTaskMessage(msg:String)Entrance.scala文件中的代码 //程序主入口. object Entrance {def main(args: Array[String]): Unit {//1. 创建ActorSystem, 用来管理所有用户自定义的Actor.val actorSystem ActorSystem(actorSystem, ConfigFactory.load())//2. 通过ActorSystem, 来管理我们自定义的Actor(SenderActor, ReceiverActor)val senderActor actorSystem.actorOf(Props(SenderActor), senderActor)val receiverActor actorSystem.actorOf(Props(ReceiverActor), receiverActor) //3. 由ActorSystem给 SenderActor发送一句话start.senderActor ! start} }SenderActor.scala文件中的代码 object SenderActor extends Actor{override def receive: Receive {//1. 接收Entrance发送过来的: startcase start {//2. 打印接收到的数据.println(SenderActor接收到: Entrance发送过来的 start 信息.)//3. 获取ReceiverActor的具体路径.//参数: 要获取的Actor的具体路径.//格式: akka://actorSystem的名字/user/要获取的Actor的名字.val receiverActor context.actorSelection(akka://actorSystem/user/receiverActor)//4. 给ReceiverActor发送消息: 采用样例类SubmitTaskMessagereceiverActor ! SubmitTaskMessage(我是SenderActor, 我在给你发消息!...)}//5. 接收ReceiverActor发送过来的回执信息.case SuccessSubmitTaskMessage(msg) println(sSenderActor接收到回执信息: ${msg} )} }ReceiverActor.scala文件中的代码 object ReceiverActor extends Actor {override def receive: Receive {//1. 接收SenderActor发送过来的消息.case SubmitTaskMessage(msg) {//2. 打印接收到的消息.println(sReceiverActor接收到: ${msg})//3. 给出回执信息.sender ! SuccessSubmitTaskMessage(接收任务成功!. 我是ReceiverActor)}} }输出结果 SenderActor接收到: Entrance发送过来的 start 信息. ReceiverActor接收到: 我是SenderActor, 我在给你发消息!... SenderActor接收到回执信息: 接收任务成功!. 我是ReceiverActor3. Akka定时任务 需求: 如果我们想要使用Akka框架定时的执行一些任务该如何处理呢 答: 在Akka中提供了一个scheduler对象来实现定时调度功能。使用ActorSystem.scheduler.schedule()方法就可以启动一个定时任务。 3.1 schedule()方法的格式 方式一: 采用发送消息的形式实现. def schedule(initialDelay: FiniteDuration, // 延迟多久后启动定时任务interval: FiniteDuration, // 每隔多久执行一次receiver: ActorRef, // 给哪个Actor发送消息message: Any) // 要发送的消息 (implicit executor: ExecutionContext) // 隐式参数需要手动导入方式二: 采用自定义方式实现. def schedule(initialDelay: FiniteDuration, // 延迟多久后启动定时任务interval: FiniteDuration // 每隔多久执行一次 )(f: ⇒ Unit) // 定期要执行的函数可以将逻辑写在这里 (implicit executor: ExecutionContext) // 隐式参数需要手动导入注意: 不管使用上述的哪种方式实现定时器, 都需要导入隐式转换和隐式参数, 具体如下: //导入隐式转换, 用来支持 定时器. import actorSystem.dispatcher //导入隐式参数, 用来给定时器设置默认参数. import scala.concurrent.duration._3.2 案例 需求 定义一个ReceiverActor, 用来循环接收消息, 并打印接收到的内容.创建一个ActorSystem, 用来管理所有用户自定义的Actor.关联ActorSystem和ReceiverActor.导入隐式转换和隐式参数.通过定时器, 定时(间隔1秒)给ReceiverActor发送一句话. 方式一: 采用发送消息的形式实现.方式二: 采用自定义方式实现. 参考代码 //案例: 演示Akka中的定时器. object MainActor {//1. 定义一个Actor, 用来循环接收消息, 并打印.object ReceiverActor extends Actor {override def receive: Receive {case x println(x) //不管接收到的是什么, 都打印.}}def main(args: Array[String]): Unit {//2. 创建一个ActorSystem, 用来管理所有用户自定义的Actor.val actorSystem ActorSystem(actorSystem, ConfigFactory.load())//3. 关联ActorSystem和ReceiverActor.val receiverActor actorSystem.actorOf(Props(ReceiverActor), receiverActor)//4. 导入隐式转换和隐式参数.//导入隐式转换, 用来支持 定时器.import actorSystem.dispatcher//导入隐式参数, 用来给定时器设置默认参数.import scala.concurrent.duration._//5. 通过定时器, 定时(间隔1秒)给ReceiverActor发送一句话.//方式一: 通过定时器的第一种方式实现, 传入四个参数.//actorSystem.scheduler.schedule(3.seconds, 2.seconds, receiverActor, 你好, 我是种哥, 我有种子你买吗?...)//方式二: 通过定时器的第二种方式实现, 传入两个时间, 和一个函数.//actorSystem.scheduler.schedule(0 seconds, 2 seconds)(receiverActor ! 新上的种子哟, 你没见过! 嘿嘿嘿...)//实际开发写法actorSystem.scheduler.schedule(0 seconds, 2 seconds){receiverActor ! 新上的种子哟, 你没见过! 嘿嘿嘿...}} }4. 实现两个进程之间的通信 4.1 案例介绍 基于Akka实现在两个进程间发送、接收消息。 WorkerActor启动后去连接MasterActor并发送消息给MasterActor.MasterActor接收到消息后再回复消息给WorkerActor。 4.2 Worker实现 步骤 创建一个Maven模块导入依赖和配置文件. 创建Maven模块. GroupId: com.itheima ArtifactID: akka-worker 把资料下的pom.xml文件中的内容复制到Maven项目akka-worker的pom.xml文件中 把资料下的application.conf复制到 src/main/resources文件夹下. 打开 application.conf配置文件, 修改端口号为: 9999 创建启动WorkerActor. 在src/main/scala文件夹下创建包: com.itheima.akka在该包下创建 WorkerActor(单例对象的形式创建).在该包下创建Entrance单例对象, 里边定义main方法 发送setup消息给WorkerActorWorkerActor接收打印消息. 启动测试. 参考代码 WorkerActor.scala文件中的代码 //1. 创建WorkActor, 用来接收和发送消息. object WorkerActor extends Actor{override def receive: Receive {//2. 接收消息.case x println(x)} }Entrance.scala文件中的代码 //程序入口. //当前ActorSystem对象的路径 akka.tcp://actorSystem127.0.0.1:9999 object Entrance {def main(args: Array[String]): Unit {//1. 创建ActorSystem.val actorSystem ActorSystem(actorSystem, ConfigFactory.load())//2. 通过ActorSystem, 加载自定义的WorkActor.val workerActor actorSystem.actorOf(Props(WorkerActor), workerActor)//3. 给WorkActor发送一句话.workerActor ! setup} } //启动测试: 右键, 执行, 如果打印结果出现setup, 说明程序执行没有问题.4.3 Master实现 步骤 创建一个Maven模块导入依赖和配置文件. 创建Maven模块. GroupId: com.itheima ArtifactID: akka-master 把资料下的pom.xml文件中的内容复制到Maven项目akka-master的pom.xml文件中 把资料下的application.conf复制到 src/main/resources文件夹下. 打开 application.conf配置文件, 修改端口号为: 8888 创建启动MasterActor. 在src/main/scala文件夹下创建包: com.itheima.akka在该包下创建 MasterActor(单例对象的形式创建).在该包下创建Entrance单例对象, 里边定义main方法 WorkerActor发送connect消息给MasterActor MasterActor回复success消息给WorkerActor WorkerActor接收并打印接收到的消息 启动Master、Worker测试 参考代码 MasterActor.scala文件中的代码 //MasterActor: 用来接收WorkerActor发送的数据, 并给其返回 回执信息. //负责管理MasterActor的ActorSystem的地址: akka.tcp://actorSystem127.0.0.1:8888 object MasterActor extends Actor{override def receive: Receive {//1. 接收WorkerActor发送的数据case connect {println(MasterActor接收到: connect!...)//2. 给WorkerActor回执一句话.sender ! success}} }Entrance.scala文件中的代码 //Master模块的主入口 object Entrance {def main(args: Array[String]): Unit {//1. 创建ActorSystem, 用来管理用户所有的自定义Actor.val actorSystem ActorSystem(actorSystem, ConfigFactory.load())//2. 关联ActorSystem和MasterActor.val masterActor actorSystem.actorOf(Props(MasterActor), masterActor)//3. 给masterActor发送一句话: 测试数据, 用来测试.//masterActor ! 测试数据} }WorkerActor.scala文件中的代码(就修改了第3步) //WorkerActor: 用来接收ActorSystem发送的消息, 并发送消息给MasterActor, 然后接收MasterActor的回执信息. //负责管理WorkerActor的ActorSystem的地址: akka.tcp://actorSystem127.0.0.1:9999 object WorkerActor extends Actor{override def receive: Receive {//1. 接收Entrance发送过来的: setup.case setup {println(WorkerActor接收到: Entrance发送过来的指令 setup!.)//2. 获取MasterActor的引用.val masterActor context.system.actorSelection(akka.tcp://actorSystem127.0.0.1:8888/user/masterActor)//3. 给MasterActor发送一句话.masterActor ! connect}//4. 接收MasterActor的回执信息.case success println(WorkerActor接收到: success!)} }5. 案例: 简易版spark通信框架 5.1 案例介绍 模拟Spark的Master与Worker通信. 一个Master 管理多个Worker 若干个WorkerWorker可以按需添加 向Master发送注册信息向Master定时发送心跳信息 5.2 实现思路 构建Master、Worker阶段 构建Master ActorSystem、Actor构建Worker ActorSystem、Actor Worker注册阶段 Worker进程向Master注册将自己的ID、CPU核数、内存大小(M)发送给Master Worker定时发送心跳阶段 Worker定期向Master发送心跳消息 Master定时心跳检测阶段 Master定期检查Worker心跳将一些超时的Worker移除并对Worker按照内存进行倒序排序 多个Worker测试阶段 启动多个Worker查看是否能够注册成功并停止某个Worker查看是否能够正确移除 5.3 工程搭建 需求 本项目使用Maven搭建工程. 步骤 分别搭建以下几个项目, Group ID统一都为: com.itheima, 具体工程名如下: 工程名说明spark-demo-common存放公共的消息、实体类spark-demo-masterAkka Master节点spark-demo-workerAkka Worker节点 导入依赖(资料包中的pom.xml). 注意: master, worker要添加common依赖, 具体如下: !--导入spark-demo-common模块-- dependencygroupIdcom.itheima/groupIdartifactIdspark-demo-common/artifactIdversion1.0-SNAPSHOT/version /dependency分别在三个项目下的src/main, src/test下, 创建scala目录. 导入配置文件(资料包中的application.conf) 修改Master的端口为7000修改Worker的端口为8000 5.4 构建Master和Worker 需求 分别构建Master和Worker并启动测试 步骤 创建并加载Master Actor创建并加载Worker Actor测试是否能够启动成功 参考代码 完成master模块中的代码, 即: 在src/main/scala下创建包: com.itheima.spark.master, 包中代码如下: MasterActor.scala文件中的代码 //Master: 用来管理多个Worker的. //MasterActor的路径: akka.tcp://actorSystem127.0.0.1:7000 object MasterActor extends Actor{override def receive: Receive {case x println(x)} }Master.scala文件中的代码 //程序入口: 相当于我们以前写的MainActor object Master {def main(args: Array[String]): Unit {//1. 创建ActorSystem.val actorSystem ActorSystem(actorSystem, ConfigFactory.load())//2. 通过ActorSystem, 关联MasterActor.val masterActor actorSystem.actorOf(Props(MasterActor), masterActor)//3. 启动程序, 如果不报错, 说明代码没有问题.} }完成worker模块中的代码, 即: 在src/main/scala下创建包: com.itheima.spark.worker, 包中代码如下: WorkerActor.scala文件中的代码 //WorkerActor的地址: akka.tcp://actorSystem127.0.0.1:7100 object WorkerActor extends Actor{override def receive: Receive {case x println(x)} }Worker.scala文件中的代码 //程序入口 object Worker {def main(args: Array[String]): Unit {//1. 创建ActorSystem.val actorSystem ActorSystem(actorSystem, ConfigFactory.load())//2. 通过ActorSystem, 关联MasterActor.val workerActor actorSystem.actorOf(Props(WorkerActor), workerActor)//3. 启动程序, 如果不报错, 说明代码没有问题.workerActor ! hello} }5.5 Worker注册阶段实现 需求 在Worker启动时发送注册消息给Master. 思路分析 Worker向Master发送注册消息workerid、cpu核数、内存大小 随机生成CPU核1、2、3、4、6、8随机生成内存大小512、1024、2048、4096单位M Master保存Worker信息并给Worker回复注册成功消息启动测试 具体步骤 在spark-demo-common项目的src/main/scala文件夹下创建包: com.itheima.spark.commons 把资料下的MessagePackage.scala和Entities.scala这两个文件拷贝到commons包下. 在WorkerActor单例对象中定义一些成员变量, 分别表示: masterActorRef: 表示MasterActor的引用.workerid: 表示当前WorkerActor对象的id.cpu: 表示当前WorkerActor对象的CPU核数.mem: 表示当前WorkerActor对象的内存大小.cup_list: 表示当前WorkerActor对象的CPU核心数的取值范围.mem_list: 表示当前WorkerActor对象的内存大小的取值范围. 在WorkerActor的preStart()方法中, 封装注册信息, 并发送给MasterActor. 在MasterActor中接收WorkerActor提交的注册信息, 并保存到双列集合中… MasterActor给WorkerActor发送回执信息(注册成功信息.). 在WorkerActor中接收MasterActor回复的 注册成功信息. 参考代码 WorkerActor.scala文件中的代码 //WorkerActor的地址: akka.tcp://actorSystem127.0.0.1:7100 object WorkerActor extends Actor {//1 定义成员变量, 记录MasterActor的引用, 以及WorkerActor提交的注册参数信息.private var masterActorRef: ActorSelection _ //表示MasterActor的引用.private var workerid:String _ //表示WorkerActor的idprivate var cpu:Int _ //表示WorkerActor的CPU核数private var mem:Int _ //表示WorkerActor的内存大小.private val cpu_list List(1, 2, 3, 4, 6, 8) //CPU核心数的取值范围private val mem_list List(512, 1024, 2048, 4096) //内存大小取值范围//2. 重写preStart()方法, 里边的内容: 在Actor启动之前就会执行.override def preStart(): Unit {//3. 获取Master的引用.masterActorRef context.actorSelection(akka.tcp://actorSystem127.0.0.1:7000/usre/masterActor)//4. 构建注册消息.workerid UUID.randomUUID().toString //设置workerActor的idval r new Random()cpu cpu_list(r.nextInt(cpu_list.length))mem mem_list(r.nextInt(mem_list.length))//5. 将WorkerActor的提交信息封装成 WorkerRegisterMessage对象.var registerMessage WorkerRegisterMessage(workerid, cpu, mem)//6. 发送消息给MasterActor.masterActorRef ! registerMessage}override def receive: Receive {case x println(x)} }MasterActor.scala文件中的代码 //Master: 用来管理多个Worker的. //MasterActor的路径: akka.tcp://actorSystem127.0.0.1:7000 object MasterActor extends Actor{//1. 定义一个可变的Map集合, 用来保存注册成功好的Worker信息.private val regWorkerMap collection.mutable.Map[String, WorkerInfo]()override def receive: Receive {case WorkerRegisterMessage(workId, cpu, mem) {//2. 打印接收到的注册信息println(sMasterActor: 接收到worker注册信息, ${workId}, ${cpu}, ${mem})//3. 把注册成功后的保存信息保存到: workInfo中.regWorkerMap workId - WorkerInfo(workId, cpu, mem)//4. 回复一个注册成功的消息.sender ! RegisterSuccessMessage}} }修改WorkerActor.scala文件中receive()方法的代码 override def receive: Receive {case RegisterSuccessMessage println(WorkerActor: 注册成功!) }5.6 Worker定时发送心跳阶段 需求 Worker接收到Master返回的注册成功信息后定时给Master发送心跳消息。而Master收到Worker发送的心跳消息后需要更新对应Worker的最后心跳时间。 思路分析 编写工具类读取心跳发送时间间隔创建心跳消息Worker接收到注册成功后定时发送心跳消息Master收到心跳消息更新Worker最后心跳时间启动测试 具体步骤 在worker的src/main/resources文件夹下的 application.conf文件中添加一个配置. worker.heartbeat.interval 5 //配置worker发送心跳的周期(单位是 s) 在worker项目的com.itheima.spark.work包下创建一个新的单例对象: ConfigUtils, 用来读取配置文件信息. 在WorkerActor的receive()方法中, 定时给MasterActor发送心跳信息. Master接收到心跳消息, 更新Worker最后心跳时间. . 参考代码 worker项目的ConfigUtils.scala文件中的代码 object ConfigUtils {//1. 获取配置信息对象.private val config ConfigFactory.load()//2. 获取worker心跳的具体周期val worker.heartbeat.interval config.getInt(worker.heartbeat.interval) }修改WorkerActor.scala文件的receive()方法中的代码 override def receive: Receive {case RegisterSuccessMessage {//1. 打印接收到的 注册成功消息println(WorkerActor: 接收到注册成功消息!)//2. 导入时间单位隐式转换 和 隐式参数import scala.concurrent.duration._import context.dispatcher //3. 定时给Master发送心跳消息.context.system.scheduler.schedule(0 seconds, ConfigUtil.worker.heartbeat.interval seconds){//3.1 采用自定义的消息的形式发送 心跳信息.masterActorRef ! WorkerHeartBeatMessage(workerId, cpu, mem)}} }MasterActor.scala文件中的代码 object MasterActor extends Actor {//1. 定义一个可变的Map集合, 用来保存注册成功好的Worker信息.private val regWorkerMap collection.mutable.Map[String, WorkerInfo]()override def receive: Receive {//接收注册信息.case WorkerRegisterMessage(workId, cpu, mem) {//2. 打印接收到的注册信息println(sMasterActor: 接收到worker注册信息, ${workId}, ${cpu}, ${mem})//3. 把注册成功后的保存信息保存到: workInfo中.regWorkerMap workId - WorkerInfo(workId, cpu, mem, new Date().getTime)//4. 回复一个注册成功的消息.sender ! RegisterSuccessMessage}//接收心跳消息case WorkerHeartBeatMessage(workId, cpu, mem) {//1. 打印接收到的心跳消息.println(sMasterActor: 接收到${workId}的心跳信息)//2. 更新指定Worker的最后一次心跳时间.regWorkerMap workId - WorkerInfo(workId, cpu, mem, new Date().getTime)//3. 为了测试代码逻辑是否OK, 我们可以打印下 regWorkerMap的信息println(regWorkerMap)}} }5.7 Master定时心跳检测阶段 需求 如果某个worker超过一段时间没有发送心跳Master需要将该worker从当前的Worker集合中移除。可以通过Akka的定时任务来实现心跳超时检查。 思路分析 编写工具类读取检查心跳间隔时间间隔、超时时间定时检查心跳过滤出来大于超时时间的Worker移除超时的Worker对现有Worker按照内存进行降序排序打印可用Worker 具体步骤 修改Master的application.conf配置文件, 添加两个配置 #配置检查Worker心跳的时间周期(单位: 秒) master.check.heartbeat.interval 6 #配置worker心跳超时的时间(秒) master.check.heartbeat.timeout 15 在Master项目的com.itheima.spark.master包下创建: ConfigUtils工具类(单例对象), 用来读取配置文件信息. 在MasterActor中开始检查心跳(即: 修改MasterActor#preStart中的代码.). 开启Master, 然后开启Worker, 进行测试. 参考代码 Master项目的ConfigUtils.scala文件中的代码 //针对Master的工具类. object ConfigUtil {//1. 获取到配置文件对象.private val config: Config ConfigFactory.load()//2. 获取检查Worker心跳的时间周期(单位: 秒)val master.check.heartbeat.interval config.getInt(master.check.heartbeat.interval)//3. 获取worker心跳超时的时间(秒)val master.check.heartbeat.timeout config.getInt(master.check.heartbeat.timeout) }MasterActor.scala文件的preStart()方法中的代码 //5. 定时检查worker的心跳信息 override def preStart(): Unit {//5.1 导入时间转换隐式类型 和 定时任务隐式变量import scala.concurrent.duration._import context.dispatcher//5.2 启动定时任务.context.system.scheduler.schedule(0 seconds, ConfigUtil.master.check.heartbeat.interval seconds) {//5.3 过滤大于超时时间的Worker.val timeOutWorkerMap regWorkerMap.filter {keyval //5.3.1 获取最后一次心跳更新时间.val lastHeatBeatTime keyval._2.lastHeartBeatTime//5.3.2 超时公式: 当前系统时间 - 最后一次心跳时间 超时时间(配置文件信息 * 1000)if (new Date().getTime - lastHeatBeatTime ConfigUtil.master.check.heartbeat.timeout * 1000) true else false}//5.4 移除超时的Workerif(!timeOutWorkerMap.isEmpty) {//如果要被移除的Worker集合不为空, 则移除此 timeOutWorkerMap//注意: 双列集合是根据键移除元素的, 所以最后的 _._1是在获取键.regWorkerMap -- timeOutWorkerMap.map(_._1)}//5.5 对worker按照内存大小进行降序排序, 打印Worker//_._2 获取所有的WorkInfo对象.val workerList regWorkerMap.map(_._2).toList//5.6 按照内存进行降序排序.val sortedWorkerList workerList.sortBy(_.mem).reverse//5.7 打印结果println(按照内存的大小降序排列的Worker列表: )println(sortedWorkerList)} }5.8 多个Worker测试阶段 需求 修改配置文件启动多个worker进行测试。 大白话: 启动一个Worker, 就修改一次Worker项目下的application.conf文件中记录的端口号, 然后重新开启Worker即可. 步骤 测试启动新的Worker是否能够注册成功停止Worker测试是否能够从现有列表删除 5.8 多个Worker测试阶段 需求 修改配置文件启动多个worker进行测试。 大白话: 启动一个Worker, 就修改一次Worker项目下的application.conf文件中记录的端口号, 然后重新开启Worker即可. 步骤 测试启动新的Worker是否能够注册成功停止Worker测试是否能够从现有列表删除
http://www.pierceye.com/news/418236/

相关文章:

  • 一站式进货平台网站建设为什么做网站编辑
  • 免费建站哪家好网站商城建设合同免费下载
  • 网站开发filter北京互联网
  • 德州市市政工程建设总公司网站设计公司的运营模式
  • 网站源码怎么弄境外注册网站
  • 肥城网站建设视频解析接口网站怎么做
  • 深圳做互联网教网站公司五百亿网站建设
  • 如何建自己网站周口网站建设费用
  • 延安网站建设哪家专业网站建设的大功效
  • 做网站交互demo工具服务器中安装wordpress
  • 2017年网站建设市场分析2345浏览器官网网址
  • 超大型网站建设怎么打广告吸引客户
  • 阳泉 网站建设合作国际网站设计
  • 东莞网站优化快速排名wordpress自适应设置宽度
  • wordpress的站点地址怎么设置青岛seo网站建设
  • wordpress 获取文章标签泰安企业网站seo
  • 网站可分为哪两种类型jsp网站建设项目实战 pdf
  • 科技类网站简介怎么做网站建设公司的工资
  • 东莞网站推广怎么做网站备案和备案的区别
  • 免费的舆情网站app开放平台设计方案
  • 昆明驿站网站建设程序做网站好还是app好
  • 提供网站推广公司电话室内设计好不好学
  • 小型网站网站建设需要做网页的素材网站
  • 圣诞节网站模板大兴企业官网网站建设
  • 法国化妆品进口报关做网站贵州两学一做网站
  • 青海环保网站建设公司wordpress版权怎
  • 砀山网站建设大凤号 网站建设
  • 汕头站什么是网站优化主要包括那几个
  • 买了一个域名如何做网站做歌手的网站
  • 制作精美网站建设服务周到外贸没有公司 如何做企业网站