当前位置:网站首页>Abstractqueuedsynchronizer (AQS) source code detailed analysis - countdownlatch source code analysis

Abstractqueuedsynchronizer (AQS) source code detailed analysis - countdownlatch source code analysis

2022-06-21 08:33:00 *Wucongcong*

AbstractQueuedSynchronizer(AQS) Detailed analysis of source code - CountDownLatch Source code analysis

1、CountDownLatch brief introduction

CountDownLatch, Is a simple synchronizer , It means Allow one or more threads to wait for other threads' operations to complete before executing subsequent operations .

CountDownLatch The usual usage of and Thread.join() It's kind of similar , Wait for other threads to complete before executing the main task .

2、 Introduction case study

Case study 1

  • For students like me ,CountDownLatch The actual development and application of , Some students have not even touched it . But under concurrent conditions , The use of this class is still very common , So first introduce two cases to understand its purpose :
  • With the help of CountDownLatch , Control the main thread to wait for the sub thread to complete before executing
/** * @author wcc * @date 2022/2/15 19:09 */
public class CountDownLatchTest01 {
    

  private static final int TASK_COUNT = 8;
  private static final int THREAD_CORE_SIZE = 10;

  public static void main(String[] args) throws InterruptedException {
    
    CountDownLatch countDownLatch = new CountDownLatch(TASK_COUNT);

    Executor executor = Executors.newFixedThreadPool(10);

    for (int i = 0; i < TASK_COUNT; i++) {
    
      executor.execute(new WorkerRunnable(i, countDownLatch));
    }

    System.out.println(" The main thread is waiting for all subtasks to complete ...");
    long mainWaitStartTimeMillis = System.currentTimeMillis();
    countDownLatch.await();

    long mainWaitEndTimeMillis = System.currentTimeMillis();
    System.out.println(" Waiting time of main thread :"+ (mainWaitEndTimeMillis - mainWaitStartTimeMillis));
  }

  static class WorkerRunnable implements Runnable{
    

    private int taskId;
    private CountDownLatch latch;

    @Override
    public void run() {
    
      doWorker();
    }

    public void doWorker(){
    
      System.out.println(" Mission ID:"+ taskId + ", The task is in progress ...");
      try {
    
        TimeUnit.MILLISECONDS.sleep(500);
      }catch (Exception e){
    
        e.printStackTrace();
      }finally {
    
        latch.countDown();
      }
    }

    public WorkerRunnable(int taskId, CountDownLatch latch) {
    
      this.taskId = taskId;
      this.latch = latch;
    }
  }
}

The operation results are as follows

H4fJ2T.png

Case study 2

  • The thread that performs the task , It may also be a many to many relationship : Let's take a look at this case , With the help of CountDownLatch, After the main thread controls the sub threads to start at the same time , The main thread then blocks and waits for the child thread to end !
/** * @author wcc * @date 2022/2/15 19:09 */
public class CountDownLatchTest02 {
    

  public static void main(String[] args) throws InterruptedException {
    
    CountDownLatch startSignal = new CountDownLatch(1);
    CountDownLatch doneSignal = new CountDownLatch(10);


    for (int i = 0; i < 10; i++) {
    
      new Thread(new Worker(startSignal, doneSignal, i)).start();
    }

    //  Here let the main thread sleep  500  millisecond , Ensure that all child threads have been started , And blocked in  startSignal  At the fence 
    TimeUnit.MILLISECONDS.sleep(500);

    //  because  startSignal  Fence value is 1, So the main thread only needs to call once 
    //  So all calls  startSignal.await()  Blocked child threads , You can pass through the fence at the same time 
    System.out.println(" Subtask fence has started ");
    startSignal.countDown();

    System.out.println(" Wait for the subtask to end ...");
    long startTime = System.currentTimeMillis();
    //  Wait for all subtasks to end 
    doneSignal.await();

    long endTime = System.currentTimeMillis();
    System.out.println(" All subtasks have ended , Time consuming :" + (endTime - startTime));
  }

