江苏省住房和城乡建设网站,开软件外包公司赚钱吗,北京网站排名优化软件,网站排名效果好在java.util.concurrent包中#xff0c;有两个很特殊的工具类#xff0c;Condition和ReentrantLock,使用过的人都知道#xff0c;ReentrantLock#xff08;重入锁#xff09;是jdk的concurrent包提供的一种独占锁的实现。它继承自Dong Lea的 AbstractQueuedSynchronizer有两个很特殊的工具类Condition和ReentrantLock,使用过的人都知道ReentrantLock重入锁是jdk的concurrent包提供的一种独占锁的实现。它继承自Dong Lea的 AbstractQueuedSynchronizer同步器确切的说是ReentrantLock的一个内部类继承了AbstractQueuedSynchronizerReentrantLock只不过是代理了该类的一些方法可能有人会问为什么要使用内部类在包装一层 我想是安全的关系因为AbstractQueuedSynchronizer中有很多方法还实现了共享锁Condition(稍候再细说)等功能如果直接使ReentrantLock继承它则很容易出现AbstractQueuedSynchronizer中的API被无用的情况。 言归正传今天我们讨论下Condition工具类的实现。 ReentrantLock和Condition的使用方式通常是这样的 运行后结果如下 可以看到 Condition的执行方式是当在线程1中调用await方法后线程1将释放锁并且将自己沉睡等待唤醒 线程2获取到锁后开始做事完毕后调用Condition的signal方法唤醒线程1线程1恢复执行。 以上说明Condition是一个多线程间协调通信的工具类使得某个或者某些线程一起等待某个条件Condition,只有当该条件具备( signal 或者 signalAll方法被带调用)时 这些等待线程才会被唤醒从而重新争夺锁。 那它是怎么实现的呢 首先还是要明白reentrantLock.newCondition() 返回的是Condition的一个实现该类在AbstractQueuedSynchronizer中被实现叫做newCondition 它可以访问AbstractQueuedSynchronizer中的方法和其余内部类 AbstractQueuedSynchronizer是个抽象类至于他怎么能访问这里有个很奇妙的点后面我专门用demo说明 现在我们一起来看下Condition类的实现还是从上面的demo入手 为了方便书写我将AbstractQueuedSynchronizer缩写为AQS 当await被调用时代码如下 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node addConditionWaiter(); //将当前线程包装下后 //添加到Condition自己维护的一个链表中。 int savedState fullyRelease(node);//释放当前线程占有的锁从demo中看到 //调用await前当前线程是占有锁的 int interruptMode 0; while (!isOnSyncQueue(node)) {//释放完毕后遍历AQS的队列看当前节点是否在队列中 //不在 说明它还没有竞争锁的资格所以继续将自己沉睡。 //直到它被加入到队列中聪明的你可能猜到了 //没有错在singal的时候加入不就可以了 LockSupport.park(this); if ((interruptMode checkInterruptWhileWaiting(node)) ! 0) break; } //被唤醒后重新开始正式竞争锁同样如果竞争不到还是会将自己沉睡等待唤醒重新开始竞争。 if (acquireQueued(node, savedState) interruptMode ! THROW_IE) interruptMode REINTERRUPT; if (node.nextWaiter ! null) unlinkCancelledWaiters(); if (interruptMode ! 0) reportInterruptAfterWait(interruptMode); } 回到上面的demo锁被释放后线程1开始沉睡这个时候线程因为线程1沉睡时会唤醒AQS队列中的头结点所所以线程2会开始竞争锁并获取到等待3秒后线程2会调用signal方法“发出”signal信号signal方法如下 public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first firstWaiter; //firstWaiter为condition自己维护的一个链表的头结点 //取出第一个节点后开始唤醒操作 if (first ! null) doSignal(first); } 说明下其实Condition内部维护了等待队列的头结点和尾节点该队列的作用是存放等待signal信号的线程该线程被封装为Node节点后存放于此。 关键的就在于此我们知道AQS自己维护的队列是当前等待资源的队列AQS会在资源被释放后依次唤醒队列中从前到后的所有节点使他们对应的线程恢复执行。直到队列为空。 而Condition自己也维护了一个队列该队列的作用是维护一个等待signal信号的队列两个队列的作用是不同事实上每个线程也仅仅会同时存在以上两个队列中的一个流程是这样的 1. 线程1调用reentrantLock.lock时线程被加入到AQS的等待队列中。 2. 线程1调用await方法被调用时该线程从AQS中移除对应操作是锁的释放。 3. 接着马上被加入到Condition的等待队列中以为着该线程需要signal信号。 4. 线程2因为线程1释放锁的关系被唤醒并判断可以获取锁于是线程2获取锁并被加入到AQS的等待队列中。 5. 线程2调用signal方法这个时候Condition的等待队列中只有线程1一个节点于是它被取出来并被加入到AQS的等待队列中。 注意这个时候线程1 并没有被唤醒。 6. signal方法执行完毕线程2调用reentrantLock.unLock()方法释放锁。这个时候因为AQS中只有线程1于是AQS释放锁后按从头到尾的顺序唤醒线程时线程1被唤醒于是线程1回复执行。 7. 直到释放所整个过程执行完毕。 可以看到整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的Condition作为一个条件类很好的自己维护了一个等待信号的队列并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。 看到这里signal方法的代码应该不难理解了。 取出头结点然后doSignal private void doSignal(Node first) { do { if ( (firstWaiter first.nextWaiter) null) //修改头结点完成旧头结点的移出工作 lastWaiter null; first.nextWaiter null; } while (!transferForSignal(first) //将老的头结点加入到AQS的等待队列中 (first firstWaiter) ! null); } final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p enq(node); int ws p.waitStatus; //如果该结点的状态为cancel 或者修改waitStatus失败则直接唤醒。 if (ws 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } 可以看到正常情况 ws 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为true的所以不会在这个时候唤醒该线程。 只有到发送signal信号的线程调用reentrantLock.unlock()后因为它已经被加到AQS的等待队列中所以才会被唤醒。 总结 本文从代码的角度说明了Condition的实现方式其中涉及到了AQS的很多操作比如AQS的等待队列实现独占锁功能不过这不是本文讨论的重点等有机会再将AQS的实现单独分享出来。 2 更多 关于Condition中访问AQS方法的问题深度解析Java8 – AbstractQueuedSynchronizer的实现分析上从LongAdder 看更高效的无锁实现FutureTask 源码解析一种超时控制的方式ThreadLocal内存泄露分析Zemanta 0.00 avg. rating (0% score) - 0 votes program language AbstractQueuedSynchronizer, Condition, java, 多线程permalink Post navigation 使用WORDPRESS搭建自己的博客 关于CONDITION中访问AQS方法的问题 7 thoughts on “怎么理解Condition” 信言说道 2014年2月15日 下午11:00 向楼主请教一个问题如最后的流程所述“1. 线程1调用reentrantLock.lock时线程被加入到AQS的等待队列中。”此时无线程争用锁线程1会先tryAcquire一次成功则无需入队。因此我认为此处线程1并未加入到AQS的等待队列中。 “2. 线程1调用await方法被调用时该线程从AQS中移除对应操作是锁的释放。”我理解lock之后unlock之前都是在临界区内此时调await直接释放锁离开临界区OK但无需从AQS移除因为移除是即将进入临界区那一刻的事情。 “4. 线程2因为线程1释放锁的关系被唤醒并判断可以获取锁于是线程2获取锁并被加入到AQS的等待队列中。”同理我认为最后一句加入到AQS队列有误。 另外楼主的代码中变量名最好改成thread1和thread2方便对号入座谢谢 回复 liuinsect说道 2014年2月17日 下午3:12 你好根据你的描述依次回复下你的问题1. AQS中维护者唯一的一个队列该队列支持两种模式独占模式和共享模式本文中提到的reentrantlock使用的是其独占模式该队列描述了多线程环境下对锁资源的占用情况其中头结点即是表明占有该资源的线程。所以如果线程1成功获取锁则线程1会被包装成一个NodeAQS中的内部数据结构加入到AQS的队列中你所说的并未加入是不准确的。2.调用await方法后是会从AQS的该队列中移除该Node的从我本文贴出的源码中可以看到在await方法中有fullyRelease操作这个操作会引起结点的移除。 最后再说明下AQS只是维护了一个在多线程环境下对某个资源的占用情况对外可以理解成“临界区” 但在AQS内部来说不过是检查在当前条件下是否可以获取资源这种操作的一种封装。所以AQS的队列上挂了所有对该资源请求的线程而AQS定义了头结点是表示占有该资源的线程独占模式。在共享模式下则队列上的一系列结点都可以同时占有资源对应于唤醒的时候这一些列线程都会被唤醒。 回复 信言说道 2014年2月20日 下午4:34 感谢楼主的回复。我查了源码ReentrantLock.lock()调了内部类Sync的抽象方法lock后者有一个公平和另一个不公平的实现。以不公平的实现NonfairSync默认为例lock方法源码为 final void lock() {if (compareAndSetState(0, 1))//Try immediate bargesetExclusiveOwnerThread(Thread.currentThread());else//backing up to normal acquire on failure.acquire(1);}如果cas操作成功直接进入临界区执行lock后续的代码否则走常规流程调acquire()acquire源码为 public final void acquire(int arg) {if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();} tryAcquire如果成功返回trueif表达式短路就直接结束了。似乎没有源码能对应到包装成一个Node加入AQS队列 回复 liuinsect说道 2014年2月22日 上午10:48 如果tryAcquire 成功了没有必要增加到AQS的等待队列中了 反之如果增加不成功进入到acquireQueued方法中去则会将当先现线程增加到AQS的等待队列中去的。 回复 信言说道 2014年2月20日 下午5:03 再看fullyRelease的源码似乎没有出现结点从队列移除的代码 final long fullyRelease(Node node) {boolean failed true;try {long savedState getState();if (release(savedState)) {//调用releasefailed false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus Node.CANCELLED;}} 它调用了releasepublic final boolean release(long arg) {if (tryRelease(arg)) {Node h head;if (h ! null h.waitStatus ! 0)unparkSuccessor(h);return true;}return false;} 再调用了tryRelease如果成功唤醒AQS队列的头结点让它尝试进入临界区因此我理解的AQS队列上的每个结点都代表了一个正等待进入临界区而被block的线程 而tryRelease纯粹是状态值的操作也不涉及出队列protected final boolean tryRelease(int releases) {int c getState() – releases;if (Thread.currentThread() ! getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free false;if (c 0) {free true;setExclusiveOwnerThread(null);}setState(c);return free;} 回复 liuinsect说道 2014年2月20日 下午8:26 unparkSuccessor 方法中有移除节点的方法:private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws node.waitStatus;if (ws 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s node.next; if (s null || s.waitStatus 0) {s null;for (Node t tail; t ! null t ! node; t t.prev)if (t.waitStatus 0) s t; } if (s ! null) LockSupport.unpark(s.thread); } 回复 Pingback 怎么理解Condition | 并发编程网 - ifeve.com