青岛网站建设找润商,怎么搭建小程序平台,国外wordpress电影模板,wap上网队列的三个需求
消息队列在存取消息时#xff0c;必须要满足三个需求#xff0c;分别是消息保序、处理重复的消息和保证消息可靠性
需求一#xff1a;消息保序。消费者仍然需要按照生产者发送消息的顺序来处理消息#xff0c;避免后发送的消息被先处理了
需求二#xf…队列的三个需求
消息队列在存取消息时必须要满足三个需求分别是消息保序、处理重复的消息和保证消息可靠性
需求一消息保序。消费者仍然需要按照生产者发送消息的顺序来处理消息避免后发送的消息被先处理了
需求二重复消息处理(幂等性)。消费者从消息队列读取消息时有时会因为网络堵塞而出现消息重传的情况。此时消费者可能会收到多条重复的消息
需求三消息可靠性保证。当消费者重启后可以重新读取消息再次进行处理否则就会出现消息漏处理的问题了
Redis如何实现队列
基于 List 的消息队列解决方案
List 本身就是按先进先出的顺序对数据进行存取的。通过LPUSH/RPOP,进行生产消费。同时为了保证CPU不会一直消耗在RPOP上可以改用BRPOP
BRPOP 命令也称为阻塞式读取客户端在没有读到队列数据时自动阻塞直到有新的数据写入队列再开始读取新数据
以上解决了消息保序的问题
为了解决消费者程序本身能对重复消息进行判断可以给每个消息队列的每个消息提供全局唯一ID。消费者呈现要把处理过的ID记录下来同时当有消息收到时需要对这个消息的ID进行检查。这样就可以保证幂等性
最后就是消息的可靠性。通过BRPOPLPUSH 命令把进行消息消费时会启动消息备份这样就算消息者程序消息但没能正常处理等它重启后就可以从备份 List 中重新读取消息并进行处理了
基于 Streams 的消息队列解决方案
Streams 是 Redis 专门为消息队列设计的数据类型它提供了丰富的消息队列操作命令
XADD插入消息保证有序可以自动生成全局唯一 IDXREAD用于读取消息可以按 ID 读取数据XREADGROUP按消费组形式读取消息XPENDING 和 XACKXPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息而 XACK 命令用于向消息队列确认消息处理已完成XADD 命令可以往消息队列中插入新消息消息的格式是键 - 值对形式。对于插入的每一条消息Streams 可以自动为其生成一个全局唯一的 ID
XREAD 在读取消息时可以指定一个消息 ID并从这个消息 ID 的下一条消息开始进行读取
消费者也可以在调用 XRAED 时设定 block 配置项实现类似于 BRPOP 的阻塞读取操作当消息队列中没有消息时一旦设置了 block 配置项XREAD 就会阻塞阻塞的时长可以在 block 配置项进行设置 为了保证消费者在发生故障或宕机再次重启后仍然可以读取未处理完的消息Streams 会自动使用内部队列也称为 PENDING List留存消费组里每个消费者读取的消息直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”
总结
ListStreams消息保序LPUSH/RPOPXADD/XREAD阻塞读取BRPOPXREAD block重复消息读取生产者自行实现唯一IDStreams自动生成消息可靠性BRPOPLPUSHPENDDING List 自动留存消息XACK确认消息适用场景数据量小数据量大(小于qps 10w/s)