当前位置: 首页 > news >正文

国外特效网站wordpress大发

国外特效网站,wordpress大发,旅游网站平台建设的方案,wordpress会员等级查看文章Redisson 源码解析 —— 分布式锁实现过程 在分布式系统中#xff0c;分布式锁 是非常常见的需求#xff0c;用来保证多个节点之间的互斥操作。Redisson 是 Redis 的一个 Java 客户端#xff0c;它提供了对分布式锁的良好封装。本文将从源码角度剖析 Redisson 的分布式锁实现…Redisson 源码解析 —— 分布式锁实现过程 在分布式系统中分布式锁 是非常常见的需求用来保证多个节点之间的互斥操作。Redisson 是 Redis 的一个 Java 客户端它提供了对分布式锁的良好封装。本文将从源码角度剖析 Redisson 的分布式锁实现过程。一、分布式锁的基本需求 一个健壮的分布式锁需要满足以下条件 互斥性同一时间只能有一个客户端持有锁。死锁避免客户端宕机后锁不会永久被占用。可重入性同一线程可多次获取同一把锁。高可用性在 Redis 集群模式下仍能正常工作。超时释放设置持有锁时间时间超过锁释放避免死锁。锁时间续约看门狗机制避免业务未执行完毕锁释放导致并发问题。二、Redisson 分布式锁的核心实现类以及加锁方法 在源码中Redisson 提供了多种锁的实现最核心的是 RedissonLock —— 基于 Redis 的可重入锁实现RedissonReadWriteLock —— 读写锁RedissonFairLock —— 公平锁 我们主要关注 RedissonLock 的实现。RLock lock redissonClient.getLock(32r);lock.方法名()常用加锁方法lock()获取锁获取不到会一致阻塞直到获取。通过看门狗机制续期默认持有锁是30s每隔10s续期一次。lock(long l, TimeUnit timeUnit)获取锁获取不到会一致阻塞直到获取。持有锁时间是手动入参的timeUnit到期释放锁。tryLock(long waite, long l1, TimeUnit timeUnit) 获取锁失败后自旋等待 waite 秒获取不到返回false获取到持有锁时间是 l1单位 timeUnit。tryLock()尝试获取一次锁如果获取不到立即返回 false获取锁成功触发 看门狗续期机制和 lock() 一样。tryLock(long waitTime, TimeUnit unit)在 waitTime 时间窗口内不断尝试执行范围内获取锁失败返回false。获取成功启动看门狗机制。RLock lock redissonClient.getLock(32r);我们可以看到 redissonClient 调用这个方法时候客户端返回的是RedissonLock这个类所以对应的我们主要关注 RedissonLock 子类和父类RedissonBaseLock这里我主要分析 lock() 方法的调用其他锁的逻辑都是参考这个去完善的。 三、加锁流程解析 1. 调用入口 当我们执行 RLock lock redisson.getLock(myLock); lock.lock();进入RedissonLock#lock方法可以看到调用lock方法其实都是调用的另外一个lock(long leaseTime, TimeUnit unit, boolean interruptibly) 方法。 对应真正调用的lock()方法 /*** 获取分布式锁的核心方法* param leaseTime 锁的租约时间* param unit 时间单位* param interruptibly 是否允许中断* throws InterruptedException 当线程被中断时抛出*/ private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {// 获取当前线程ID用于标识锁的持有者long threadId Thread.currentThread().getId();// 尝试获取锁返回剩余的TTL生存时间// 如果返回null表示获取锁成功否则返回锁的剩余过期时间Long ttl this.tryAcquire(-1L, leaseTime, unit, threadId);// 如果ttl不为null说明锁获取失败需要等待if (ttl ! null) {// 订阅锁释放的通知返回一个Future对象CompletableFutureRedissonLockEntry future this.subscribe(threadId);// 设置订阅操作的超时时间this.pubSub.timeout(future);// 根据是否允许中断来获取订阅结果RedissonLockEntry entry;if (interruptibly) {// 允许中断的方式获取结果entry (RedissonLockEntry)this.commandExecutor.getInterrupted(future);} else {// 不允许中断的方式获取结果entry (RedissonLockEntry)this.commandExecutor.get(future);}try {// 自旋等待锁释放while(true) {// 再次尝试获取锁ttl this.tryAcquire(-1L, leaseTime, unit, threadId);// 如果获取锁成功ttl为null则退出循环if (ttl null) {return;}// 如果ttl大于等于0说明锁还存在需要等待指定的时间if (ttl 0L) {try {// 使用信号量等待指定的ttl时间entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {// 如果允许中断直接抛出异常if (interruptibly) {throw e;}// 如果不允许中断继续等待entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// 如果ttl小于0表示需要无限等待if (interruptibly) {// 允许中断的无限等待entry.getLatch().acquire();} else {// 不允许中断的无限等待entry.getLatch().acquireUninterruptibly();}}}} finally {// 无论成功与否都要取消订阅释放资源this.unsubscribe(entry, threadId);}} }这时候我们只需要重点关注对应的this.tryAcquire(-1L, leaseTime, unit, threadId);这个方法。 源码图如下对应的Java代码解释 /*** 异步尝试获取锁* param waitTime 等待时间* param leaseTime 锁的租约时间* param unit 时间单位* param threadId 线程ID* return 返回锁的剩余TTL时间null表示获取锁成功*/ private T RFutureLong tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// 声明TTL剩余时间的Future对象RFutureLong ttlRemainingFuture;// 判断是否指定了租约时间if (leaseTime 0L) {// 使用指定的租约时间尝试获取锁ttlRemainingFuture this.LongtryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 使用默认的内部锁租约时间尝试获取锁ttlRemainingFuture this.LongtryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}// 对获取锁的结果进行后续处理CompletionStageLong f ttlRemainingFuture.thenApply((ttlRemaining) - {// 如果ttlRemaining为null说明成功获取到锁if (ttlRemaining null) {// 判断是否指定了租约时间if (leaseTime 0L) {// 将指定的租约时间转换为毫秒并存储到内部锁租约时间this.internalLockLeaseTime unit.toMillis(leaseTime);} else {// 如果没有指定租约时间启动锁的自动续期机制// 防止锁因过期而被误释放this.scheduleExpirationRenewal(threadId);}}// 返回TTL剩余时间null表示获取锁成功非null表示需要等待的时间return ttlRemaining;});// 将CompletionStage包装成RFuture并返回return new CompletableFutureWrapper(f); }这里最重要的是调用对应的tryAcquire里面的tryLockInnerAsync方法方法详解如下T RFutureT tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT command) {return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command,if (redis.call(exists, KEYS[1]) 0) then redis.call(hincrby, KEYS[1], ARGV[2], 1);redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) 1) thenredis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);, Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});}这个tryLockInnerAsync方法主要是执行对应的脚本然后返回剩余的时间如果获取锁成功返回 nil 获取锁失败会返回 持有锁的锁过期时间。 核心 Lua 脚本详解如下 Redisson 并不是简单地 SETNX而是使用 Lua 脚本 来保证操作的原子性。 加锁脚本大致逻辑如下 if (redis.call(exists, KEYS[1]) 0) then-- 锁不存在设置锁并绑定到线程redis.call(hset, KEYS[1], ARGV[2], 1);redis.call(pexpire, KEYS[1], ARGV[1]);return nil; end;-- 锁已存在判断是否是当前线程重入 if (redis.call(hexists, KEYS[1], ARGV[2]) 1) thenredis.call(hincrby, KEYS[1], ARGV[2], 1);redis.call(pexpire, KEYS[1], ARGV[1]);return nil; end;return redis.call(pttl, KEYS[1]);解释 KEYS[1]: 锁的 key (如 myLock)ARGV[1]: 锁的过期时间默认 30sARGV[2]: 当前线程标识由 UUID 线程 ID 组成 执行流程 如果锁不存在设置 hashkey 线程标识value 1。如果锁存在且是自己线程则递增重入次数。否则返回锁的剩余过期时间。 问题延伸 Redis不是单线程吗高并发线程下不是线程安全吗为什么还需要使用Lua脚本保证原子性 想想为什么使用lua脚本你可以想象一下高并发场景下Redis执行命令是单线程的Redis只能保证对应的单条命令是原子性的不能保证多条命令的原子性假设线程A执行redis.call(exists, KEYS[1]) 0结束后线程B抢到执行权然后线程B也执行redis.call(exists, KEYS[1]) 0然后后续大家都会进行对应的锁设置导致线程A上锁可能会被覆盖不过可以用hsetnx解决但是后续可能判断还是会有并发问题。使用 lua 脚本可以将多条命令整合成类似一条命令redis执行从而保证原子性 WatchDog 自动续期机制 Redisson 的一大亮点是 锁续期机制 当线程获取锁后会启动一个 看门狗定时任务默认每隔 lockWatchdogTimeout / 3 秒续期一次默认 30s → 10s。如果业务逻辑执行很久不用担心锁被提前释放。如果线程宕机定时任务不再执行锁会在超时后自动释放。 判断对应的leasetime有没有指定然后执行对应的续期或不续期的方法 源码关键点在scheduleExpirationRenewal() 方法。 关键代码CompletionStageLong f ttlRemainingFuture.thenApply((ttlRemaining) - {if (ttlRemaining null) {if (leaseTime 0L) {this.internalLockLeaseTime unit.toMillis(leaseTime);} else {this.scheduleExpirationRenewal(threadId);}}return ttlRemaining;});根据对应的没指定leaseTime 然后执行对应的RedissonBaseLock#scheduleExpirationRenewal对应的方法逻辑如下/*** 调度锁的过期时间续期任务* 为指定线程启动自动续期机制防止锁因过期而被误释放* param threadId 需要续期的线程ID*/ protected void scheduleExpirationRenewal(long threadId) {// 创建新的过期时间管理条目ExpirationEntry entry new ExpirationEntry();// 尝试将新条目放入续期映射表中如果已存在则返回旧条目// 使用putIfAbsent确保原子性操作避免并发问题ExpirationEntry oldEntry (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);// 判断是否已经存在续期任务if (oldEntry ! null) {// 如果已存在续期任务只需将当前线程ID添加到现有条目中// 这种情况发生在同一个锁被多个线程可重入锁或同一线程多次获取时oldEntry.addThreadId(threadId);} else {// 如果是首次为这个锁创建续期任务// 将当前线程ID添加到新创建的条目中entry.addThreadId(threadId);try {// 启动实际的续期任务// 这会创建定时任务定期延长锁的过期时间this.renewExpiration();} finally {// 检查当前线程是否被中断if (Thread.currentThread().isInterrupted()) {// 如果线程被中断取消刚刚启动的续期任务// 防止资源泄漏和无效的续期操作this.cancelExpirationRenewal(threadId);}}} }这个通过一个创建一个ExpirationEntry 然后通过EXPIRATION_RENEWAL_MAP判断是否存在如果条目不存在就启动对应的自动续期机制任务 renewExpiration() RedissonBaseLock#renewExpiration()方法如下 /*** 启动锁的自动续期机制* 创建定时任务定期延长锁的过期时间防止锁因超时而被释放*/ private void renewExpiration() {// 从续期映射表中获取当前锁的过期时间管理条目ExpirationEntry ee (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());// 如果条目存在说明需要为这个锁设置续期任务if (ee ! null) {// 创建定时任务在锁租约时间的1/3处执行续期操作// 选择1/3时间点是为了在锁过期前有足够的时间进行续期Timeout task this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {// 定时任务执行时重新获取续期条目防止在延迟期间被移除ExpirationEntry ent (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());// 双重检查确保续期条目仍然存在if (ent ! null) {// 获取需要续期的第一个线程ID// 对于可重入锁可能有多个线程ID取第一个进行续期Long threadId ent.getFirstThreadId();// 如果线程ID有效执行续期操作if (threadId ! null) {// 异步执行锁的续期操作CompletionStageBoolean future RedissonBaseLock.this.renewExpirationAsync(threadId);// 处理续期结果future.whenComplete((res, e) - {// 如果续期过程中发生异常if (e ! null) {// 记录错误日志RedissonBaseLock.log.error(Cant update lock RedissonBaseLock.this.getRawName() expiration, e);// 从续期映射表中移除条目停止续期RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());} else {// 续期操作成功完成if (res) {// 如果续期成功返回true递归调用继续下一轮续期// 这样就形成了持续的自动续期循环RedissonBaseLock.this.renewExpiration();} else {// 如果续期失败返回false说明锁已经不存在或不属于当前线程// 取消续期任务清理资源RedissonBaseLock.this.cancelExpirationRenewal((Long)null);}}});}}}}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); // 在租约时间的1/3处执行续期// 将定时任务保存到条目中用于后续的取消操作ee.setTimeout(task);} }最后完美结束对应的获取锁的过程返回一个对应的时间值 ttl如果返回的是null代表加锁成功否则是加锁失败此时会进行订阅持有锁者this.subscribe(threadId)如果释放锁会通知这个获取锁失败的线程会将这个线程唤醒。 四、解锁流程解析 解锁的流程 解锁时同样使用 Lua 脚本保证原子性 if (redis.call(hexists, KEYS[1], ARGV[2]) 0) thenreturn nil; end;local counter redis.call(hincrby, KEYS[1], ARGV[2], -1);if (counter 0) thenreturn 0; elseredis.call(del, KEYS[1]);return 1; end;解释 检查当前线程是否持有锁。如果是可重入锁计数 -1。如果计数为 0则删除锁。六、源码设计亮点 Lua 脚本保证原子性避免分布式并发问题。可重入性设计使用 hash 结构存储线程标识和重入次数。锁超时释放设计避免死锁问题。看门狗机制保证长时间任务也能安全持有锁。异步化设计Redisson 提供 lockAsync() 等方法方便高并发场景。七、总结 Redisson 的分布式锁实现基于 Redis Lua 脚本解决了互斥、可重入和死锁问题。看门狗续期机制 是 Redisson 的亮点保证了业务执行时间不可预测的情况下的安全性。在生产环境中Redisson 的分布式锁相较于 SETNX EXPIRE 的手写版本更加健壮和可靠。
http://www.pierceye.com/news/86795/

