网络运维和网站开发,白云做网站要多少钱,可信网站认证收费,加强网站内容建设一、延迟队列延迟队列#xff0c;也就是一定时间之后将消息体放入队列#xff0c;然后消费者才能正常消费。比如1分钟之后发送短信#xff0c;发送邮件#xff0c;检测数据状态等。二、Redisson Delayed Queue如果你项目中使用了redisson#xff0c;那么恭喜你#xff0c…一、延迟队列延迟队列也就是一定时间之后将消息体放入队列然后消费者才能正常消费。比如1分钟之后发送短信发送邮件检测数据状态等。二、Redisson Delayed Queue如果你项目中使用了redisson那么恭喜你使用延迟队列将非常的简单。基于Redis的Redisson分布式延迟队列Delayed Queue结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。RQueueString distinationQueue ...
RDelayedQueueString delayedQueue getDelayedQueue(distinationQueue);
// 10秒钟以后将消息发送到指定队列
delayedQueue.offer(msg1, 10, TimeUnit.SECONDS);
// 一分钟以后将消息发送到指定队列
delayedQueue.offer(msg2, 1, TimeUnit.MINUTES);在该对象不再需要的情况下应该主动销毁。仅在相关的Redisson对象也需要关闭的时候可以不用主动销毁。三、Java DelayQueueDelayQueue它本质上是一个队列而这个队列里也只有存放Delayed的子类才有意义。延迟队列demopublic class DelayTask implements Delayed {private long startDate;public DelayTask(Long delayMillions) {this.startDate System.currentTimeMillis() delayMillions;}Overridepublic int compareTo(Delayed o) {Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));}Overridepublic long getDelay(TimeUnit unit) {return this.startDate - System.currentTimeMillis();}public static void main(String[] args) throws Exception {BlockingQueueDelayTask queue new DelayQueue();DelayTask delayTask new DelayTask(1000 * 5L);queue.put(delayTask);while (queue.size()0){queue.take();}}
}延迟队列消费原理源码中出现了三次await字眼第一次是当队列为空时等待第二次等待是因为发现有任务没有到执行时间并且有准备执行的线程(leader)那不好意思还得接续等待直到下一个可执行的任务。第三次是真正延时的地方了available.awaitNanos(delay),此时也没有别的线程要执行也就是我将要执行等待剩下的延迟时间即可。延迟队列生产原理为保证消费者正常消费如果优先队列头元素和当前放入元素相等则说明当前元素消费的优先级高重置准备消费的线程(leader)为null唤醒消费者线程重新执行take方法逻辑。四、手写一个Redis延迟队列Redis延迟队列设计延迟消息体设计延迟消息体Message实现了Delayed接口这样Java DelayQueue就知道什么时候取出消息体。Redis延迟队列实现RedisDelayQueue构造函数依赖redis操作缓存服务对象和目标队列名称redis key。offer方法传入member具体消息delay延迟时间timeUnit时间单位然后封装成延迟消息体Message对象放入Java DelayQueue中。run方法是一个循环体不断的从Java DelayQueue对象中获取消息体然后放入redis对应的目标队列里。延迟队列测试demo控制台打印效果五、思考这种方案实现比较简单使用的时候一定要谨慎应用于延迟小消息量不大的场景是没问题的毕竟Java DelayQueue是占用内存的。另外也可以考虑利用Redis的sorted set 结构实现延迟队列使用TimeStamp作为score比如你的任务是要延迟5分钟那么就在当前时间上加5分钟作为 score 轮询任务每秒只轮询 score 小于等于 当前时间的 key即可如果任务支持有误差那么当没有扫描到有效数据的时候可以休眠对应时间再继续轮询。