  static class Worker implements Runnable{
    

    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    private int id;

    @Override
    public void run() {
    

      try {
    
        //  To enable all threads to start the task at the same time , We're going to have all the lines blocked up here 
        //  When everyone is ready , Open the threshold again 
        startSignal.await();
        System.out.println(" The subtasks -" + id + ", Opening time :" + System.currentTimeMillis());
        doWorker();
      }catch (Exception e){
    
        e.printStackTrace();
      }finally {
    
        doneSignal.countDown();
      }
    }

    public void doWorker() throws InterruptedException{
    
      TimeUnit.SECONDS.sleep(5);

    }

    public Worker(CountDownLatch startSignal, CountDownLatch doneSignal, int id) {
    
      this.startSignal = startSignal;
      this.doneSignal = doneSignal;
      this.id = id;
    }
  }
}

Execution results

H4f2se.png

In the above code startSignal.await(); It's like a fence , Put all the child threads in their run Method , Wait for the main thread to execute startSignal.countDown(); That is, after closing the fence , All child threads continue to execute their own run() Method , Here's the picture :

H4fbQS.png

Case study 3

/** * @author wcc * @date 2022/2/16 14:14 */
public class CountDownLatchTest03 {
    

  public static void main(String[] args) {
    
    CountDownLatch latch = new CountDownLatch(2);

    Thread t1 = new Thread(() -> {
    
      try {
    
        Thread.sleep(5000);
      }catch (Exception e){
    
      }
      //  rest  5  Seconds later ( The simulated worker thread works  5  second ), call  countDown()
      latch.countDown();
    }, "t1");

    Thread t2 = new Thread(() -> {
    
      try {
    
        Thread.sleep(10000);
      }catch (Exception e){
    
      }
      //  rest  10  Seconds later ( The simulated worker thread works  10  second ), call  countDown()
      latch.countDown();
    }, "t2");

    t1.start();
    t2.start();

    Thread t3 = new Thread(() -> {
    
      try {
    
        //  Blocking , wait for  state  Reduced to  0
        latch.await();
        System.out.println(" Threads  t3  from  await  In the back ");
      }catch (Exception e){
    
        System.out.println(" Threads  t3 await  Interrupted ");
        Thread.currentThread().interrupt();
      }
    }, "t3");

    Thread t4 = new Thread(() -> {
    
      try {
    
        //  Blocking , wait for  state  Reduced to  0
        latch.await();
        System.out.println(" Threads  t4  from  await  In the back ");
      }catch (Exception e){
    
        System.out.println(" Threads  t4 await  Interrupted ");
        Thread.currentThread().interrupt();
      }
    }, "t4");

    t3.start();
    t4.start();
  }
}

The results are as follows

H4fXZj.png

3、 Source code analysis

3.1、Sync Inner class

private static final class Sync extends AbstractQueuedSynchronizer {
    
    private static final long serialVersionUID = 4982264981922014374L;

    // Incoming initial count frequency 
    Sync(int count) {
    
        //  call  setState() Method setting AQS Medium  state  Value 
        setState(count);
    }

    // Get the rest count frequency 
    int getCount() {
    
        return getState();
    }

  // Attempt to acquire the Shared lock 
  protected int tryAcquireShared(int acquires) {
    
    //  Be careful , here state be equal to 0 When I came back 1
    // state It's not equal to 0 When I came back -1, in other words state It's not equal to 0 Always have to line up when 
    return (getState() == 0) ? 1 : -1;
  }
    