相关文章:

  • 网站的首页面设计swf格式网站链接怎样做
  • 国外做的不错的网站网站正在建设代码
  • 巩义网站建设费用wordpress 插件管理
  • 网站目录链接怎么做西安 美院 网站建设
  • 网站建设做的人多吗wordpress 免费 最好
  • ASP个人网站的建设自己可做以做网站吗
  • 网站频道规划菏泽住房与城乡建设官网
  • 网站seo视频国字型网页布局
  • 海外服务器租用的价格网络优化软件
  • 网站卖东西怎么做的网站自己服务器
  • 做运营那些无版权图片网站保定网站免费制作
  • 重庆移动网站建设网站loading动画
  • 做影视网站难吗百度h5在线制作免费
  • 山西建站管理系统开发厦门网站建设 首选猴子网络
  • 自己制作的网站智能行业网站模板
  • 网站做用户登录宣传册设计与制作素材
  • 红桥天津网站建设互联网培训班
  • 做招聘网站怎么赚钱营销软件哪个好
  • 台州网站建设咨询百度第三季度财报2022
  • 南京科技网站设计有特点网站开发品牌有哪些
  • 南宁网站设计可以找我成都手机网站开发
  • 怎么建设展示网站 需要维护费吗人力资源公司代缴社保合法吗
  • 如何做企业网站界面自己做网站怎么弄
  • 你那个没封的网站怎么做啊最近一周的新闻大事10条
  • 做推广的网站那个好最有效的15个营销方法
  • 成都微信网站建设报价单网站运营与管理试卷
  • 房产网站系统源码南宁百度推广排名优化
  • 重庆展厅设计制作济南网站优化推广方案
  • 河南省通信管理局网站备案电话青岛免费网站建站模板
  • 预约网站怎么做深圳做网站乐云seo费用优惠