当前位置: 首页 > news >正文

网页网站设计公司有哪些金山区网站制作

网页网站设计公司有哪些,金山区网站制作,企业网络规划设计,长沙有名的公司一、何为SparkRPC RPC全称为远程过程调用#xff08;Remote Procedure Call#xff09;#xff0c;它是一种计算机通信协议#xff0c;允许一个计算机程序调用另一个计算机上的子程序#xff0c;而无需了解底层网络细节。通过RPC#xff0c;一个计算机程序可以像调用本地…一、何为SparkRPC RPC全称为远程过程调用Remote Procedure Call它是一种计算机通信协议允许一个计算机程序调用另一个计算机上的子程序而无需了解底层网络细节。通过RPC一个计算机程序可以像调用本地程序一样调用远程程序使得分布式应用程序的开发更加简单和高效。 二、SparkRPC示意图 三、SparkRPC代码示例 1、基本流程 ①启动Master和Worker ②Worker向Master发送注册信息(封装成一个类:RegisterWorker) case class RegisterWorker(rpcEndpointRef:RpcEndpointRef,workerId:String,workerMemory:Int,workerCores:Int) ③Master收到Worker的注册信息,并将其存放到一个HashMap,其中Key为WorkerId,Value为WorkerInfo其结构如下 class WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long _ } 其中lastHearBeatTime是该Worker最后一次心跳时间。 ④Master中启动一个定时任务(设定为每15s执行一次)定时从HashMap中获取各个Worker信息并将其中的lastHearBeatTime与当前时间进行比较如果大于10s,就认为该Worker已经与Master失联将其从HashMap中剔除 ⑤)(与④其实同步进行) Worker同样开启了一个定时任务(设定为每10s执行一次), 定时给Master发送心跳HeartBeat; Master收到该心跳后,根据WorkerId从HashMap中取出对应的Worker信息并将其lastHearBeatTime修改为当前时间从而不断更新与Master保持通信的Worker的最后心跳时间。 case class HeartBeat(WorkerId:String) 2、完整代码  1Master package org.apache.spark.wakedataimport org.apache.spark.SparkConf import org.apache.spark.SecurityManager import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}import java.util.concurrent.{Executors, TimeUnit} import scala.collection.mutableclass Master(val rpcEnv:RpcEnv) extends ThreadSafeRpcEndpoint{val idToWorker new mutable.HashMap[String, WorkerInfo]()override def onStart(): Unit {//启动一个定时器val service Executors.newScheduledThreadPool(1)service.scheduleAtFixedRate(new Runnable {override def run(): Unit {// 如果Worker最后一次心跳时间距离当前时间 大于10s,就需要移除该Workerval deadWorkers idToWorker.values.filter(w System.currentTimeMillis() - w.lastHearBeatTime 10000)deadWorkers.foreach(w {idToWorker - w.workerId})println(s当前活跃的Worker数量:${idToWorker.size})}},0,15,TimeUnit.SECONDS)}override def receive: PartialFunction[Any, Unit] { // case test println(接收到了测试消息);// println(sMaster收到了来自Worker的信息workerId:$workerId,workerMemory:$workerMemory,workerCores:$workerCores)//给Worker发送异步消息// rpcEndpointRef.send(response)// 接收到Worker发送过来的注册消息case RegisterWorker(rpcEndpointRef,workerId,workerMemory,workerCores) {//封装Worker传递过来的信息val workerInfo new WorkerInfo(workerId, workerMemory, workerCores)idToWorker(workerId) workerInfo//向Worker返回一个注册成功的消息rpcEndpointRef.send(RegisteredWorker)}//接收到Worker发送过来的心跳信息case HeartBeat(workerId) {val workerInfo idToWorker(workerId)//更新最后一次访问时间workerInfo.lastHearBeatTime System.currentTimeMillis()} }//接收同步消息override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] {case ask-msg {println(接收到来自Worker的同步消息)//Master响应Worker的请求给Worker返回消息context.reply(reply-msg)}} }object Master {def main(args: Array[String]): Unit {val conf new SparkConf()//创建 SecurityManager(安全管理器,对系统资源的访问进行检查和限制)val securityMgr new SecurityManager(conf)//创建rpcEnv,并指定名称、IP地址和端口等val rpcEnv RpcEnv.create(SparkMaster, localhost, 8888, conf, securityMgr)//创建Master RpcEndpointval master new Master(rpcEnv)//将Master的RpcEndpoint传入到setupEndpoint,并指定名称,返回一个RpcEndpoint的引用,val masterEndpoint rpcEnv.setupEndpoint(master, master)//通过RpcEndpoint的引用发送消息 // masterEndpoint.send(test)//将程序挂起,等待退出rpcEnv.awaitTermination()}}2Worker package org.apache.spark.wakedataimport org.apache.spark.SparkConf import org.apache.spark.SecurityManager import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}import java.util.concurrent.{Executors, TimeUnit} import scala.concurrent.ExecutionContext.Implicits.globalclass Worker(val rpcEnv:RpcEnv) extends ThreadSafeRpcEndpoint{var masterEndpointRef:RpcEndpointRef _val WORKER_ID worker02override def onStart(): Unit {//向Master发送请求连接(本质上是连接master的endPoint)masterEndpointRef rpcEnv.setupEndpointRef(RpcAddress(localhost, 8888), master)//向Master发送注册Worker的请求(注意这里发送的是同步消息,底层使用的是ask方法.为何是同步发送? 因为必须首先建立好连接,然后才能发送消息)//其中self是worker RpcEndpoint的引用masterEndpointRef.send(RegisterWorker(self,WORKER_ID,1000,8))}//接收异步消息override def receive: PartialFunction[Any, Unit] { // case response { // println(Worker接收到Master返回的消息) // //向Master发送同步消息 // val future masterEndpointRef.ask[String](ask-msg) // //接收Master返回的消息 // future.map(res println(sWorker接收到Master返回的响应请求消息:$res)) // }//Worker接收到Master发送过来的异步消息case RegisteredWorker {//启动一个定时器val service Executors.newScheduledThreadPool(1)service.scheduleAtFixedRate(new Runnable {override def run(): Unit {masterEndpointRef.send(HeartBeat(WORKER_ID))}},0,10,TimeUnit.SECONDS)}} }object Worker {def main(args: Array[String]): Unit {val conf new SparkConf()val SecurityMgr new SecurityManager(conf)//创建rpcEnvval rpcEnv RpcEnv.create(SparkWorker, localhost, 9998, conf,SecurityMgr)//创建rpcendpointval worker new Worker(rpcEnv)//返回一个RpcEndpoint的引用val workerEndpoint rpcEnv.setupEndpoint(Worker, worker)rpcEnv.awaitTermination()}}3RegisterWorker-样例类和伴生对象 此类封装Worker注册信息 package org.apache.spark.wakedataimport org.apache.spark.rpc.RpcEndpointRefcase class RegisterWorker(rpcEndpointRef:RpcEndpointRef,workerId:String,workerMemory:Int,workerCores:Int)package org.apache.spark.wakedatacase object RegisteredWorker4WorkerInfo package org.apache.spark.wakedataclass WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long _ } package org.apache.spark.wakedataclass WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long _ } package org.apache.spark.wakedataclass WorkerInfo(val workerId:String,var workerMemory:Int,var workerCores:Int){var lastHearBeatTime: Long _ }3、关于SparkRPC的一些细节  1、接收异步消息是在receive方法 2、接收同步消息是在receiveAndReply //接收同步消息override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] {case ask-msg {println(接收到来自Worker的同步消息)//Master响应Worker的请求给Worker返回消息context.reply(reply-msg)}} } 3、发送同步消息使用ask // //向Master发送同步消息 // val future masterEndpointRef.ask[String](ask-msg)
http://www.pierceye.com/news/771559/