  /** *  Try to release the shared lock  *  to update  AQS.state  Value , Every call ,state  Value reduction 1, When  state - 1 Just for 0 When , return true */
  protected boolean tryReleaseShared(int releases) {
    
    //  Spin operation , Make sure AQS.state  The value of was updated successfully 
    for (;;) {
    
      //  Get current  state  Value 
      int c = getState();
      //  Conditions established : This indicates that a thread has triggered the wake-up operation ( Shared lock has been released , Can no longer be released ), Return here false
      if (c == 0)
        return false;
      //  So let's go over here , explain  state > 0
      //  If  c  Value  > 0, Will c value -1
      int nextc = c-1;
      //  Atomic updates  state CAS success : Indicates that the current thread is executing  tryReleaseShared Method  c-1 Before , No other thread has modified state Value 
      // Atomic updates state Value :
      if (compareAndSetState(c, nextc))
        // nextc == 0:true: Description of the current call  countDown()  Method is the thread that needs to trigger the wake-up operation , It will return to true Wake up operation 
        return nextc == 0;
    }
}

Sync Inner classes override tryReleaseShared(int releases) and tryAcquireShared(int acquires) Method , And put count Deposit in state Go to variables . Pay attention here , The parameters of the above two methods are not used .

3.2、 Construction method

//  The constructor needs to pass in a  count, That's the initial number 
public CountDownLatch(int count) {
    
    if (count < 0) throw new IllegalArgumentException("count < 0");

    this.sync = new Sync(count);
}

3.3、await() Method

await() Method is a method that waits for other threads to complete , It will try to acquire the shared lock first , If it fails, enter AQS In the blocking queue waiting to be awakened .

According to the above Sync Source code , We know ,state It's not equal to 0 When tryAcquireShared() The return is -1, in other words count Not reduced to 0 When , All calls await() The threads of the method are all queued .

public void await() throws InterruptedException {
    
    //  call  AQS  Of acquireSharedInterruptibly()  Method 
    sync.acquireSharedInterruptibly(1);
}

AQS Medium acquireSharedInterruptibly Method :

//  be located AQS  in : Methods that can acquire shared locks in response to interrupts 
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    
  //  Conditions established : Description of the current call  await  The thread of the method is already in the interrupt state , Throw an exception directly 
  if (Thread.interrupted())
   throw new InterruptedException();
  //  Conditions established : Show the current  AQS  Of  state  It is greater than 0 Of , At this point, the thread is queued , Then go to the wake-up operation 
  //  Conditions not established : AQS.state == 0, At this point, the thread will not be blocked ...
  //  The thread corresponding to the task execution at the business level has latch To break the , Then others are calling latch.await The thread will not be blocked here 
  if (tryAcquireShared(arg) < 0)
   doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
      throws InterruptedException {
    
    //  Will call  latch.await()  The thread of the method is wrapped as  node  Add to AQS  In the blocking queue 
    final Node node = addWaiter(Node.SHARED);
    // false: Indicates that the current thread is not interrupted , No interrupt exception thrown , There is no need to respond to interrupt the logic of dequeue 
    // true: Indicates that the current thread has been interrupted , And throw an interrupt exception , You need to cancel the assignment node The logic of competition 
    boolean failed = true;
    try {
    
      //  Spin operation 
      for (;;) {
    
        //  Get the predecessor node of the current node 
        final Node p = node.predecessor();
        //  Conditions established : This indicates that the node corresponding to the current thread is  head.next  node 
        if (p == head) {
    
          // head.next  The node has the right to acquire the shared lock 
          int r = tryAcquireShared(arg);
          if (r >= 0) {
    
            setHeadAndPropagate(node, r);
            p.next = null; // help GC
            failed = false;
            return;
          }
        }
        // shouldParkAfterFailedAcquire(): Will find a good father for the current thread , Finally, set the status of the parent node to  -1(SIGNAL), Finally, this method returns  true
        if (shouldParkAfterFailedAcquire(p, node) &&
            // parkAndCheckInterrupt(): Suspends the current thread , And returns the interrupt flag of the current thread 
            parkAndCheckInterrupt())
          throw new InterruptedException();
      }
    } finally {
    
      //  Conditions established : This indicates that the current thread has been interrupted , The current... Needs to be unassigned node Threads compete 
      if (failed)
        cancelAcquire(node);
    }
  }

/** * AQS  Of setHeadAndPropagate Method   Set the current node to head node , And spread backwards ( Wake up in turn ) * @param node * @param propagate 1: Represents the current shared lock state==0,-1: Represents the current shared lock status  state != 0 */
  private void setHeadAndPropagate(Node node, int propagate) {
    
    Node h = head; // Record old head for check below
    //  Set the current node as new head node , And set up thread、prev by null
    setHead(node);

    //  call  setHeadAndPropagate When  propagate == 1  It must be true 
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
    
      //  Get the successor node of the current node 
      Node s = node.next;
      //  Conditions for a :s== null  When was it established : At present  node  Node is already  tail 了 , At this time, the conditions will be established , call  doReleaseShared Will deal with this situation 
      //  Condition 2 : precondition :s != null  Subsequent nodes are required s The mode of is shared mode  SHARED
      if (s == null || s.isShared())
        //  Basically, all cases will be implemented to  doReleaseShared  Method 
        doReleaseShared();
    }
  }
}

