佛山网站建设公司哪家性价比高,唐山网站建设七彩科技,金融网站建设多少钱,前端做网站直播我们在上篇博客高并发处理 --- 超卖问题一人一单解决方案讲述了两种锁解决业务的使用方法#xff0c;但是这样不能让锁跨JVM也就是跨进程去使用#xff0c;只能适用在单体项目中如下图#xff1a;
为了解决这种场景#xff0c;我们就需要用一个锁监视器对全部集群进行监视…我们在上篇博客高并发处理 --- 超卖问题一人一单解决方案讲述了两种锁解决业务的使用方法但是这样不能让锁跨JVM也就是跨进程去使用只能适用在单体项目中如下图
为了解决这种场景我们就需要用一个锁监视器对全部集群进行监视 这就引出了分布式锁的概念。 什么是分布式锁 分布式锁是一种在分布式系统中用于控制多个实例如多个微服务节点对共享资源的并发访问的机制。分布式锁的核心目标是避免在并发情况下出现数据不一致的问题比如多个线程同时对同一数据进行操作时可能会导致数据竞争、脏数据或者业务逻辑错误。 总而言之分布式锁就是满足分布式系统或者集群模式下多进程可见并且互斥的锁。 为什么要使用分布式锁 在单机环境中使用常规的同步机制如 Java 中的 synchronized可以避免并发问题但在分布式系统中多个服务或应用实例可能同时操作共享的资源。传统的同步方法无法跨机器或进程工作因此需要引入分布式锁来确保在多台机器中只有一个节点可以操作共享资源其他节点必须等待。 分布式锁的核心实现多进程之间互斥。
分布式锁的实现方式有很多种主要使用分布式系统中能共享的工具和技术。常见的实现方式包括基于 数据库、Redis、Zookeeper 等的分布式锁。 那么接下来我们就对基于Redis实现分布式锁进行讲解 Redis 是目前使用最广泛的分布式锁实现方式之一因为 Redis 提供了高性能的存储和锁机制适合高并发的场景。 我们在SimpleRedisLock类去继承ILock接口以此来实现获取释放锁的过程
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;public class SimpleRedisLock implements ILock{private static final String KEY_PREFIX lock:;private String keyName;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String keyName, StringRedisTemplate stringRedisTemplate) {this.keyName keyName;this.stringRedisTemplate stringRedisTemplate;}Overridepublic boolean tryLock(long timeoutSec) {// 获取当前线程标识long threadId Thread.currentThread().getId();// 获取锁Boolean success stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX keyName, threadId , timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}Overridepublic void unlock() {// 释放锁stringRedisTemplate.delete(KEY_PREFIX keyName);}
}随后我们通过自定义的锁工具类进行使用 首先new一个工具类对象通过对象调用获取锁方法随后判断是否获得锁之后使用动态代理获得代理对象后使用代理对象调用事务方法。 业务内容请点击下面地址高并发处理 --- 超卖问题一人一单解决方案 Service
public class VoucherOrderServiceImpl extends ServiceImplVoucherOrderMapper, VoucherOrder implements IVoucherOrderService {Autowiredprivate ISeckillVoucherService seckillVoucherService;Autowiredprivate RedisIdWorker redisIdWorker;Resourceprivate StringRedisTemplate stringRedisTemplate;public Result seckillVoucherBySelf(Long voucherId) {// 业务逻辑...Long userId UserHolder.getUser().getId();
// synchronized (userId.toString().intern()){
// IVoucherOrderService proxy (IVoucherOrderService)AopContext.currentProxy(); // 获取当前类的代理对象 (需要引入aspectjweaver依赖并且在实现类加入EnableAspectJAutoProxy(exposeProxy true)以此来暴露代理对象)
// return proxy.createVoucherOrder(voucherId);
// }SimpleRedisLock lock new SimpleRedisLock(order userId, stringRedisTemplate);boolean isLock lock.tryLock(1200);// 判断是否获取锁成功if (!isLock) {// 获取锁失败返回错误或重试return Result.fail(不允许重复下单);}try {IVoucherOrderService proxy (IVoucherOrderService)AopContext.currentProxy(); // 获取当前类的代理对象 (需要引入aspectjweaver依赖并且在实现类加入EnableAspectJAutoProxy(exposeProxy true)以此来暴露代理对象)return proxy.createVoucherOrder(voucherId);} finally {lock.unlock();}}Transactionalpublic Result createVoucherOrder(Long voucherId) {// 业务逻辑... }
}但是这样会出现一种情况虽然我们的线程1还没有结束但是由于锁设定的时间到期而被释放销毁此时线程2就能够开始获取锁过一段时间后线程1结束就会要释放锁这个时候释放的锁就是线程2刚加上去的锁所以导致线程安全问题。 解决方案在获取锁的过程存入线程标识可用UUID表示以便于在释放锁去判断这个锁是不是自己的如果是则释放如果不是则不释放。总而言之设置线程标识的目的是判断释放锁是不是同一个线程获取的以此来避免类似线程2创建的锁被线程1释放引发的线程安全问题 那么我们就需要再释放锁的方法中加入判断逻辑
public class SimpleRedisLock implements ILock{private static final String KEY_PREFIX lock:; // 锁的前缀private static final String ID_PREFIX UUID.randomUUID().toString() -; // 线程标识的前缀private String keyName;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String keyName, StringRedisTemplate stringRedisTemplate) {this.keyName keyName;this.stringRedisTemplate stringRedisTemplate;}Overridepublic boolean tryLock(long timeoutSec) {// ...}Overridepublic void unlock() {// 获取线程标识String threadId ID_PREFIX Thread.currentThread().getId();// 获取锁中标识String id stringRedisTemplate.opsForValue().get(KEY_PREFIX keyName);// 判断标识是否一致if(threadId.equals(id)){// 如果一致则释放锁stringRedisTemplate.delete(KEY_PREFIX keyName);}}
}但是还是有问题请看下图 如果出现一种情况在线程1事务结束后释放锁这时按照我们上述逻辑会判断锁key与线程标识是否一致随后假如非常凑巧这个时候发生了线程阻塞并且锁还正好到期而释放那么这个时候线程2就会获取锁随后线程1阻塞结束后就会执行释放锁的代码那么这个时候刚好就会将线程2创建的锁释放掉。 所以这个时候我们就必须保证判断锁标识与释放锁是原子性的。 这个时候我们就可以使用 lua 脚本编写多条Redis命令从而确保多条命令执行时的原子性。
可以参考下面网站学习Lua 教程 | 菜鸟教程 什么是Lua脚本 LUA 是一种轻量级的脚本语言具有简单、易于嵌入等特点。在 Redis 中LUA 脚本用于实现服务器端的逻辑操作它可以在 Redis 服务器上执行并且具有 原子性即 Redis 在执行 LUA 脚本时会确保脚本中的所有操作都要么全部执行成功要么全部不执行。 Redis 支持通过 EVAL 命令执行 LUA 脚本使用这种方式可以实现一些复杂的操作避免了在客户端执行多个 Redis 命令时可能发生的网络延迟和操作不一致问题。 LUA 脚本如何确保原子性 在 Redis 中LUA 脚本是原子性执行的即在脚本执行过程中其他 Redis 命令不会被插入。这是因为 Redis 会一次性地加载、解析并执行整个脚本直到脚本执行完毕Redis 才会处理其他命令。因此在一个脚本中所有的操作都在 Redis 服务器端执行不会中断也不会被其他命令打断。 这就意味着在执行 LUA 脚本时Redis 会确保以下几点 脚本中的所有命令要么都执行要么都不执行例如如果一个脚本修改了多个 Redis 键值并且中途遇到了错误那么整个脚本的操作将会失败Redis 会保证在执行过程中没有部分操作成功部分操作失败的情况。不需要担心并发问题多个客户端同时执行相同的 LUA 脚本时它们会按顺序依次执行不会出现竞争条件。性能提升将多个操作通过一个 LUA 脚本提交给 Redis 执行避免了多次网络请求提高了性能。 LUA 脚本在 Redis 中的使用方式 Redis 通过 EVAL 或 EVALSHA 命令执行 LUA 脚本 EVAL 命令直接执行脚本。EVALSHA 命令执行已加载的脚本通过 SCRIPT LOAD 命令加载脚本使用脚本的 SHA1 校验和来标识脚本。 一个 LUA 脚本的示例 local current redis.call(GET, KEYS[1])
if current thenredis.call(SET, KEYS[1], current 1)
elseredis.call(SET, KEYS[1], 1)
end
return redis.call(GET, KEYS[1]) 假设我们有一个 Lua 脚本要将一个键的值更新为某个参数 return redis.call(set, KEYS[1], ARGV[1])在 Redis Lua 脚本中KEYS[] 和 ARGV[] 是两个特殊的数组用来传递键和值。它们的内容由 execute 方法的 keys 和 args 参数提供。 KEYS[] 数组用于传递 Redis 中的键通常在 Lua 脚本中用于表示数据库中需要操作的 Redis 键。你可以在 Lua 脚本中通过 KEYS[1]、KEYS[2] 等方式来引用这些键。ARGV[] 数组用于传递非键值的参数。这些参数通常用于脚本中的计算、条件判断等而不是 Redis 键。例如ARGV 可以用来传递数字、字符串等作为参数传给 Redis 命令。 那么我们将java代码提取改造成Lua脚本
// 获取线程标识
String threadId ID_PREFIX Thread.currentThread().getId();
// 获取锁中标识
String id stringRedisTemplate.opsForValue().get(KEY_PREFIX keyName);
// 判断标识是否一致
if(threadId.equals(id)){// 如果一致则释放锁stringRedisTemplate.delete(KEY_PREFIX keyName);
}
那么改造后的Lua脚本
-- 锁的key
local key KEYS[1]
-- 当前线程标识
local threadId ARGV[1]-- 获取锁中的线程标识 get key
local id redis.call(get, key)
-- 比较线程标识与锁中的标识是否一致
if(id threadId) then-- 释放锁 del keyreturn redis.call(del, key)
end
return 0
我们下载EmmyLua插件在/resource/unlock.lua文件中将上面代码粘贴进入即可。
随后我们在unlock方法内进行修改按住CtrlH在右面即可查看类型的全部继承类。
为了调用Lua脚本文件我们可以使用 RedisTemplate 的 execute 方法这样可以直接执行 Lua 脚本并传递相应的参数。RedisTemplate 的 execute 方法允许你执行自定义的 Redis 命令包括 Lua 脚本。 在Java底层execute()方法的源码如下 public T T execute(RedisScriptT script, ListK keys, Object... args) {return this.scriptExecutor.execute(script, keys, args);
} 1.RedisScriptT script RedisScript 是一个用于表示 Redis 脚本通常是 Lua 脚本的对象。T 是脚本执行结果的返回类型。你可以将 Lua 脚本作为 RedisScript 的参数传入执行该脚本后它会返回一个类型为 T 的结果。 2.ListK keyskeys 参数是 Lua 脚本的 KEYS[] 数组对应传入 Redis 的键。这些键是你在 Lua 脚本中使用的 KEYS[] 部分。Redis 脚本允许你使用多个键最多可以传递 16 个键这些键是传递给 Lua 脚本的。 3.Object... argsargs 是可变参数表示 Lua 脚本中的 ARGV[] 数组。在 Lua 脚本中ARGV[] 用于传递参数这些参数通常是值字符串、数字等而不是 Redis 键。args 可以是任意类型的参数。 public class SimpleRedisLock implements ILock{private static final String KEY_PREFIX lock:; // 锁的前缀private static final String ID_PREFIX UUID.randomUUID().toString() -; // 线程标识的前缀private String keyName;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String keyName, StringRedisTemplate stringRedisTemplate) {this.keyName keyName;this.stringRedisTemplate stringRedisTemplate;}// Lua代码private static final DefaultRedisScriptLong UNLOCK_SCRIPT; // 泛型内填入返回值类型static { // 静态属性要使用静态代码块进行初始化UNLOCK_SCRIPT new DefaultRedisScript();UNLOCK_SCRIPT.setResultType(Long.class);UNLOCK_SCRIPT.setLocation(new ClassPathResource(unlock.lua));}// 调用Lua脚本Overridepublic void unlock() {// 调用Lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX keyName),ID_PREFIX Thread.currentThread().getId()); }
}
但是基于我们自己创建的锁有以下几个方面的问题
不可重入同一个线程无法多次获取同一把锁。不可重试获取锁只尝试一次就返回false没有重试机制。超时释放锁超时释放虽然可以避免死锁但是如果是业务执行耗时较长也会导致锁的释放存在安全隐患问题。主从一致性如果Redis提供了主从集群主从同步存在延迟当主宕机时如果从并同步主中的锁数据则会出现锁实现。
那么这个时候我们就需要使用Redission来实现分布式锁 下面是官方文档 Redission --- 快速入门 Redisson 是一个基于 Redis 的客户端它提供了分布式锁的功能适用于解决分布式系统中资源的竞争问题。我们可以通过 Redisson 来实现分布式锁确保同一时刻只有一个实例能处理某个资源或任务。 首先引入依赖
dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.32.0/version
/dependency
之后配置Redission的客户端
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RedissonConfig {Beanpublic RedissonClient redissonClient() {// 配置 Redisson 客户端Config config new Config();// 添加redis的地址也可使用config.useClusterServers()添加集群地址config.useSingleServer().setAddress(redis://localhost:6379).setPassword(password); // 如果有密码return Redisson.create(config);}
}之后是使用Redission实现的分布式锁的案例
Redisson 提供了 RLock 对象允许在业务逻辑中加锁和释放锁。
例如假设有一个需要保证只允许一个实例执行的业务操作可以这样做
Service
public class BusinessService {Autowiredprivate RedissonClient redissonClient;public void performBusinessLogic() {// 获取锁锁名为 myLock设置等待时间和锁的持有时间RLock lock redissonClient.getLock(myLock);try {// 尝试获取锁最多等待 10 秒锁的持有时间为 30 秒// tryLock(long waitTime, long leaseTime, TimeUnit unit) if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {try {// 执行需要加锁的业务逻辑System.out.println(Executing business logic...);// 例如处理任务更新数据库等} finally {// 释放锁lock.unlock();}} else {System.out.println(Could not acquire the lock, try again later.);}} catch (InterruptedException e) {e.printStackTrace();}}
}那么上面代码使用Redission实现的分布式锁为
Service
public class VoucherOrderServiceImpl extends ServiceImplVoucherOrderMapper, VoucherOrder implements IVoucherOrderService {Autowiredprivate ISeckillVoucherService seckillVoucherService;Autowiredprivate RedisIdWorker redisIdWorker;Resourceprivate StringRedisTemplate stringRedisTemplate;Resourceprivate RedissonClient redissonClient;public Result seckillVoucherBySelf(Long voucherId) {// ...Long userId UserHolder.getUser().getId();
// SimpleRedisLock lock new SimpleRedisLock(order userId, stringRedisTemplate);
// boolean isLock lock.tryLock(1200);RLock lock redissonClient.getLock(lock:order: userId);boolean isLock lock.tryLock();// 判断是否获取锁成功if (!isLock) {// 获取锁失败返回错误或重试return Result.fail(不允许重复下单);}try {IVoucherOrderService proxy (IVoucherOrderService)AopContext.currentProxy(); // 获取当前类的代理对象 (需要引入aspectjweaver依赖并且在实现类加入EnableAspectJAutoProxy(exposeProxy true)以此来暴露代理对象)return proxy.createVoucherOrder(voucherId);} finally {lock.unlock();}}
}
下图讲述了Redission实现分布式锁的原理 那么如何实现可重入锁呢 我们先了解什么是可重入锁
Redis 的 可重入锁 是指同一个线程可以多次获得锁而不会导致死锁。Redisson 提供了内置的可重入锁RLock功能能够自动支持锁的可重入性。它的原理是通过记录每个线程对锁的持有次数来实现的每当一个线程重新获取锁时就会增加持有次数释放锁时会检查持有次数直到持有次数为 0 才会真正释放锁。
我们可以按照下面逻辑进行分析 当然我们这里的读写锁的操作同样要保证原子性那么这个时候就需要写Lua脚本
1判断锁是否存在①不存在获取锁设置有效期 ②存在且是自己的重入次数1设置有效期
local key KEYS[1]; -- 锁的key
local threadId ARGV[1]; -- 线程唯一标识
local releaseTime ARGV[2]; -- 锁的自动释放时间
-- 判断是否存在
if(redis.call(exist,key) 0) then-- 不存在,获取锁redis.call(hset,key,threadId,1);-- 设置有效期redis.call(expire,key,releaseTime);
end;
-- 锁已经存在判断threadId是否是自己
if(redis.call(hexists,key,threadId) 1) then-- 不存在获取锁重入次数1redis.call(hincrby,key,threadId,1);-- 设置有效期redis.call(expire,key,releaseTime);return 1; -- 返回结果
end;
return 0; -- 代码走到这里则说明获取锁的不是自己获取锁失败 2判断锁是否是自己的 不是就直接返回 nil 是自己锁就 -1 并判断重入次数是否为 0① 0则重置有效期 ② 0则直接删除锁
local key KEYS[1]; -- 锁的key
local threadId ARGV[1]; -- 线程唯一标识
local releaseTime ARGV[2]; -- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if(redis.call(hexists,key,threadId) 0) thenreturn nil; -- 如果已经不是自己则直接返回
end;
-- 是自己的锁则冲入次数-1
local count redis.call(hincrby,key,threadId,-1);
-- 判断重入次数是否已经0
if(count 0) then-- 不为0说明不能释放锁则重置有效期然后返回redis.call(expire,key,releaseTime);return nil;
else -- 0说明可以释放锁直接删除锁redis.call(del,key);return nil;
end; 在使用分布式锁时尤其是可重入锁务必注意以下几点 锁的自动释放设置 releaseTime 时间确保锁在指定时间内被释放防止线程因为异常或超时没有释放锁而导致死锁。及时释放锁在业务执行完后一定要确保调用 unlock() 方法释放锁。尤其在复杂业务场景下确保每个分支都有对应的释放逻辑避免丢锁。