当前位置:网站首页>AQS source code in-depth analysis of conditional queue
AQS source code in-depth analysis of conditional queue
2020-11-09 19:27:00 【Irving the procedural ape】
This article is based on JDK-8u261 Source code analysis
1 brief introduction
because CLH Threads in the queue , What thread gets the lock , What threads are queued , What thread releases the lock , These are beyond our control . So the emergence of conditional queues provides us with the initiative to 、 Only when the specified conditions are met can the thread block and wake up . For conditional queues, we first need to explain some concepts : The condition queue is AQS In addition to CLH A queue other than a queue , For every one created Condition It's actually creating a conditional queue , And every time it's called await The method is actually to join the conditional queue , Every call signal The method is actually to queue up in the conditional queue . Unlike CLH There are multiple states of nodes on the queue , There is only one state of the node on the conditional queue :CONDITION. So if the node on the conditional queue is no longer CONDITION In the state of , It means that this node should be out of the team . It should be noted that , Conditional queues can only run in exclusive mode .
In general, when using conditional queue as blocking queue, two conditional queues will be created :notFull and notEmpty.notFull When the condition queue is full ,put The method will be in a wait state , Until the queue is not full ;notEmpty When the condition queue is empty ,take The method will be in a wait state , Until the queue has data .
and notFull.signal Methods and notEmpty.signal Method will move the node on the conditional queue to CLH In line ( Transfer only one at a time ). in other words , There is a node that is transferred from the conditional queue to CLH The situation in the queue . It also means , Lock resource contention will not occur on conditional queues , All of the lock competition happens in CLH On the queue .
Some other conditional queues and CLH The differences between queues are as follows :
- Conditional queues use nextWaiter Pointer to the next node , Is a one-way linked list structure , differ CLH Double linked list structure of queue ;
- Conditional queues use firstWaiter and lastWaiter To point to the head and tail , differ CLH Queued head and tail;
- The first node in the conditional queue will not look like CLH The lines are the same , It's a special empty node ;
- differ CLH There will be a lot of CAS Operation to control concurrency , The premise of conditional queue entering the queue is that the exclusive lock resource has been obtained , So many places don't need to consider concurrency .
The following is the specific source analysis . Conditional queue with ArrayBlockingQueue For example :
2 Constructors
1 /** 2 * ArrayBlockingQueue: 3 */ 4 public ArrayBlockingQueue(int capacity) { 5 this(capacity, false); 6} 7 8 public ArrayBlockingQueue(int capacity, boolean fair) { 9 if (capacity <= 0)10 throw new IllegalArgumentException();11 // An array of actual data 12 this.items = new Object[capacity];13 // Exclusive lock use ReentrantLock To achieve (fair Is it fair lock or unfair lock , Default to unfair lock )14 lock = new ReentrantLock(fair);15 //notEmpty Condition queue 16 notEmpty = lock.newCondition();17 //notFull Condition queue 18 notFull = lock.newCondition();19 }
3 put Method
1 /** 2 * ArrayBlockingQueue: 3 */ 4 public void put(E e) throws InterruptedException { 5 // Non empty verification 6 checkNotNull(e); 7 final ReentrantLock lock = this.lock; 8 /* 9 Get exclusive lock resource , Response interrupt mode . In fact, modern code and lock There are ways Semaphore Of acquire The method is similar 10 Because this is the conditional queue , So the details of the method are no longer analyzed 11 */ 12 lock.lockInterruptibly(); 13 try { 14 while (count == items.length) 15 // If the array is full , It's just notFull A new node in the Chinese team , And block the current thread 16 notFull.await(); 17 // Add array elements and wake up notEmpty 18 enqueue(e); 19 } finally { 20 // Release lock resource 21 lock.unlock(); 22 } 23 }
4 await Method
If in put The array is full , Or in take The array is empty , Will call await Method to put the current node in the conditional queue :
1 /** 2 * AbstractQueuedSynchronizer: 3 */ 4 public final void await() throws InterruptedException { 5 // If the current thread is interrupted, an exception is thrown 6 if (Thread.interrupted()) 7 throw new InterruptedException(); 8 // Add the current node to the conditional queue 9 Node node = addConditionWaiter();10 // Release the lock resource obtained before , Because the thread will be blocked later , So if you don't release it , Other threads will wait for the thread to wake up 11 int savedState = fullyRelease(node);12 int interruptMode = 0;13 // If the current node is not in CLH The queue is blocked , wait for unpark Wake up the 14 while (!isOnSyncQueue(node)) {15 LockSupport.park(this);16 /*17 It may be normal to be awakened here signal The operation may have been interrupted . But in either case , Will insert the current node into CLH Queue tail ,18 And exit the loop ( Be careful , It's awakened here except in the two cases above , There is also a false wake-up at the operating system level (spurious wakeup),19 That is, the current thread will be awakened for no reason , So you need to use while To avoid this situation )20 */21 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)22 break;23 }24 // Go here to show that the current node has been inserted into CLH In line ( By signal Awakened or interrupted ). And then in CLH The operation of getting lock resource in the queue 25 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)26 /*27 <<<THROW_IE and REINTERRUPT For an explanation, see transferAfterCancelledWait Method >>>2829 The previous analysis if acquireQueued Method returns true, Indicates that the current thread has been interrupted 30 return true It means in acquireQueued Method will be interrupted again ( Be careful , This means that there are two code points that determine whether the thread is interrupted :31 One is at the 15 Line code , The other is in acquireQueued Method inside ), If it hasn't been interrupted before , be interruptMode=0,32 And in the acquireQueued Method in which the thread is interrupted and returned , This time will be interruptMode It is revised as REINTERRUPT that will do 33 As for why not amend it to THROW_IE Because in this case , The first 15 Line code has been called by signal The method wakes up normally ,34 The node has been put in CLH In line . And the interruption at this time is signal After the operation , In the 25 Line code to grab lock resources occurred when 35 At this time, it doesn't matter whether you interrupt or not , So there's no need to throw InterruptedException36 */37 interruptMode = REINTERRUPT;38 /*39 This indicates that the current node has obtained the lock resource ( If you can't get it, you'll be blocked again acquireQueued In the method )40 If interruptMode=REINTERRUPT Words , Indicates that you have called signal The method , In other words, the node has been removed from the conditional queue ,41 nextWaiter The pointer must be empty , So in this case, there is no need to execute unlinkCancelledWaiters Methodical 42 And if the interruptMode=THROW_IE Words , Description has not been called before signal Method to remove the node from the conditional queue . At this point, you need to call 43 unlinkCancelledWaiters Method to remove this node ( Prior to transferAfterCancelledWait In the method 44 The state of the node has been changed to the initial state 0), By the way, all the others are not CONDITION State nodes are also removed . Be careful : If the current node is in the conditional queue 45 The last node , It's not going to be cleaned up . It's ok , Wait until the next time you add a node or call signal Methods will be cleaned up 46 */47 if (node.nextWaiter != null)48 unlinkCancelledWaiters();49 // Processing interrupts according to different modes ( Normal mode doesn't need to deal with )50 if (interruptMode != 0)51 reportInterruptAfterWait(interruptMode);52 }
5 addConditionWaiter Method
Logic to add a node to the conditional queue :
1 /** 2 * AbstractQueuedSynchronizer: 3 */ 4 private Node addConditionWaiter() { 5 Node t = lastWaiter; 6 /* 7 If the last node is not CONDITION state , Delete all not in the conditional queue CONDITION Nodes of state 8 As for why we only need to judge the state of the last node to know whether there is no in the whole queue CONDITION The node of , It will be explained later 9 */ 10 if (t != null && t.waitStatus != Node.CONDITION) {11 // Delete all not CONDITION Nodes of state 12 unlinkCancelledWaiters();13 t = lastWaiter;14 }15 // Create a type of CONDITION The new node 16 Node node = new Node(Thread.currentThread(), Node.CONDITION);17 if (t == null)18 //t by null It means that the condition queue is empty at this time , Just point the head pointer to the new node 19 firstWaiter = node;20 else21 //t Not for null This means that there are nodes in the conditional queue , Add this new node directly to the end 22 t.nextWaiter = node;23 // The tail pointer points to the new node , Finished adding nodes 24 lastWaiter = node;25 /*26 Be careful , It doesn't have to look like CLH Queue enq The method is the same , If the insertion fails, it spins until the insertion is successful 27 Because the exclusive lock has not been released yet 28 */29 return node;30 }3132 /**33 * The first 12 Line code :34 * Delete all not in the conditional queue CONDITION Nodes of state 35 */36 private void unlinkCancelledWaiters() {37 Node t = firstWaiter;38 /*39 In each of the following cycles ,trail It points from the beginning to the node of the loop , The last one is CONDITION Nodes of state 40 This is done because the middle of the queue is not CONDITION The node of , You need to keep the last one CONDITION Pointer to node ,41 And then directly trail.nextWaiter = next You can disconnect 42 */43 Node trail = null;44 while (t != null) {45 Node next = t.nextWaiter;46 if (t.waitStatus != Node.CONDITION) {47 t.nextWaiter = null;48 if (trail == null)49 firstWaiter = next;50 else51 trail.nextWaiter = next;52 if (next == null)53 lastWaiter = trail;54 } else55 trail = t;56 t = next;57 }58 }
6 fullyRelease Method
Release lock resource , All lock resources including reentrant locks :
1 /** 2 * AbstractQueuedSynchronizer: 3 */ 4 final int fullyRelease(Node node) { 5 boolean failed = true; 6 try { 7 int savedState = getState(); 8 /* 9 Release lock resource . Notice that this is to release all locks , Including reentrant locks with multiple locks , It will be released all at once . Because on the previous line 10 Code savedState All lock resources are stored , And here's how to release all these resources , This is the method name “fully” The meaning of 11 */12 if (release(savedState)) {13 failed = false;14 return savedState;15 } else {16 /*17 Throw exception if release fails , That is to say, it is not released clean , Maybe in the context of concurrency state The reason for the change ,18 It could be something else . Note that if an exception is thrown here, it will go first 166 Line code 19 */20 throw new IllegalMonitorStateException();21 }22 } finally {23 /*24 If the release of the lock fails , Set the node to CANCELLED state . One of the more subtle things is , Before addConditionWaiter Regulation in method 10 Line code ,25 Determine whether there is no in the conditional queue CONDITION The nodes of , Just determine whether the state of the last node is CONDITION That's it 26 As a general rule , You need to traverse the entire queue to know . But every time a new node is added to the conditional queue, it is inserted at the end , And if the release of the lock fails ,27 Will add this new 、 Set the new node at the end of the queue to CANCELLED state . And before CONDITION The nodes must be at the head of the team 28 Because if there are new nodes in the queue at this time , First in add.........
版权声明
本文为[Irving the procedural ape]所创,转载请带上原文链接,感谢
边栏推荐
猜你喜欢
Abbyy finereader 15 added edit table cell function
百亿级数据分表后怎么分页查询?
如何运用二分查找算法
Hand in hand to teach you to use container service tke cluster audit troubleshooting
Ultra simple integration of Huawei system integrity testing, complete equipment security protection
[graffiti Internet of things footprint] graffiti cloud platform interface description
C console calls ffmpeg to push MP4 video file to stream media open source service platform easydarwin process
[stm32h7] Chapter 6: stm32h7 dma2d acceleration of ThreadX guix
Git老鸟查询手册
Rookie gospel, 28 books step by step to make you a big bull! (a copy of learning syllabus attached)
随机推荐
Markdown plug-in of vscode
手把手教你使用容器服务 TKE 集群审计排查问题
Another comparison operator related interview question let me understand that the foundation is very important
LeetCode 48 旋转图像
EMQ X 在中国建设银行物联网平台中的应用
How to use binary search algorithm
DCL单例模式中的缺陷及单例模式的其他实现
上云嘉年华,超低价云服务器来袭
In the third stage, day19 users echo packaged cookie products and remotely call Shopping Cart module crud operation
【云小课】版本管理发展史之Git+——代码托管
Container technology (3) building local registry [15]
双十一大秒杀,云服务器低至 0.7 折
【科创人】Rancher江鹏:从清华工程物理学硕士到云计算开源创业者
手势切换背景,让直播带货更加身临其境
40 tips for life that may be useful
运用强大的 PowerBI 桑基图表示复杂运营业务流
JS deep copy
expect ':' at 0, actual = (JSON转化异常解决)
Flink的安装部署
低功耗蓝牙单芯片为物联网助力