当前位置:网站首页>Deeply understand the underlying implementation principle of countdownlatch in concurrent programming

Deeply understand the underlying implementation principle of countdownlatch in concurrent programming

2022-07-01 05:25:00 Jackli1123 (cross correlation second return)

Deep understanding of concurrent programming CountDownLatch Underlying implementation principle


One 、 What is? CountDownLatch

CountDownLatch It's a kind of java.util.concurrent Package the next synchronization tool class , It's like a counter , It allows one or more threads to wait until the execution of a set of operations in other threads is complete .
CountDownLatch The bottom layer is based on AQS Realized , Provides a pass int Construction method of type new CountDownLatch(int count), Indicates that the initial value of the counter is count, There are also two core approaches :countDown() Method indicates that the number of counters is subtracted 1 operation ; await() Method means to put the current thread into a waiting state until count The value of is 0 In order to proceed ; And an inheritance from AQS The inner class of Sync To do specific operations .

Two 、CountDownLatch Source code analysis

It is suggested that you should take a look at the source code here lock Lock source code , Because the two source code pairs AQS The logic used is almost identical , The same source code also Semaphore Semaphore .
Deep understanding of concurrent programming AQS Source code interpretation

1. Construction method

CountDownLatch The construction method of is a need to pass int A parametric construction of a type , The parameters to be passed must ≥0, because CountDownLatch It's a minus counter

   public CountDownLatch(int count) {
    
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
	// Set the initial state Is the initial value of the counter , This is using AQS Of state,Lock This variable is used in the lock to indicate the number of reentries 
    Sync(int count) {
    
        setState(count);
    }

2.await() Execute the wait method

perform await() Method will enter the wait state , This is nothing to see , If you are interested, you can take a look at the above Lock Lock source code analysis , The code is almost the same

public void await() throws InterruptedException {
    
       sync.acquireSharedInterruptibly(1);
}
   
public final void acquireSharedInterruptibly(int arg)
           throws InterruptedException {
    
       if (Thread.interrupted())
           throw new InterruptedException();
       if (tryAcquireShared(arg) < 0)
           doAcquireSharedInterruptibly(arg);
   }
   
// Returns the status of the current counter , If the counter is not equal to 0 Then return to -1
protected int tryAcquireShared(int acquires) {
    
           return (getState() == 0) ? 1 : -1;
 }
 
 // This careful look is also related to lock The lock lock() The method is the same 
private void doAcquireSharedInterruptibly(int arg)
     throws InterruptedException {
    
     // Create a new waiting node 
     final Node node = addWaiter(Node.SHARED);
     boolean failed = true;
     try {
    
         for (;;) {
    
             final Node p = node.predecessor();
             if (p == head) {
    
             	//  Only when the result of the counter is 0 And when the thread is awakened, it returns 1 In order to proceed 
                 int r = tryAcquireShared(arg); 
                 if (r >= 0) {
    
                     setHeadAndPropagate(node, r);
                     p.next = null; // help GC
                     failed = false;
                     return;
                 }
             }
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt()) // Here we enter a blocking state ,
                 throw new InterruptedException();
         }
     } finally {
    
         if (failed)
             cancelAcquire(node);
     }
 }
 
 // Create a new waiting node , The operation here is similar to lock The locks are almost identical, so I won't explain them in detail , It can also be seen from here CountDownLatch Our linked list is a two-way linked list 
private Node addWaiter(Node mode) {
    
     Node node = new Node(Thread.currentThread(), mode);
     Node pred = tail;
     if (pred != null) {
    
         node.prev = pred;
         if (compareAndSetTail(pred, node)) {
    
             pred.next = node;
             return node;
         }
     }
     enq(node);
     return node;
 }

2.countDown() Count minus 1 operation

I don't want to see this , Also follow the front lock The operation of lock release is the same

 public void countDown() {
    
    sync.releaseShared(1);
  }

public final boolean releaseShared(int arg) {
    
	// Counter state Perform minus one operation , When minus 1 The operation after is 0 Back when true, Otherwise return to false
     if (tryReleaseShared(arg)) {
     
     	// Only when state The value of is reduced to 1 Wake up the thread of the header node 
         doReleaseShared();
         return true;
     }
     return false;
 }
