临沂seo整站优化厂家,长春自助建站模板,邢台网站建设哪家好,世界杯直播 现场免费直播由于AKka的核心是Actor#xff0c;而Actor是按照Actor模型进行实现的#xff0c;所以在使用Akka之前#xff0c;有必要弄清楚什么是Actor模型。
Actor模型最早是1973年Carl Hewitt、Peter Bishop和Richard Seiger的论文中出现的#xff0c;受物理学中的广义相对论(general…由于AKka的核心是Actor而Actor是按照Actor模型进行实现的所以在使用Akka之前有必要弄清楚什么是Actor模型。
Actor模型最早是1973年Carl Hewitt、Peter Bishop和Richard Seiger的论文中出现的受物理学中的广义相对论(general relativity)和量子力学(quantum mechanics)所启发为解决并发计算的一个数学模型。
Actor模型所推崇的哲学是”一切皆是Actor“这与面向对象编程的”一切皆是对象“类似。但不同的是在模型中Actor是一个运算实体它遵循以下规则 接受外部消息不占用调用方消息发送者的CPU时间片 通过消息改变自身的状态 创建有限数量的新Actor 发送有限数量的消息给其他Actor 很多语言都实现了Actor模型而其中最出名的实现要属Erlang的。Akka的实现借鉴了不少Erlang的经验。
dependencygroupIdcom.typesafe.akka/groupIdartifactIdakka-actor_2.11/artifactIdversion2.4.7/version
/dependencytell 发送一个消息到目标Actor后立刻返回
public class C extends AbstractActor {Overridepublic Receive createReceive() {return receiveBuilder().match(Object.class,obj-{if(obj instanceof String){System.out.println(C: D你回复给我的消息我收到了);return;}SomeOne someOne (SomeOne) obj;System.out.println(C: C接收到消息someOne.toString());// 创建D路由ActorRef actorRef this.getContext().actorOf(Props.create(D.class, D::new));// 传递给DactorRef.tell(someOne,self());// 路由给D(和tell 实现的功能一样)//actorRef.forward(someOne,getContext());}).build();}public static void main(String[] args) {ActorSystem ok ActorSystem.create(ok);ActorRef actorRef ok.actorOf(Props.create(C.class, C::new));Scanner sc new Scanner(System.in);System.out.print(请输入:);String s sc.nextLine();actorRef.tell(new SomeOne(1,s,0),ActorRef.noSender());}
}public class D extends AbstractActor {Overridepublic Receive createReceive() {return receiveBuilder().match(Object.class,obj-{SomeOne someOne (SomeOne) obj;System.out.println(D: D接收到C 传过来的消息someOne.toString());Thread.sleep(2000);sender().tell(D: 我再把消息发给你C,self());}).build();}
}注意
ActorSystem是一个较重的存在一般一个应用里只需要一个ActorSystem。
在同一个ActorySystem中Actor不能重名。ask 发送一个消息到目标Actor并返回一个Future对象可以通过该对象获取结果。但前提是目标Actor会有Reply才行如果没有Reply则抛出超时异常
public class A extends AbstractActor {// 接收到对象SomeOneOverridepublic Receive createReceive() {return receiveBuilder().match(Object.class,obj -{if(obj instanceof SomeOne){SomeOne someOne (SomeOne) obj;System.out.println( A 收到 SomeOne 对象:someOne.toString());someOne.setAge(someOne.getAge()1);// 业务。。。Thread.sleep(1000);// 返回结果this.getSender().tell(xxx,getSelf());}}).build();}## Await 同步阻塞等待结果public static void main(String[] args) {//ActorSystem test ActorSystem.create(test);ActorRef actorRefA test.actorOf(Props.create(A.class, A::new));SomeOne someOne new SomeOne(1,哈哈哈ok,10);// 2 分钟超时Timeout timeout new Timeout(Duration.create(2, TimeUnit.SECONDS));FutureObject future Patterns.ask(actorRefA, someOne, timeout); //ref,消息体,超时时间try {// Await 同步阻塞等待方式String reply (String) Await.result(future, timeout.duration());System.out.println(回复的消息: reply);} catch (Exception e) {e.printStackTrace();}}public class A extends AbstractActor {// 接收到对象SomeOneOverridepublic Receive createReceive() {return receiveBuilder().match(Object.class,obj -{if(obj instanceof SomeOne){SomeOne someOne (SomeOne) obj;System.out.println( A 收到 SomeOne 对象:someOne.toString());someOne.setAge(someOne.getAge()1);// 业务。。。Thread.sleep(1000);// 返回结果this.getSender().tell(xxx,getSelf());}}).build();}## future 异步等待结果。
public static void main(String[] args) {//ActorSystem test ActorSystem.create(test);ActorRef actorRefA test.actorOf(Props.create(A.class, A::new));SomeOne someOne new SomeOne(1,哈哈哈ok,10);// 2 分钟超时Timeout timeout new Timeout(Duration.create(2, TimeUnit.SECONDS));//ref,消息体,超时时间FutureObject future Patterns.ask(actorRefA, someOne, timeout);// 异步方式future.onComplete(new OnCompleteObject() {Overridepublic void onComplete(Throwable throwable, Object o) throws Throwable {if (throwable ! null) {System.out.println(返回结果异常 throwable.getMessage());} else {System.out.println(返回消息 o);}}}, test.dispatcher());// 成功执行过程future.onSuccess(new OnSuccessObject() {Overridepublic void onSuccess(Object msg) throws Throwable {System.out.println(回复的消息 msg);}}, test.dispatcher());//失败执行过程future.onFailure(new OnFailure() {Overridepublic void onFailure(Throwable throwable) throws Throwable {if (throwable instanceof TimeoutException) {System.out.println(服务超时);} else {System.out.println(未知错误);}}}, test.dispatcher());}tell 前置后置处理销毁线程 的例子
public class MessageSendAndAccept extends AbstractActor {//接收消息前置处理Overridepublic void preStart() {System.out.println(--------- 接收到消息 start);}//接收消息后置处理Overridepublic void postStop(){System.out.println(--------- 消息处理完毕 end);}// A接收消息Overridepublic Receive createReceive() {return receiveBuilder().match(String.class,result -{consoleLog(result);}).build();}//打印public void consoleLog(String log){System.out.println(接收到内容log);//销毁线程getContext().stop(self());}public static void main(String[] args) {// 创建ActorSystem仓库ActorSystem actorSystem ActorSystem.create(demo);// 创建路由路由到AActorRef my_actor actorSystem.actorOf(Props.create(MessageSendAndAccept.class), my_actor);// 给 A 发消息my_actor.tell(哈哈哈a,ActorRef.noSender());}
}并发 执行方法 例子
创建多个actor 同时执行就好了
public class G extends AbstractActor {Overridepublic Receive createReceive() {return receiveBuilder().match(Object.class,obj-{if(obj instanceof String){System.out.println(obj ,timenew SimpleDateFormat(yyyy-MM-dd HH:mm:ss).format(new Date())--- Thread ---Thread.currentThread().getName());//休眠 3sThread.sleep(3000L);System.out.println(Thread.currentThread().getName()---END);return;}}).build();}public static void main(String[] args) {ActorSystem ok ActorSystem.create(ok);ActorRef actorRef_0 ok.actorOf(Props.create(G.class, G::new));actorRef_0.tell(a,ActorRef.noSender());ActorRef actorRef_1 ok.actorOf(Props.create(G.class, G::new));actorRef_1.tell(b,ActorRef.noSender());ActorRef actorRef_2 ok.actorOf(Props.create(G.class, G::new));actorRef_2.tell(c,ActorRef.noSender());}
}