广东阳春市建设局网站,wordpress+手册主题,wordpress注册邮箱设置,湖南邵阳建设局网站1. AQS简介 在上一篇文章中我们对lock和AbstractQueuedSynchronizer(AQS)有了初步的认识。在同步组件的实现中#xff0c;AQS是核心部分#xff0c;同步组件的实现者通过使用AQS提供的模板方法实现同步组件语义#xff0c;AQS则实现了对同步状态的管理#xff0c;以及对阻塞… 1. AQS简介 在上一篇文章中我们对lock和AbstractQueuedSynchronizer(AQS)有了初步的认识。在同步组件的实现中AQS是核心部分同步组件的实现者通过使用AQS提供的模板方法实现同步组件语义AQS则实现了对同步状态的管理以及对阻塞线程进行排队等待通知等等一些底层的实现处理。AQS的核心也包括了这些方面:同步队列独占式锁的获取和释放共享锁的获取和释放以及可中断锁超时等待锁获取这些特性的实现而这些实际上则是AQS提供出来的模板方法归纳整理如下 独占式锁 void acquire(int arg)独占式获取同步状态如果获取失败则插入同步队列进行等待 void acquireInterruptibly(int arg)与acquire方法相同但在同步队列中进行等待的时候可以检测中断 boolean tryAcquireNanos(int arg, long nanosTimeout)在acquireInterruptibly基础上增加了超时等待功能在超时时间内没有获得同步状态返回false; boolean release(int arg)释放同步状态该方法会唤醒在同步队列中的下一个节点 共享式锁 void acquireShared(int arg)共享式获取同步状态与独占式的区别在于同一时刻有多个线程获取同步状态 void acquireSharedInterruptibly(int arg)在acquireShared方法基础上增加了能响应中断的功能 boolean tryAcquireSharedNanos(int arg, long nanosTimeout)在acquireSharedInterruptibly基础上增加了超时等待的功能 boolean releaseShared(int arg)共享式释放同步状态 要想掌握AQS的底层实现其实也就是对这些模板方法的逻辑进行学习。在学习这些模板方法之前我们得首先了解下AQS中的同步队列是一种什么样的数据结构因为同步队列是AQS对同步状态的管理的基石。 2. 同步队列 当共享资源被某个线程占有其他请求该资源的线程将会阻塞从而进入同步队列。就数据结构而言队列的实现方式无外乎两者一是通过数组的形式另外一种则是链表的形式。AQS中的同步队列则是通过链式方式进行实现。接下来很显然我们至少会抱有这样的疑问**1. 节点的数据结构是什么样的2. 是单向还是双向3. 是带头结点的还是不带头节点的**我们依旧先是通过看源码的方式。 在AQS有一个静态内部类Node其中有这样一些属性 volatile int waitStatus //节点状态 volatile Node prev //当前节点/线程的前驱节点 volatile Node next; //当前节点/线程的后继节点 volatile Thread thread;//加入同步队列的线程引用 Node nextWaiter;//等待队列中的下一个节点 节点的状态有以下这些 int CANCELLED 1//节点从同步队列中取消 int SIGNAL -1//后继节点的线程处于等待状态如果当前节点释放同步状态会通知后继节点使得后继节点的线程能够运行 int CONDITION -2//当前节点进入等待队列中 int PROPAGATE -3//表示下一次共享式同步状态获取将会无条件传播下去 int INITIAL 0;//初始状态 现在我们知道了节点的数据结构类型并且每个节点拥有其前驱和后继节点很显然这是一个双向队列。同样的我们可以用一段demo看一下。 public class LockDemo {private static ReentrantLock lock new ReentrantLock();public static void main(String[] args) {for (int i 0; i 5; i) {Thread thread new Thread(() - {lock.lock();try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}});thread.start();}}
}
复制代码 实例代码中开启了5个线程先获取锁之后再睡眠10S中实际上这里让线程睡眠是想模拟出当线程无法获取锁时进入同步队列的情况。通过debug当Thread-4在本例中最后一个线程获取锁失败后进入同步时AQS时现在的同步队列如图所示 Thread-0先获得锁后进行睡眠其他线程Thread-1,Thread-2,Thread-3,Thread-4获取锁失败进入同步队列同时也可以很清楚的看出来每个节点有两个域prev(前驱)和next(后继)并且每个节点用来保存获取同步状态失败的线程引用以及等待状态等信息。另外AQS中有两个重要的成员变量 private transient volatile Node head;
private transient volatile Node tail;
复制代码 也就是说AQS实际上通过头尾指针来管理同步队列同时实现包括获取锁失败的线程进行入队释放锁时对同步队列中的线程进行通知等核心方法。其示意图如下 通过对源码的理解以及做实验的方式现在我们可以清楚的知道这样几点 节点的数据结构即AQS的静态内部类Node,节点的等待状态等信息同步队列是一个双向队列AQS通过持有头尾指针管理同步队列那么节点如何进行入队和出队是怎样做的了实际上这对应着锁的获取和释放两个操作获取锁失败进行入队操作获取锁成功进行出队操作。 3. 独占锁 3.1 独占锁的获取acquire方法 我们继续通过看源码和debug的方式来看还是以上面的demo为例调用lock()方法是获取独占式锁获取失败就将当前线程加入同步队列成功则线程执行。而lock()方法实际上会调用AQS的**acquire()**方法源码如下 public final void acquire(int arg) {//先看同步状态是否获取成功如果成功则方法结束返回//若失败则先调用addWaiter()方法再调用acquireQueued()方法if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
复制代码 关键信息请看注释acquire根据当前获得同步状态成功与否做了两件事情1. 成功则方法结束返回2. 失败则先调用addWaiter()然后在调用acquireQueued()方法。 获取同步状态失败入队操作 当线程获取独占式锁失败后就会将当前线程加入同步队列那么加入队列的方式是怎样的了我们接下来就应该去研究一下addWaiter()和acquireQueued()。addWaiter()源码如下 private Node addWaiter(Node mode) {// 1. 将当前线程构建成Node类型Node node new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure// 2. 当前尾节点是否为nullNode pred tail;if (pred ! null) {// 2.2 将当前节点尾插入的方式插入同步队列中node.prev pred;if (compareAndSetTail(pred, node)) {pred.next node;return node;}}// 2.1. 当前同步队列尾节点为null说明当前线程是第一个加入同步队列进行等待的线程enq(node);return node;
}
复制代码 分析可以看上面的注释。程序的逻辑主要分为两个部分**1. 当前同步队列的尾节点为null调用方法enq()插入;2. 当前队列的尾节点不为null则采用尾插入compareAndSetTail方法的方式入队。**另外还会有另外一个问题如果 if (compareAndSetTail(pred, node))为false怎么办会继续执行到enq()方法同时很明显compareAndSetTail是一个CAS操作通常来说如果CAS操作失败会继续自旋死循环进行重试。因此经过我们这样的分析enq()方法可能承担两个任务**1. 处理当前同步队列尾节点为null时进行入队操作2. 如果CAS尾插入节点失败后负责自旋进行尝试。**那么是不是真的就像我们分析的一样了只有源码会告诉我们答案:),enq()源码如下 private Node enq(final Node node) {for (;;) {Node t tail;if (t null) { // Must initialize//1. 构造头结点if (compareAndSetHead(new Node()))tail head;} else {// 2. 尾插入CAS操作失败自旋尝试node.prev t;if (compareAndSetTail(t, node)) {t.next node;return t;}}}
}
复制代码 在上面的分析中我们可以看出在第1步中会先创建头结点说明同步队列是带头结点的链式存储结构。带头结点与不带头结点相比会在入队和出队的操作中获得更大的便捷性因此同步队列选择了带头结点的链式存储结构。那么带头节点的队列初始化时机是什么自然而然是在tail为null时即当前线程是第一次插入同步队列。compareAndSetTail(t, node)方法会利用CAS操作设置尾节点如果CAS操作失败会在for (;;)for死循环中不断尝试直至成功return返回为止。因此对enq()方法可以做这样的总结 在当前线程是第一个加入同步队列时调用compareAndSetHead(new Node())方法完成链式队列的头结点的初始化自旋不断尝试CAS尾插入节点直至成功为止。现在我们已经很清楚获取独占式锁失败的线程包装成Node然后插入同步队列的过程了那么紧接着会有下一个问题在同步队列中的节点线程会做什么事情了来保证自己能够有机会获得独占式锁了带着这样的问题我们就来看看acquireQueued()方法从方法名就可以很清楚这个方法的作用就是排队获取锁的过程源码如下 final boolean acquireQueued(final Node node, int arg) {boolean failed true;try {boolean interrupted false;for (;;) {// 1. 获得当前节点的先驱节点final Node p node.predecessor();// 2. 当前节点能否获取独占式锁 // 2.1 如果当前节点的先驱节点是头结点并且成功获取同步状态即可以获得独占式锁if (p head tryAcquire(arg)) {//队列头指针用指向当前节点setHead(node);//释放前驱节点p.next null; // help GCfailed false;return interrupted;}// 2.2 获取锁失败线程进入等待状态等待获取独占式锁if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt())interrupted true;}} finally {if (failed)cancelAcquire(node);}
}
复制代码 程序逻辑通过注释已经标出整体来看这是一个这又是一个自旋的过程for (;;)代码首先获取当前节点的先驱节点如果先驱节点是头结点的并且成功获得同步状态的时候if (p head tryAcquire(arg))当前节点所指向的线程能够获取锁。反之获取锁失败进入等待状态。整体示意图为下图 获取锁成功出队操作 获取锁的节点出队的逻辑是 //队列头结点引用指向当前节点
setHead(node);
//释放前驱节点
p.next null; // help GC
failed false;
return interrupted;
复制代码 setHead()方法为 private void setHead(Node node) {head node;node.thread null;node.prev null;
}
复制代码 将当前节点通过setHead()方法设置为队列的头结点然后将之前的头结点的next域设置为null并且pre域也为null即与队列断开无任何引用方便GC时能够将内存进行回收。示意图如下 那么当获取锁失败的时候会调用shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法看看他们做了什么事情。shouldParkAfterFailedAcquire()方法源码为 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws pred.waitStatus;if (ws Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev pred pred.prev;} while (pred.waitStatus 0);pred.next node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but dont park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}
复制代码 shouldParkAfterFailedAcquire()方法主要逻辑是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)使用CAS将节点状态由INITIAL设置成SIGNAL表示当前线程阻塞。当compareAndSetWaitStatus设置失败则说明shouldParkAfterFailedAcquire方法返回false然后会在acquireQueued()方法中for (;;)死循环中会继续重试直至compareAndSetWaitStatus设置节点状态位为SIGNAL时shouldParkAfterFailedAcquire返回true时才会执行方法parkAndCheckInterrupt()方法该方法的源码为 private final boolean parkAndCheckInterrupt() {//使得该线程阻塞LockSupport.park(this);return Thread.interrupted();
}
复制代码 该方法的关键是会调用LookSupport.park()方法关于LookSupport会在以后的文章进行讨论该方法是用来阻塞当前线程的。因此到这里就应该清楚了acquireQueued()在自旋过程中主要完成了两件事情 如果当前节点的前驱节点是头节点并且能够获得同步状态的话当前线程能够获得锁该方法执行结束退出获取锁失败的话先将节点状态设置成SIGNAL然后调用LookSupport.park方法使得当前线程阻塞。经过上面的分析独占式锁的获取过程也就是acquire()方法的执行流程如下图所示 3.2 独占锁的释放release()方法 独占锁的释放就相对来说比较容易理解了废话不多说先来看下源码 public final boolean release(int arg) {if (tryRelease(arg)) {Node h head;if (h ! null h.waitStatus ! 0)unparkSuccessor(h);return true;}return false;
}
复制代码 这段代码逻辑就比较容易理解了如果同步状态释放成功tryRelease返回true则会执行if块中的代码当head指向的头结点不为null并且该节点的状态值不为0的话才会执行unparkSuccessor()方法。unparkSuccessor方法源码 private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws node.waitStatus;if (ws 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*///头节点的后继节点Node s node.next;if (s null || s.waitStatus 0) {s null;for (Node t tail; t ! null t ! node; t t.prev)if (t.waitStatus 0)s t;}if (s ! null)//后继节点不为null时唤醒该线程LockSupport.unpark(s.thread);
}
复制代码 源码的关键信息请看注释首先获取头节点的后继节点当后继节点的时候会调用LookSupport.unpark()方法该方法会唤醒该节点的后继节点所包装的线程。因此每一次锁释放后就会唤醒队列中该节点的后继节点所引用的线程从而进一步可以佐证获得锁的过程是一个FIFO先进先出的过程。 到现在我们终于啃下了一块硬骨头了通过学习源码的方式非常深刻的学习到了独占式锁的获取和释放的过程以及同步队列。可以做一下总结 线程获取锁失败线程被封装成Node进行入队操作核心方法在于addWaiter()和enq()同时enq()完成对同步队列的头结点初始化工作以及CAS操作失败的重试;线程获取锁是一个自旋的过程当且仅当 当前节点的前驱节点是头结点并且成功获得同步状态时节点出队即该节点引用的线程获得锁否则当不满足条件时就会调用LookSupport.park()方法使得线程阻塞释放锁的时候会唤醒后继节点总体来说在获取同步状态时AQS维护一个同步队列获取同步状态失败的线程会加入到队列中进行自旋移除队列或停止自旋的条件是前驱节点是头结点并且成功获得了同步状态。在释放同步状态时同步器会调用unparkSuccessor()方法唤醒后继节点。 独占锁特性学习 3.3 可中断式获取锁acquireInterruptibly方法 我们知道lock相较于synchronized有一些更方便的特性比如能响应中断以及超时等待等特性现在我们依旧采用通过学习源码的方式来看看能够响应中断是怎么实现的。可响应中断式锁可调用方法lock.lockInterruptibly();而该方法其底层会调用AQS的acquireInterruptibly方法源码为 public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))//线程获取锁失败doAcquireInterruptibly(arg);
}
复制代码 在获取同步状态失败后就会调用doAcquireInterruptibly方法 private void doAcquireInterruptibly(int arg)throws InterruptedException {//将节点插入到同步队列中final Node node addWaiter(Node.EXCLUSIVE);boolean failed true;try {for (;;) {final Node p node.predecessor();//获取锁出队if (p head tryAcquire(arg)) {setHead(node);p.next null; // help GCfailed false;return;}if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt())//线程中断抛异常throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}
复制代码 关键信息请看注释现在看这段代码就很轻松了吧:),与acquire方法逻辑几乎一致唯一的区别是当parkAndCheckInterrupt返回true时即线程阻塞时该线程被中断代码抛出被中断异常。 3.4 超时等待式获取锁tryAcquireNanos()方法 通过调用lock.tryLock(timeout,TimeUnit)方式达到超时等待获取锁的效果该方法会在三种情况下才会返回 在超时时间内当前线程成功获取了锁当前线程在超时时间内被中断超时时间结束仍未获得锁返回false。我们仍然通过采取阅读源码的方式来学习底层具体是怎么实现的该方法会调用AQS的方法tryAcquireNanos(),源码为 public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquire(arg) ||//实现超时等待的效果doAcquireNanos(arg, nanosTimeout);
}
复制代码 很显然这段源码最终是靠doAcquireNanos方法实现超时等待的效果该方法源码如下 private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout 0L)return false;//1. 根据超时时间和当前时间计算出截止时间final long deadline System.nanoTime() nanosTimeout;final Node node addWaiter(Node.EXCLUSIVE);boolean failed true;try {for (;;) {final Node p node.predecessor();//2. 当前线程获得锁出队列if (p head tryAcquire(arg)) {setHead(node);p.next null; // help GCfailed false;return true;}// 3.1 重新计算超时时间nanosTimeout deadline - System.nanoTime();// 3.2 已经超时返回falseif (nanosTimeout 0L)return false;// 3.3 线程阻塞等待 if (shouldParkAfterFailedAcquire(p, node) nanosTimeout spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);// 3.4 线程被中断抛出被中断异常if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}
复制代码 程序逻辑如图所示 程序逻辑同独占锁可响应中断式获取基本一致唯一的不同在于获取锁失败后对超时时间的处理上在第1步会先计算出按照现在时间和超时时间计算出理论上的截止时间比如当前时间是8h10min,超时时间是10min那么根据deadline System.nanoTime() nanosTimeout计算出刚好达到超时时间时的系统时间就是8h 10min10min 8h 20min。然后根据deadline - System.nanoTime()就可以判断是否已经超时了比如当前系统时间是8h 30min很明显已经超过了理论上的系统时间8h 20mindeadline - System.nanoTime()计算出来就是一个负数自然而然会在3.2步中的If判断之间返回false。如果还没有超时即3.2步中的if判断为true时就会继续执行3.3步通过LockSupport.parkNanos使得当前线程阻塞同时在3.4步增加了对中断的检测若检测出被中断直接抛出被中断异常。 4. 共享锁 4.1 共享锁的获取acquireShared()方法 在聊完AQS对独占锁的实现后我们继续一鼓作气的来看看共享锁是怎样实现的共享锁的获取方法为acquireShared源码为 public final void acquireShared(int arg) {if (tryAcquireShared(arg) 0)doAcquireShared(arg);
}
复制代码 这段源码的逻辑很容易理解在该方法中会首先调用tryAcquireShared方法tryAcquireShared返回值是一个int类型当返回值为大于等于0的时候方法结束说明获得成功获取锁否则表明获取同步状态失败即所引用的线程获取锁失败会执行doAcquireShared方法该方法的源码为 private void doAcquireShared(int arg) {final Node node addWaiter(Node.SHARED);boolean failed true;try {boolean interrupted false;for (;;) {final Node p node.predecessor();if (p head) {int r tryAcquireShared(arg);if (r 0) {// 当该节点的前驱节点是头结点且成功获取同步状态setHeadAndPropagate(node, r);p.next null; // help GCif (interrupted)selfInterrupt();failed false;return;}}if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt())interrupted true;}} finally {if (failed)cancelAcquire(node);}
}
复制代码 现在来看这段代码会不会很容易了逻辑几乎和独占式锁的获取一模一样这里的自旋过程中能够退出的条件是当前节点的前驱节点是头结点并且tryAcquireShared(arg)返回值大于等于0即能成功获得同步状态。 4.2 共享锁的释放releaseShared()方法 共享锁的释放在AQS中会调用方法releaseShared public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}
复制代码 当成功释放同步状态之后即tryReleaseShared会继续执行doReleaseShared方法 private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h head;if (h ! null h ! tail) {int ws h.waitStatus;if (ws Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws 0 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h head) // loop if head changedbreak;}
}
复制代码 这段方法跟独占式锁释放过程有点点不同在共享式锁的释放过程中对于能够支持多个线程同时访问的并发组件必须保证多个线程能够安全的释放同步状态这里采用的CAS保证当CAS操作失败continue在下一次循环中进行重试。 4.3 可中断acquireSharedInterruptibly()方法超时等待tryAcquireSharedNanos()方法 关于可中断锁以及超时等待的特性其实现和独占式锁可中断获取锁以及超时等待的实现几乎一致具体的就不再说了如果理解了上面的内容对这部分的理解也是水到渠成的。 通过这篇加深了对AQS的底层实现更加清楚了也对了解并发组件的实现原理打下了基础学无止境继续加油:);如果觉得不错请给赞嘿嘿。 参考文献 《java并发编程的艺术》