网站建设基本标准,企业官网建设,网站后台管理系统如何安装,珍岛做网站怎么样目录1.前言2.单JVM锁3.分布式锁4.总结正文分割线1.前言锁就像一把钥匙#xff0c;需要加锁的代码就像一个房间。出现互斥操作的典型场景#xff1a;多人同时想进同一个房间争抢这个房间的钥匙(只有一把)#xff0c;一人抢到钥匙#xff0c;其他人都等待这个人出来归还钥匙正文分割线1.前言锁就像一把钥匙需要加锁的代码就像一个房间。出现互斥操作的典型场景多人同时想进同一个房间争抢这个房间的钥匙(只有一把)一人抢到钥匙其他人都等待这个人出来归还钥匙此时大家再次争抢钥匙循环下去。作为终极实战系列本篇用java语言分析锁的原理(源码剖析)和应用(详细代码)根据锁的作用范围分为JVM锁和分布式锁。如理解有误之处还请指出。2.单JVM锁(进程级别)程序部署在一台服务器上当容器启动时(例如tomcat)一台JVM就运行起来了。本节分析的锁均只能在单JVM下生效。因为最终锁定的是某个对象这个对象生存在JVM中自然锁只能锁单JVM。这一点很重要。如果你的服务只部署一个实例那么恭喜你用以下几种锁就可以了。1.synchronized同步锁2.ReentrantLock重入锁3.ReadWriteLock读写锁4.StampedLock戳锁3.分布式锁(多服务节点多进程)3.1基于数据库锁实现场景举例卖商品先查询库存0,更新库存-1。1.悲观锁select for update(一致性锁定读)查询官方文档如上图事务内起作用的行锁。能够保证当前session事务所锁定的行不会被其他session所修改(这里的修改指更新或者删除)。对读取的记录加X锁即排它锁其他事不能对上锁的行加任何锁。BEGIN;(确保以下2步骤在一个事务中)SELECT * FROM tb_product_stock WHERE product_id1 FOR UPDATE---product_id有索引锁行.加锁(注:条件字段必须有索引才能锁行否则锁表且最好用explain查看一下是否使用了索引因为有一些会被优化掉最终没有使用索引)UPDATE tb_product_stock SET numbernumber-1 WHERE product_id1---更新库存-1.解锁COMMIT;2.乐观锁版本控制选一个字段作为版本控制字段更新前查询一次更新时该字段作为更新条件。不同业务场景版本控制字段可以0 1控制也可以1控制也可以-1控制这个随意。BEGIN;(确保以下2步骤在一个事务中)SELECT number FROM tb_product_stock WHERE product_id1--》查询库存总数不加锁UPDATE tb_product_stock SET numbernumber-1 WHERE product_id1 AND number第一步查询到的库存数--》number字段作为版本控制字段COMMIT;3.2基于缓存实现(redismemcached)原理redisson开源jar包,提供了很多功能其中就包含分布式锁。是Redis官方推荐的顶级项目官网飞机票核心org.redisson.api.RLock接口封装了分布式锁的获取和释放。源码如下1 Override2 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {3 long time unit.toMillis(waitTime);4 long current System.currentTimeMillis();5 final long threadId Thread.currentThread().getId();6 Long ttl tryAcquire(leaseTime, unit, threadId);//申请锁返回还剩余的锁过期时间7 //lock acquired8 if (ttl null) {9 return true;10 }1112 time - (System.currentTimeMillis() - current);13 if (time 0) {14 acquireFailed(threadId);15 return false;16 }1718 current System.currentTimeMillis();19 final RFuture subscribeFuture subscribe(threadId);20 if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {21 if (!subscribeFuture.cancel(false)) {22 subscribeFuture.addListener(new FutureListener() {23 Override24 public void operationComplete(Future future) throws Exception {25 if (subscribeFuture.isSuccess()) {26 unsubscribe(subscribeFuture, threadId);27 }28 }29 });30 }31 acquireFailed(threadId);32 return false;33 }3435 try {36 time - (System.currentTimeMillis() - current);37 if (time 0) {38 acquireFailed(threadId);39 return false;40 }4142 while (true) {43 long currentTime System.currentTimeMillis();44 ttl tryAcquire(leaseTime, unit, threadId);45 //lock acquired46 if (ttl null) {47 return true;48 }4950 time - (System.currentTimeMillis() - currentTime);51 if (time 0) {52 acquireFailed(threadId);53 return false;54 }5556 //waiting for message57 currentTime System.currentTimeMillis();58 if (ttl 0 ttl time) {59 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);60 } else {61 getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);62 }6364 time - (System.currentTimeMillis() - currentTime);65 if (time 0) {66 acquireFailed(threadId);67 return false;68 }69 }70 } finally {71 unsubscribe(subscribeFuture, threadId);72 }73 //return get(tryLockAsync(waitTime, leaseTime, unit));74 }上述方法调用加锁的逻辑就是在tryAcquire(leaseTime, unit, threadId)中如下图1 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {2 return get(tryAcquireAsync(leaseTime, unit, threadId));//tryAcquireAsync返回RFutrue3 }tryAcquireAsync中commandExecutor.evalWriteAsync就是咱们加锁核心方法了1 RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {2 internalLockLeaseTime unit.toMillis(leaseTime);34 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,5 if (redis.call(exists, KEYS[1]) 0) then 6 redis.call(hset, KEYS[1], ARGV[2], 1); 7 redis.call(pexpire, KEYS[1], ARGV[1]); 8 return nil; 9 end; 10 if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then 11 redis.call(hincrby, KEYS[1], ARGV[2], 1); 12 redis.call(pexpire, KEYS[1], ARGV[1]); 13 return nil; 14 end; 15 return redis.call(pttl, KEYS[1]);,16 Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));17 }如上图已经到了redis命令了加锁KEYS[1] 需要加锁的key这里需要是字符串类型。ARGV[1] 锁的超时时间防止死锁ARGV[2] 锁的唯一标识(UUID.randomUUID()) “:” threadId1 //检查是否key已经被占用如果没有则设置超时时间和唯一标识初始化value12 if (redis.call(exists, KEYS[1]) 0)3 then4 redis.call(hset, KEYS[1], ARGV[2], 1); //hset key field value 哈希数据结构5 redis.call(pexpire, KEYS[1], ARGV[1]); //pexpire key expireTime 设置有效时间6 return nil;7 end;8 //如果锁重入,需要判断锁的key field 都一直情况下 value 加一9 if (redis.call(hexists, KEYS[1], ARGV[2]) 1)10 then11 redis.call(hincrby, KEYS[1], ARGV[2], 1);//hincrby key filed addValue 加112 redis.call(pexpire, KEYS[1], ARGV[1]);//pexpire key expireTime重新设置超时时间13 return nil;14 end;15 //返回剩余的过期时间16 return redis.call(pttl, KEYS[1]);以上的方法当返回空是说明获取到锁如果返回一个long数值(pttl 命令的返回值)说明锁已被占用通过返回剩余时间外部可以做一些等待时间的判断和调整。不再分析解锁步骤直接贴上解锁的redis 命令解锁– KEYS[1] 需要加锁的key这里需要是字符串类型。– KEYS[2] redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{” getName() “}”– ARGV[1] reids消息体这里只需要一个字节的标记就可以主要标记redis的key已经解锁再结合redis的Subscribe能唤醒其他订阅解锁消息的客户端线程申请锁。– ARGV[2] 锁的超时时间防止死锁– ARGV[3] 锁的唯一标识(UUID.randomUUID()) “:” threadId1 //如果key已经不存在说明已经被解锁直接发布(publihs)redis消息2 if (redis.call(exists, KEYS[1]) 0)3 then4 redis.call(publish, KEYS[2], ARGV[1]);//publish ChannelName message向信道发送解锁消息5 return 1;6 end;7 //key和field不匹配说明当前客户端线程没有持有锁不能主动解锁。8 if (redis.call(hexists, KEYS[1], ARGV[3]) 0)9 then10 return nil;11 end;12 //将value减113 local counter redis.call(hincrby, KEYS[1], ARGV[3], -1); //hincrby key filed addValue 减114 //如果counter0说明锁在重入不能删除key15 if (counter 0)16 then17 redis.call(pexpire, KEYS[1], ARGV[2]);18 return 0;19 else20 //删除key并且publish 解锁消息21 redis.call(del, KEYS[1]);22 redis.call(publish, KEYS[2], ARGV[1]);23 return 1;24 end;25 return nil;特点逻辑并不复杂, 实现了可重入功能, 通过pub/sub功能来减少空转性能极高。实现了Lock的大部分功能,支持强制解锁。实战1.创建客户端配置类这里我们最终只用了一种来测试就是initSingleServerConfig单例模式。1 package distributed.lock.redis;23 import org.redisson.config.Config;45 /**6 *7 * ClassName:RedissionConfig8 * Description:自定义RedissionConfig初始化方法9 * 支持自定义构造单例模式集群模式主从模式哨兵模式。10 * 注此处使用spring bean 配置文件保证bean单例见applicationContext-redis.xml11 * 大家也可以用工厂模式自己维护单例本类生成RedissionConfig再RedissonClient redisson Redisson.create(config);这样就可以创建RedissonClient12 *authordiandian.zhang13 * date 2017年7月20日下午12:55:5014 */15 public class RedissionConfig {16 private RedissionConfig() {17 }1819 public static Config initSingleServerConfig(String redisHost, String redisPort, String redisPassword) {20 return initSingleServerConfig(redisHost, redisPort, redisPassword, 0);21 }2223 /**24 *25 * Description 使用单例模式初始化构造Config26 *paramredisHost27 *paramredisPort28 *paramredisPassword29 *paramredisDatabase redis db 默认0 (0~15)有redis.conf配置文件中参数来控制数据库总数database 16.30 *return31 *authordiandian.zhang32 * date 2017年7月20日下午12:56:2133 *sinceJDK1.834 */35 public static Config initSingleServerConfig(String redisHost, String redisPort, String redisPassword,Integer redisDatabase) {36 Config config new Config();37 config.useSingleServer().setAddress(redisHost : redisPort)38 .setPassword(redisPassword)39 .setDatabase(redisDatabase);//可以不设置看业务是否需要隔离40 //RedissonClient redisson Redisson.create(config);41 return config;42 }4344 /**45 *46 * Description 集群模式47 *parammasterAddress48 *paramnodeAddressArray49 *return50 *authordiandian.zhang51 * date 2017年7月20日下午3:29:3252 *sinceJDK1.853 */54 public static Config initClusterServerConfig(String masterAddress, String[] nodeAddressArray) {55 String nodeStr ;56 for(String slave:nodeAddressArray){57 nodeStr ,slave;58 }59 Config config new Config();60 config.useClusterServers()61 .setScanInterval(2000) //cluster state scan interval in milliseconds62 .addNodeAddress(nodeStr);63 return config;64 }6566 /**67 *68 * Description 主从模式69 *parammasterAddress 一主70 *paramslaveAddressArray 多从71 *return72 *authordiandian.zhang73 * date 2017年7月20日下午2:29:3874 *sinceJDK1.875 */76 public static Config initMasterSlaveServerConfig(String masterAddress, String[] slaveAddressArray) {77 String slaveStr ;78 for(String slave:slaveAddressArray){79 slaveStr ,slave;80 }81 Config config new Config();82 config.useMasterSlaveServers()83 .setMasterAddress(masterAddress)//一主84 .addSlaveAddress(slaveStr);//多从127.0.0.1:26389, 127.0.0.1:2637985 return config;86 }8788 /**89 *90 * Description 哨兵模式91 *parammasterAddress92 *paramslaveAddressArray93 *return94 *authordiandian.zhang95 * date 2017年7月20日下午3:01:3596 *sinceJDK1.897 */98 public static Config initSentinelServerConfig(String masterAddress, String[] sentinelAddressArray) {99 String sentinelStr ;100 for(String sentinel:sentinelAddressArray){101 sentinelStr ,sentinel;102 }103 Config config new Config();104 config.useSentinelServers()105 .setMasterName(mymaster)106 .addSentinelAddress(sentinelStr);107 return config;108 }109110111 }2.分布式锁实现类1 package distributed.lock.redis;2345 import java.text.SimpleDateFormat;6 import java.util.Date;7 import java.util.concurrent.CountDownLatch;8 import java.util.concurrent.TimeUnit;910 import org.redisson.Redisson;11 import org.redisson.api.RLock;12 import org.redisson.api.RedissonClient;13 import org.slf4j.Logger;14 import org.slf4j.LoggerFactory;151617 public class RedissonTest {18 private static final Logger logger LoggerFactory.getLogger(RedissonTest.class);19 static SimpleDateFormat time new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);20 //这里可自定义多种模式单例集群主从哨兵模式。为了简单这里使用单例模式21 private static RedissonClient redissonClient Redisson.create(RedissionConfig.initSingleServerConfig(192.168.50.107, 6379, password));2223 public static void main(String[] args) {24 CountDownLatch latch new CountDownLatch(3);25 //key26 String lockKey testkey20170802;27 try {28 Thread t1 new Thread(() - {29 doWithLock(lockKey,latch);//函数式编程30 }, t1);31 Thread t2 new Thread(() - {32 doWithLock(lockKey,latch);33 }, t2);34 Thread t3 new Thread(() - {35 doWithLock(lockKey,latch);36 }, t3);37 //启动线程38 t1.start();39 t2.start();40 t3.start();41 //等待全部完成42 latch.await();43 System.out.println(3个线程都解锁完毕关闭客户端);44 redissonClient.shutdown();45 } catch (Exception e) {46 e.printStackTrace();47 }48 }4950 /**51 *52 * Description 线程执行函数体53 *paramlockKey54 *authordiandian.zhang55 * date 2017年8月2日下午3:37:3256 *sinceJDK1.857 */58 private static void doWithLock(String lockKey,CountDownLatch latch) {59 try {60 System.out.println(进入线程Thread.currentThread().getName():time.format(new Date()));61 //获取锁30秒内获取到返回true,未获取到返回false,60秒过后自动unLock62 if (tryLock(lockKey, 30, 60, TimeUnit.SECONDS)) {63 System.out.println(Thread.currentThread().getName() 获取锁成功执行需要加锁的任务time.format(new Date()));64 Thread.sleep(2000L);//休息2秒模拟执行需要加锁的任务65 //获取锁超时66 }else{67 System.out.println(Thread.currentThread().getName() 获取锁超时time.format(new Date()));68 }69 } catch (Exception e) {70 e.printStackTrace();71 } finally {72 try {73 //释放锁74 unLock(lockKey);75 latch.countDown();//完成计数器减一76 } catch (Exception e) {77 e.printStackTrace();78 }79 }80 }8182 /**83 *84 * Description 获取锁锁waitTime时间内获取到返回true,未获取到返回false,租赁期leaseTime过后unLock(除非手动释放锁)85 *paramkey86 *paramwaitTime87 *paramleaseTime88 *paramtimeUnit89 *return90 *authordiandian.zhang91 * date 2017年8月2日下午3:24:0992 *sinceJDK1.893 */94 public static boolean tryLock(String key, long waitTime, long leaseTime, TimeUnit timeUnit) {95 try {96 //根据key获取锁实例非公平锁97 RLock lock redissonClient.getLock(key);98 //在leaseTime时间内阻塞获取锁获取锁后持有锁直到leaseTime租期结束(除非手动unLock释放锁)。99 return lock.tryLock(waitTime, leaseTime, timeUnit);100 } catch (Exception e) {101 logger.error(redis获取分布式锁异常;key key ,waitTime waitTime ,leaseTime leaseTime 102 ,timeUnit timeUnit, e);103 return false;104 }105 }106107 /**108 *109 * Description 释放锁110 *paramkey111 *authordiandian.zhang112 * date 2017年8月2日下午3:25:34113 *sinceJDK1.8114 */115 public static void unLock(String key) {116 RLock lock redissonClient.getLock(key);117 lock.unlock();118 System.out.println(Thread.currentThread().getName() 释放锁time.format(new Date()));119 }120 }执行结果如下1 进入线程t3:2017-08-02 16:33:192 进入线程t1:2017-08-02 16:33:193 进入线程t2:2017-08-02 16:33:194 t2 获取锁成功执行需要加锁的任务2017-08-02 16:33:19---T2 19秒时获取到锁5 t2 释放锁2017-08-02 16:33:21---T2任务完成21秒时释放锁6 t1 获取锁成功执行需要加锁的任务2017-08-02 16:33:21---T1 21秒时获取到锁7 t1 释放锁2017-08-02 16:33:23---T2任务完成23秒时释放锁8 t3 获取锁成功执行需要加锁的任务2017-08-02 16:33:23---T3 23秒时获取到锁9 t3 释放锁2017-08-02 16:33:25---T2任务完成25秒时释放锁10 3个线程都解锁完毕关闭客户端如上图3个线程共消耗25-196秒验证通过确实互斥锁住了。我们用Redis Desktop Manger来看一下redis中数据1 192.168.50.107:0hgetall testkey20170802---用key查询hash所有的值2 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:30---T2获取到锁fielduuid:线程号3 2) 1 ---value1代表重入次数为14 192.168.50.107:0hgetall testkey20170802---T2释放锁T1获取到锁5 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:296 2) 17 192.168.50.107:0hgetall testkey20170802---T1释放锁T3获取到锁8 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:319 2) 110 192.168.50.107:0hgetall testkey20170802---最后一次查询时T3释放锁已无数据2)基于zookeeper实现原理每个客户端(每个JVM内部共用一个客户端实例)对某个方法加锁时在zookeeper上指定节点的目录下生成一个唯一的瞬时有序节点。判断是否获取锁的方式很简单只需要判断有序节点中序号最小的一个。当释放锁的时候只需将这个瞬时节点删除即可。我们使用apache的Curator组件来实现一般使用Client、Framework、Recipes三个组件。curator下,InterProcessMutex可重入互斥公平锁源码(curator-recipes-2.4.1.jar)注释如下A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is fair - each user will get the mutex in the order requested (from ZKs point of view)即一个在JVM上工作的可重入互斥锁。使用ZK去持有这把锁。在所有JVM中的进程组只要使用相同的锁路径将会获得进程间的临界资源。进一步说这个互斥锁是公平的-因为每个线程将会根据请求顺序获得这个互斥量(对于ZK来说)主要方法如下1 //构造方法2 public InterProcessMutex(CuratorFramework client, String path)3 public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)4 //通过acquire获得锁,并提供超时机制5 public void acquire() throws Exception6 public boolean acquire(long time, TimeUnit unit) throws Exception7 //撤销锁8 public void makeRevocable(RevocationListener listener)9 public void makeRevocable(final RevocationListener listener, Executor executor)我们主要分析核心获取锁acquire方法如下1 Override2 public boolean acquire(long time, TimeUnit unit) throws Exception3 {4 return internalLock(time, unit);5 }67 private boolean internalLock(long time, TimeUnit unit) throws Exception8 {9 /*10 Note on concurrency: a given lockData instance11 can be only acted on by a single thread so locking isnt necessary12 */1314 Thread currentThread Thread.currentThread();15 //线程安全map:private final ConcurrentMap threadData Maps.newConcurrentMap();16 LockData lockData threadData.get(currentThread);17 if ( lockData ! null )18 {19 //这里实现了可重入,如果当前线程已经获取锁计数1直接返回true20 lockData.lockCount.incrementAndGet();21 return true;22 }23 //获取锁核心方法24 String lockPath internals.attemptLock(time, unit, getLockNodeBytes());25 if ( lockPath ! null )26 { //得到锁塞进线程安全map27 LockData newLockData new LockData(currentThread, lockPath);28 threadData.put(currentThread, newLockData);29 return true;30 }3132 return false;33 }核心获取锁的方法attemptLock源码如下1 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception2 {3 final long startMillis System.currentTimeMillis();4 final Long millisToWait (unit ! null) ? unit.toMillis(time) : null;5 final byte[] localLockNodeBytes (revocable.get() ! null) ? new byte[0] : lockNodeBytes;6 int retryCount 0;78 String ourPath null;9 boolean hasTheLock false;10 boolean isDone false;11 while ( !isDone )12 {13 isDone true;1415 try16 {17 if ( localLockNodeBytes ! null )18 {19 ourPath client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);20 }21 else22 { //创建瞬时节点(客户端断开连接时删除)节点名追加自增数字23 ourPath client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);24 }//自循环等待时间并判断是否获取到锁25 hasTheLock internalLockLoop(startMillis, millisToWait, ourPath);26 }27 catch ( KeeperException.NoNodeException e )28 {29 //gets thrown by StandardLockInternalsDriver when it cant find the lock node30 //this can happen when the session expires, etc. So, if the retry allows, just try it all again31 if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )32 {33 isDone false;34 }35 else36 {37 throw e;38 }39 }40 }41 //获取到锁返回节点path42 if ( hasTheLock )43 {44 return ourPath;45 }4647 return null;48 }自循环等待时间1 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception2 {3 boolean haveTheLock false;4 boolean doDelete false;5 try6 {7 if ( revocable.get() ! null )8 {9 client.getData().usingWatcher(revocableWatcher).forPath(ourPath);10 }1112 while ( (client.getState() CuratorFrameworkState.STARTED) !haveTheLock )//如果状态是开始且未获取到锁13 {14 List children getSortedChildren();//获取父节点下所有线程的子节点15 String sequenceNodeName ourPath.substring(basePath.length() 1); //获取当前节点名称16//核心方法判断是否获取到锁17 PredicateResults predicateResults driver.getsTheLock(client, children, sequenceNodeName, maxLeases);18 if ( predicateResults.getsTheLock() )//获取到锁置true下一次循环退出19 {20 haveTheLock true;21 }22 else//没有索取到锁23 {24 String previousSequencePath basePath / predicateResults.getPathToWatch();//这里路径是上一次获取到锁的持有锁路径2526 synchronized(this)//强制加锁27 {//让线程等待并且watcher当前节点当节点有变化的之后则notifyAll当前等待的线程让它再次进入来争抢锁28 Stat stat client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);29 if ( stat ! null )30 {31 if ( millisToWait ! null )32 {33 millisToWait - (System.currentTimeMillis() - startMillis);34 startMillis System.currentTimeMillis();35 if ( millisToWait 0 )36 {37 doDelete true; //等待超时置状态为true,后面会删除节点38 break;39 }40 //等待指定时间41 wait(millisToWait);42 }43 else44 { //一直等待45 wait();46 }47 }48 }49 //else it may have been deleted (i.e. lock released). Try to acquire again50 }51 }52 }53 catch ( Exception e )54 {55 doDelete true;56 throw e;57 }58 finally59 {60 if ( doDelete )//删除path61 {62 deleteOurPath(ourPath);63 }64 }65 return haveTheLock;66 }1 Override2 public PredicateResults getsTheLock(CuratorFramework client, List children, String sequenceNodeName, int maxLeases) throws Exception3 {4 int ourIndex children.indexOf(sequenceNodeName);//先根据子节点名获取children(所有子节点升序集合)中的索引5 validateOurIndex(sequenceNodeName, ourIndex);//校验如果索引为负值即不存在该子节点6 //maxLeases允许同时租赁的数量这里源代码写死了1,但这种设计符合将来拓展修改maxLeases即可满足多租赁7 boolean getsTheLock ourIndex 8 String pathToWatch getsTheLock ? null : children.get(ourIndex - maxLeases);//获取到锁返回null,否则未获取到锁获取上一次的获取到锁的路径。后面会监视这个路径用以唤醒请求线程910 return new PredicateResults(pathToWatch, getsTheLock);11 }特点1.可避免死锁zk瞬时节点(Ephemeral Nodes)生命周期和session一致session结束节点自动删除。2.依赖zk创建节点涉及文件操作开销较大。实战1.创建客户端client2.生成互斥锁InterProcessMutex3.开启3个线程去获取锁1 package distributed.lock.zk;23 import java.text.SimpleDateFormat;4 import java.util.Date;5 import java.util.concurrent.TimeUnit;67 import org.apache.curator.framework.CuratorFramework;8 import org.apache.curator.framework.CuratorFrameworkFactory;9 import org.apache.curator.framework.recipes.locks.InterProcessMutex;10 import org.apache.curator.retry.ExponentialBackoffRetry;11 import org.apache.curator.retry.RetryNTimes;12 import org.jboss.netty.channel.StaticChannelPipeline;13 import org.omg.CORBA.PRIVATE_MEMBER;1415 /**16 *17 * ClassName:CuratorDistrLockTest18 * Description:Curator包实现zk分布式锁:利用了zookeeper的临时顺序节点特性一旦客户端失去连接后则就会自动清除该节点。19 *authordiandian.zhang20 * date 2017年7月11日下午12:43:4421 */2223 public class CuratorDistrLock {24 private static final String ZK_ADDRESS 192.168.50.253:2181;//zk25 private static final String ZK_LOCK_PATH /zktest;//path26 static SimpleDateFormat time new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);2728 public static void main(String[] args) {29 try {30 //创建zk客户端31 //CuratorFramework client CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(3, 1000));32 CuratorFramework client CuratorFrameworkFactory.builder()33 .connectString(ZK_ADDRESS)34 .sessionTimeoutMs(5000)35 .retryPolicy(new ExponentialBackoffRetry(1000, 10))36 .build();37 //开启38 client.start();39 System.out.println(zk client start successfully!time.format(new Date()));4041 Thread t1 new Thread(() - {42 doWithLock(client);//函数式编程43 }, t1);44 Thread t2 new Thread(() - {45 doWithLock(client);46 }, t2);47 Thread t3 new Thread(() - {48 doWithLock(client);49 }, t3);50 //启动线程51 t1.start();52 t2.start();53 t3.start();54 } catch (Exception e) {55 e.printStackTrace();56 }57 }5859 /**60 *61 * Description 线程执行函数体62 *paramclient63 *paramlock64 *authordiandian.zhang65 * date 2017年7月12日下午6:00:5366 *sinceJDK1.867 */68 private static void doWithLock(CuratorFramework client) {69 //依赖ZK生成的可重入互斥公平锁(按照请求顺序)70 InterProcessMutex lock new InterProcessMutex(client, ZK_LOCK_PATH);71 try {72 System.out.println(进入线程Thread.currentThread().getName():time.format(new Date()));7374 //花20秒时间尝试获取锁75 if (lock.acquire(20, TimeUnit.SECONDS)) {76 System.out.println(Thread.currentThread().getName() 获取锁成功执行需要加锁的任务time.format(new Date()));77 Thread.sleep(2000L);//休息2秒模拟执行需要加锁的任务78 //获取锁超时79 }else{80 System.out.println(Thread.currentThread().getName() 获取锁超时time.format(new Date()));81 }82 } catch (Exception e) {83 e.printStackTrace();84 } finally {85 try {86 //当前线程获取到锁那么最后需要释放锁(实际上是删除节点)87 if (lock.isAcquiredInThisProcess()) {88 lock.release();89 System.out.println(Thread.currentThread().getName() 释放锁time.format(new Date()));90 }91 } catch (Exception e) {92 e.printStackTrace();93 }94 }95 }9697 }执行结果zk client start successfully!进入线程t2:2017-07-13 11:13:23进入线程t1:2017-07-13 11:13:23进入线程t3:2017-07-13 11:13:23t2 获取锁成功执行需要加锁的任务2017-07-13 11:13:23----》起始时间23秒t2 释放锁2017-07-13 11:13:25t3 获取锁成功执行需要加锁的任务2017-07-13 11:13:25----》验证耗时2秒T2执行完T3执行t3 释放锁2017-07-13 11:13:27t1 获取锁成功执行需要加锁的任务2017-07-13 11:13:27----》验证耗时2秒T3执行完T1执行t1 释放锁2017-07-13 11:13:29----》验证耗时2秒T1执行完3个任务共耗时29-236秒验证互斥锁达到目标。查看zookeeper节点1.客户端连接zkCli.sh -server 192.168.50.253:21812.查看节点[zk: 192.168.50.253:2181(CONNECTED) 80] ls /-----》查看根目录[dubbo, zktest, zookeeper, test][zk: 192.168.50.253:2181(CONNECTED) 81] ls /zktest -----》查看我们创建的子节点[_c_034e5f23-abaf-4d4a-856f-c27956db574e-lock-0000000007, _c_63c708f1-2c3c-4e59-9d5b-f0c70c149758-lock-0000000006, _c_1f688cb7-c38c-4ebb-8909-0ba421e484a4-lock-0000000008][zk: 192.168.50.253:2181(CONNECTED) 82] ls /zktest-----》任务执行完毕最终释放了子节点[]4.总结比较一级锁分类二级锁分类锁名称特性是否推荐单JVM锁基于JVM源生synchronized关键字实现synchronized同步锁适用于低并发的情况性能稳定。新手推荐基于JDK实现需显示获取锁释放锁ReentrantLock可重入锁适用于低、高并发的情况性能较高需要指定公平、非公平或condition时使用。ReentrantReadWriteLock可重入读写锁适用于读多写少的情况。性能高。老司机推荐StampedLock戳锁JDK8才有适用于高并发且读远大于写时支持乐观读票据校验失败后可升级悲观读锁性能极高老司机推荐分布式锁基于数据库锁实现悲观锁select for updatesql直接使用但水很深。涉及数据库ACID原理隔离级别不同数据库规范不推荐乐观锁版本控制自己实现字段版本控制新手推荐基于缓存实现org.redisson性能极高支持除了分布式锁外还实现了分布式对象、分布式集合等极端强大的功能老司机推荐基于zookeeper实现org.apache.curator zookeeper性能较高除支持分布式锁外还实现了master选举、节点监听()、分布式队列、Barrier、AtomicLong等计数器老司机推荐附Redis命令SETNX key value (SET if Not eXists):当且仅当 key 不存在将 key 的值设为 value 并返回1若给定的 key 已经存在则 SETNX 不做任何动作并返回0。详见SETNX commondGETSET key value:将给定 key 的值设为 value 并返回 key 的旧值 (old value)当 key 存在但不是字符串类型时返回一个错误当key不存在时返回nil。详见GETSET commondGET key:返回 key 所关联的字符串值如果 key 不存在那么返回 nil 。详见GET CommondDEL key [KEY …]:删除给定的一个或多个 key ,不存在的 key 会被忽略,返回实际删除的key的个数(integer)。详见DEL CommondHSET key field value给一个key 设置一个{fieldvalue}的组合值如果key没有就直接赋值并返回1如果field已有那么就更新value的值并返回0.详见HSET CommondHEXISTS key field:当key 中存储着field的时候返回1如果key或者field至少有一个不存在返回0。详见HEXISTS CommondHINCRBY key field increment:将存储在 key 中的哈希(Hash)对象中的指定字段 field 的值加上增量 increment。如果键 key 不存在一个保存了哈希对象的新建将被创建。如果字段 field 不存在在进行当前操作前其将被创建且对应的值被置为 0。返回值是增量之后的值。详见HINCRBY CommondPEXPIRE key milliseconds设置存活时间单位是毫秒。expire操作单位是秒。详见:PEXPIRE CommondPUBLISH channel message:向channel post一个message内容的消息返回接收消息的客户端数。详见PUBLISH Commond参考