繁体中文网站 怎么做,大连网站建设网站,禅城网站建设哪家好,网页设计怎么做版式目录 秒杀优化一#xff1a;异步秒杀1#xff1a;思路2#xff1a;实现 二#xff1a;redis实现消息队列1#xff1a;什么是消息队列2#xff1a;基于list结构实现消息队列3#xff1a;基于pubsub实现消息队列4#xff1a;基于stream实现消息队列5#xff1a;stream的… 目录 秒杀优化一异步秒杀1思路2实现 二redis实现消息队列1什么是消息队列2基于list结构实现消息队列3基于pubsub实现消息队列4基于stream实现消息队列5stream的消费者组模式 三基于redis的stream结构实现消息队列 秒杀优化
一异步秒杀
1思路 原本我们每一个请求都是串行执行从头到尾执行完了才算一个请求处理成功这样过于耗时我们看到执行的操作中查询优惠券查询订单减库存创建订单都是数据库操作而数据库的性能又不是很好我们可以将服务拆分成两部分将判断优惠券信息和校验一人一单的操作提取出来先执行判断优惠券和校验操作然后直接返回订单id我们在陆续操作数据库减库存和创建订单这样前端响应的会非常快并且我们可以将优惠券和一人一单的操作放在redis中去执行这样又能提高性能然后我们将优惠券信息用户信息订单信息先保存在队列里先返回给前端数据在慢慢的根据队列的信息去存入数据 我们之前说将查询和校验功能放在redis中实现那么用什么结构呢查询订单很简单只要查询相应的优惠券的库存是否大于0就行我们就可以是否字符串结构key存优惠券信息value存库存那么校验呢因为是一人一单所以我们可以使用set这样就能保证用户的唯一性 我们执行的具体步骤是先判断库存是否充足不充足直接返回充足判断是否有资格购买没有返回有就可以减库存然后将用户加入集合中在返回因为我们执行这些操作时要保证命令的原子性所以这些操作我们都使用lua脚本来编写 具体的执行流程就是先执行lua脚本如果结果不是0那么直接返回如果不是0那么就将信息存入阻塞队列然后返回订单id 2实现 1新增时添加到redis
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEYvoucher.getId(),voucher.getStock().toString());2lua脚本编写:
local stock tonumber(redis.call(get, seckill:stock: .. ARGV[1]))
if (stock0) thenreturn 1
end
local userIdARGV[2]
local isoktonumber(redis.call(sadd,seckill:order:..ARGV[1],userId))
if isok0 thenreturn 2
end
redis.call(incrby,seckill:stock:..ARGV[1],-1)
return 0然后就能改变之前的代码在redis中实现异步下单
Override
public Result seckilOrder(Long voucherId) throws InterruptedException {Long id UserHolder.getUser().getId();Long res (Long) stringRedisTemplate.execute(SECKIL_ORDER_LUA, Collections.emptyList(), voucherId.toString(), id.toString());if (res!0){return Result.fail(res1?库存不足:一人只能购买一单);}long orderID redisIDWork.nextId(order);return Result.ok(orderID);}初始化lua脚本文件
Resource
private RedissonClient redissonClient2;
public static final DefaultRedisScript SECKIL_ORDER_LUA;
static {//初始化SECKIL_ORDER_LUAnew DefaultRedisScript();//定位到lua脚本的位置SECKIL_ORDER_LUA.setLocation(new ClassPathResource(seckill.lua));//设置lua脚本的返回值SECKIL_ORDER_LUA.setResultType(Long.class);
}还剩一个阻塞队列没有实现
阻塞队列的功能就是异步的将订单信息存入数据库
阻塞队列可以使用blockdeque
BlockingQueueVoucherOrder blockingQueue new ArrayBlockingQueueVoucherOrder(1024*1024);在类上直接初始化
然后使用的时候就是将订单添加到阻塞队列让另一个线程去执行往数据库中添加阻塞队列中的订单信息
blockingQueue.add(voucherOrder);然后就要开出一个线程然后执行往数据库添加元素的任务了 //创建一个线程private ExecutorService SECKILL_ORDER_EXECUTORExecutors.newSingleThreadExecutor();//注解PostConstruct添加这个注解的方法就是在类初始化完成之后就会执行PostConstructprivate void init(){//提交任务SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandle());}//定义一个任务内部类实现Runnable然后需要实现run方法run方法中就是我们的任务private class VoucherOrderHandle implements Runnable {Overridepublic void run() {try {//从阻塞队列中取出订单VoucherOrder voucherOrder blockingQueue.take();//执行方法handleVoucherOrder(voucherOrder);} catch (InterruptedException e) {log.info(下单业务异常,e);}}}当类加载是就会一直提交任务只要阻塞队列里有订单就会将订单取出然后调用方法将订单存入数据库 调用的方法是尝试获取锁的方法而获取锁其实并不需要因为我们自己开出来的线程只有一个是单线程而且在lua脚本中已经对一人一单还有超卖问题进行处理这里只是为了更加保险 Transactionalpublic void handleVoucherOrder(VoucherOrder voucherOrder) throws InterruptedException {
// SimpleRedisLock simpleRedisLock new SimpleRedisLock(order id, stringRedisTemplate);Long userId voucherOrder.getUserId();RLock simpleRedisLock redissonClient2.getLock(lock:order userId);boolean trylock simpleRedisLock.tryLock(1L, TimeUnit.SECONDS);if (!trylock){log.info(获取锁失败);}try {orderService.createVoucherOrder(voucherOrder);} catch (IllegalStateException e) {throw new RuntimeException(e);}finally {simpleRedisLock.unlock();}}然后获取锁成功后就会调用方法执行数据库操作但是这个方法是带有事务的我们单独开出来的子线程无法使事务生效只能在方法的外部声明一个代理对象然后通过代理对象去调用方法使事务生效 Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {Integer count query().eq(user_id, voucherOrder.getUserId()).eq(voucher_id, voucherOrder.getVoucherId()).count();if (count 0) {log.info(一个用户只能下一单);}//进行更新库存减一boolean success seckillVoucherService.update().setSql(stock stock - 1) // set stock stock - 1.eq(voucher_id, voucherOrder.getVoucherId()).gt(stock, 0).update();// where id ? and stock 0//扣减失败返回错误信息if (!success) {log.info(扣减失败);}save(voucherOrder);}因为我们是开出来的子线程调用的方法所以不能从线程中获取值只能从我们传入的订单对象获取然后就是减库存和存入订单的操作了 总结
我们使用异步操作将下单和存入订单分开来执行大大提高了执行的销量在redis中完成超卖和一人一单的问题
然后使用阻塞队列开出一个子线程异步存入数据库下单
问题
我们的阻塞队列是在jvm中的jvm中内存是有上线的超过上限就会有异常还有就是我们的数据都是存放在内存中要是出现了一些事故会导致数据丢失
二redis实现消息队列
1什么是消息队列 消息队列由三个角色构成 1生产者发送消息到消息队列 2消息队列存储和管理消息队列也被称为消息代理 3消费者从消息队列中获取消息并处理 好的消息队列有这几个特点 1有独立的服务独立的内存 2可以做到数据的持久化 3能够发送消息给消费者并且确保消息处理完成 2基于list结构实现消息队列 使用brpop可以实现阻塞获取 3基于pubsub实现消息队列 4基于stream实现消息队列 stream发送消息的方式xadd key * msg key是指消息队列的名称* 是发送消息的名称由redis来生成后面的msg就是键值对我们要发宋的消息 xread是读取消息的命令count指定读取消息的数量block指定阻塞时间不指定就是不阻塞指定0就是无限等待sreams 是消息队列的名称可以是多个id是消息的id0是从0开始读$是从最新的开始读 但是有个问题就是指定$是获取最新的消息但是只是获取使用这个命令之后最新的消息而如果一次性发多条只会获取最后一个就会出现漏消息 5stream的消费者组模式 消费者组就是将消费者划分到一个组中监听一个消息队列 有这些好处 1消息分流消息发送到消费者组中消费者会处于竞争关系会争夺消息来处理这个发送多个消息就会实现分流就会由不同的消费者来处理加快了处理速度 2消息标识在读取消息后会记录最后一个被处理的消息这样就不会出现消息漏读的情况 3消息确认消息发出去会消息会处于pending状态会等待消息处理完毕这个时候会将消息存入pendinglist中当处理完后才会从pending中移除确保了消息的安全性保证消息不会丢失就算再消息发出去后服务宕机了也能知道该消息没有被处理这个功能的作用就是确保消息至少被消费一次 三基于redis的stream结构实现消息队列 首先再redis客户端中输入命令创建一个队列和接受这个队列消息的组 然后修改秒杀下单的lua脚本直接在redis中通过消息队列将消息发送给消费者 local orderIdARGV[3]
local stock tonumber(redis.call(get, seckill:stock: .. ARGV[1]))
if (stock0) thenreturn 1
end
local userIdARGV[2]
local isoktonumber(redis.call(sadd,seckill:order:..ARGV[1],userId))
if isok0 thenreturn 2
end
redis.call(incrby,seckill:stock:..ARGV[1],-1)
--将消息发送给stream.orders队列
redis.call(xadd,stream.orders,*,userId,userId,id,orderId,voucherId,ARGV[1])
return 0这里发送的是优惠券id用户id还有订单id正是我们存入数据库中所需要的参数
然后就可以去修改前面秒杀下单的逻辑不用去将消息放到阻塞队列我们直接从redis的队列中取出就行
Override
public Result seckilOrder(Long voucherId) throws InterruptedException {long orderId redisIDWork.nextId(order);Long userId UserHolder.getUser().getId();Long res (Long) stringRedisTemplate.execute(SECKIL_ORDER_LUA,Collections.emptyList(), voucherId.toString(),userId.toString(),String.valueOf(orderId));if (res ! 0) {return Result.fail(res 1 ? 库存不足 : 一人只能购买一单);}orderService (IVoucherOrderService) AopContext.currentProxy();return Result.ok(orderId);
}这里我们需要将订单id作为lua脚本的参数传入进去然后将订单信息存入阻塞队列的操作可以省略因为我们已经将订单信息存入了redis中的消息队列 然后这里我们需要单独开出一个线程去将队列中的消息存入数据库
private class VoucherOrderHandle implements Runnable {String ququeNamestream.orders;Overridepublic void run() {try {//从消息队列中取出订单while (true){//xreadgroup GROUP group consumer count(1) block(2000) streams key ListMapRecordString, Object, Object msg stringRedisTemplate.opsForStream().read(Consumer.from(g1, c1),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(ququeName, ReadOffset.lastConsumed()));//如果消息为空就继续等待接收if (msgnull||msg.isEmpty()){continue;}//因为每次读取一个消息所以我们获取第一个消息MapRecordString, Object, Object entries msg.get(0);//获取消息的值是一些我们传入的键值对MapObject, Object value entries.getValue();//将map转成voucherorder对象VoucherOrder voucherOrder BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);//执行方法handleVoucherOrder(voucherOrder);//确认消息已经处理stringRedisTemplate.opsForStream().acknowledge(ququeName,g1,entries.getId());}} catch (InterruptedException e) {log.info(下单业务异常,e);handleVoucherOrderError();}}我们要做的就是接受消息然后再将消息存入数据库 我们调用stream的方法作为消费者从队列中读取消息阻塞时间是2秒每次读取一个消息从下一个未消费的消息读取如果读取的消息为空那么就继续循环读取消息如果有消息就将消息取出然后将其转成对象map再将其转成对象然后再去做确认消息的处理如果不确认消息消息就会存在待处理的队列中如果出现的异常那么我们取出的消息可能没有进行确认没有确认的会存入待处理队列我们就要从队列里取出然后进行处理 出错只会执行的方法 private void handleVoucherOrderError() {try {//从消息队列中取出订单while (true){//xreadgroup GROUP group consumer count(1) streams key 0,表示从第一个未处理的消息开始读取ListMapRecordString, Object, Object msg stringRedisTemplate.opsForStream().read(Consumer.from(g1, c1),StreamReadOptions.empty().count(1), StreamOffset.create(ququeName, ReadOffset.from(0)));//如果为空就说明没有待处理的消息结束就行if (msgnull||msg.isEmpty()){break;}//因为每次读取一个消息所以我们获取第一个消息MapRecordString, Object, Object entries msg.get(0);//获取消息的值是一些我们传入的键值对MapObject, Object value entries.getValue();//将map转成voucherorder对象VoucherOrder voucherOrder BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false);//执行方法handleVoucherOrder(voucherOrder);}} catch (InterruptedException e) {log.info(下单业务异常,e);}}
}这里因为是再待处理中直接取出所以不用阻塞处理然后从待消费队列中第一个消息开始读如果为空那么就说明没有待处理的消息我们直接返回就行如果不为空我们再处理 这样使用redis中的消息队列就实现了1独立的服务足够的内存2有确认机制避免消息漏读3消息持久化
BeanUtil.fillBeanWithMap(value,new VoucherOrder(),false); //执行方法 handleVoucherOrder(voucherOrder); } } catch (InterruptedException e) { log.info(“下单业务异常”,e); } } } 这里因为是再待处理中直接取出所以不用阻塞处理然后从待消费队列中第一个消息开始读如果为空那么就说明没有待处理的消息我们直接返回就行如果不为空我们再处理这样使用redis中的消息队列就实现了1独立的服务足够的内存2有确认机制避免消息漏读3消息持久化