天晴创艺网站建设百度小程序,手机高端网站开发,做网站mfdos,wordpress还原旧版本先基于单机模式#xff0c;基于Jedis手工造轮子实现自己的分布式锁。
首先看两个命令#xff1a; Redis 分布式锁机制#xff0c;主要借助 setnx 和 expire 两个命令完成。 setnx命令: setnx 是 set if not exists 的简写。将 key 的值设为 value #xff0c;当且仅当…先基于单机模式基于Jedis手工造轮子实现自己的分布式锁。
首先看两个命令 Redis 分布式锁机制主要借助 setnx 和 expire 两个命令完成。 setnx命令: setnx 是 set if not exists 的简写。将 key 的值设为 value 当且仅当 key 不存在 ; 若给定的 key 已经存在则 setnx 不做任何动作。 下面为客户端使用示例 127.0.0.1:6379 set lock unlock
OK
127.0.0.1:6379 setnx lock unlock
(integer) 0
127.0.0.1:6379 setnx lock lock
(integer) 0
127.0.0.1:6379 expire命令: expire 命令为 key 设置生存时间当 key 过期时 ( 生存时间为 0 ) 它会被自动删除。 其格式为 expire key seconds 下面为客户端使用示例 127.0.0.1:6379 expire lock 10
(integer) 1
127.0.0.1:6379 ttl lock
8 基于Jedis API的分布式锁的总体流程 通过 Redis 的 setnx 、 expire 命令可以实现简单的锁机制 key不存在时创建并设置value和过期时间返回值为1成功获取到锁 如key存在时直接返回0抢锁失败 持有锁的线程释放锁时手动删除key 或者过期时间到key自动删除锁释放。 线程调用setnx方法成功返回1认为加锁成功其他线程要等到当前线程业务操作完成释放锁后才能再次调用setnx加锁成功。 以上简单redis分布式锁的问题 如果出现了这么一个问题如果 setnx 是成功的但是 expire 设置失败一旦出现了释放锁失败或者没有手工释放那么这个锁永远被占用其他线程永远也抢不到锁。 所以 , 需要保障 setnx 和 expire 两个操作的原子性要么全部执行要么全部不执行二者不能分开。 解决的办法有两种
使用set的命令时同时设置过期时间不再单独使用 expire 命令使用lua脚本将加锁的命令放在lua脚本中原子性的执行。
简单加锁使用set的命令时同时设置过期时间
使用set的命令时同时设置过期时间的示例如下
127.0.0.1:6379 set unlock 234 EX 100 NX
(nil)
127.0.0.1:6379
127.0.0.1:6379 set test 111 EX 100 NX
OK 这样就完美的解决了分布式锁的原子性 set 命令的完整格式 set key value [EX seconds] [PX milliseconds] [NX|XX] EX seconds设置失效时长单位秒
PX milliseconds设置失效时长单位毫秒
NXkey不存在时设置value成功返回OK失败返回(nil)
XXkey存在时设置value成功返回OK失败返回(nil)
加锁的简单代码实现
Slf4j
Data
AllArgsConstructor
public class JedisCommandLock {private RedisTemplate redisTemplate;private static final String LOCK_SUCCESS OK;private static final String SET_IF_NOT_EXIST NX;private static final String SET_WITH_EXPIRE_TIME PX;/*** 尝试获取分布式锁* param jedis Redis客户端* param lockKey 锁* param requestId 请求标识* param expireTime 超期时间* return 是否获取成功*/public static boolean tryGetDistributedLock(Jedis jedis, String lockKey,String requestId, int expireTime) {String result jedis.set(lockKey, requestId, SET_IF_NOT_EXIST,SET_WITH_EXPIRE_TIME, expireTime);if (LOCK_SUCCESS.equals(result)) {return true;}return false;}
} 可以看到我们加锁用到了 Jedis 的 set Api jedis.set(String key, String value, String nxxx, String expx, int time) 这个 set() 方法一共有五个形参 第一个为key我们使用key来当锁因为key是唯一的。 第二个为value我们传的是requestId很多童鞋可能不明白有key作为锁不就够了吗为什么还要用到value原因就是我们在上面讲到可靠性时分布式锁要满足第四个条件解铃还须系铃人通过给value赋值为requestId我们就知道这把锁是哪个请求加的了在解锁的时候就可以有依据。 requestId可以使用 UUID.randomUUID().toString() 方法生成。 第三个为nxxx这个参数我们填的是NX意思是SET IF NOT EXIST即当key不存在时我们进行set操作若key已经存在则不做任何操作 第四个为expx这个参数我们传的是PX意思是我们要给这个key加一个过期的设置具体时间由第五个参数决定。 第五个为time与第四个参数相呼应代表key的过期时间。 总的来说执行上面的set()方法就只会导致两种结果 1. 当前没有锁 key 不存在那么就进行加锁操作并对锁设置个有效期同时 value 表示加锁的客户端。 2. 已有锁存在不做任何操作。 心细的童鞋就会发现了我们的加锁代码满足前面描述的四个条件中的三个。 首先set()加入了NX参数可以保证如果已有key存在则函数不会调用成功也就是只有一个客户端能持有锁满足互斥性。 其次由于我们对锁设置了过期时间即使锁的持有者后续发生崩溃而没有解锁锁也会因为到了过期时间而自动解锁即key被删除不会被永远占用而发生死锁。 最后因为我们将value赋值为requestId代表加锁的客户端请求标识那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。 由于我们只考虑Redis单机部署的场景所以容错性我们暂不考虑。 基于Jedis 的API实现简单解锁代码
Slf4j
Data
AllArgsConstructor
public class JedisCommandLock {private static final Long RELEASE_SUCCESS 1L;/*** 释放分布式锁* param jedis Redis客户端* param lockKey 锁* param requestId 请求标识* return 是否释放成功*/public static boolean releaseDistributedLock(Jedis jedis, String lockKey,String requestId) {String script if redis.call(get, KEYS[1]) ARGV[1] then returnredis.call(del, KEYS[1]) else return 0 end;Object result jedis.eval(script, Collections.singletonList(lockKey),Collections.singletonList(requestId));if (RELEASE_SUCCESS.equals(result)) {return true;}return false;}
} 那么这段 Lua 代码的功能是什么呢 其实很简单首先获取锁对应的 value 值检查是否与 requestId 相等如果相等则删除锁解锁。 第一行代码我们写了一个简单的 Lua 脚本代码。 第二行代码我们将 Lua 代码传到 jedis.eval() 方法里并使参数 KEYS[1] 赋值为 lockKey ARGV[1] 赋值为requestId。 eval() 方法是将 Lua 代码交给 Redis 服务端执行。 那么为什么要使用 Lua 语言来实现呢 因为要确保上述操作是原子性的。那么为什么执行 eval() 方法可以确保原子性源于 Redis 的特性 . 简单来说就是在 eval 命令执行 Lua 代码的时候 Lua 代码将被当成一个命令去执行并且直到 eval 命令执行完成 Redis 才会执行其他命 错误示例1 最常见的解锁代码就是直接使用 jedis.del() 方法删除锁这种不先判断锁的拥有者而直接解锁的方式会导致任何客户端都可以随时进行解锁即使这把锁不是它的。 public static void wrongReleaseLock1(Jedis jedis, String lockKey) {jedis.del(lockKey);
} 错误示例2 这种解锁代码乍一看也是没问题甚至我之前也差点这样实现与正确姿势差不多唯一区别的是分成两条命令去执行代码如下 public static void wrongReleaseLock2(Jedis jedis, String lockKey, StringrequestId) {// 判断加锁与解锁是不是同一个客户端if (requestId.equals(jedis.get(lockKey))) {// 若在此时这把锁突然不是这个客户端的则会误解锁jedis.del(lockKey);}
} 基于Lua脚本实现分布式锁 lua脚本的好处 为什么要使用Lua语言来实现呢 因为要确保上述操作是原子性的。那么为什么执行 eval()方法可以确保原子性源于Redis的特性简单来说就是在 eval 命令执行 Lua 代码的时候Lua代码将被当成一个命令去执行并且直到 eval 命令执行完成Redis才会执行其他命令。大部分的开源框架如 redission中的分布式锁组件都是用纯lua脚本实现的。Lua 脚本是高并发、高性能的必备脚本语言。 基于纯Lua脚本的分布式锁的执行流程 加锁和删除锁的操作使用纯 lua 进行封装保障其执行时候的原子性。 基于纯Lua脚本实现分布式锁的执行流程大致如下 加锁的Lua脚本 lock.lua
--- -1 failed
--- 1 success
---
local key KEYS[1]
local requestId KEYS[2]
local ttl tonumber(KEYS[3])
local result redis.call(setnx, key, requestId)
if result 1 then--PEXPIRE:以毫秒的形式指定过期时间redis.call(pexpire, key, ttl)
elseresult -1;-- 如果value相同则认为是同一个线程的请求则认为重入锁local value redis.call(get, key)if (value requestId) thenresult 1;redis.call(pexpire, key, ttl)end
end
-- 如果获取锁成功则返回 1
return result
解锁的Lua脚本 unlock.lua
--- -1 failed
--- 1 success
-- unlock key
local key KEYS[1]
local requestId KEYS[2]
local value redis.call(get, key)
if value requestId thenredis.call(del, key);return 1;
end
return -1
在Java中调用lua脚本完成加锁操作
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;Slf4j
Data
AllArgsConstructor
public class JedisLock implements Lock {private RedisTemplate redisTemplate;RedisScriptLong lockScript null;RedisScriptLong unLockScript null;public static final int DEFAULT_TIMEOUT 2000;public static final Long LOCKED Long.valueOf(1);public static final Long UNLOCKED Long.valueOf(1);public static final Long WAIT_GAT Long.valueOf(200);public static final int EXPIRE 2000;String key;String lockValue; // lockValue 锁的value ,代表线程的uuid/*** 默认为2000ms*/long expire 2000L;public JedisLock(String lockKey, String lockValue) {this.key lockKey;this.lockValue lockValue;}private volatile boolean isLocked false;private Thread thread;/*** 获取一个分布式锁 , 超时则返回失败** return 获锁成功 - true | 获锁失败 - false*/Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {//本地可重入if (isLocked thread Thread.currentThread()) {return true;}expire unit ! null ? unit.toMillis(time) : DEFAULT_TIMEOUT;long startMillis System.currentTimeMillis();Long millisToWait expire;boolean localLocked false;int turn 1;while (!localLocked) {localLocked this.lockInner(expire);if (!localLocked) {millisToWait millisToWait - (System.currentTimeMillis() -startMillis);startMillis System.currentTimeMillis();if (millisToWait 0L) {/*** 还没有超时*/ThreadUtil.sleepMilliSeconds(WAIT_GAT);log.info(睡眠一下重新开始turn:{},剩余时间{}, turn,millisToWait);} else {log.info(抢锁超时);return false;}} else {isLocked true;localLocked true;}}return isLocked;}/*** 有返回值的抢夺锁** param millisToWait*/public boolean lockInner(Long millisToWait) {if (null key) {return false;}try {ListString redisKeys new ArrayList();redisKeys.add(key);redisKeys.add(lockValue);redisKeys.add(String.valueOf(millisToWait));Long res (Long) redisTemplate.execute(lockScript, redisKeys);return res ! null res.equals(LOCKED);} catch (Exception e) {e.printStackTrace();throw BusinessException.builder().errMsg(抢锁失败).build();}}
}在Java中调用lua脚本完成解锁操作
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.List;Slf4j
Data
AllArgsConstructor
public class JedisLock implements Lock {private RedisTemplate redisTemplate;RedisScriptLong lockScript null;RedisScriptLong unLockScript null;//释放锁Overridepublic void unlock() {if (key null || requestId null) {return;}try {ListString redisKeys new ArrayList();redisKeys.add(key);redisKeys.add(requestId);Long res (Long) redisTemplate.execute(unLockScript, redisKeys);} catch (Exception e) {e.printStackTrace();throw BusinessException.builder().errMsg(释放锁失败).build();}}
} 编写RedisLockService用于管理JedisLock 编写个分布式锁服务用于加载 lua 脚本创建分布式锁代码如下 import com.crazymaker.springcloud.common.util.IOUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;Slf4j
Data
public class RedisLockService {private RedisTemplate redisTemplate;static String lockLua script/lock.lua;static String unLockLua script/unlock.lua;static RedisScriptLong lockScript null;static RedisScriptLong unLockScript null;{String script IOUtil.loadJarFile(RedisLockService.class.getClassLoader(), lockLua);// String script FileUtil.readString(lockLua, Charset.forName(UTF-8));if (StringUtils.isEmpty(script)) {log.error(lua load failed: lockLua);}lockScript new DefaultRedisScript(script, Long.class);// script FileUtil.readString(unLockLua, Charset.forName(UTF-8));script IOUtil.loadJarFile(RedisLockService.class.getClassLoader(), unLockLua);if (StringUtils.isEmpty(script)) {log.error(lua load failed: unLockLua);}unLockScript new DefaultRedisScript(script, Long.class);}public RedisLockService(RedisTemplate redisTemplate) {this.redisTemplate redisTemplate;}public Lock getLock(String lockKey, String lockValue) {JedisLock lock new JedisLock(lockKey, lockValue);lock.setRedisTemplate(redisTemplate);lock.setLockScript(lockScript);lock.setUnLockScript(unLockScript);return lock;}
} 测试用例 import lombok.extern.slf4j.Slf4j;import javax.annotation.Resource;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;Slf4j
RunWith(SpringRunner.class)
SpringBootTest(classes {DemoCloudApplication.class})
// 指定启动类
public class RedisLockTest {ResourceRedisLockService redisLockService;private ExecutorService pool Executors.newFixedThreadPool(10);Testpublic void testLock() {int threads 10;final int[] count {0};CountDownLatch countDownLatch new CountDownLatch(threads);long start System.currentTimeMillis();for (int i 0; i threads; i) {pool.submit(() -{String lockValue UUID.randomUUID().toString();try {Lock lock redisLockService.getLock(test:lock:1,lockValue);boolean locked lock.tryLock(10, TimeUnit.SECONDS);if (locked) {for (int j 0; j 1000; j) {count[0];}log.info(count count[0]);lock.unlock();} else {System.out.println(抢锁失败);}} catch (Exception e) {e.printStackTrace();}countDownLatch.countDown();});}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(10个线程每个累加1000为 count[0]);//输出统计结果float time System.currentTimeMillis() - start;System.out.println(运行的时长为(ms) time);System.out.println(每一次执行的时长为(ms) time / count[0]);}
} 执行结果 2021-05-04 23:02:11.900 INFO 22120 --- [pool-1-thread-7]
c.c.springcloud.lock.RedisLockTest LN:50 count 6000
2021-05-04 23:02:11.901 INFO 22120 --- [pool-1-thread-1]
c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下重新开始turn:3,剩余时间
9585
2021-05-04 23:02:11.902 INFO 22120 --- [pool-1-thread-1]
c.c.springcloud.lock.RedisLockTest LN:50 count 7000
2021-05-04 23:02:12.100 INFO 22120 --- [pool-1-thread-4]
c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下重新开始turn:3,剩余时间
9586
2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-5]
c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下重新开始turn:3,剩余时间
9585
2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-8]
c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下重新开始turn:3,剩余时间
9585
2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-4]
c.c.springcloud.lock.RedisLockTest LN:50 count 8000
2021-05-04 23:02:12.102 INFO 22120 --- [pool-1-thread-8]
c.c.springcloud.lock.RedisLockTest LN:50 count 9000
2021-05-04 23:02:12.304 INFO 22120 --- [pool-1-thread-5]
c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下重新开始turn:4,剩余时间
9383
2021-05-04 23:02:12.307 INFO 22120 --- [pool-1-thread-5]
c.c.springcloud.lock.RedisLockTest LN:50 count 10000
10个线程每个累加1000为 10000
运行的时长为(ms)827.0
每一次执行的时长为(ms)0.0827 STW导致的锁过期问题 下面有一个简单的使用锁的例子在 10 秒内占着锁 //写数据到文件
public void writeData(filename,data){boolean lockedlock.tryLock(10,TimeUnit.SECONDS);if(!locked){throwFailed to acquire lock;}try{//将数据写到文件var filestorage.readFile(filename);var updatedupdateContents(file,data);storage.writeFile(filename,updated);}finally{lock.unlock();}
} 问题是如果在写文件过程中发生了 fullGC 并且其时间跨度较长 超过了 10 秒 那么分布式锁就自动释放了。 在此过程中 client2 抢到锁写了文件。 client1 的 fullGC 完成后也继续写文件 注意此时 client1 的并没有占用锁此时写入会导致文件数 据错乱发生线程安全问题这就是STW导致的锁过期问题。 STW导致的锁过期问题,大概的解决方案 1 模拟CAS乐观锁的方式增加版本号如下图中的token
2watch dog自动延期机制 客户端 1 加锁的锁 key 默认生存时间才 30 秒如果超过了 30 秒客户端 1 还想一直持有这把锁怎么办 简单只要客户端 1 一旦加锁成功就会启动一个 watch dog 看门狗 他是一个后台线程会每隔 10 秒检查一下如果客户端 1 还持有锁 key 那么就会不断的延长锁 key 的生存时间。 redission采用的就是这种方案 此方案不会入侵业务代码 注意 单机版的 watch dog 并不能解决 STW 的过期问题 需要分布式版本的 watch dog 独立的看门狗服务。 锁删除之后 取消看门狗服务的对应的 key 记录 当然这就使得系统变得复杂 还要保证看门狗服务的高并发、高可用、数据一致性的问题。