当前位置:网站首页>Abstractqueuedsynchronizer (AQS) source code detailed analysis - countdownlatch source code analysis
Abstractqueuedsynchronizer (AQS) source code detailed analysis - countdownlatch source code analysis
2022-06-21 08:33:00 【*Wucongcong*】
AbstractQueuedSynchronizer(AQS) Detailed analysis of source code - CountDownLatch Source code analysis
1、CountDownLatch brief introduction
CountDownLatch, Is a simple synchronizer , It means Allow one or more threads to wait for other threads' operations to complete before executing subsequent operations .
CountDownLatch The usual usage of and Thread.join() It's kind of similar , Wait for other threads to complete before executing the main task .
2、 Introduction case study
Case study 1:
- For students like me ,CountDownLatch The actual development and application of , Some students have not even touched it . But under concurrent conditions , The use of this class is still very common , So first introduce two cases to understand its purpose :
- With the help of CountDownLatch , Control the main thread to wait for the sub thread to complete before executing
/** * @author wcc * @date 2022/2/15 19:09 */
public class CountDownLatchTest01 {
private static final int TASK_COUNT = 8;
private static final int THREAD_CORE_SIZE = 10;
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(TASK_COUNT);
Executor executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < TASK_COUNT; i++) {
executor.execute(new WorkerRunnable(i, countDownLatch));
}
System.out.println(" The main thread is waiting for all subtasks to complete ...");
long mainWaitStartTimeMillis = System.currentTimeMillis();
countDownLatch.await();
long mainWaitEndTimeMillis = System.currentTimeMillis();
System.out.println(" Waiting time of main thread :"+ (mainWaitEndTimeMillis - mainWaitStartTimeMillis));
}
static class WorkerRunnable implements Runnable{
private int taskId;
private CountDownLatch latch;
@Override
public void run() {
doWorker();
}
public void doWorker(){
System.out.println(" Mission ID:"+ taskId + ", The task is in progress ...");
try {
TimeUnit.MILLISECONDS.sleep(500);
}catch (Exception e){
e.printStackTrace();
}finally {
latch.countDown();
}
}
public WorkerRunnable(int taskId, CountDownLatch latch) {
this.taskId = taskId;
this.latch = latch;
}
}
}
The operation results are as follows :

Case study 2:
- The thread that performs the task , It may also be a many to many relationship : Let's take a look at this case , With the help of CountDownLatch, After the main thread controls the sub threads to start at the same time , The main thread then blocks and waits for the child thread to end !
/** * @author wcc * @date 2022/2/15 19:09 */
public class CountDownLatchTest02 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(new Worker(startSignal, doneSignal, i)).start();
}
// Here let the main thread sleep 500 millisecond , Ensure that all child threads have been started , And blocked in startSignal At the fence
TimeUnit.MILLISECONDS.sleep(500);
// because startSignal Fence value is 1, So the main thread only needs to call once
// So all calls startSignal.await() Blocked child threads , You can pass through the fence at the same time
System.out.println(" Subtask fence has started ");
startSignal.countDown();
System.out.println(" Wait for the subtask to end ...");
long startTime = System.currentTimeMillis();
// Wait for all subtasks to end
doneSignal.await();
long endTime = System.currentTimeMillis();
System.out.println(" All subtasks have ended , Time consuming :" + (endTime - startTime));
}
static class Worker implements Runnable{
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
private int id;
@Override
public void run() {
try {
// To enable all threads to start the task at the same time , We're going to have all the lines blocked up here
// When everyone is ready , Open the threshold again
startSignal.await();
System.out.println(" The subtasks -" + id + ", Opening time :" + System.currentTimeMillis());
doWorker();
}catch (Exception e){
e.printStackTrace();
}finally {
doneSignal.countDown();
}
}
public void doWorker() throws InterruptedException{
TimeUnit.SECONDS.sleep(5);
}
public Worker(CountDownLatch startSignal, CountDownLatch doneSignal, int id) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
this.id = id;
}
}
}
Execution results :

In the above code startSignal.await(); It's like a fence , Put all the child threads in their run Method , Wait for the main thread to execute startSignal.countDown(); That is, after closing the fence , All child threads continue to execute their own run() Method , Here's the picture :