相关文章:

  • 综合性门户网站有哪些高端网站建设 房产
  • 百度做的网站能优化吗如何在jsp上做网站页面代码
  • 广州市品牌网站建设公司营销型网站开发推广
  • 甜品网站首页设计用php做的网站模版
  • 怎样做企业的网站百度下载安装免费版
  • 常州市网站优化汕头网站建设和运营
  • wordpress 同分类评论调用seo排名是什么
  • 网站建设推广怎么玩软件开发模型是什么
  • 网站开发报价表格海口注册公司代理公司地址电话
  • 西宁好的网站建设视频网站文案
  • 郑州网站优化网络建设有限公司网站建设 交单流程
  • 网站搬家内页打不开重庆市建设工程信息网怎么进不去
  • 深圳 做公司网站网站用什么建设
  • 网站更换空间对优化的影响营销号视频生成器手机版
  • 南宁大型网站推广公司昆山网站制作哪家好
  • 格尔木哪里有做网站的wordpress编辑器排版
  • 怎样开电商襄阳抖音seo找哪家
  • 个人网站 域名舞阳专业做网站
  • 做国外购物网站凤山网站seo
  • 苏州制作网站的有几家WordPress文章编辑链接
  • 免费看电视剧的网站2021网站建设坂田
  • 网站建设中 目录怎么做更好wordpress最好用的虚拟主机
  • 网站百度网盘南京市建设局网站
  • 让别人做网站多久开始注册域名公司注册地址提供
  • 手机网站 设计趋势建设银行暑期招聘网站
  • 兰山做网站专业深圳网站定制开发
  • 做与食品安全有关的网站徐州企业网站设计
  • 番禺网站建设策划江阴市建设局官网站
  • 建设网站模块需要哪些内容石家庄城乡建设厅网站
  • 公司网站后台管理网络公司名字大全三字