当前位置:网站首页>Multithreading and high concurrency (8) -- summarize AQS shared lock from countdownlatch (punch in for the third anniversary)

Multithreading and high concurrency (8) -- summarize AQS shared lock from countdownlatch (punch in for the third anniversary)

2022-07-06 00:16:00 The green flowers of the Li Wang family

I have written an article unconsciously 3 Years. ,3 I only wrote more than 60 articles in , A little ashamed . I hope there will be a burst of creation in the coming period , Can also usher in the explosion of their own technology . Come on! , Cui Hua !

One 、 summary

Last article : Multithreading and high concurrency (7)—— from ReentrantLock To AQS Source code Explained in detail AQS Principle , We know how to use AQS The synchronizer can be constructed simply and efficiently ( rewrite tryAcquire、tryRelease You can easily achieve ). What about here? ,JDK It provides us with many efficient synchronizers , Such as CountDownLatch 、CyclicBarrier、Phaser、Semaphore、Exchanger、ReentrantReadWriteLock, We will explain and analyze the following articles one by one .
here , We will pass CountDownLatch summary AQS The principle of shared lock , At the same time, compare CyclicBarrier and Phaser.

Two 、CountDownLatch

CountDownLatch It literally means counting down the bolts , That is, count down how many threads have completed execution . It allows int A thread is blocked in one place , Until all threads have completed their tasks .

1、 Common methods

await(): The thread calling this method is waiting , until latch The value of is reduced to 0 Or the current thread is interrupted . Generally, it is called by the main thread .—— Open door
await(long timeout, TimeUnit unit): With timeout await.
countDown(): send latch The value of the reduction 1, If it's down to 0, Will wake up all waiting in this latch On the thread .—— Reciprocal
getCount(): get latch value .

2、 Two typical uses

1、 The main thread is executed only after all businesses are executed
take CountDownLatch The counter of is initialized to n (new CountDownLatch(n)), Every time a task thread finishes executing , Just subtract the counter 1 (countdownlatch.countDown()), When the value of the counter changes to 0 when , stay CountDownLatch On await() The thread will wake up . For example, Baidu's text to voice code , For long text , You need to turn in segments , Each paragraph needs to be converted to integrate the whole text .
Let's give you another example , Like an exam , Yes 30 Candidates ( Threads ), A invigilator ( The main thread ), Each candidate can hand in the test paper in advance after completing the test paper , And you don't need to worry about other exams after you hand in the paper , After the exam ( All students hand in their papers ,latch by 0), The invigilator can leave ( End of main thread ).
30 Student output results are too long , Let's change to 5 A student , The code is as follows :

 public static void main(String[] args) {
    
        CountDownLatch countDownLatch = new CountDownLatch(5);
        // Specified number of threads 
        ExecutorService threadPool = Executors.newFixedThreadPool(30);
        for (int i = 0; i < 5; i++) {
    
            int finalI = i;
            threadPool.execute(() -> {
    
                System.out.println(" Student "+ finalI +" Hand in papers ");
                countDownLatch.countDown();
            });
        }
        try {
    
            // Open door 
            countDownLatch.await();
        } catch (InterruptedException e) {
    
            e.printStackTrace();
        }
        threadPool.shutdown();
        System.out.println(" The exam is over , The invigilator left ");
    }

The operation results are as follows , It can be seen that students hand in their papers in disorder , But the teacher must wait for all the students to hand in their papers before leaving :

 Student 0 Hand in papers 
 Student 2 Hand in papers 
 Student 3 Hand in papers 
 Student 1 Hand in papers 
 Student 4 Hand in papers 
 The exam is over , The invigilator left 

