山东省住房建设厅网站考试项目,wordpress首页调用文章图片不显示,网站公司网站搭建,开发公司采暖费补偿办法应用场景 目前常见的应用软件都有消息的延迟推送的影子#xff0c;应用也极为广泛#xff0c;例如#xff1a; 淘宝七天自动确认收货。在我们签收商品后#xff0c;物流系统会在七天后延时发送一个消息给支付系统#xff0c;通知支付系统将款打给商家#xff0c;这个过程…应用场景 目前常见的应用软件都有消息的延迟推送的影子应用也极为广泛例如 淘宝七天自动确认收货。在我们签收商品后物流系统会在七天后延时发送一个消息给支付系统通知支付系统将款打给商家这个过程持续七天就是使用了消息中间件的延迟推送功能。12306 购票支付确认页面。我们在选好票点击确定跳转的页面中往往都会有倒计时代表着 30 分钟内订单不确认的话将会自动取消订单。其实在下订单那一刻开始购票业务系统就会发送一个延时消息给订单系统延时30分钟告诉订单系统订单未完成如果我们在30分钟内完成了订单则可以通过逻辑代码判断来忽略掉收到的消息。在上面两种场景中如果我们使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量 使用 redis 给订单设置过期时间最后通过判断 redis 中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息的延迟推送性能较低因为我们知道 redis 都是存储于内存中我们遇到恶意下单或者刷单的将会给内存带来巨大压力。使用传统的数据库轮询来判断数据库表中订单的状态这无疑增加了IO次数性能极低。使用 jvm 原生的 DelayQueue 也是大量占用内存而且没有持久化策略系统宕机或者重启都会丢失订单信息。消息延迟推送的实现 首先我们创建交换机和消息队列 import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class MQConfig {public static final String LAZY_EXCHANGE Ex.LazyExchange;public static final String LAZY_QUEUE MQ.LazyQueue;public static final String LAZY_KEY lazy.#;Beanpublic TopicExchange lazyExchange(){//MapString, Object pros new HashMap();//设置交换机支持延迟消息推送//pros.put(x-delayed-message, topic);TopicExchange exchange new TopicExchange(LAZY_EXCHANGE, true, false, pros);exchange.setDelayed(true);return exchange;}Beanpublic Queue lazyQueue(){return new Queue(LAZY_QUEUE, true);}Beanpublic Binding lazyBinding(){return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);}
}复制代码我们在 Exchange 的声明中可以设置exchange.setDelayed(true)来开启延迟队列也可以设置为以下内容传入交换机声明的方法中因为第一种方式的底层就是通过这种方式来实现的。 //MapString, Object pros new HashMap();//设置交换机支持延迟消息推送//pros.put(x-delayed-message, topic);TopicExchange exchange new TopicExchange(LAZY_EXCHANGE, true, false, pros);复制代码发送消息时我们需要指定延迟推送的时间我们这里在发送消息的方法中传入参数 new MessagePostProcessor() 是为了获得 Message对象因为需要借助 Message对象的api 来设置延迟时间。 import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;Component
public class MQSender {Autowiredprivate RabbitTemplate rabbitTemplate;//confirmCallback returnCallback 代码省略请参照上一篇public void sendLazy(Object message){rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);//id 时间戳 全局唯一CorrelationData correlationData new CorrelationData(12345678909new Date());//发送消息时指定 header 延迟时间rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, lazy.boot, message,new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//message.getMessageProperties().setHeader(x-delay, 6000);message.getMessageProperties().setDelay(6000);return message;}}, correlationData);}
}复制代码我们可以观察 setDelay(Integer i)底层代码也是在 header 中设置 x-delay。等同于我们手动设置 header message.getMessageProperties().setHeader(x-delay, 6000); /*** Set the x-delay header.* param delay the delay.* since 1.6*/
public void setDelay(Integer delay) {if (delay null || delay 0) {this.headers.remove(X_DELAY);}else {this.headers.put(X_DELAY, delay);}
}复制代码消费端进行消费 import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;Component
public class MQReceiver {RabbitListener(queues MQ.LazyQueue)RabbitHandlerpublic void onLazyMessage(Message msg, Channel channel) throws IOException{long deliveryTag msg.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag, true);System.out.println(lazy receive new String(msg.getBody()));}复制代码测试结果 import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;SpringBootTest
RunWith(SpringRunner.class)
public class MQSenderTest {Autowiredprivate MQSender mqSender;Testpublic void sendLazy() throws Exception {String msg hello spring boot;mqSender.sendLazy(msg :);}
}复制代码果然在 6 秒后收到了消息 lazy receive hello spring boot: Java学习、面试文档、视频资源免费获取 转载于:https://juejin.im/post/5cf7c2cd51882537465f29bf