婚纱摄影行业网站,营销网站建设报价,深圳网站设计山东济南兴田德润电话,网站导航设计虽然activemqjencks的jms轻量级解决方案已经很好地在psa中work了#xff0c;尤其spring的JmsTemplate使得代码更简单#xff0c;但是还是存在问题。问题来自暑期做psa的时候#xff0c;linke突然提出要求#xff0c;需要MDP返回些处理信息#xff0c;比如处理结果、异常jencks的jms轻量级解决方案已经很好地在psa中work了尤其spring的JmsTemplate使得代码更简单但是还是存在问题。问题来自暑期做psa的时候linke突然提出要求需要MDP返回些处理信息比如处理结果、异常以便后台监控和前台显示但是MDP没有返回值也没法返回异常当时我只能无奈。Lingo解决了这个问题它扩展了JMS允许异步的函数调用。在下载了lingo-1.0-M1后(虽然1.2.1发布了但是还没有文档支持所以暂且用1.0)参考其自带的example了解了它异步函数调用的代码思路。客户端拥有服务端的方法接口客户端将callback和相关参数代入接口进行异步调用而服务端的接口实现中利用callback来返回必要的信息。callback实现了EventListener提供了返回值和异常的接口另外涉及到两个方面首先callback本身需要轮询其次callback可以由实例池管理。第一个方面主要参考了lingo的example使用semaphore来进行轮询。第二个方面并没有利用实例池而是利用ThreadPoolExecutor来newFixedThreadPool管理不同的异步调用线程来完成对callback的调度。配置部分encodingUTF-8?/pBEAN//EN http://www.springframework.org/dtd/spring-beans.dtdclassorg.activemq.spring.BrokerFactoryBeanvalueclasspath:activemq.xml/classorg.activemq.ActiveMQConnectionFactorynamebrokerURLtcp://localhost:61616classorg.activemq.message.ActiveMQQueueindex0lingo.demoidinvocationFactoryclassorg.logicblaze.lingo.LingoRemoteInvocationFactoryclassorg.logicblaze.lingo.SimpleMetadataStrategyvaluetrue /classorg.logicblaze.lingo.jms.JmsProxyFactoryBeanvalueorg.openepo.jms.lingo.MailService/refjmsFactory /refdestination /nameremoteInvocationFactoryrefinvocationFactory /classorg.logicblaze.lingo.jms.JmsServiceExporterrefserverImpl /valueorg.openepo.jms.lingo.MailService/refjmsFactory /refdestination /refinvocationFactory /classorg.openepo.jms.lingo.MailServiceImpl/classorg.openepo.jms.lingo.AsyncManagersingletonfalse//ResultListener和ResultListenerImplcallback接口及实现。ResultListener.javapackage org.openepo.jms.lingo;import java.util.EventListener;public interface ResultListener extends EventListener {public void onResult(Object result);// lifecycle end methodspublic void stop();public void onException(Exception e);}ResultListenerImpl.javapackage org.openepo.jms.lingo;import java.util.ArrayList;import java.util.List;public class ResultListenerImpl implements ResultListener{private List results new ArrayList();private Object semaphore new Object();private boolean stopped;private Exception onException;private long waitTime 1000;public synchronized void onResult(Object result){results.add(result);synchronized (semaphore) {semaphore.notifyAll();}}// lifecycle end methodspublic void stop() {stopped true;}public void onException(Exception e){onException e;}public Exception getOnException(){returnonException;}public List getResults(){return results;}public boolean isStopped(){return stopped;}public void waitForAsyncResponses(intmessageCount) {System.out.println(Waiting for: messageCount responses to arrive);long start System.currentTimeMillis();for (int i 0; i 10; i) {try {if (hasReceivedResponses(messageCount)){break;}synchronized (semaphore) {semaphore.wait(waitTime);}}catch (InterruptedException e) {System.out.println(Caught: e);}}long end System.currentTimeMillis() - start;System.out.println(End of wait for end millis);}protected boolean hasReceivedResponse(){returnresults.isEmpty();}protected synchronized booleanhasReceivedResponses(int messageCount) {return results.size() messageCount;}public long getWaitTime(){return waitTime;}public void setWaitTime(long waitTime){this.waitTime waitTime;}}MailService和MailServiceImpl服务代码。MailService.javapackage org.openepo.jms.lingo;import java.util.List;public interface MailService {public voidasyncSendMail(List mails,ResultListener listener);}MailServiceImpl.javapackage org.openepo.jms.lingo;import java.util.List;public class MailServiceImpl implements MailService{public voidasyncSendMail(List mails,ResultListener listener) {try {for (Mail mail : mails) {sendMail(mail);Thread.sleep(2000);// 服务端时耗listener.onResult(mail.getContent() Sended Successfully.);}listener.stop();} catch(Exception e) {listener.onException(e);}}public void sendMail(Mail mail) throwsException {// 可以取消下面的注释来查看服务端将异常传给客户端//throw new Exception(Error occurson server side.);}}在服务端方法中,可以利用callback将处理结果,是否结束和异常信息返回客户端.Mail.javapackage org.openepo.jms.lingo;import java.io.Serializable;public class Mail implements Serializable{private static final long serialVersionUID 1L;private String content;public String getContent() {return content;}public void setContent(String content){this.content content;}public Mail(String content) {this.content content;}}AsyncManager各类异步调用的方法可以集中在这个类中利用线程池来统一控制callback实例。AsyncManager.javapackage org.openepo.jms.lingo;import java.util.List;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;public class AsyncManager {static private int threadSize 10; //callback池大小static private ThreadPoolExecutor executor (ThreadPoolExecutor) Executors.newFixedThreadPool(threadSize); //callback池public void setThreadSize(int threadSize){AsyncManager.threadSize threadSize;}private MailService mailClient;public void setMailClient(MailService mailClient){this.mailClient mailClient;}public AsyncManager() {}public void sendMails(finalList mails){// callback对象final ResultListenerImpl callBack newResultListenerImpl();callBack.setWaitTime(2000);// 异步调用mailClient.asyncSendMail(mails, callBack);// 调用线程池中的callbackexecutor.execute(new Runnable(){public void run() {// callBack 阻塞等待n个消息callBack.waitForAsyncResponses(mails.size());if (callBack.getOnException() ! null) {// 服务端异常System.out.println(ServerException: callBack.getOnException().getMessage());} else{// 得到服务端处理结果,打印结果for (Object result :callBack.getResults()){System.out.println(Result: result);}}}});}}上面匿名类的run方法中在callback的waitForAsyncResponses方法结束后可以检查callback中的信息进行异常处理等。下面是测试用例Testpublic void test() {List mails newArrayList();mails.add(new Mail(mail1));mails.add(new Mail(mail2));// 计算时间long startTime System.currentTimeMillis();try {// 异步调用asyncManager.sendMails(mails);// 没有阻塞System.out.println(Cost time (System.currentTimeMillis() -startTime) ms);//为查看结果,sleep主线程Thread.sleep(10000);} catch (InterruptedException e){e.printStackTrace();}}使用Lingo对JMS增强后通过callback使得异步调用的确比较OO了但是更重要的是服务端的信息可以通过callback返回给客户端客户端可以相应地进行处理。多出了许多代码自然复杂度有所增加但是lingo-1.2.1后增加了annotation减少了callback的代码量。