2、 At some point , All threads execute together .
This function is to realize the maximum parallelism of multiple threads starting to execute tasks .
Parallel , It is emphasized that multiple threads start executing at a certain time . Concurrency is still executed in sequence .
It's like a race , Put multiple threads in the starting point , Waiting for the start of the shooting , And run at the same time .
The code is as follows :

 public static void main(String[] args) {
    
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(5);
        // Specified number of threads 
        ExecutorService threadPool = Executors.newFixedThreadPool(30);
        for (int i = 0; i < 5; i++) {
    
            int finalI = i;
            threadPool.execute(() -> {
    
                try {
    
                    System.out.println(" Athletes "+ finalI +" Preparing for ");
                    // Thread blocking , Waiting to be released 
                    countDownLatch.await();
                    Thread.sleep(100);
                    System.out.println(" Athletes "+ finalI +" sprint ");
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
                countDownLatch2.countDown();
                System.out.println(" Athletes "+ finalI +" Finish the game ");

            });
        }
        try {
    
            Thread.sleep(1000);
            System.out.println(" All the athletes are ready , Start running ");
            // Unlock and release all threads above 
            countDownLatch.countDown();
            // All threads complete the race , Go down 
            countDownLatch2.await();
        } catch (InterruptedException e) {
    
            e.printStackTrace();
        }
        threadPool.shutdown();
    }

The results are as follows :

 Athletes 1 Preparing for 
 Athletes 4 Preparing for 
 Athletes 3 Preparing for 
 Athletes 0 Preparing for 
 Athletes 2 Preparing for 
 All the athletes are ready , Start running 
 Athletes 0 sprint 
 Athletes 1 sprint 
 Athletes 4 sprint 
 Athletes 3 sprint 
 Athletes 2 sprint 
 Athletes 3 Finish the game 
 Athletes 4 Finish the game 
 Athletes 1 Finish the game 
 Athletes 0 Finish the game 
 Athletes 2 Finish the game 

3、 The source code parsing

With the basis of the last article , Let's talk about the process of sharing locks here, which will be more handy .

CountDownLatch Is an implementation of shared locks , It is constructed by default AQS Of state The value is count. When threads use countDown() When the method is used , Actually used tryReleaseShared Methods to CAS To reduce state, until state by 0 . When calling await() Method time , If state Not for 0, That proves that the task has not been completed ,await() The method will be blocked all the time , in other words await() The statement after the method will not be executed . then ,CountDownLatch Can spin CAS Judge state == 0, If state == 0 Words , All waiting threads will be released ,await() The statement after the method is executed .

First , Let's take a look at how it's constructed :

public CountDownLatch(int count) {
    
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    ...
 Sync(int count) {
    
            setState(count);
        }

You can see it , It's going on new When the object ,count Namely state The initial value of the .
then , Let's see await() Method :

 public void await() throws InterruptedException {
    
        sync.acquireSharedInterruptibly(1);
    }

The call sync Of acquireSharedInterruptibly Method , It's also AQS Methods :

 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    
            //  If interrupted , Throw an exception 
        if (Thread.interrupted())
            throw new InterruptedException();
            //  Try to get synchronization status 
        if (tryAcquireShared(arg) < 0)
        	//  Failed to get synchronization status , The spin 
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared() Method is CountDownLatch Rewrite method , as follows :

protected int tryAcquireShared(int acquires) {
    
			// Whether the current status is 0 Releasable lock 
            return (getState() == 0) ? 1 : -1;
        }

doAcquireSharedInterruptibly The code is as follows :

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    
        // Add the current thread to the end of the synchronization queue ,addWaiter You can read the last article 
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
    
        	// The spin 
            for (;;) {
    
            // The predecessor node of the current node 
                final Node p = node.predecessor();
                // If the precursor node is the head node , Then try to get the synchronization status 
                if (p == head) {
    
                	// The current node attempts to get the synchronization status 
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
    
                    	// If successful , Then set the current node as the head node and wake up the next thread 
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // If the precursor of the current node is not the head node , Try to suspend the current thread , Same as exclusive lock 
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
    
            if (failed)
            	// Cancel acquisition lock , Same as exclusive lock 
                cancelAcquire(node);
        }
    }

below , Let's see countDown() Method source code :

public void countDown() {
    
        sync.releaseShared(1);
    }

It also calls AQS Of releaseShared() Method :

public final boolean releaseShared(int arg) {
    
		// Get release synchronization status 
        if (tryReleaseShared(arg)) {
    
        	//  If it works , Into spin , Try to wake up the successor node of the head node in the synchronization queue 
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared() The code is as follows :

 protected boolean tryReleaseShared(int releases) {
    
            //  Just one meaning , The spin , until state state -1 by 0
            for (;;) {
    
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

doReleaseShared() The method is as follows :

 private void doReleaseShared() {
    
 		// Spin first 
        for (;;) {
    
        	// Get the header node 
            Node h = head;
            if (h != null && h != tail) {
    
            	// Head node state 
                int ws = h.waitStatus;
                // If it is SIGNAL, Try to wake up subsequent nodes 
                if (ws == Node.SIGNAL) {
    
                	// as long as head Successful from SIGNAL It is amended as follows 0, that head The thread corresponding to the successor node of will be awakened .
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // Put the next node that is not empty unpark
                    unparkSuccessor(h);
                }
                // Don't wake up at other times 
                else if (ws == 0 &&
                // The predecessor node will not only wake up its successor node , At the same time, it may wake up the subsequent nodes . Without stop for
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //  If head There is no change , Call break Exit loop 
            if (h == head)                   // loop if head changed
                break;
        }
    }

When the successor node of the head node is awakened , The thread will wake up from where it was suspended , Carry on . If the current status is still >0, Then set the current node as the head node .setHeadAndPropagate() The code is as follows :

 private void setHeadAndPropagate(Node node, int propagate) {
    
 		// Current header node 
        Node h = head; // Record old head for check below
        // Set the current node as the head node 
        setHead(node);
        // If you execute this function , that propagate It must be equal to 1
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
    
            // Get the next node of the current node 
            Node s = node.next;
            // Wake up subsequent nodes 
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

Um. , The writing is a little messy , Draw a picture and calm down :
 Insert picture description here
Blue CountDownLatch, Yellow is AQS Source code .

3、 ... and 、CyclicBarrier

CyclicBarrier Literally means recyclable (Cyclic) My fence (Barrier). A group of threads reach a fence ( It can also be called synchronization point ) When is blocked , Until the last thread reaches the fence , The fence will open , All intercepted threads will continue to work .
It and CountDownLatch Is the difference between the , For example, examination. ,CountDownLatch The first candidate in the middle school doesn't care what the second candidate does . however CyclicBarrier in , Everyone must arrive , For example, primary school students travel , When you go home , Everyone must get on the bus to go home .
The core approach :
await() : All participants are already here barrier On the call await Before method , Will be waiting for .
CyclicBarrier(int parties, Runnable barrierAction): Used when a thread reaches a barrier , priority barrierAction, Easy to handle more complex business scenarios .
reset(): Reset the barrier to its original state . If all participants are currently waiting at the barrier , Then they will return , Throw one at the same time BrokenBarrierException.
The usage code is as follows :

 public static void main(String[] args) {
    
        // Every time 5 Personal departure 
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        // Specified number of threads 
        ExecutorService threadPool = Executors.newFixedThreadPool(30);
        for (int i = 0; i < 5; i++) {
    
            int finalI = i;
            threadPool.execute(() -> {
    
                System.out.println(" Student "+ finalI +" Get on the train ");
                try {
    
                    // Wait to ensure that the execution of the child thread ends 
                    cyclicBarrier.await(60, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
    
                    e.printStackTrace();
                } catch (TimeoutException e) {
    
                    e.printStackTrace();
                }
                System.out.println(" Student "+ finalI +" set out ");
            });
        }
        threadPool.shutdown();

    }

The results are as follows :

 Student 1 Get on the train 
 Student 3 Get on the train 
 Student 4 Get on the train 
 Student 2 Get on the train 
 Student 0 Get on the train 
 Student 0 set out 
 Student 3 set out 
 Student 1 set out 
 Student 2 set out 
 Student 4 set out 

It can be seen that , All the students arrived before they set out .
CountDownLatch Is the counter , Thread completes one record one , It's just that the count is not increasing but decreasing , and CyclicBarrier It's more like a valve , All threads are required to arrive , The valve will open , And then go ahead and do it .
Expand :
Phaser What is a class ?
Phaser Is a reusable synchronization fence , Its function and CountDownLatch、CyclicBarrier be similar , But it can be used to solve the scenario problem of controlling multiple threads to complete tasks in stages .—— Segmented fence
It's a little bit like CyclicBarrier Of barrierAction, This is just a summary .

原网站

版权声明
本文为[The green flowers of the Li Wang family]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/187/202207060009599407.html