当前位置:网站首页>AQS源码深入分析之条件队列
AQS源码深入分析之条件队列
2020-11-09 19:27:00 【程序猿欧文】
本文基于JDK-8u261源码分析
1 简介
因为CLH队列中的线程,什么线程获取到锁,什么线程进入队列排队,什么线程释放锁,这些都是不受我们控制的。所以条件队列的出现为我们提供了主动式地、只有满足指定的条件后才能线程阻塞和唤醒的方式。对于条件队列首先需要说明一些概念:条件队列是AQS中除了CLH队列之外的另一种队列,每创建一个Condition实际上就是创建了一个条件队列,而每调用一次await方法实际上就是往条件队列中入队,每调用一次signal方法实际上就是往条件队列中出队。不像CLH队列上节点的状态有多个,条件队列上节点的状态只有一个:CONDITION。所以如果条件队列上一个节点不再是CONDITION状态时,就意味着这个节点该出队了。需要注意的是,条件队列只能运行在独占模式下。
一般在使用条件队列作为阻塞队列来使用时都会创建两个条件队列:notFull和notEmpty。notFull表示当条件队列已满的时候,put方法会处于等待状态,直到队列没满;notEmpty表示当条件队列为空的时候,take方法会处于等待状态,直到队列有数据了。
而notFull.signal方法和notEmpty.signal方法会将条件队列上的节点移到CLH队列中(每次只转移一个)。也就是说,存在一个节点从条件队列被转移到CLH队列的情况发生。同时也意味着,条件队列上不会发生锁资源竞争,所有的锁竞争都是发生在CLH队列上的。
其他一些条件队列和CLH队列之间的差异如下:
- 条件队列使用nextWaiter指针来指向下一个节点,是一个单向链表结构,不同于CLH队列的双向链表结构;
- 条件队列使用firstWaiter和lastWaiter来指向头尾指针,不同于CLH队列的head和tail;
- 条件队列中的第一个节点也不会像CLH队列一样,是一个特殊的空节点;
- 不同于CLH队列中会用很多的CAS操作来控制并发,条件队列进队列的前提是已经获取到了独占锁资源,所以很多地方不需要考虑并发。
下面就是具体的源码分析了。条件队列以ArrayBlockingQueue来举例:
2 构造器
1 /** 2 * ArrayBlockingQueue: 3 */ 4 public ArrayBlockingQueue(int capacity) { 5 this(capacity, false); 6} 7 8 public ArrayBlockingQueue(int capacity, boolean fair) { 9 if (capacity <= 0)10 throw new IllegalArgumentException();11 //存放实际数据的数组12 this.items = new Object[capacity];13 //独占锁使用ReentrantLock来实现(fair表示的就是公平锁还是非公平锁,默认为非公平锁)14 lock = new ReentrantLock(fair);15 //notEmpty条件队列16 notEmpty = lock.newCondition();17 //notFull条件队列18 notFull = lock.newCondition();19 }
3 put方法
1 /** 2 * ArrayBlockingQueue: 3 */ 4 public void put(E e) throws InterruptedException { 5 //非空校验 6 checkNotNull(e); 7 final ReentrantLock lock = this.lock; 8 /* 9 获取独占锁资源,响应中断模式。其实现代码和lock方法还有Semaphore的acquire方法是类似的 10 因为这里分析的是条件队列,于是就不再分析该方法的细节了 11 */ 12 lock.lockInterruptibly(); 13 try { 14 while (count == items.length) 15 //如果数组中数据已经满了的话,就在notFull中入队一个新节点,并阻塞当前线程 16 notFull.await(); 17 //添加数组元素并唤醒notEmpty 18 enqueue(e); 19 } finally { 20 //释放锁资源 21 lock.unlock(); 22 } 23 }
4 await方法
如果在put的时候发现数组已满,或者在take的时候发现数组是空的,就会调用await方法来将当前节点放入条件队列中:
1 /** 2 * AbstractQueuedSynchronizer: 3 */ 4 public final void await() throws InterruptedException { 5 //如果当前线程被中断就抛出异常 6 if (Thread.interrupted()) 7 throw new InterruptedException(); 8 //把当前节点加入到条件队列中 9 Node node = addConditionWaiter();10 //释放之前获取到的锁资源,因为后续会阻塞该线程,所以如果不释放的话,其他线程将会等待该线程被唤醒11 int savedState = fullyRelease(node);12 int interruptMode = 0;13 //如果当前节点不在CLH队列中则阻塞住,等待unpark唤醒14 while (!isOnSyncQueue(node)) {15 LockSupport.park(this);16 /*17 这里被唤醒可能是正常的signal操作也可能是被中断了。但无论是哪种情况,都会将当前节点插入到CLH队列尾,18 并退出循环(注意,这里被唤醒除了上面两种情况之外,还有一种情况是操作系统级别的虚假唤醒(spurious wakeup),19 也就是当前线程毫无理由就会被唤醒了,所以上面需要使用while来规避掉这种情况)20 */21 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)22 break;23 }24 //走到这里说明当前节点已经插入到了CLH队列中(被signal所唤醒或者被中断)。然后在CLH队列中进行获取锁资源的操作25 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)26 /*27 <<<THROW_IE和REINTERRUPT的解释详见transferAfterCancelledWait方法>>>2829 之前分析过的如果acquireQueued方法返回true,说明当前线程被中断了30 返回true意味着在acquireQueued方法中此时会再一次被中断(注意,这意味着有两个代码点判断线程是否被中断:31 一个是在第15行代码处,另一个是在acquireQueued方法里面),如果之前没有被中断,则interruptMode=0,32 而在acquireQueued方法里面线程被中断返回了,这个时候将interruptMode重新修正为REINTERRUPT即可33 至于为什么不修正为THROW_IE是因为在这种情况下,第15行代码处已经通过调用signal方法正常唤醒了,34 节点已经放进了CLH队列中。而此时的中断是在signal操作之后,在第25行代码处去抢锁资源的时候发生的35 这个时候中断不中断已经无所谓了,所以就不需要抛出InterruptedException36 */37 interruptMode = REINTERRUPT;38 /*39 走到这里说明当前节点已经获取到了锁资源(获取不到的话就会被再次阻塞在acquireQueued方法里)40 如果interruptMode=REINTERRUPT的话,说明之前已经调用过signal方法了,也就是说该节点已经从条件队列中剔除掉了,41 nextWaiter指针肯定为空,所以在这种情况下是不需要执行unlinkCancelledWaiters方法的42 而如果interruptMode=THROW_IE的话,说明之前还没有调用过signal方法来从条件队列中剔除该节点。这个时候就需要调用43 unlinkCancelledWaiters方法来剔除这个节点了(在之前的transferAfterCancelledWait方法中44 已经把该节点的状态改为了初始状态0),顺便把所有其他不是CONDITION状态的节点也一并剔除掉。注意:如果当前节点是条件队列中的45 最后一个节点的话,并不会被清理。无妨,等到下次添加节点或调用signal方法的时候就会被清理了46 */47 if (node.nextWaiter != null)48 unlinkCancelledWaiters();49 //根据不同模式处理中断(正常模式不需要处理)50 if (interruptMode != 0)51 reportInterruptAfterWait(interruptMode);52 }
5 addConditionWaiter方法
在条件队列中添加一个节点的逻辑:
1 /** 2 * AbstractQueuedSynchronizer: 3 */ 4 private Node addConditionWaiter() { 5 Node t = lastWaiter; 6 /* 7 如果最后一个节点不是CONDITION状态,就删除条件队列中所有不是CONDITION状态的节点 8 至于为什么只需要判断最后一个节点的状态就能知道整个队列中是否有不是CONDITION的节点,后面会说明 9 */ 10 if (t != null && t.waitStatus != Node.CONDITION) {11 //删除所有不是CONDITION状态的节点12 unlinkCancelledWaiters();13 t = lastWaiter;14 }15 //创建一个类型为CONDITION的新节点16 Node node = new Node(Thread.currentThread(), Node.CONDITION);17 if (t == null)18 //t为null意味着此时条件队列中为空,直接将头指针指向这个新节点即可19 firstWaiter = node;20 else21 //t不为null的话就说明此时条件队列中有节点,直接在尾处加入这个新节点22 t.nextWaiter = node;23 //尾指针指向这个新节点,添加节点完毕24 lastWaiter = node;25 /*26 注意,这里不用像CLH队列中的enq方法一样,如果插入失败就会自旋直到插入成功为止27 因为此时还没有释放独占锁28 */29 return node;30 }3132 /**33 * 第12行代码处:34 * 删除条件队列当中所有不是CONDITION状态的节点35 */36 private void unlinkCancelledWaiters() {37 Node t = firstWaiter;38 /*39 在下面的每次循环中,trail指向的是从头到循环的节点为止,最后一个是CONDITION状态的节点40 这样做是因为要剔除队列中间不是CONDITION的节点,就需要保留上一个是CONDITION节点的指针,41 然后直接trail.nextWaiter = next就可以断开了42 */43 Node trail = null;44 while (t != null) {45 Node next = t.nextWaiter;46 if (t.waitStatus != Node.CONDITION) {47 t.nextWaiter = null;48 if (trail == null)49 firstWaiter = next;50 else51 trail.nextWaiter = next;52 if (next == null)53 lastWaiter = trail;54 } else55 trail = t;56 t = next;57 }58 }
6 fullyRelease方法
释放锁资源,包括可重入锁的所有锁资源:
1 /** 2 * AbstractQueuedSynchronizer: 3 */ 4 final int fullyRelease(Node node) { 5 boolean failed = true; 6 try { 7 int savedState = getState(); 8 /* 9 释放锁资源。注意这里是释放所有的锁,包括可重入锁有多次加锁的话,会一次性全部释放。因为在上一行10 代码savedState存的是所有的锁资源,而这里就是释放这些所有的资源,这也就是方法名中“fully”的含义11 */12 if (release(savedState)) {13 failed = false;14 return savedState;15 } else {16 /*17 释放失败就抛异常,也就是说没有释放干净,可能是在并发的情景下state被修改了的原因,18 也可能是其他原因。注意如果在这里抛出异常了那么会走第166行代码19 */20 throw new IllegalMonitorStateException();21 }22 } finally {23 /*24 如果释放锁失败,就把节点置为CANCELLED状态。比较精妙的一点是,在之前addConditionWaiter方法中的第10行代码处,25 判断条件队列中是否有不是CONDITION的节点时,只需要判断最后一个节点的状态是否是CONDITION就行了26 按常理来说,是需要遍历整个队列才能知道的。但是条件队列每次添加新节点都是插在尾处,而如果释放锁失败,27 会将这个新添加的、在队列尾巴的新节点置为CANCELLED状态。而之前的CONDITION节点必然都是在队头28 因为如果此时再有新的节点入队的话,会首先在add.........
版权声明
本文为[程序猿欧文]所创,转载请带上原文链接,感谢
https://my.oschina.net/mikeowen/blog/4710263
边栏推荐
- 零基础小白python入门——深入Python中的文件操作
- 浅谈API网关(API Gateway)如何承载API经济生态链
- 磁阻式随机存储器MRAM基本原理
- Super low price cloud server is coming
- RabbitMQ安装
- Introduction to zero base little white Python
- 磁阻式随机存储器MRAM基本原理
- Gesture switch background, let live with goods more immersive
- 菜鸟福音,28本书籍循序渐进让你成为大牛!(附学习大纲一份)
- 数据库执行truncate table CM_CHECK_ITEM_HIS怎么恢复
猜你喜欢
[interview experience] bat programmers interviewed 200 people and analyzed the most frequently asked interview questions
Gesture switch background, let live with goods more immersive
Installation and deployment of Flink
【科创人】Rancher江鹏:从清华工程物理学硕士到云计算开源创业者
揭秘在召唤师峡谷中移动路径选择逻辑?
超简单集成华为系统完整性检测,搞定设备安全防护
C console calls ffmpeg to push MP4 video file to stream media open source service platform easydarwin process
Abbyy finereader 15 added edit table cell function
关于生活,可能有用的40条建议
Configure static IP address in ubuntu18.04 NAT mode -2020.11.09
随机推荐
Ubuntu18.04 NAT模式下配置静态IP地址 -2020.11.09
RBAC of kubernetes authority management (1)
Rookie gospel, 28 books step by step to make you a big bull! (a copy of learning syllabus attached)
【STM32H7】第6章 ThreadX GUIX上手之STM32H7 DMA2D加速
配置ng-zerro的nz-date-picker时间选择组件
Share tips on editing letters and mathematical formulas with MathType
配置ng
Abbyy finereader 15 adds editing page layout function
数据库执行truncate table CM_CHECK_ITEM_HIS怎么恢复
揭秘在召唤师峡谷中移动路径选择逻辑?
老旧系统重构技巧,轻松搞定遗留代码
Numeric keyboard with decimal point in IOS
[interview experience] bat programmers interviewed 200 people and analyzed the most frequently asked interview questions
CentOS view the number of CPU cores and cpuinfo analysis
[stm32f429] Chapter 6: stm32f429 dma2d acceleration of ThreadX guix
容器技术(三)镜像小结【16】
Git老鸟查询手册
js对象数组去重
[graffiti Internet of things footprint] graffiti cloud platform interface description
Git + -- Code hosting in the history of version management