Graphic analysis

H45EP1.png

3.4、countDown() Method

countDown() Method , Will release the shared lock , That is to say count The number of times will decrease 1.

According to the above Sync Source code , We know ,tryReleaseShared() Every time I put count Less times 1, When it is reduced to 0 Back when true, Only then will the waiting thread wake up .

Be careful ,doReleaseShared() Is to wake up the waiting thread , We analyzed this method in the previous chapters .

public void countDown() {
    
        sync.releaseShared(1);
    }

    //  How to release the shared lock 
    public final boolean releaseShared(int arg) {
    
      //  Conditions established : Description of the current call  latch.countDown()  Method thread , Is precisely  state - 1 == 0 The thread of , Need to trigger wakeup  await Thread in state 
      if (tryReleaseShared(arg)) {
    
        //  call countDown() Method only one thread will enter this  if  Inside the block , call  doReleaseShared(), Logic to wake up a thread in a blocking state 
        doReleaseShared();
        return true;
      }
      return false;
    }

  /** *  What kinds of paths will be called to  doReleaseShared Methods? ? * 1.latch.countDown() -> AQS.state == 0 -> doReleaseShared()  Wake up the current blocking queue head.next The corresponding thread  * 2. Awakened thread  -> doAcquireSharedInterruptibly() -> setHeadAndPropagate() -> doReleaseShared() */
  // AQS  Of  doReleaseShared()  Method 
  private void doReleaseShared() {
    
    for (;;) {
    
      //  Get current  AQS  Head node in 
      Node h = head;
      //  Conditions for a :h != null  establish : Description: the blocking queue is not empty 
      //  Don't set up :h == null, When will it be like this ?
      // latch  When it's created , No thread has ever called  await() Before method , There are thread calls  latch.countDown() operation , And trigger the logic of waking up the blocking node 
      //  Condition 2 :h != tail, Currently in the blocking queue except head There are other nodes besides nodes 
      // h == tail -> head  and  tail  Point to the same node object   When will this happen ?
      // 1. Normal wakeup conditions , Obtain the shared lock in turn , When the current thread reaches this point ( Threads are tail node )
      // 2. The first call  await()  Method thread and call countDown And the thread that triggers the wake-up blocking node is concurrent 
      //  because await() The thread is the first to call  latch.await()  The thread of , There is nothing in the queue at this time , It needs to be supplemented by creating a head node , Then spin in again 
      //  stay await() Before the thread queue completes , Assume that there are only empty elements just created in the current queue head
      //  At the same time , There is an external call  countDown()  The thread of , It will state From 1 It is amended as follows 0 了 , This thread needs to do the logic to wake up the elements in the blocking queue 
      //  Be careful : call  await()  Method thread , Because when the team is completely joined, it returns to the upper level method again doAcquireSharedInterruptibly in , Will go into spin ...
      //  Spin will get the precursor of the current element , Judge who you are head.next, All subsequent threads will set themselves to head, Then the current thread is not interrupted 
      if (h != null && h != tail) {
    
        //  To this if Inside , Show the current head There must be a successor node 

        //  Get the header head Waiting state 
        int ws = h.waitStatus;
        //  If at present head The node status is  signal  This indicates that the current successor node has not been awakened 
        if (ws == Node.SIGNAL) {
    
          //  Before waking up the successor node , Will the current head The state of the node is changed to 0
          //  here , Why use CAS  Operation? ?
          //  here , This is because the current node wakes up the subsequent nodes , Subsequent nodes have updated themselves as head node , The current node cannot exit the spin , Then it will participate in the logic of the successor node that wakes up its successor node again 
          //  therefore , At this time, there are concurrent , To use CAS Logic 
          if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
            continue;            // loop to recheck cases
          // Wake up the successor node 
          unparkSuccessor(h);
        }

        // So let's go over here , This indicates that the waiting state of the current header node is not SIGNAL
        else if (ws == 0 &&
            !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
          continue;                // loop on failed CAS
      }
      //  Conditions established :
      // 1. It indicates that the successor node that has just woken up has not been executed setHeadAndPropagate Method , Set the current wake-up node to head The logic of 
      //  This is the time , The current thread directly jumps out and ends ...
      //  here , Don't worry about the wake-up logic being broken here ?
      //  Don't worry , Because the awakened thread will execute sooner or later  doReleaseShared In the method 

      // 2.head == null
      // latch  When it's created , No thread has ever called  await() Before method , There are thread calls  latch.countDown() operation , And trigger the logic of waking up the blocking node 
      // 3.head == tail  The first call  await()  Method thread and call countDown And the thread that triggers the wake-up blocking node is concurrent  head  and  tail  It's pointing to the same object 

      //  Conditions not established :
      //  The awakened node is very active , Then directly in the upper layer method doAcquireSharedInterruptibly Be awakened in , Set yourself directly as a new head node 
      //  At this point, wake up its node ( Precursor node ) perform  h == head  Cause the condition not to hold 
      //  here  head  The precursor of the node will not jump out  doReleaseShared, Will continue to participate in the awakening of new head The subsequent node logic of the node 
      if (h == head)                   // loop if head changed
        break;
    }
  }

  /** *  Try to release the shared lock  *  to update  AQS.state  Value , Every call ,state  Value reduction 1, When  state - 1 Just for 0 When , return true */
  protected boolean tryReleaseShared(int releases) {
    
    //  Spin operation , Make sure AQS.state  The value of was updated successfully 
    for (;;) {
    
      //  Get current  state  Value 
      int c = getState();
      //  Conditions established : This indicates that a thread has triggered the wake-up operation ( Shared lock has been released , Can no longer be released ), Return here false
      if (c == 0)
        return false;
      //  So let's go over here , explain  state > 0
      //  If  c  Value  > 0, Will c value -1
      int nextc = c-1;
      //  Atomic updates  state CAS success : Indicates that the current thread is executing  tryReleaseShared Method  c-1 Before , No other thread has modified state Value 
      // Atomic updates state Value :
      if (compareAndSetState(c, nextc))
        // nextc == 0:true: Description of the current call  countDown()  Method is the thread that needs to trigger the wake-up operation , It will return to true Wake up operation 
        return nextc == 0;
    }
  }

CountDowmnLatch.countDown() Execution flow diagram :

H45oJ1.png
summary

  • CountDownLatch Indicates that one or more threads are allowed to wait for the operation of other threads to complete before executing subsequent operations
  • CountDownLatch Use AQS The implementation of the shared lock mechanism
  • CountDownLatch The number of incoming times is required during initialization count( The number of lock levels of the shared lock )
  • Every time you call countDown() Method time count Less times 1
  • Every time you call await() Method will try to obtain the lock , The acquisition lock here is actually a check AQS Medium state Value is not 0
  • When count value ( That is to say state Value ) Reduced to 0 Will wake up when AQS Blocking threads in the queue , These thread calls await() Method to join the team
原网站

版权声明
本文为[*Wucongcong*]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202221454047984.html