当前位置:网站首页>AQS source code in-depth analysis of conditional queue

AQS source code in-depth analysis of conditional queue

2020-11-09 19:27:00 Irving the procedural ape

This article is based on JDK-8u261 Source code analysis


1 brief introduction

img

because CLH Threads in the queue , What thread gets the lock , What threads are queued , What thread releases the lock , These are beyond our control . So the emergence of conditional queues provides us with the initiative to 、 Only when the specified conditions are met can the thread block and wake up . For conditional queues, we first need to explain some concepts : The condition queue is AQS In addition to CLH A queue other than a queue , For every one created Condition It's actually creating a conditional queue , And every time it's called await The method is actually to join the conditional queue , Every call signal The method is actually to queue up in the conditional queue . Unlike CLH There are multiple states of nodes on the queue , There is only one state of the node on the conditional queue :CONDITION. So if the node on the conditional queue is no longer CONDITION In the state of , It means that this node should be out of the team . It should be noted that , Conditional queues can only run in exclusive mode .

In general, when using conditional queue as blocking queue, two conditional queues will be created :notFull and notEmpty.notFull When the condition queue is full ,put The method will be in a wait state , Until the queue is not full ;notEmpty When the condition queue is empty ,take The method will be in a wait state , Until the queue has data .

and notFull.signal Methods and notEmpty.signal Method will move the node on the conditional queue to CLH In line ( Transfer only one at a time ). in other words , There is a node that is transferred from the conditional queue to CLH The situation in the queue . It also means , Lock resource contention will not occur on conditional queues , All of the lock competition happens in CLH On the queue .

Some other conditional queues and CLH The differences between queues are as follows :

  • Conditional queues use nextWaiter Pointer to the next node , Is a one-way linked list structure , differ CLH Double linked list structure of queue ;
  • Conditional queues use firstWaiter and lastWaiter To point to the head and tail , differ CLH Queued head and tail;
  • The first node in the conditional queue will not look like CLH The lines are the same , It's a special empty node ;
  • differ CLH There will be a lot of CAS Operation to control concurrency , The premise of conditional queue entering the queue is that the exclusive lock resource has been obtained , So many places don't need to consider concurrency .

The following is the specific source analysis . Conditional queue with ArrayBlockingQueue For example :


