当前位置:网站首页>Come and have a look! What is the relationship between AQS and countdownlatch?

Come and have a look! What is the relationship between AQS and countdownlatch?

2020-11-08 18:35:00 Liu Zhihang

Preface

CountDownLatch A synchronization aid , Again based on AQS Realization , This paper mainly introduces CountDownLatch Use , And source code .

official account :『 Liu Zhihang 』, Record the skills in work study 、 Development and source notes ; From time to time to share some of the life experience . You are welcome to guide !

Introduce

A synchronization aid , Allow one or more threads to wait , Until a set of operations performed in other threads is complete

One CountDownLatch Initialize to the given count . stay await Methods block , call countDown Method will reduce the count until it reaches zero , After that, all waiting threads are released , Any subsequent calls await Will return to . It's a one-off phenomenon - The count cannot be reset . If you need a version to reset the count , Please consider using CyclicBarrier .

CountDownLatch Is a general synchronization tool , It can be used for many purposes .

  1. Used as a simple opening / Turn off the latch , Or the door : All thread calls await Wait at the door , Until it's called countDown The thread opens .
  2. The initialization count is N , Wait with a thread , until N Threads complete an operation , Or something has been done N Time .

CountDownLatch One useful attribute is , It doesn't require calling countDown The thread waits until the count reaches zero , It just prevents any thread from passing through await , Until all threads can pass through .

Basic use

Before me CAS That article 《 from JUC Source code to see CAS, I took a note ......》 Described in the CAS For example, the CountDownLatch, The code is as follows :

public class CasTest {

    private static final CountDownLatch LATCH = new CountDownLatch(10);

    private static int NUM_I = 0;
    private static volatile int NUM_J = 0;
    private static final AtomicInteger NUM_K = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {

            threadPool.execute(new Runnable() {
                public void run() {
                    for (int j = 0; j < 10000; j++) {
                        NUM_I++;
                        NUM_J++;
                        NUM_K.incrementAndGet();
                    }
                    LATCH.countDown();
                }
            });
        }
        LATCH.await();

        System.out.println("NUM_I = " + NUM_I);
        System.out.println("NUM_J = " + NUM_J);
        System.out.println("NUM_K = " + NUM_K.get());
        threadPool.shutdown();
    }

}

Briefly introduce the main logic and function of this code :

  1. CountDownLatch The initialization count is 10 .
  2. open 10 Three threads to process business logic , The end of business logic calls LATCH.countDown() To count -1 operation .
  3. stay LATCH.await() It's blocking and waiting , until LATCH The value of is 0 , namely 10 All thread services are finished .
  4. Then the main thread continues to execute .

Question question

  1. CountDownLatch and AQS What does it matter ?
  2. CountDownLatch What is the implementation principle of ?

Source code analysis

The basic structure

uml-VAGlMb

You can see from the class diagram that ,CountDownLatch There is a static class inside Sync, and Sync Inherited AbstractQueuedSynchronizer. How is the specific internal realization , Then the following through the source code and drawing step by step to introduce .

initialization

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

By initializing the constructor, you can see that , stay new When you create an object, you must pass a int Non negative number of type . The implementation logic shows that , Is to create a Sync object .

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

}   

I was introducing AQS The source code has introduced state The meaning of ,state It means different meanings in different subclasses .

  1. stay ReentrantLock in state It stands for lock state ,0 No thread gets lock , Greater than or equal to 1 A thread has already acquired a lock , Greater than 1 Indicates that the thread that obtained the lock has re entered multiple times .
  2. stay ReentrantReadWriteLock in state Represents the state of the lock .state by 0 , No thread holds lock ,state The height of 16 Represents the read lock state , low 16 Represents the write lock state . The actual value of the read-write lock can be obtained by bit operation .
  3. And in here, (CountDownLatch) It represents the value of the latch or count .

countDown

public void countDown() {
    sync.releaseShared(1);
}

Decrement of latch count :

  • If the current count is greater than zero , Then decrease .
  • If the count reaches zero , Then release all waiting threads .
  • If then the current count equals zero, there's no reaction .

Called here AQS Of releaseShard() Method , Free up shared resources .

// AQS  Code 
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

stay AQS In the method of releasing shared resources tryReleaseShared(arg) Part of it is in CountDownLatch The inner class of Sync Implemented in , The code part is as follows :

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

Decreasing update state , If state by 0 Then return to false, Otherwise return to true .

At this point, check with the above AQS Code , Find out : If tryReleaseShared return true , The subsequent nodes will be awakened to start the operation . So that is to say , If state Not for 0, The subsequent nodes will not be awakened , until state by 0 .

await

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

Causes the current thread to wait , Until the latch counts down to zero , Unless the thread is interrupted .

  • If the current count is zero , This method immediately returns .
  • If the current count is greater than zero , The current thread is used for thread scheduling purposes , One of the two things that you disable and remain dormant occurs :

    • Because of the call countDown Method to make the count reach 0;
    • Some other threads interrupt the current thread .
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

AQS Defined tryAcquireShared The return value is divided into 3 Kind of :

  1. Less than 0: It means failure ;
  2. be equal to 0: Indicates that the shared mode has successfully obtained resources , However, subsequent nodes cannot succeed in shared mode ;
  3. Greater than 0: Indicates that the shared mode has successfully obtained resources , Subsequent nodes may also succeed in sharing mode , under these circumstances , Subsequent waiting threads must check availability .

among tryAcquireShared By the same CountDownLatch The inner class of Sync To realize , The internal logic is mainly judgment state Value , Go back .

In the internal implementation, the only value returned is 1 and -1 , Description in state == 0 when , return 1 , Wake up the subsequent nodes . It's not equal to 0 when , It will block .

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

summary

Q: CountDownLatch and AQS What does it matter ?

A: CountDownLatch Is based on AQS The shared mode of .

Q: CountDownLatch What is the implementation principle of ?

A: You can refer to the source code analysis above , Summarize and introduce . CountDownLatch Is based on AQS Sharing mode realizes , The count must be passed in at initialization time , The count is actually AQS Of state value . stay countDown When the state Go down , stay When state by 0 when Will wake up AQS All the waiting nodes in the queue ( Because it's sharing mode ). and await The way is to judge state Value , If not 0 , Then all threads are blocked in the queue , Waiting to wake up .

Q: state What is the meaning of "Representation" ?
A:

  1. stay ReentrantLock in state It stands for lock state ,0 No thread gets lock , Greater than or equal to 1 A thread has already acquired a lock , Greater than 1 Indicates that the thread that obtained the lock has re entered multiple times .
  2. stay ReentrantReadWriteLock in state Represents the state of the lock .state by 0 , No thread holds lock ,state The height of 16 Represents the read lock state , low 16 Represents the write lock state . The actual value of the read-write lock can be obtained by bit operation .
  3. And in here, (CountDownLatch) It represents the value of the latch or count .

Conclusion

This paper mainly introduces CyclicBarrier In a common way , Through source code , Analyze how to achieve the barrier and loop effect . Wrong place , Please correct more .

Related information

  1. Java SE API :https://docs.oracle.com/javas...

版权声明
本文为[Liu Zhihang]所创,转载请带上原文链接,感谢