深圳手机网站建设多少钱,南京百度,网络营销推广方式案例,网站建设下坡路文章目录 ZooKeeper 实战(五) Curator实现分布式锁1.简介1.1.分布式锁概念1.2.Curator 分布式锁的实现方式1.3.分布式锁接口 2.准备工作3.分布式可重入锁3.1.锁对象3.2.非重入式抢占锁测试代码输出日志 3.3.重入式抢占锁测试代码输出日志 4.分布式非可重入锁4.1.锁对象4.2.重入… 文章目录 ZooKeeper 实战(五) Curator实现分布式锁1.简介1.1.分布式锁概念1.2.Curator 分布式锁的实现方式1.3.分布式锁接口 2.准备工作3.分布式可重入锁3.1.锁对象3.2.非重入式抢占锁测试代码输出日志 3.3.重入式抢占锁测试代码输出日志 4.分布式非可重入锁4.1.锁对象4.2.重入式抢占锁测试代码输出日志 5.分布式可重入读写锁5.1.锁对象5.2.读锁和写锁的竞争测试代码输出日志 6.共享信号量6.1.锁对象6.2.信号量抢占测试代码输出日志 7.多共享锁7.1.锁对象7.2.获取共享锁测试代码输出日志 ZooKeeper 实战(五) Curator实现分布式锁
1.简介
1.1.分布式锁概念
分布式锁是一种用于实现分布式系统中的同步机制的技术。它允许在多个进程或线程之间实现互斥访问共享资源以避免并发访问时的数据不一致问题。分布式锁的主要目的是在分布式系统中提供类似于全局锁的效果以确保在任何时刻只有一个进程或线程可以访问特定的资源。
zookeeper基于临时有序节点实现分布式锁。每个客户端对某个临界资源加锁时在zookeeper上的与该临界资源对应的指定节点的目录下生成一个唯一的临时有序节点。 判断是否获取锁的方式很简单只需要判断临时有序节点中序号最小的那个是否由自身创建。 当释放锁的时候只需将这个临时有序节点删除即可。同时其可以避免服务宕机导致的锁无法释放而产生的死锁问题。
1.2.Curator 分布式锁的实现方式
curator-recipes中实现的锁有五种
Shared Reentrant Lock 分布式可重入锁Shared Lock 分布式非可重入锁Shared Reentrant Read Write Lock 可重入读写锁Shared Semaphore 共享信号量Multi Shared Lock 多共享锁
1.3.分布式锁接口
org.apache.curator.framework.recipes.locks.InterProcessLock 该接口为分布式锁的行为规范定义了获取锁和释放锁方法。
public interface InterProcessLock
{/*** 阻塞获取锁如果未获取到锁则一直阻塞*/public void acquire() throws Exception;/*** 非阻塞获取锁** param time 超时时间* param unit 时间单位* return 如果在超时时间内获取到锁则返回true否则返回false* throws Exception ZK errors, connection interruptions*/public boolean acquire(long time, TimeUnit unit) throws Exception;/*** 释放锁。每次获取锁之后必须调用该方法释放锁acquire和release成对出现*/public void release() throws Exception;/*** 判断该线程是否获取到锁* return true/false*/boolean isAcquiredInThisProcess();
}2.准备工作
首先需要读者掌握 Ideal 同一项目启动多个Service的能力详细教程可参考博主另一篇博客博主创建了两个启动实例一个端口号为8888另一个9981此处随意只要不与其他服务的端口号冲突即可。 另外在启动项目之前请根据先前所写的教程启动zookeeper的单机服务器 参考ZooKeeper 实战(一) 超详细的单机与集群部署教程并创建一个存储数据节点路径/ahao/data数据内容40。可参照如下操作由于博主已经创建过所以重新设置了一遍 3.分布式可重入锁
可重入锁是一种自我保护的锁允许同一进程或线程多次获得相同的锁而不会造成死锁。
3.1.锁对象
类路径org.apache.curator.framework.recipes.locks.InterProcessMutex
公开构造方法如下 /*** param client 当前客户端实例* param path 锁节点路径*/public InterProcessMutex(CuratorFramework client, String path)/*** param client 当前客户端实例* param path 锁节点路径* param driver 锁驱动实例工具类只要提供两个方法#createsTheLock 创建锁节点#getsTheLock 获取当前锁*/public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)3.2.非重入式抢占锁
由于非重入式抢占锁的场景对于5种分布式锁实现方式均适用并且测试场景均一样所以本节介绍完非重入式抢占锁的测试场景之后对应的另4种实现方式的该测试场景将不在赘述。
测试场景有两台服务实例 C1C2。C1和C2个有两个线程线程池调度并发执行对同一共享资源/ahao/data中的数据进行减1 操作。
测试代码
/*** Name: CuratorDemoApplication* Description:* Author: ahao* Date: 2024/1/10 3:29 PM*/
Slf4j
SpringBootApplication
public class CuratorDemoApplication implements ApplicationRunner{Autowiredprivate CuratorFramework client;public static void main(String[] args) {SpringApplication.run(CuratorDemoApplication.class,args);}// 存储数据节点的路径final String dataPath /ahao/data;Overridepublic void run(ApplicationArguments args) throws Exception {log.info(。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。);// 锁节点路径String lockPath /ahao/lock;TimeUnit.SECONDS.sleep(3);// 创建可重入锁InterProcessMutex mutex new InterProcessMutex(client,lockPath);// 创建一个线程池ExecutorService executorService Executors.newFixedThreadPool(2);// 保证并发执行当前时间的秒针部分为0则结束循环int seconds LocalDateTime.now().getSecond();while (seconds ! 0){seconds LocalDateTime.now().getSecond();}// 提交两个任务for (int i 0; i 2; i) {executorService.submit(() - {while (share(mutex)) {try {// 睡眠0.5秒TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}});}}/*** 用来模拟临界资源的方法*/public boolean share(final InterProcessMutex mutex){boolean b true;try {// 获取锁if (mutex.acquire(3, TimeUnit.SECONDS)) {// 获取数据节点中的值byte[] bytes client.getData().forPath(dataPath);String s new String(bytes);Integer integer Integer.valueOf(s);if(integer 0){// 设置新值client.setData().forPath(dataPath,String.valueOf(integer-1).getBytes(StandardCharsets.UTF_8));log.info(当前值{},integer);b true;}else {log.info(任务已完成。。。。);b false;}}} catch (Exception e) {throw new RuntimeException(e);} finally {try {// 释放锁mutex.release();return b;} catch (Exception e) {log.error(释放锁失败);throw new RuntimeException(e);}}}
}输出日志
从下方日志可见没有出现重复的数值保证了分布式系统中实现互斥访问共享资源避免并发访问时的数据不一致问题。
实例c1
2024-01-15 INFO 65348 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值40
2024-01-15 INFO 65348 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值39
2024-01-15 INFO 65348 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值36
2024-01-15 INFO 65348 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值35
2024-01-15 INFO 65348 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值32
2024-01-15 INFO 65348 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值31
2024-01-15 INFO 65348 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值28
2024-01-15 INFO 65348 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值27
2024-01-15 INFO 65348 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值24
2024-01-15 INFO 65348 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值23
2024-01-15 INFO 65348 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值20
2024-01-15 INFO 65348 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值19
2024-01-15 INFO 65348 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值16
2024-01-15 INFO 65348 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值15
2024-01-15 INFO 65348 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值12
2024-01-15 INFO 65348 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值11
2024-01-15 INFO 65348 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值8
2024-01-15 INFO 65348 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值7
2024-01-15 INFO 65348 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值4
2024-01-15 INFO 65348 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值3
2024-01-15 INFO 65348 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 任务已完成。。。。
2024-01-15 INFO 65348 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 任务已完成。。。。
实例c2
2024-01-15 INFO 65387 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值38
2024-01-15 INFO 65387 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值37
2024-01-15 INFO 65387 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值34
2024-01-15 INFO 65387 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值33
2024-01-15 INFO 65387 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值30
2024-01-15 INFO 65387 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值29
2024-01-15 INFO 65387 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值26
2024-01-15 INFO 65387 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值25
2024-01-15 INFO 65387 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值22
2024-01-15 INFO 65387 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值21
2024-01-15 INFO 65387 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值18
2024-01-15 INFO 65387 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值17
2024-01-15 INFO 65387 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值14
2024-01-15 INFO 65387 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值13
2024-01-15 INFO 65387 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值10
2024-01-15 INFO 65387 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值9
2024-01-15 INFO 65387 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值6
2024-01-15 INFO 65387 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值5
2024-01-15 INFO 65387 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 当前值2
2024-01-15 INFO 65387 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 当前值1
2024-01-15 INFO 65387 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 任务已完成。。。。
2024-01-15 INFO 65387 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 任务已完成。。。。
3.3.重入式抢占锁
测试场景有两台服务实例 C1C2。C1和C2个有两个线程线程池调度并发执行对同一共享资源/ahao/data中的数据进行连续两次的减1 操作并且第一次操作成功后才能进行第二次操作。
测试代码
具体的减1操作抽离成方法以便接下来的测试。 Overridepublic void run(ApplicationArguments args) throws Exception {log.info(。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。);String lockPath /ahao/lock;TimeUnit.SECONDS.sleep(3);// 创建可重入锁InterProcessMutex mutex new InterProcessMutex(client,lockPath);// 创建一个线程池ExecutorService executorService Executors.newFixedThreadPool(2);// 保证并发执行当前时间的秒针部分为0则结束循环int seconds LocalDateTime.now().getSecond();while (seconds ! 0){seconds LocalDateTime.now().getSecond();}// 提交两个任务for (int i 0; i 2; i) {executorService.submit(() - {// 循环执行while (share(mutex,1)) {try {// 睡眠0.5秒TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}});}}/*** 用来模拟临界资源的方法*/public boolean share(final InterProcessLock mutex, int n){boolean b true;try {// 获取锁if (mutex.acquire(3, TimeUnit.SECONDS)) {// 减1操作b doLock(n);// 最多执行三次if (b n 3){b share(mutex,n1);}}} catch (Exception e) {throw new RuntimeException(e);} finally {try {mutex.release();return b;} catch (Exception e) {log.error(释放锁失败);throw new RuntimeException(e);}}}// 减1操作private boolean doLock(int n) throws Exception {// 获取数据节点中的值byte[] bytes client.getData().forPath(dataPath);String s new String(bytes);Integer integer Integer.valueOf(s);// 判断是否为0if(integer 0){// 设置新值client.setData().forPath(dataPath,String.valueOf(integer-1).getBytes(StandardCharsets.UTF_8));log.info(第{}次加锁当前值{},n,integer);return true;}else {log.info(任务已完成。。。。);return false;}}输出日志
由此可见通过日志可以发现每次获取到锁的线程都是连续执行三次并且重复获取锁并释放。
实例c1
2024-01-16 INFO 91120 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值34
2024-01-16 INFO 91120 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第2次加锁当前值33
2024-01-16 INFO 91120 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第3次加锁当前值32
2024-01-16 INFO 91120 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值31
2024-01-16 INFO 91120 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第2次加锁当前值30
2024-01-16 INFO 91120 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第3次加锁当前值29
2024-01-16 INFO 91120 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值22
2024-01-16 INFO 91120 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第2次加锁当前值21
2024-01-16 INFO 91120 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第3次加锁当前值20
2024-01-16 INFO 91120 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值19
2024-01-16 INFO 91120 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第2次加锁当前值18
2024-01-16 INFO 91120 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第3次加锁当前值17
2024-01-16 INFO 91120 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值10
2024-01-16 INFO 91120 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第2次加锁当前值9
2024-01-16 INFO 91120 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第3次加锁当前值8
2024-01-16 INFO 91120 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值7
2024-01-16 INFO 91120 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第2次加锁当前值6
2024-01-16 INFO 91120 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第3次加锁当前值5
2024-01-16 INFO 91120 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 任务已完成。。。。
2024-01-16 INFO 91120 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 任务已完成。。。。实例c2
2024-01-16 INFO 91115 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值40
2024-01-16 INFO 91115 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第2次加锁当前值39
2024-01-16 INFO 91115 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第3次加锁当前值38
2024-01-16 INFO 91115 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值37
2024-01-16 INFO 91115 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第2次加锁当前值36
2024-01-16 INFO 91115 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第3次加锁当前值35
2024-01-16 INFO 91115 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值28
2024-01-16 INFO 91115 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第2次加锁当前值27
2024-01-16 INFO 91115 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第3次加锁当前值26
2024-01-16 INFO 91115 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值25
2024-01-16 INFO 91115 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第2次加锁当前值24
2024-01-16 INFO 91115 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第3次加锁当前值23
2024-01-16 INFO 91115 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值16
2024-01-16 INFO 91115 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第2次加锁当前值15
2024-01-16 INFO 91115 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第3次加锁当前值14
2024-01-16 INFO 91115 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值13
2024-01-16 INFO 91115 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第2次加锁当前值12
2024-01-16 INFO 91115 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第3次加锁当前值11
2024-01-16 INFO 91115 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值4
2024-01-16 INFO 91115 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第2次加锁当前值3
2024-01-16 INFO 91115 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 第3次加锁当前值2
2024-01-16 INFO 91115 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 第1次加锁当前值1
2024-01-16 INFO 91115 --- [pool-3-thread-2] com.ahao.demo.CuratorDemoApplication : 任务已完成。。。。
2024-01-16 INFO 91115 --- [pool-3-thread-1] com.ahao.demo.CuratorDemoApplication : 任务已完成。。。。4.分布式非可重入锁
与org.apache.curator.framework.recipes.locks.InterProcessMutex 类似只是不可重入。
4.1.锁对象
类路径InterProcessSemaphoreMutex
公开构造方法如下 /*** param client 当前客户端实例* param path 锁节点路径*/ public InterProcessSemaphoreMutex(CuratorFramework client, String path);4.2.重入式抢占锁
测试场景有一台服务实例 C1。C1中的main线程进行连续两次的加锁操作。
测试代码
/*** Name: CuratorDemoApplication* Description:* Author: ahao* Date: 2024/1/10 3:29 PM*/
Slf4j
SpringBootApplication
public class CuratorDemoApplication implements ApplicationRunner{Autowiredprivate CuratorFramework client;public static void main(String[] args) {SpringApplication.run(CuratorDemoApplication.class,args);}String dataPath /ahao/data;Overridepublic void run(ApplicationArguments args) throws Exception {log.info(。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。);String lockPath /ahao/lock;TimeUnit.SECONDS.sleep(3);// 创建非可重入锁InterProcessSemaphoreMutex mutex new InterProcessSemaphoreMutex(client, lockPath);// 调用测试方法share(mutex);}/*** 用来模拟临界资源的方法*/public void share(final InterProcessLock mutex){try {// 获取锁if (mutex.acquire(5, TimeUnit.SECONDS)) {log.info(第一次加锁成功);}// 再次获取锁if (!mutex.acquire(5, TimeUnit.SECONDS)) {log.info(第二次加锁失败了);}} catch (Exception e) {throw new RuntimeException(e);} finally {try {// 获取多少次锁就要释放多少次mutex.release();mutex.release();} catch (Exception e) {log.error(释放锁失败{},e.getStackTrace());throw new RuntimeException(e);}}}}输出日志
本次测试仅需要启动一个实例c1即可用于测试分布式非可重入锁多次加锁的场景。根据以下输出日志可知第一次加锁成功在第二次加锁时超时失败了导致之后在第二次释放锁时出现异常。
2024-01-16 INFO 45025 --- [ main] com.ahao.demo.CuratorDemoApplication : 第一次加锁成功
2024-01-16 INFO 45025 --- [ main] com.ahao.demo.CuratorDemoApplication : 第二次加锁失败了
2024-01-16 ERROR 45025 --- [ main] com.ahao.demo.CuratorDemoApplication : 释放锁失败org.apache.curator.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
2024-01-16 INFO 45025 --- [ main] ConditionEvaluationReportLoggingListener : Error starting ApplicationContext. To display the conditions report re-run your application with debug enabled.
2024-01-16 ERROR 45025 --- [ main] o.s.boot.SpringApplication : Application run failedjava.lang.IllegalStateException: Failed to execute ApplicationRunnerat org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:762)at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:749)at org.springframework.boot.SpringApplication.run(SpringApplication.java:314)at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303)at org.springframework.boot.SpringApplication.run(SpringApplication.java:1292)at com.ahao.demo.CuratorDemoApplication.main(CuratorDemoApplication.java:41)
Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: Not acquiredat com.ahao.demo.CuratorDemoApplication.share(CuratorDemoApplication.java:83)at com.ahao.demo.CuratorDemoApplication.run(CuratorDemoApplication.java:56)at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:759)... 5 common frames omitted
Caused by: java.lang.IllegalStateException: Not acquiredat org.apache.curator.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:444)at org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex.release(InterProcessSemaphoreMutex.java:68)at com.ahao.demo.CuratorDemoApplication.share(CuratorDemoApplication.java:80)... 7 common frames omitted
5.分布式可重入读写锁
读写锁顾名思义包含两把锁读锁和写锁。当写锁未生效未被获取时读锁能够被多个线程获取使用。但是写锁只能被一个线程获取持有。 只有当写锁释放时读锁才能被持有。可重入表示一个拥有写锁的线程可重入读锁但是读锁却不能进入写锁读锁可以重入读锁。 这也意味着写锁可以降级成读锁 比如请求写锁 —读锁 —-释放写锁。 从读锁升级成写锁是不行的。可重入读写锁是“公平的”每个实例将按请求的顺序获取锁。
5.1.锁对象
类路径org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock
公开构造方法如下 /*** param client 当前客户端实例* param path 锁节点路径*/ public InterProcessReadWriteLock(CuratorFramework client, String basePath)/*** param client 当前客户端实例* param path 锁节点路径* param lockData 存储在锁节点的数据内容*/public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData)5.2.读锁和写锁的竞争
测试场景有两台服务实例 C1C2。C1和C2个有3个线程2个读线程和1个写线程并发执行其中写线程进行加1操作并重复执行5次每次都加写锁而读线程进行查询操作并重复执行10次每次都加读锁。
测试代码
Slf4j
SpringBootApplication
public class CuratorDemoApplication implements ApplicationRunner {Autowiredprivate CuratorFramework client;public static void main(String[] args) {SpringApplication.run(CuratorDemoApplication.class, args);}String dataPath /ahao/data;Overridepublic void run(ApplicationArguments args) throws Exception {log.info(。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。);String lockPath /ahao/lock;TimeUnit.SECONDS.sleep(3);InterProcessReadWriteLock readWriteLock new InterProcessReadWriteLock(client, lockPath);// 获取读锁InterProcessMutex readLock readWriteLock.readLock();// 获取写锁InterProcessMutex writeLock readWriteLock.writeLock();// 保证并发执行当前时间的秒针部分为30的整数倍则结束循环int seconds LocalDateTime.now().getSecond();while (seconds/30 ! 0){seconds LocalDateTime.now().getSecond();}for (int j 0; j 2; j) {// 读线程new Thread(() - {for (int i 0; i 10; i) {try {// 加锁if (readLock.acquire(3, TimeUnit.SECONDS)) {doLock(false);}} catch (Exception e) {throw new RuntimeException(e);} finally {// 释放锁try {readLock.release();} catch (Exception e) {throw new RuntimeException(e);}}}}, 读线程j).start();}// 写线程new Thread(() - {for (int i 0; i 5; i) {try {// 加锁if (writeLock.acquire(3, TimeUnit.SECONDS)) {doLock(true);}} catch (Exception e) {throw new RuntimeException(e);} finally {// 释放锁try {writeLock.release();} catch (Exception e) {throw new RuntimeException(e);}}}}, 写线程).start();}/*** 加操作* param isAdd true 表示加1false 表示不加查询数据* return* throws Exception*/public void doLock(boolean isAdd) throws Exception {// 获取数据节点中的值byte[] bytes client.getData().forPath(dataPath);Integer integer Integer.valueOf(new String(bytes));if (isAdd) {// 设置新值client.setData().forPath(dataPath, String.valueOf(integer 1).getBytes(StandardCharsets.UTF_8));log.info(加1操作后{}, integer);} else {log.info(查询数据{}, integer);}}
}输出日志
可以观察到读线程所查询的数据存在重复数据说明了在同一时刻可以加多个读锁而写线程不会出现重复数据只能有一个线程可以获取到写锁。
实例c1
2024-01-16 INFO 64462 --- [ 写线程] com.ahao.demo.CuratorDemoApplication : 加1操作后40
2024-01-16 INFO 64462 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据41
2024-01-16 INFO 64462 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据42
2024-01-16 INFO 64462 --- [ 写线程] com.ahao.demo.CuratorDemoApplication : 加1操作后42
2024-01-16 INFO 64462 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据43
2024-01-16 INFO 64462 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据44
2024-01-16 INFO 64462 --- [ 写线程] com.ahao.demo.CuratorDemoApplication : 加1操作后44
2024-01-16 INFO 64462 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据45
2024-01-16 INFO 64462 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据46
2024-01-16 INFO 64462 --- [ 写线程] com.ahao.demo.CuratorDemoApplication : 加1操作后46
2024-01-16 INFO 64462 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据47
2024-01-16 INFO 64462 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据48
2024-01-16 INFO 64462 --- [ 写线程] com.ahao.demo.CuratorDemoApplication : 加1操作后48
2024-01-16 INFO 64462 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据49
2024-01-16 INFO 64462 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64462 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64462 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64462 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64462 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64462 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64462 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64462 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64462 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64462 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64462 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据50
实例c2
2024-01-16 INFO 64453 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据40
2024-01-16 INFO 64453 --- [ 写线程] com.ahao.demo.CuratorDemoApplication : 加1操作后41
2024-01-16 INFO 64453 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据42
2024-01-16 INFO 64453 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据42
2024-01-16 INFO 64453 --- [ 写线程] com.ahao.demo.CuratorDemoApplication : 加1操作后43
2024-01-16 INFO 64453 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据44
2024-01-16 INFO 64453 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据44
2024-01-16 INFO 64453 --- [ 写线程] com.ahao.demo.CuratorDemoApplication : 加1操作后45
2024-01-16 INFO 64453 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据46
2024-01-16 INFO 64453 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据46
2024-01-16 INFO 64453 --- [ 写线程] com.ahao.demo.CuratorDemoApplication : 加1操作后47
2024-01-16 INFO 64453 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据48
2024-01-16 INFO 64453 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据48
2024-01-16 INFO 64453 --- [ 写线程] com.ahao.demo.CuratorDemoApplication : 加1操作后49
2024-01-16 INFO 64453 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64453 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64453 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64453 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64453 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64453 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64453 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64453 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64453 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64453 --- [ 读线程1] com.ahao.demo.CuratorDemoApplication : 查询数据50
2024-01-16 INFO 64453 --- [ 读线程0] com.ahao.demo.CuratorDemoApplication : 查询数据506.共享信号量
对 JUC 熟悉的读者应该了解Semaphore信号量。Semaphore是一种用于实现同步的对象通常用于限制对共享资源的并发访问。Semaphore可以用来控制进入临界区的线程数量通过使用Semaphore可以防止过多线程同时访问共享资源从而避免出现资源竞争和死锁等问题。
而Curator中的Semaphore和JUC中的Semaphore如出一辙只是一个应用于分布式场景一个应用于进程服务器内部。
6.1.锁对象
类路径org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2
公开构造方法如下 /*** param client 当前客户端实例* param path 锁节点路径* param maxLeases 允许线程进入临界区的最大数量许可证数量*/public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases);/*** param client 当前客户端实例* param path 锁节点路径* param count 用于监听许可证数量*/public InterProcessSemaphoreV2(CuratorFramework client, String path, SharedCountReader count);6.2.信号量抢占
测试场景有两台服务实例 C1C2。C1和C2个有3个线程并发执行。但是有2个许可证每个线程一次只能获取一个许可证获取成功则执行4s耗时操作睡眠4s然后释放锁。
测试代码 Overridepublic void run(ApplicationArguments args) throws Exception {log.info(。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。);String lockPath /ahao/lock;TimeUnit.SECONDS.sleep(3);// 创建共享信号量InterProcessSemaphoreV2 semaphoreV2 new InterProcessSemaphoreV2(client,lockPath,2);// 保证并发执行当前时间的秒针部分为30的整数倍则结束循环int seconds LocalDateTime.now().getSecond();while (seconds/30 ! 0){seconds LocalDateTime.now().getSecond();}// 创建线程争夺许可证for (int i 0; i 3; i) {new Thread(() - {Lease acquire null;try {acquire semaphoreV2.acquire(5, TimeUnit.SECONDS);if (acquire ! null){log.info(抢到许可证参与竞争的节点{},semaphoreV2.getParticipantNodes());// 睡眠4秒TimeUnit.SECONDS.sleep(4);}else {log.info(抢到许可证失败);}} catch (Exception e) {throw new RuntimeException(e);}finally {if (acquire ! null){semaphoreV2.returnLease(acquire);}}}, 线程i).start();}}输出日志
实例c1
2024-01-16 INFO 4319 --- [ 线程2] com.ahao.demo.CuratorDemoApplication : 抢到许可证
2024-01-16 INFO 4319 --- [ 线程1] com.ahao.demo.CuratorDemoApplication : 抢到许可证
2024-01-16 INFO 4319 --- [ 线程0] com.ahao.demo.CuratorDemoApplication : 抢到许可证失败实例c2
2024-01-16 INFO 4307 --- [ 线程0] com.ahao.demo.CuratorDemoApplication : 抢到许可证
2024-01-16 INFO 4307 --- [ 线程2] com.ahao.demo.CuratorDemoApplication : 抢到许可证
2024-01-16 INFO 4307 --- [ 线程1] com.ahao.demo.CuratorDemoApplication : 抢到许可证失败7.多共享锁
表示将多个锁合并为一个锁。在获取多共享锁时必须获取其内部所有的锁才算获取成功否则释放所有已获取的锁。同样调用释放锁方法时会释放所有的锁。
7.1.锁对象
类路径org.apache.curator.framework.recipes.locks.InterProcessMultiLock
公开构造方法如下 /*** param client 当前客户端实例* param path 多个锁节点路径*/public InterProcessMultiLock(CuratorFramework client, ListString paths);/*** param locks 多个锁对象*/public InterProcessMultiLock(ListInterProcessLock locks);7.2.获取共享锁
测试场景有一台服务实例 C1启动3个线程并发执行抢占同一个共享锁。
测试代码
Overridepublic void run(ApplicationArguments args) throws Exception {log.info(。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。);String lockPath /ahao/lock;String lockPath2 /ahao/lock2;TimeUnit.SECONDS.sleep(3);// 创建锁1InterProcessMutex mutex new InterProcessMutex(client,lockPath);// 创建锁2InterProcessMutex mutex2 new InterProcessMutex(client,lockPath2);// 创建共享锁InterProcessMultiLock multiLock new InterProcessMultiLock(List.of(mutex,mutex2));for (int i 0; i 3; i) {new Thread(()-{try {if (multiLock.acquire(5, TimeUnit.SECONDS)) {log.info(获取到锁);TimeUnit.SECONDS.sleep(3);}else {log.info(获取失败);}} catch (Exception e) {throw new RuntimeException(e);} finally {try {multiLock.release();} catch (Exception e) {throw new RuntimeException(e);}}},线程i).start();}}输出日志
可见在第3个线程获取锁时由于没有获取到/ahao/lock2对应的锁对象导致的超时。
2024-01-16 INFO 14352 --- [ 线程1] com.ahao.demo.CuratorDemoApplication : 获取到锁
2024-01-16 INFO 14352 --- [ 线程2] com.ahao.demo.CuratorDemoApplication : 获取到锁
2024-01-16 INFO 14352 --- [ 线程0] com.ahao.demo.CuratorDemoApplication : 获取失败
Exception in thread 线程0 java.lang.RuntimeException: java.lang.Exception: java.lang.IllegalMonitorStateException: You do not own the lock: /ahao/lock2at com.ahao.demo.CuratorDemoApplication.lambda$run$0(CuratorDemoApplication.java:70)at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.Exception: java.lang.IllegalMonitorStateException: You do not own the lock: /ahao/lock2at org.apache.curator.framework.recipes.locks.InterProcessMultiLock.release(InterProcessMultiLock.java:169)at com.ahao.demo.CuratorDemoApplication.lambda$run$0(CuratorDemoApplication.java:68)... 1 more
Caused by: java.lang.IllegalMonitorStateException: You do not own the lock: /ahao/lock2at org.apache.curator.framework.recipes.locks.InterProcessMutex.release(InterProcessMutex.java:140)at org.apache.curator.framework.recipes.locks.InterProcessMultiLock.release(InterProcessMultiLock.java:158)... 2 more