做一个网站平台的流程是什么,西安到北京飞机几个小时,基层建设论文收录在哪个网站,之力akka 消息发送接收在上一篇文章中#xff0c;我们研究了Akka Typed提供的一些基本功能。 在本文和下一篇文章中#xff0c;我们将更进一步地了解一些其他功能#xff0c;并通过查看Akka Typed提供的两种不同模式来做到这一点#xff1a;Receiver和Receptionist模式。 如果您… akka 消息发送接收 在上一篇文章中我们研究了Akka Typed提供的一些基本功能。 在本文和下一篇文章中我们将更进一步地了解一些其他功能并通过查看Akka Typed提供的两种不同模式来做到这一点Receiver和Receptionist模式。 如果您是Akka Typed的新手那么最好先阅读上一篇文章因为这将使您对Akka Typed有所了解。 因此对于本系列中的Akka型文章我们将研究Receiver模式。 与往常一样您可以在Github Gist中找到此示例的代码 https : //gist.github.com/josdirksen/77e59d236c637d46ab32 接收方模式 在Akka Typed发行版中有一个名为akka.typed.patterns的包。 在此程序包中有两种不同的模式即接收方模式和接收方模式。 坦白说为什么这两种模式足够重要以增加发行版但我确实不知道但是它们确实为在Akka Typed之后引入更多概念和想法提供了一个很好的方法。 因此让我们看一下Receiver模式在下一篇文章中我们将做Receptionist模式。 要了解Receiver模式的功能只需看一下我们可以发送给它的消息 /*** Retrieve one message from the Receiver, waiting at most for the given duration.*/final case class GetOne[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetOneResult[T]]) extends Command[T]/*** Retrieve all messages from the Receiver that it has queued after the given* duration has elapsed.*/final case class GetAll[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetAllResult[T]]) extends Command[T]/*** Retrieve the external address of this Receiver (i.e. the side at which it* takes in the messages of type T.*/final case class ExternalAddress[T](replyTo: ActorRef[ActorRef[T]]) extends Command[T] 从这些消息中可以看到Receiver的工作是将T类型的消息排队并提供其他命令以在等待特定时间的同时获取这些消息中的一个或多个。 要使用接收器我们需要获取ExternalAddress以便我们可以向其发送类型为T的消息。 并且可以从其他参与者发送get GetOne和GetAll消息以查看接收器中是否有任何消息在等待。 对于我们的示例我们将创建以下参与者 生产者它向接收者发送类型为T的消息。 可以从此接收器检索类型T消息的使用者。 根角色运行此方案。 我们将从生产者开始如下所示 /*** Producer object containing the protocol and the behavior. This is a very simple* actor that produces messages using a schedule. To start producing messages* we need to send an initial message*/object Producer {// a simple protocol defining the messages that can be sentsealed trait ProducerMsgfinal case class registerReceiverMsgIn(msgIn: ActorRef[HelloMsg]) extends ProducerMsgfinal case class addHelloWorldMsg(msg: HelloMsg) extends ProducerMsg// the producer, which first waits for a registerReceiver message, after which// it changes behavior, to send messages.val producer Full[ProducerMsg] {// if we receive a register message, we know where to send messages tocase Msg(ctx, registerReceiverMsgIn(msgConsumer)) println(Producer: Switching behavior)// simple helper function which sends a message to self.def scheduleMessage() ctx.schedule(500 millisecond, ctx.self, addHelloWorldMsg(Hello(shello ${System.currentTimeMillis()})))// schedule the first one, the rest will be triggered through the behavior.scheduleMessage()Static {// add a message to the receiver and schedule a new onecase addHelloWorldMsg(msg) {println(sProducer: Adding new $msg to receiver: $msgConsumer) ;msgConsumer ! msg; scheduleMessage()}}// dont switch behavior on any of the other messagescase _ Same}} 在此对象中我们定义了可以发送给角色的消息以及行为。 registerReceiverMsgIn消息为actor提供了应该向其发送消息的目的地稍后会对此进行详细介绍并且addHelloWorldMsg告诉行为将什么消息发送到registerReceiverMsgIn消息提供的地址。 如果您查看此行为则可以看到我们使用Full [T]行为。 对于这种行为我们必须为所有消息和信号提供匹配器此外我们还可以访问actor ctx。 在其初始状态下此行为仅响应registerReceiverMsgIn消息。 当它收到这样的消息时它会做两件事 它定义了一个函数我们可以用来调度消息我们也可以直接调用它以调度消息在半秒钟内发送。 它定义了我们的新行为。 此新行为可以处理scheduleMessage函数发送的消息。 收到该消息后它将内容发送到提供的messageConsumer接收方然后再次调用计划消息。 保持每500毫秒发送一次消息。 因此当我们发送初始的registerReceiverMessage时它将导致actor每500 ms向接收者发送一条新消息。 现在让我们看看另一面消费者。 对于消费者我们还将所有内容包装在一个对象中如下所示 object Consumer {val consumer Total[HelloMsg] {// in the case of a registerReceiver message, we change the implementation// since were ready to receive other message.case registerReceiverCmdIn(commandAddress) {println(Consumer: Switching behavior)// return a static implementation which closes over actorRefs// all messages we receive we pass to the receiver, which will queue// them. We have a specific message that prints out the received messagesContextAware { ctx Static[HelloMsg] {// printmessages just prints out the list of messages weve receivedcase PrintMessages(msgs) println(sConsumer: Printing messages: $msgs) ;msgs.foreach { hw println(s $hw)}// if we get the getAllMessages request, we get all the messages from// the receiver.case GetAllMessages() {println(Consumer: requesting all messages)val wrap ctx.spawnAdapter[GetAllResult[HelloMsg]] {case msgs:GetAllResult[HelloMsg] println(sConsumer: Received ${msgs.msgs.length} messages); PrintMessages(msgs.msgs)}commandAddress ! GetAll(2 seconds)(wrap)}}}}// for all the other cases return the existing implementation, in essence// were just ignoring other messages till we change statecase _ Same} } 在此对象中我们定义了一个行为该行为在接收到第一条消息后也会切换其实现。 在这种情况下第一个消息称为registerReceiverCmdIn。 通过此消息我们可以访问接收方的actorRef将GetAll和getOne消息发送至该消息。 切换行为后我们将处理自己的自定义GetAllMessages消息这将触发将GetAll消息发送到接收器。 由于未针对从Receiver收到的响应类型键入我们自己的行为因此我们使用适配器ctx.spawnAdapter。 该适配器将接收来自接收器的响应并打印出消息。 最后一个消息部分是一个启动此行为的参与者 // Simple root actor, which well use to start the other actorsval scenario1 {Full[Unit] {case Sig(ctx, PreStart) {import Producer._import Consumer._println(Scenario1: Started, now lets start up a number of child actors to do our stuff)// first start the two actors, one implements the receiver pattern, and// the other is the one we control directly.val receiverActor ctx.spawn(Props(Receiver.behavior[HelloMsg]), receiver)val consumerActor ctx.spawn(Props(consumer), adder)val producerActor ctx.spawn(Props(producer), producer)// our producerActor first needs the actorRef it can use to add messages to the receiver// for this we use a wrapper, this wrapper creates a child, which we use to get the// address, to which we can send messages.val wrapper ctx.spawnAdapter[ActorRef[HelloMsg]] {case p: ActorRef[HelloMsg] producerActor ! registerReceiverMsgIn(p)}// now send the message to get the external address, the response will be sent// to our own actor as a registerReceiver message, through the adapterreceiverActor ! ExternalAddress(wrapper)// our printing actor needs to now the address of the receiver so send it to himconsumerActor ! registerReceiverCmdIn(receiverActor)// by calling getAllMessages we get the messages within a time period.println(Scenario1: Get all the messages)consumerActor ! GetAllMessages()Thread.sleep(3000)consumerActor ! GetAllMessages()Thread.sleep(5000)consumerActor ! GetAllMessages()Same}}} 这里没什么特别的。 在这种情况下我们将创建各种角色并使用ctx.spawnAdapter来获取接收者的外部地址并将其传递给producerActor。 接下来我们将接收者参与者的地址传递给消费者。 现在我们在使用者地址上调用GetAllMessages该地址将从接收方获取消息并打印出来。 因此总结一下将在此示例中执行的步骤 我们创建一个将运行此方案的根角色。 从这个根基参与者我们创建了三个参与者接收者消费者和生产者。 接下来我们从接收方获取externalAddress我们将类型为T的消息发送到的地址并使用适配器将其传递给生产方。 生产者在接收到此消息后将切换行为并开始将消息发送到传递的地址。 同时根actor将接收方的地址传递给使用者。 使用者在收到此消息时将更改行为并现在等待GetAllMessages类型的消息。 现在根actor将发送GetAllMessages到使用者。 当使用者接收到此消息时它将使用适配器将GetAll消息发送给接收者。 当适配器接收到响应时它会打印出接收到的消息数量并通过为接收者从接收到的每条消息发送一个PrintMessage来对使用者进行进一步处理。 这种情况的结果如下所示 Scenario1: Started, now lets start up a number of child actors to do our stuff
Scenario1: Get all the messages
Consumer: Switching behavior
Consumer: requesting all messages
Producer: Switching behavior
Producer: Adding new Hello(hello 1446277162929) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277163454) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277163969) to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 3 messages
Consumer: Printing messages: Vector(Hello(hello 1446277162929), Hello(hello 1446277163454), Hello(hello 1446277163969))Hello(hello 1446277162929)Hello(hello 1446277163454)Hello(hello 1446277163969)
Producer: Adding new Hello(hello 1446277164488) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277165008) to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new Hello(hello 1446277165529) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277166049) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277166569) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277167089) to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 6 messages
Consumer: Printing messages: Vector(Hello(hello 1446277164488), Hello(hello 1446277165008), Hello(hello 1446277165529), Hello(hello 1446277166049), Hello(hello 1446277166569), Hello(hello 1446277167089))Hello(hello 1446277164488)Hello(hello 1446277165008)Hello(hello 1446277165529)Hello(hello 1446277166049)Hello(hello 1446277166569)Hello(hello 1446277167089)
Producer: Adding new Hello(hello 1446277167607) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277168129) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277168650) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277169169) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277169690) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277170210) to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new Hello(hello 1446277170729) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277171249) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277171769) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277172289) to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 10 messages
Consumer: Printing messages: Vector(Hello(hello 1446277167607), Hello(hello 1446277168129), Hello(hello 1446277168650), Hello(hello 1446277169169), Hello(hello 1446277169690), Hello(hello 1446277170210), Hello(hello 1446277170729), Hello(hello 1446277171249), Hello(hello 1446277171769), Hello(hello 1446277172289))Hello(hello 1446277167607)Hello(hello 1446277168129)Hello(hello 1446277168650)Hello(hello 1446277169169)Hello(hello 1446277169690)Hello(hello 1446277170210)Hello(hello 1446277170729)Hello(hello 1446277171249)Hello(hello 1446277171769)Hello(hello 1446277172289)
Producer: Adding new Hello(hello 1446277172808) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277173328) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277173849) to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new Hello(hello 1446277174369) to receiver: Actor[akka://Root/user/receiver#1097367365] 酷吧 从消息序列中可以看到我们的生产者将消息发送到接收者接收者将它们排队。 接下来我们有一个使用者它请求到目前为止已收到的所有消息并打印出来。 这是关于Akka-Typed的文章的内容在下一篇文章中我们将介绍同样在Akka-Typed中呈现的接待员模式。 翻译自: https://www.javacodegeeks.com/2015/11/akka-typed-actors-exploring-the-receiver-pattern.htmlakka 消息发送接收