余姚什么网站做装修比较好,邢台企业做网站哪儿好,昆明城乡和住房建设局网站,上海品牌网站开发我的同事正在开发一种交易系统#xff0c;该系统可以处理大量的传入交易。 每笔交易都涵盖一种Instrument #xff08;例如债券或股票#xff09;#xff0c;并且具有某些#xff08;现在#xff09;不重要的属性。 他们坚持使用Java#xff08;8#xff09;#… 我的同事正在开发一种交易系统该系统可以处理大量的传入交易。 每笔交易都涵盖一种Instrument 例如债券或股票并且具有某些现在不重要的属性。 他们坚持使用Java8所以我们坚持下去 class Instrument implements Serializable, ComparableInstrument {private final String name;public Instrument(String name) {this.name name;}//...Java boilerplate}public class Transaction {private final Instrument instrument;public Transaction(Instrument instrument) {this.instrument instrument;}//...Java boilerplate} Instrument稍后将用作HashMap的键因此将来我们会主动实现ComparableInstrument 。 这是我们的领域现在的要求是 交易进入系统需要尽快处理无论如何 我们可以按任何顺序自由处理它们 …但是同一种工具的交易需要按照进来时的顺序完全相同地顺序进行。 最初的实现很简单–将所有传入的事务放入一个使用方的队列例如ArrayBlockingQueue 中。 这满足了最后的要求因为队列在所有事务中都保留了严格的FIFO顺序。 但是这种架构阻止了针对不同工具的不相关交易的并发处理从而浪费了令人信服的吞吐量提高。 毫无疑问这种实现尽管很简单却成为了瓶颈。 第一个想法是以某种方式分别按工具和流程工具拆分传入的交易。 我们提出了以下数据结构 priavate final ConcurrentMapInstrument, QueueTransaction queues new ConcurrentHashMapInstrument, QueueTransaction();public void accept(Transaction tx) {final Instrument instrument tx.getInstrument();if (queues.get(instrument) null) {queues.putIfAbsent(instrument, new LinkedBlockingQueueTransaction());}final QueueTransaction queue queues.get(instrument);queue.add(tx);
} 但是最坏的时刻还没有到来。 您如何确保最多一个线程一次处理每个队列 毕竟否则两个线程可以从一个队列一种仪器中提取项目并以相反的顺序处理它们这是不允许的。 最简单的情况是每个队列都有一个Thread -这无法扩展因为我们期望成千上万种不同的工具。 因此我们可以说N线程让每个线程处理队列的一个子集例如instrument.hashCode() % N告诉我们哪个线程负责处理给定的队列。 但是由于以下三个原因它仍然不够完美 一个线程必须“观察”许多队列很可能是忙等待并始终对其进行遍历。 或者队列可能以某种方式唤醒其父线程 在最坏的情况下所有工具都将具有冲突的哈希码仅针对一个线程-这实际上与我们最初的解决方案相同 这只是该死的复杂 漂亮的代码并不复杂 实现这种怪异是可能的但是困难且容易出错。 此外还有另一个非功能性的要求仪器来来往往随着时间的流逝成千上万的仪器。 一段时间后我们应删除代表最近未见过的仪器的地图条目。 否则我们会发生内存泄漏。 如果您能提出一些更简单的解决方案请告诉我。 同时让我告诉你我对同事的建议。 如您所料它是Akka –结果非常简单。 我们需要两种角色 Dispatcher和Processor 。 Dispatcher有一个实例并接收所有传入的事务。 它的责任是为每个Instrument找到或生成工作Processor角色并向其推送事务 public class Dispatcher extends UntypedActor {private final MapInstrument, ActorRef instrumentProcessors new HashMapInstrument, ActorRef();Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {dispatch(((Transaction) message));} else {unhandled(message);}}private void dispatch(Transaction tx) {final ActorRef processor findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}private ActorRef findOrCreateProcessorFor(Instrument instrument) {final ActorRef maybeActor instrumentProcessors.get(instrument);if (maybeActor ! null) {return maybeActor;} else {final ActorRef actorRef context().actorOf(Props.create(Processor.class), instrument.getName());instrumentProcessors.put(instrument, actorRef);return actorRef;}}
} 这很简单。 由于我们的Dispatcher actor实际上是单线程的因此不需要同步。 我们几乎没有收到Transaction 查找或创建Processor并进一步传递Transaction 。 这是Processor实现的样子 public class Processor extends UntypedActor {private final LoggingAdapter log Logging.getLogger(getContext().system(), this);Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {process(((Transaction) message));} else {unhandled(message);}}private void process(Transaction tx) {log.info(Processing {}, tx);}
} 而已 有趣的是我们的Akka实现几乎与我们第一个使用队列映射的想法相同。 毕竟参与者只是一个队列还有一个逻辑线程在该队列中处理项目。 区别在于Akka管理有限的线程池并可能在成千上万的参与者之间共享它。 而且由于每个工具都有其专用和“单线程”执行器因此可以保证每个工具的事务顺序处理。 还有一件事。 如前所述有大量的乐器我们不想让演员出现一段时间了。 假设如果Processor在一个小时内未收到任何交易则应停止并收集垃圾。 如果以后我们收到此类工具的新交易则可以随时重新创建它。 这是一个非常棘手的问题–我们必须确保如果处理器决定删除自身时如果事务到达我们将无法松开该事务。 Processor没有停止自身而是向其父Processor发出空闲时间过长的信号。 然后 Dispatcher将发送PoisonPill到它。 因为ProcessorIdle和Transaction消息都是顺序处理的所以没有交易发送到不再存在的参与者的风险。 每个setReceiveTimeout通过使用setReceiveTimeout安排超时来独立地管理其生命周期 public class Processor extends UntypedActor {Overridepublic void preStart() throws Exception {context().setReceiveTimeout(Duration.create(1, TimeUnit.HOURS));}Overridepublic void onReceive(Object message) throws Exception {//...if (message instanceof ReceiveTimeout) {log.debug(Idle for two long, shutting down);context().parent().tell(ProcessorIdle.INSTANCE, self());} else {unhandled(message);}}}enum ProcessorIdle {INSTANCE
} 显然当Processor在一个小时内未收到任何消息时它会向其父级 Dispatcher 轻轻发出信号。 但是演员仍然活着并且如果交易恰好在一小时后发生便可以处理。 Dispatcher作用是杀死给定的Processor并将其从地图中删除 public class Dispatcher extends UntypedActor {private final BiMapInstrument, ActorRef instrumentProcessors HashBiMap.create();public void onReceive(Object message) throws Exception {//...if (message ProcessorIdle.INSTANCE) {removeIdleProcessor(sender());sender().tell(PoisonPill.getInstance(), self());} else {unhandled(message);}}private void removeIdleProcessor(ActorRef idleProcessor) {instrumentProcessors.inverse().remove(idleProcessor);}private void dispatch(Transaction tx) {final ActorRef processor findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}//...} 不便之处。 instrumentProcessors过去是MapInstrument, ActorRef 。 事实证明这是不够的因为我们突然不得不按值删除此映射中的条目。 换句话说我们需要找到一个映射到给定ActorRef Processor 的键 Instrument 。 有多种处理方法例如空闲的Processor可以发送它处理的Instrumnt 但是我改用了BiMapK, V 。 之所以起作用是因为指定的Instrument和ActorRef都是唯一的每个乐器的actor。 使用BiMap我可以简单地对地图进行inverse() 从BiMapInstrument, ActorRef到BiMapActorRef, Instrument并将ActorRef视为键。 这个Akka例子只不过是“ helloworld ”。 但是与卷积解决方案相比我们必须使用并发队列锁和线程池进行编写这是完美的。 我的队友非常兴奋以至于最终他们决定将整个应用程序重写为Akka。 翻译自: https://www.javacodegeeks.com/2014/06/simplifying-trading-system-with-akka.html