Case study 3:
/** * @author wcc * @date 2022/2/16 14:14 */
public class CountDownLatchTest03 {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(2);
Thread t1 = new Thread(() -> {
try {
Thread.sleep(5000);
}catch (Exception e){
}
// rest 5 Seconds later ( The simulated worker thread works 5 second ), call countDown()
latch.countDown();
}, "t1");
Thread t2 = new Thread(() -> {
try {
Thread.sleep(10000);
}catch (Exception e){
}
// rest 10 Seconds later ( The simulated worker thread works 10 second ), call countDown()
latch.countDown();
}, "t2");
t1.start();
t2.start();
Thread t3 = new Thread(() -> {
try {
// Blocking , wait for state Reduced to 0
latch.await();
System.out.println(" Threads t3 from await In the back ");
}catch (Exception e){
System.out.println(" Threads t3 await Interrupted ");
Thread.currentThread().interrupt();
}
}, "t3");
Thread t4 = new Thread(() -> {
try {
// Blocking , wait for state Reduced to 0
latch.await();
System.out.println(" Threads t4 from await In the back ");
}catch (Exception e){
System.out.println(" Threads t4 await Interrupted ");
Thread.currentThread().interrupt();
}
}, "t4");
t3.start();
t4.start();
}
}
The results are as follows :

3、 Source code analysis
3.1、Sync Inner class
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// Incoming initial count frequency
Sync(int count) {
// call setState() Method setting AQS Medium state Value
setState(count);
}
// Get the rest count frequency
int getCount() {
return getState();
}
// Attempt to acquire the Shared lock
protected int tryAcquireShared(int acquires) {
// Be careful , here state be equal to 0 When I came back 1
// state It's not equal to 0 When I came back -1, in other words state It's not equal to 0 Always have to line up when
return (getState() == 0) ? 1 : -1;
}
/** * Try to release the shared lock * to update AQS.state Value , Every call ,state Value reduction 1, When state - 1 Just for 0 When , return true */
protected boolean tryReleaseShared(int releases) {
// Spin operation , Make sure AQS.state The value of was updated successfully
for (;;) {
// Get current state Value
int c = getState();
// Conditions established : This indicates that a thread has triggered the wake-up operation ( Shared lock has been released , Can no longer be released ), Return here false
if (c == 0)
return false;
// So let's go over here , explain state > 0
// If c Value > 0, Will c value -1
int nextc = c-1;
// Atomic updates state CAS success : Indicates that the current thread is executing tryReleaseShared Method c-1 Before , No other thread has modified state Value
// Atomic updates state Value :
if (compareAndSetState(c, nextc))
// nextc == 0:true: Description of the current call countDown() Method is the thread that needs to trigger the wake-up operation , It will return to true Wake up operation
return nextc == 0;
}
}
Sync Inner classes override tryReleaseShared(int releases) and tryAcquireShared(int acquires) Method , And put count Deposit in state Go to variables . Pay attention here , The parameters of the above two methods are not used .
3.2、 Construction method
// The constructor needs to pass in a count, That's the initial number
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
3.3、await() Method
await() Method is a method that waits for other threads to complete , It will try to acquire the shared lock first , If it fails, enter AQS In the blocking queue waiting to be awakened .
According to the above Sync Source code , We know ,state It's not equal to 0 When tryAcquireShared() The return is -1, in other words count Not reduced to 0 When , All calls await() The threads of the method are all queued .
public void await() throws InterruptedException {
// call AQS Of acquireSharedInterruptibly() Method
sync.acquireSharedInterruptibly(1);
}
AQS Medium acquireSharedInterruptibly Method :
// be located AQS in : Methods that can acquire shared locks in response to interrupts
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// Conditions established : Description of the current call await The thread of the method is already in the interrupt state , Throw an exception directly
if (Thread.interrupted())
throw new InterruptedException();
// Conditions established : Show the current AQS Of state It is greater than 0 Of , At this point, the thread is queued , Then go to the wake-up operation
// Conditions not established : AQS.state == 0, At this point, the thread will not be blocked ...
// The thread corresponding to the task execution at the business level has latch To break the , Then others are calling latch.await The thread will not be blocked here
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Will call latch.await() The thread of the method is wrapped as node Add to AQS In the blocking queue
final Node node = addWaiter(Node.SHARED);
// false: Indicates that the current thread is not interrupted , No interrupt exception thrown , There is no need to respond to interrupt the logic of dequeue
// true: Indicates that the current thread has been interrupted , And throw an interrupt exception , You need to cancel the assignment node The logic of competition
boolean failed = true;
try {
// Spin operation
for (;;) {
// Get the predecessor node of the current node
final Node p = node.predecessor();
// Conditions established : This indicates that the node corresponding to the current thread is head.next node
if (p == head) {
// head.next The node has the right to acquire the shared lock
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// shouldParkAfterFailedAcquire(): Will find a good father for the current thread , Finally, set the status of the parent node to -1(SIGNAL), Finally, this method returns true
if (shouldParkAfterFailedAcquire(p, node) &&
// parkAndCheckInterrupt(): Suspends the current thread , And returns the interrupt flag of the current thread
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// Conditions established : This indicates that the current thread has been interrupted , The current... Needs to be unassigned node Threads compete
if (failed)
cancelAcquire(node);
}
}
/** * AQS Of setHeadAndPropagate Method Set the current node to head node , And spread backwards ( Wake up in turn ) * @param node * @param propagate 1: Represents the current shared lock state==0,-1: Represents the current shared lock status state != 0 */
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// Set the current node as new head node , And set up thread、prev by null
setHead(node);
// call setHeadAndPropagate When propagate == 1 It must be true
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// Get the successor node of the current node
Node s = node.next;
// Conditions for a :s== null When was it established : At present node Node is already tail 了 , At this time, the conditions will be established , call doReleaseShared Will deal with this situation
// Condition 2 : precondition :s != null Subsequent nodes are required s The mode of is shared mode SHARED
if (s == null || s.isShared())
// Basically, all cases will be implemented to doReleaseShared Method
doReleaseShared();
}
}
}
Graphic analysis :

