专业做外贸英文公司网站,建设工程造价管理总站网站,wordpress教育插件,网站改版协议http://activemq.apache.org/async-sends.html producer发送消息有同步和异步两种模式#xff0c;可以通过代码配置#xff1a; ((ActiveMQConnection)connection).setUseAsyncSend(true); producer默认是异步发送消息。在没有开启事务的情况下#xff0c;producer发送持久化…http://activemq.apache.org/async-sends.html producer发送消息有同步和异步两种模式可以通过代码配置  ((ActiveMQConnection)connection).setUseAsyncSend(true);  producer默认是异步发送消息。在没有开启事务的情况下producer发送持久化消息是同步的调用send会阻塞直到broker把消息保存到磁盘并返回确认。 消息设置为持久  MessageProducer producer  session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);  消息设置为非持久  MessageProducer producer  session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  producer发送消息的调用栈如下   // ActiveMQSession
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {// 省略其他代码// 消息的持久类型和和连接模式是或的所以只要connection配置为异步就走异步发送if (onCompletenull  sendTimeout  0  !msg.isResponseRequired()  !connection.isAlwaysSyncSend()  (!msg.isPersistent() || connection.isUseAsyncSend() || txid ! null)) {this.connection.asyncSendPacket(msg);if (producerWindow ! null) {int size  msg.getSize();producerWindow.increaseUsage(size);}} else { // 同步发送if (sendTimeout  0  onCompletenull) {this.connection.syncSendPacket(msg,sendTimeout);}else {this.connection.syncSendPacket(msg, onComplete);}}
}    producer发送同步消息的调用栈   // org.apache.activemq.transport.ResponseCorrelator
public Object request(Object command) throws IOException {FutureResponse response  asyncRequest(command, null);return response.getResult();
}public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {Command command  (Command) o;command.setCommandId(sequenceGenerator.getNextSequenceId());// 需要回复command.setResponseRequired(true);FutureResponse future  new FutureResponse(responseCallback);IOException priorError  null;synchronized (requestMap) {priorError  this.error;if (priorError  null) {requestMap.put(new Integer(command.getCommandId()), future);}}if (priorError ! null) {future.set(new ExceptionResponse(priorError));throw priorError;}next.oneway(command);return future;
}    producer发送异步消息的调用栈   //org.apache.activemq.transport.ResponseCorrelator
public void oneway(Object o) throws IOException {Command command  (Command)o;command.setCommandId(sequenceGenerator.getNextSequenceId());// 不需要回复command.setResponseRequired(false);next.oneway(command);
}    在不考虑事务的情况下 producer发送持久化消息是同步发送发送是阻塞的直到收到确认。同步发送肯定是有流量控制的。 producer默认是异步发送异步发送不会等待broker的确认 所以就需要考虑流量控制了  ActiveMQConnectionFactory.setProducerWindowSize(int producerWindowSize)  ProducerWindowSize的含义producer每发送一个消息统计一下发送的字节数当字节数达到ProducerWindowSize值时需要等待broker的确认才能继续发送。  转载于:https://www.cnblogs.com/allenwas3/p/8600638.html