// Perform wake-up thread operation 
 private void doReleaseShared() {
    
      for (;;) {
    
          Node h = head; 
          if (h != null && h != tail) {
    
              int ws = h.waitStatus;	// obtain head State of thread 
              if (ws == Node.SIGNAL) {
      // If the state is -1 Indicates waiting for wakeup 
                  if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // Try to change the status of the thread from -1 The wait for wakeup status is changed to 0
                      continue;           
                  unparkSuccessor(h);  // Wake up the head Threads 
              }
              else if (ws == 0 &&
                       !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                  continue;               
          }
          if (h == head)                  
              break;
      }
  }

3、 ... and 、 Handwriting CountDownLatch Look at the execution logic

public class MyCountDownLatch {
    
    private Sync sync;

    public MyCountDownLatch(int count) {
    
        sync = new Sync(count);
    }

    /** *  Make the current thread blocked  */
    public void await() {
    
        sync.acquireShared(1);
    }
   /** *  Counter -1 operation  */
    public void countDown() {
    
        sync.releaseShared(1);
    }
	// be based on AQS The inner class of 
    class Sync extends AbstractQueuedSynchronizer {
    
        public Sync(int count) {
    
            setState(count);
        }

        /** *  If the return result is <0 Under the circumstances , Then put the current thread into aqs In a two-way list  * * @param arg * @return */
        @Override
        protected int tryAcquireShared(int arg) {
    
            //  If aqs The state of >0, Then put the current thread into aqs In a two-way list   Return equal to 0 -1;
            return getState() == 0 ? 1 : -1;
        }

        /** *  If the method returns true Under the circumstances , Just wake up the blocked thread  * getState() == 0 -1  If ==0 return true * >0 false * * @param arg * @return */
        @Override
        protected boolean tryReleaseShared(int arg) {
    
            for (; ; ) {
    
                int oldState = getState();
                if (oldState == 0) {
    
                    return false;
                }
                int newState = oldState - arg;
                if (compareAndSetState(oldState, newState)) {
    
                    return newState == 0;
                }
            }
        }
    }
}

perform demo Method validation :

    public static void main(String[] args) {
    
        MyCountDownLatch mayiktCountDownLatch = new MyCountDownLatch(2);
        new Thread(() -> {
    
            try {
    
                Thread.sleep(1000);
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " End of thread execution ");
            mayiktCountDownLatch.countDown();
        }).start();
        new Thread(() -> {
    
            try {
    
                Thread.sleep(1000);
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " End of thread execution ");
            mayiktCountDownLatch.countDown();
        }).start();
        System.out.println(" The main thread begins to enter the waiting process ");
        mayiktCountDownLatch.await();
    }

Finally, the execution result , The main thread enters the wait state , Until the two threads have finished executing countDown Then continue to execute :
 Insert picture description here

Four 、Semaphore Semaphore

Semaphore Used to restrict access to certain resources ( Physical or logical ) Number of threads for , He maintains a collection of licenses , Limit how many resources you need to maintain license sets , If there is N A resource , That corresponds to N A license , There can only be... At the same time N Thread access . When a thread obtains a license, it calls acquire Method , When you run out of free resources, call release Method .

It is simply understood as Semaphore Semaphores can limit the flow of threads ( The maximum number of threads executing at the same time ), The bottom layer is also based on AQS Don't read the specific source code , Let's talk about the usage .
**acquire():** Get a license , Get one license at a time Semaphore Available licenses for (state) The number of 1, by 0 When the following thread enters the blocking state .
**release():** Release a license , Release one license at a time Semaphore Available licenses for (state) The quantity of is added 1, Wake up a waiting thread .

public class Test001 {
    
	// Suppose the parking lot has 5 A parking space , Then the maximum can only stop 5 The line behind the car , Threads are cars 
    public static void main(String[] args) throws InterruptedException {
    
        //  Set up AQS Status as 5  There can only be 5 Thread execution code   Current limiting   Long can increase capacity 5 personal 
        Semaphore semaphore = new Semaphore(5);
        for (int i = 1; i <= 10; i++) {
    
            new Thread(() -> {
    
                try {
    
                    // AQS The status will subtract 1, Until state The status of is 0 Enter the two-way blocking queue 
                    semaphore.acquire(); // Enter the parking lot 
                    System.out.println(Thread.currentThread().getName() + ", Parking ");
                    // AQS  state +1  At the same time, wake up a thread in the blocking queue 
                    semaphore.release(); // Get out of the parking lot 
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Content sources : Ant class

原网站

版权声明
本文为[Jackli1123 (cross correlation second return)]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202160219092074.html