3.4、countDown() Method
countDown() Method , Will release the shared lock , That is to say count The number of times will decrease 1.
According to the above Sync Source code , We know ,tryReleaseShared() Every time I put count Less times 1, When it is reduced to 0 Back when true, Only then will the waiting thread wake up .
Be careful ,doReleaseShared() Is to wake up the waiting thread , We analyzed this method in the previous chapters .
public void countDown() {
sync.releaseShared(1);
}
// How to release the shared lock
public final boolean releaseShared(int arg) {
// Conditions established : Description of the current call latch.countDown() Method thread , Is precisely state - 1 == 0 The thread of , Need to trigger wakeup await Thread in state
if (tryReleaseShared(arg)) {
// call countDown() Method only one thread will enter this if Inside the block , call doReleaseShared(), Logic to wake up a thread in a blocking state
doReleaseShared();
return true;
}
return false;
}
/** * What kinds of paths will be called to doReleaseShared Methods? ? * 1.latch.countDown() -> AQS.state == 0 -> doReleaseShared() Wake up the current blocking queue head.next The corresponding thread * 2. Awakened thread -> doAcquireSharedInterruptibly() -> setHeadAndPropagate() -> doReleaseShared() */
// AQS Of doReleaseShared() Method
private void doReleaseShared() {
for (;;) {
// Get current AQS Head node in
Node h = head;
// Conditions for a :h != null establish : Description: the blocking queue is not empty
// Don't set up :h == null, When will it be like this ?
// latch When it's created , No thread has ever called await() Before method , There are thread calls latch.countDown() operation , And trigger the logic of waking up the blocking node
// Condition 2 :h != tail, Currently in the blocking queue except head There are other nodes besides nodes
// h == tail -> head and tail Point to the same node object When will this happen ?
// 1. Normal wakeup conditions , Obtain the shared lock in turn , When the current thread reaches this point ( Threads are tail node )
// 2. The first call await() Method thread and call countDown And the thread that triggers the wake-up blocking node is concurrent
// because await() The thread is the first to call latch.await() The thread of , There is nothing in the queue at this time , It needs to be supplemented by creating a head node , Then spin in again
// stay await() Before the thread queue completes , Assume that there are only empty elements just created in the current queue head
// At the same time , There is an external call countDown() The thread of , It will state From 1 It is amended as follows 0 了 , This thread needs to do the logic to wake up the elements in the blocking queue
// Be careful : call await() Method thread , Because when the team is completely joined, it returns to the upper level method again doAcquireSharedInterruptibly in , Will go into spin ...
// Spin will get the precursor of the current element , Judge who you are head.next, All subsequent threads will set themselves to head, Then the current thread is not interrupted
if (h != null && h != tail) {
// To this if Inside , Show the current head There must be a successor node
// Get the header head Waiting state
int ws = h.waitStatus;
// If at present head The node status is signal This indicates that the current successor node has not been awakened
if (ws == Node.SIGNAL) {
// Before waking up the successor node , Will the current head The state of the node is changed to 0
// here , Why use CAS Operation? ?
// here , This is because the current node wakes up the subsequent nodes , Subsequent nodes have updated themselves as head node , The current node cannot exit the spin , Then it will participate in the logic of the successor node that wakes up its successor node again
// therefore , At this time, there are concurrent , To use CAS Logic
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// Wake up the successor node
unparkSuccessor(h);
}
// So let's go over here , This indicates that the waiting state of the current header node is not SIGNAL
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// Conditions established :
// 1. It indicates that the successor node that has just woken up has not been executed setHeadAndPropagate Method , Set the current wake-up node to head The logic of
// This is the time , The current thread directly jumps out and ends ...
// here , Don't worry about the wake-up logic being broken here ?
// Don't worry , Because the awakened thread will execute sooner or later doReleaseShared In the method
// 2.head == null
// latch When it's created , No thread has ever called await() Before method , There are thread calls latch.countDown() operation , And trigger the logic of waking up the blocking node
// 3.head == tail The first call await() Method thread and call countDown And the thread that triggers the wake-up blocking node is concurrent head and tail It's pointing to the same object
// Conditions not established :
// The awakened node is very active , Then directly in the upper layer method doAcquireSharedInterruptibly Be awakened in , Set yourself directly as a new head node
// At this point, wake up its node ( Precursor node ) perform h == head Cause the condition not to hold
// here head The precursor of the node will not jump out doReleaseShared, Will continue to participate in the awakening of new head The subsequent node logic of the node
if (h == head) // loop if head changed
break;
}
}
/** * Try to release the shared lock * to update AQS.state Value , Every call ,state Value reduction 1, When state - 1 Just for 0 When , return true */
protected boolean tryReleaseShared(int releases) {
// Spin operation , Make sure AQS.state The value of was updated successfully
for (;;) {
// Get current state Value
int c = getState();
// Conditions established : This indicates that a thread has triggered the wake-up operation ( Shared lock has been released , Can no longer be released ), Return here false
if (c == 0)
return false;
// So let's go over here , explain state > 0
// If c Value > 0, Will c value -1
int nextc = c-1;
// Atomic updates state CAS success : Indicates that the current thread is executing tryReleaseShared Method c-1 Before , No other thread has modified state Value
// Atomic updates state Value :
if (compareAndSetState(c, nextc))
// nextc == 0:true: Description of the current call countDown() Method is the thread that needs to trigger the wake-up operation , It will return to true Wake up operation
return nextc == 0;
}
}
CountDowmnLatch.countDown() Execution flow diagram :

summary :
CountDownLatchIndicates that one or more threads are allowed to wait for the operation of other threads to complete before executing subsequent operationsCountDownLatchUse AQS The implementation of the shared lock mechanismCountDownLatchThe number of incoming times is required during initializationcount( The number of lock levels of the shared lock )- Every time you call
countDown()Method timecountLess times 1 - Every time you call
await()Method will try to obtain the lock , The acquisition lock here is actually a check AQS Medium state Value is not 0 - When count value ( That is to say state Value ) Reduced to 0 Will wake up when AQS Blocking threads in the queue , These thread calls
await()Method to join the team
边栏推荐
- showCTF Web入门题系列
- 日記(C語言總結)
- Client construction and Optimization Practice
- Unity中.Meta文件作用详解
- 函数声明和函数表达式的区别
- Summary of command execution knowledge points in CTF
- 4.6 lodash usage documents
- Decrypt FTP
- Give two strings S and T, and judge whether T is the word formed after rearrangement of S
- 给两个字符串s和t,判断t是否为s的重新排列后组成的单词
猜你喜欢

STL教程3-类型转换static_cast、dynamic_cast、const_cast、reinterpret_cast方法
![[early knowledge of activities] list of recent activities of livevideostack](/img/8c/f8007931b1a5944f3a0a243a5afcc4.png)
[early knowledge of activities] list of recent activities of livevideostack

【活动早知道】LiveVideoStack近期活动一览

2022-2028 global after sales spark plug industry research and trend analysis report

2022-2028 global hydrogen internal combustion engine industry research and trend analysis report

LeetCode数组经典题目(一)

This article takes you to interpret the advertising advantages of tiktok

Gql+nodejs+mysql database

Summary of mobile application development

Summary of problems and errors encountered in tidb4.0.0 (tiup deployment)
随机推荐
For hand drawn graphics, covering multiple topics, CVPR 2022 sketchdl workshop begins to solicit contributions!
Tsinghua University | van: visual attention network
[early knowledge of activities] list of recent activities of livevideostack
2022-2028 global section valve industry research and trend analysis report
2022-2028 global boom cylinder industry research and trend analysis report
Difference between function declaration and function expression
Requirements for setting up points sign in tasks and common problems in the process of building points mall
【MGT】代码解读之model-MGT
Visual studio code annotation plug-in: korofileheader
What should I do if a white page appears during MySQL installation
Global and Chinese market of horizontal drilling rigs 2022-2028: Research Report on technology, participants, trends, market size and share
Zhongyi Antu submitted for registration: proposed to raise 600million yuan, with annual revenue of nearly 1.2 billion yuan
CTF show WEB10
Dumpling備份數據庫
PS prompts "script error -50 general Photoshop error, how to solve it?
2.19 simulation summary
Linux安装达梦数据库/DM8(附带客户端工具安装完整版)
CTF show WEB10
Unity development related blog collection
【C】【时间操作】C语言中关于时间的操作