金融类网站源码,广西建筑模板,品牌如何推广,广州住建网站AQS
java.util.concurrent.locks.AbstractQueuedSynchronizer AQS #xff08;抽象队列同步器#xff09;#xff1a; AbstractQueuedSynchronizer 是什么
来自jdk1.5#xff0c;是用来实现锁或者其他同步器组件的公共基础部分的抽象实现#xff0c;是重量级基础框架以及…AQS
java.util.concurrent.locks.AbstractQueuedSynchronizer AQS 抽象队列同步器 AbstractQueuedSynchronizer 是什么
来自jdk1.5是用来实现锁或者其他同步器组件的公共基础部分的抽象实现是重量级基础框架以及JUC的基石主要用于解决锁分配给谁的问题整体是通过一个抽象的FIFO队列来完成资源获取线程的排队工作并通过一个int变量表示持有锁的状态锁是面向开发人员的而同步器是JDK统一规范并简化了锁的实现并抽象出来的公共基础部分屏蔽了同步状态、同步队列的管理和线程排队、通知、唤醒等机制
和AQS相关的类
ReentranLockCountDownLatchReentrantReadWriteLockSemaphore等 AQS 原理
整体是通过一个抽象的FIFO队列来完成资源获取线程的排队工作并通过一个int变量表示持有锁的状态如果共享资源被占用就需要阻塞唤醒机制来保证锁的分配这个机制主要是通过CLH队列的变体实现的将暂时获取锁失败的线程以及自身的等待状态封装成队列的节点对象node放入队列中通过CAS、自旋等维护共享资源的状态达到并发效果内部结构 队列的 头指针、尾指针int 类型的同步状态的标识 state 默认值0代表没有被占用大于等于1代表被占用内部类node将暂时获取锁失败的线程以及自身的等待状态封装成队列的节点对象node int类型变量 waitStatus 当前节点再队列中的等待状态默认为0 1表示线程被取消-1表示后继线程需要被唤醒-2表示等待conditon唤醒-3表示共享式锁分为共享和独占同步状态获取将无条件地传播下去 前一个节点的指针和后一个节点的指针请求线程
ReentrantLock
是lock的实现类构造器可以传入一个boolean值true创建的就是公平锁false为非公平锁默认为非公平锁内部有一个静态内部类sync继承了 AbstractQueuedSynchronizer 用于锁的各种操作
lock 当调用 lock 方法加锁时 非公平锁会先尝试通过cas 比较并交换的操作把 states 的状态值从 0更新为1如果更新成功就把持有锁的线程设置为自己更新失败就和公平锁一样执行 AQS 的 acquire方法除此之外公平和非公平锁的区别就是再获取同步状态时公平锁需要判断等待队列中再自己之前是否存在有效节点如果有公平锁就需要排队因为公平锁讲究先到先得线程再获取锁时如果这个锁的等待队列已经有线程再等待当前线程就会直接进入等待队列而非公平锁不管是否有队列如果可以获取锁就会立刻占有锁的对象所以第一个在队列里排队的线程苏醒后仍然需要去竞争锁且不一定能竞争到锁
acquire 方法源码 public final void acquire(int arg) {if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}acquire 调用lock方法加锁除非是非公平锁能直接拿到锁其他情况下都是在调用acquire 方法 acquire 方法分为三种情况 调用 tryAcquire 方法尝试加锁 加锁失败调用 addWaite方法进入等待队列 进入队列之后调用acquireQueued 方法线程进入阻塞状态等待唤醒后才能继续执行
非公平锁的 tryAcquire 方法源码 AQS类的tryAcquire方法只是做了规范方法内直接抛出异常所以这个方法需要由子类去实现 非公平锁的tryAcquire 方法会先判断锁的状态state是否为0为0说明没有被其他线程占用就立即使用cas操作变更state为1变更成功就把持有锁的线程设置为自己变更失败就表示加锁失败 如果锁的状态为1说明锁已经被占用在比较当前线程和持有锁的线程是否一致不一致就加锁失败 tryAcquire 方法公平和非公平锁的区别是 再获取同步状态时公平锁需要判断等待队列中再自己之前是否存在有效节点如果有公平锁就需要排队非公平锁不管是否有队列如果可以获取锁就会立刻占有锁的对象 如果 tryAcquire 方法抢锁失败就需要调用 addWaiter加入到等待队列 final boolean nonfairTryAcquire(int acquires) {final Thread current Thread.currentThread();int c getState();//先判断锁的状态state是否为0为0说明没有被其他线程占用if (c 0) {//为0说明没有被其他线程占用使用cas操作变更state为1if (compareAndSetState(0, acquires)) {//变更成功就把持有锁的线程设置为自己变更失败就表示加锁失败setExclusiveOwnerThread(current);return true;}}//如果锁的状态为1说明锁已经被占用在比较当前线程和持有锁的线程是否一致else if (current getExclusiveOwnerThread()) {int nextc c acquires;if (nextc 0) // overflowthrow new Error(Maximum lock count exceeded);setState(nextc);return true;}//加锁失败return false;}addWaiter(Node.EXCLUSIVE) 加入等待队列 源码
Node.EXCLUSIVE 代表的是独占的节点也就是排他锁acquire 方法的 addWaiter 方法创建的是独占的node节点节点中封装的是当前线程addWaiter方法首先要判断 链表的尾指针是否为空 如果为空就需要初始化链表首先new一个空的哨兵节点这个节点并不存储信息只是作为占位使用然后设置哨兵节点为头节点然后把头节点赋值给尾节点当链表初始化完成后或者链表中已经由其他节点时就用CAS操作把新节点加入到链表尾部如果节点加入链表失败就进行下一次循环直到把节点加入成功为止如果不为空直接用CAS操作把新节点加入到链表尾部同样如果节点加入链表失败就进行循环直到把节点加入成功为止 节点成功入队后需要调用acquireQueued 方法
private Node addWaiter(Node mode) {//node节点中封装的是当前线程Node node new Node(Thread.currentThread(), mode);//尾指针Node pred tail;//链表的尾指针是否为空if (pred ! null) {node.prev pred;//如果加入失败就会走下面的循环直到把节点加入链表为止if (compareAndSetTail(pred, node)) {pred.next node;return node;}}enq(node);return node;
}private Node enq(final Node node) {for (;;) {//尾节点Node t tail;//如果尾节点为nullif (t null) { // Must initialize//就需要new一个node节点并且设置为头节点然后把头节点赋值给尾节点if (compareAndSetHead(new Node()))tail head;} else {//当链表初始化完成后或者链表中已经由其他节点时//把要加入链表的新节点的前指针设置为尾节点node.prev t;//并且把新加入的节点设置为尾节点if (compareAndSetTail(t, node)) {//设置成功就之前尾节点的后指针指向新节点这样新节点就变成了新的尾节点如果设置失败就继续循环直到把新节点加入到链表尾部为止t.next node;return t;}}}}acquireQueued 源码
首先获取当前节点的前置节点如果前置节点是头节点就尝试去获取锁如果获取锁成功就把自己设为头节点就把锁的state改为1设置当前线程为持有锁的线程如果前置节点不是头节点或者获取锁失败 就需要判断前置节点的waitStatus状态值waitStatus值默认为0第一次进入循环会把前置节点的waitStatus的值改为-1后继续下一次循环后会调用 LockSupport.park 方法阻塞当前线程需要等待其他线程释放锁后再唤醒阻塞的线程当持有锁的线程释放锁且调用LockSupport.unpark 唤醒该线程后才能继续执行LockSupport.unpark 唤醒的是头节点的下一个节点线程被唤醒后,检查线程是否被中断如果线程没有被中断就继续进行循环继续尝试去加锁因为是非公平锁所以有可能会加锁失败 如果加锁成功就把锁的state改为1设置当前线程为持有锁的线程并且把当前线程的节点设置为链表的头节点原本的头节点会从链表中剔除因为每次唤醒的都是头节点的下一个节点所以成功抢到到锁后被唤醒的节点会成为新的头节点后续会唤醒链表的下一个节点 如果线程在等待过程中取消没有获取到锁就跳出了循环failed值为默认的true就会执行cancelAcquire方法取消正在排队的节点 首先设置当前节点的线程为null然后获取上一个没有取消的前置节点把当前节点的 waitStatus 设置为11就是要取消的节点如果当前节点是尾节点就把上一个有效的节点设置为尾节点如果不是尾节点并且满足出队条件就变更链表中相关节点的前置和后置引用剔除要取消的节点 //arg为1独占锁final boolean acquireQueued(final Node node, int arg) {boolean failed true;try {boolean interrupted false;for (;;) {//获得node节点的前置节点final Node p node.predecessor();//node节点的前置节点是否为头节点如果是就尝试去获取锁if (p head tryAcquire(arg)) {setHead(node);p.next null; // help GCfailed false;return interrupted;}//判断node节点的前置节点的waitStatus状态默认情况下都是0在第二次循环的时候就会改成-1然后执行parkAndCheckInterrupt方法//parkAndCheckInterrupt方法会阻塞当前线程//也就是后面的节点会把前面节点的 waitStatus 改为-1if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt())interrupted true;}} finally {if (failed)cancelAcquire(node);}}/*** waitStatus 当前节点再队列中的等待状态默认为01表示线程获取锁的请求被取消-1表示线程已经准备好了-2表示节点在等待队列中等待唤醒-3表示共享式锁分为共享和独占同步状态获取将无条件地传播下去*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//前置节点的状态int ws pred.waitStatus;// SIGNAL -1 当线程再次进行循环的时候前一个节点的waitStatus已经被设置为-1就返回trueif (ws Node.SIGNAL)return true;//线程被取消if (ws 0) {do {node.prev pred pred.prev;} while (pred.waitStatus 0);pred.next node;} else {//如果前置节点的 waitStatus不等于-1也不大于0就把waitStatus的值改为-1后返回falsecompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}//阻塞当前线程private final boolean parkAndCheckInterrupt() {//验证当前线程的通行证阻塞当前线程LockSupport.park(this);//被唤醒后,检查线程是否被中断如果线程没有被中断就返回 falsereturn Thread.interrupted();}
取消正在进行的获取尝试 // node 为需要取消的节点private void cancelAcquire(Node node) {// Ignore if node doesnt existif (node null)return;//设置当前节点的线程为nullnode.thread null;//获取上一个节点Node pred node.prev;//waitStatus 0 表示上一个节点也要取消while (pred.waitStatus 0)//那么就一直向上找直到找到没有取消的前置节点node.prev pred pred.prev;//获取不会取消的前置节点的下一个节点Node predNext pred.next;//把当前节点的 waitStatus 设置为11就是要取消的节点node.waitStatus Node.CANCELLED;//如果当前节点是尾节点就把上一个还有效的节点设置为尾节点if (node tail compareAndSetTail(node, pred)) {//设置成功就把上一个节点的后置节点设置为null,这样上一个还有效的节点就成为了尾节点compareAndSetNext(pred, predNext, null);} else {//否则int ws;//前置节点不能是头节点因为头节点只是占位节点并且满足出队条件if (pred ! head ((ws pred.waitStatus) Node.SIGNAL ||(ws 0 compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) pred.thread ! null) {//变更链表中相关节点的前置和后置引用剔除要取消的节点Node next node.next;if (next ! null next.waitStatus 0)compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);}node.next node; // help GC}}
unlock
unlock 源码其实是再调用release方法 public void unlock() {sync.release(1);}release方法会首先尝试释放锁
tryRelease 会把持有锁的线程为null并且把锁的state设置为0如果链表被初始化过有在等待的线程节点头节点就不为空且waitStatus值为-1接下来会把头节点的waitStatus的改为0如果头节点的下一个节点不为null就调用LockSupport.unpark 方法唤醒头节点的下一个节点 public final boolean release(int arg) {if (tryRelease(arg)) {Node h head;if (h ! null h.waitStatus ! 0)unparkSuccessor(h);return true;}return false;}
AQS的tryRelease方法同样没有做实现需要子类自己去实现下面是ReentrantLock的实现 protected final boolean tryRelease(int releases) {//传入的releases为1持有锁的线程State为1所以C为0int c getState() - releases;//如果当前线程不等于持有锁的线程会抛出异常这种情况一般不会出现if (Thread.currentThread() ! getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free false;//c等于0就设置持有锁的线程为null,并且把state设置为0返回trueif (c 0) {free true;setExclusiveOwnerThread(null);}setState(c);return free;}unparkSuccessor private void unparkSuccessor(Node node) {int ws node.waitStatus;if (ws 0)//重新把头节点的waitStatus值改为0compareAndSetWaitStatus(node, ws, 0);//头节点的下一个节点Node s node.next;//如果链表被初始化过有在等待的线程节点头节点的后置节点就不为null//如果链表后面还有其他节点那么头节点的后置节点waitStatus值就为-1if (s null || s.waitStatus 0) {s null;for (Node t tail; t ! null t ! node; t t.prev)if (t.waitStatus 0)s t;}//如果头节点的下一个节点不为null就直接调用 LockSupport.unpark 方法唤醒头节点的下一个节点if (s ! null)LockSupport.unpark(s.thread);}