2 Constructors

 1 /** 2 * ArrayBlockingQueue: 3 */ 4 public ArrayBlockingQueue(int capacity) { 5 this(capacity, false); 6} 7 8 public ArrayBlockingQueue(int capacity, boolean fair) { 9 if (capacity <= 0)10  throw new IllegalArgumentException();11 // An array of actual data 12 this.items = new Object[capacity];13 // Exclusive lock use ReentrantLock To achieve (fair Is it fair lock or unfair lock , Default to unfair lock )14 lock = new ReentrantLock(fair);15 //notEmpty Condition queue 16 notEmpty = lock.newCondition();17 //notFull Condition queue 18 notFull = lock.newCondition();19 }

3 put Method

 1 /** 2 * ArrayBlockingQueue: 3 */ 4 public void put(E e) throws InterruptedException { 5 // Non empty verification  6 checkNotNull(e); 7 final ReentrantLock lock = this.lock; 8 /* 9  Get exclusive lock resource , Response interrupt mode . In fact, modern code and lock There are ways Semaphore Of acquire The method is similar  10  Because this is the conditional queue , So the details of the method are no longer analyzed  11  */ 12 lock.lockInterruptibly(); 13 try { 14  while (count == items.length) 15   // If the array is full , It's just notFull A new node in the Chinese team , And block the current thread  16   notFull.await(); 17  // Add array elements and wake up notEmpty 18  enqueue(e); 19 } finally { 20  // Release lock resource  21  lock.unlock(); 22 } 23 }

4 await Method

If in put The array is full , Or in take The array is empty , Will call await Method to put the current node in the conditional queue :

 1 /** 2 * AbstractQueuedSynchronizer: 3 */ 4 public final void await() throws InterruptedException { 5 // If the current thread is interrupted, an exception is thrown  6 if (Thread.interrupted()) 7  throw new InterruptedException(); 8 // Add the current node to the conditional queue  9 Node node = addConditionWaiter();10 // Release the lock resource obtained before , Because the thread will be blocked later , So if you don't release it , Other threads will wait for the thread to wake up 11 int savedState = fullyRelease(node);12 int interruptMode = 0;13 // If the current node is not in CLH The queue is blocked , wait for unpark Wake up the 14 while (!isOnSyncQueue(node)) {15  LockSupport.park(this);16  /*17   It may be normal to be awakened here signal The operation may have been interrupted . But in either case , Will insert the current node into CLH Queue tail ,18   And exit the loop ( Be careful , It's awakened here except in the two cases above , There is also a false wake-up at the operating system level (spurious wakeup),19   That is, the current thread will be awakened for no reason , So you need to use while To avoid this situation )20   */21  if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)22   break;23 }24 // Go here to show that the current node has been inserted into CLH In line ( By signal Awakened or interrupted ). And then in CLH The operation of getting lock resource in the queue 25 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)26  /*27  <<<THROW_IE and REINTERRUPT For an explanation, see transferAfterCancelledWait Method >>>2829   The previous analysis if acquireQueued Method returns true, Indicates that the current thread has been interrupted 30   return true It means in acquireQueued Method will be interrupted again ( Be careful , This means that there are two code points that determine whether the thread is interrupted :31   One is at the 15 Line code , The other is in acquireQueued Method inside ), If it hasn't been interrupted before , be interruptMode=0,32   And in the acquireQueued Method in which the thread is interrupted and returned , This time will be interruptMode It is revised as REINTERRUPT that will do 33   As for why not amend it to THROW_IE Because in this case , The first 15 Line code has been called by signal The method wakes up normally ,34   The node has been put in CLH In line . And the interruption at this time is signal After the operation , In the 25 Line code to grab lock resources occurred when 35   At this time, it doesn't matter whether you interrupt or not , So there's no need to throw InterruptedException36   */37  interruptMode = REINTERRUPT;38 /*39  This indicates that the current node has obtained the lock resource ( If you can't get it, you'll be blocked again acquireQueued In the method )40  If interruptMode=REINTERRUPT Words , Indicates that you have called signal The method , In other words, the node has been removed from the conditional queue ,41 nextWaiter The pointer must be empty , So in this case, there is no need to execute unlinkCancelledWaiters Methodical 42  And if the interruptMode=THROW_IE Words , Description has not been called before signal Method to remove the node from the conditional queue . At this point, you need to call 43 unlinkCancelledWaiters Method to remove this node ( Prior to transferAfterCancelledWait In the method 44  The state of the node has been changed to the initial state 0), By the way, all the others are not CONDITION State nodes are also removed . Be careful : If the current node is in the conditional queue 45  The last node , It's not going to be cleaned up . It's ok , Wait until the next time you add a node or call signal Methods will be cleaned up 46  */47 if (node.nextWaiter != null)48  unlinkCancelledWaiters();49 // Processing interrupts according to different modes ( Normal mode doesn't need to deal with )50 if (interruptMode != 0)51  reportInterruptAfterWait(interruptMode);52 }

5 addConditionWaiter Method

Logic to add a node to the conditional queue :

 1 /** 2 * AbstractQueuedSynchronizer: 3 */ 4 private Node addConditionWaiter() { 5 Node t = lastWaiter; 6 /* 7  If the last node is not CONDITION state , Delete all not in the conditional queue CONDITION Nodes of state  8  As for why we only need to judge the state of the last node to know whether there is no in the whole queue CONDITION The node of , It will be explained later  9  */  10 if (t != null && t.waitStatus != Node.CONDITION) {11  // Delete all not CONDITION Nodes of state 12  unlinkCancelledWaiters();13  t = lastWaiter;14 }15 // Create a type of CONDITION The new node 16 Node node = new Node(Thread.currentThread(), Node.CONDITION);17 if (t == null)18  //t by null It means that the condition queue is empty at this time , Just point the head pointer to the new node 19  firstWaiter = node;20 else21  //t Not for null This means that there are nodes in the conditional queue , Add this new node directly to the end 22  t.nextWaiter = node;23 // The tail pointer points to the new node , Finished adding nodes 24 lastWaiter = node;25 /*26  Be careful , It doesn't have to look like CLH Queue enq The method is the same , If the insertion fails, it spins until the insertion is successful 27  Because the exclusive lock has not been released yet 28  */29 return node;30 }3132 /**33 *  The first 12 Line code :34 *  Delete all not in the conditional queue CONDITION Nodes of state 35 */36 private void unlinkCancelledWaiters() {37 Node t = firstWaiter;38 /*39  In each of the following cycles ,trail It points from the beginning to the node of the loop , The last one is CONDITION Nodes of state 40  This is done because the middle of the queue is not CONDITION The node of , You need to keep the last one CONDITION Pointer to node ,41  And then directly trail.nextWaiter = next You can disconnect 42  */43 Node trail = null;44 while (t != null) {45  Node next = t.nextWaiter;46  if (t.waitStatus != Node.CONDITION) {47   t.nextWaiter = null;48   if (trail == null)49    firstWaiter = next;50   else51    trail.nextWaiter = next;52   if (next == null)53    lastWaiter = trail;54  } else55   trail = t;56  t = next;57 }58 }

6 fullyRelease Method

Release lock resource , All lock resources including reentrant locks :

 1 /** 2 * AbstractQueuedSynchronizer: 3 */ 4 final int fullyRelease(Node node) { 5 boolean failed = true; 6 try { 7  int savedState = getState(); 8  /* 9   Release lock resource . Notice that this is to release all locks , Including reentrant locks with multiple locks , It will be released all at once . Because on the previous line 10   Code savedState All lock resources are stored , And here's how to release all these resources , This is the method name “fully” The meaning of 11   */12  if (release(savedState)) {13   failed = false;14   return savedState;15  } else {16   /*17    Throw exception if release fails , That is to say, it is not released clean , Maybe in the context of concurrency state The reason for the change ,18    It could be something else . Note that if an exception is thrown here, it will go first 166 Line code 19    */20   throw new IllegalMonitorStateException();21  }22 } finally {23  /*24   If the release of the lock fails , Set the node to CANCELLED state . One of the more subtle things is , Before addConditionWaiter Regulation in method 10 Line code ,25   Determine whether there is no in the conditional queue CONDITION The nodes of , Just determine whether the state of the last node is CONDITION That's it 26   As a general rule , You need to traverse the entire queue to know . But every time a new node is added to the conditional queue, it is inserted at the end , And if the release of the lock fails ,27   Will add this new 、 Set the new node at the end of the queue to CANCELLED state . And before CONDITION The nodes must be at the head of the team 28   Because if there are new nodes in the queue at this time , First in add.........

版权声明
本文为[Irving the procedural ape]所创,转载请带上原文链接,感谢