当前位置:网站首页>Multithreading and high concurrency (8) -- summarize AQS shared lock from countdownlatch (punch in for the third anniversary)
Multithreading and high concurrency (8) -- summarize AQS shared lock from countdownlatch (punch in for the third anniversary)
2022-07-06 00:16:00 【The green flowers of the Li Wang family】
I have written an article unconsciously 3 Years. ,3 I only wrote more than 60 articles in , A little ashamed . I hope there will be a burst of creation in the coming period , Can also usher in the explosion of their own technology . Come on! , Cui Hua !
One 、 summary
Last article : Multithreading and high concurrency (7)—— from ReentrantLock To AQS Source code Explained in detail AQS Principle , We know how to use AQS The synchronizer can be constructed simply and efficiently ( rewrite tryAcquire、tryRelease You can easily achieve ). What about here? ,JDK It provides us with many efficient synchronizers , Such as CountDownLatch 、CyclicBarrier、Phaser、Semaphore、Exchanger、ReentrantReadWriteLock, We will explain and analyze the following articles one by one .
here , We will pass CountDownLatch summary AQS The principle of shared lock , At the same time, compare CyclicBarrier and Phaser.
Two 、CountDownLatch
CountDownLatch It literally means counting down the bolts , That is, count down how many threads have completed execution . It allows int A thread is blocked in one place , Until all threads have completed their tasks .
1、 Common methods
await(): The thread calling this method is waiting , until latch The value of is reduced to 0 Or the current thread is interrupted . Generally, it is called by the main thread .—— Open door
await(long timeout, TimeUnit unit): With timeout await.
countDown(): send latch The value of the reduction 1, If it's down to 0, Will wake up all waiting in this latch On the thread .—— Reciprocal
getCount(): get latch value .
2、 Two typical uses
1、 The main thread is executed only after all businesses are executed
take CountDownLatch The counter of is initialized to n (new CountDownLatch(n)), Every time a task thread finishes executing , Just subtract the counter 1 (countdownlatch.countDown()), When the value of the counter changes to 0 when , stay CountDownLatch On await() The thread will wake up . For example, Baidu's text to voice code , For long text , You need to turn in segments , Each paragraph needs to be converted to integrate the whole text .
Let's give you another example , Like an exam , Yes 30 Candidates ( Threads ), A invigilator ( The main thread ), Each candidate can hand in the test paper in advance after completing the test paper , And you don't need to worry about other exams after you hand in the paper , After the exam ( All students hand in their papers ,latch by 0), The invigilator can leave ( End of main thread ).
30 Student output results are too long , Let's change to 5 A student , The code is as follows :
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(5);
// Specified number of threads
ExecutorService threadPool = Executors.newFixedThreadPool(30);
for (int i = 0; i < 5; i++) {
int finalI = i;
threadPool.execute(() -> {
System.out.println(" Student "+ finalI +" Hand in papers ");
countDownLatch.countDown();
});
}
try {
// Open door
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.shutdown();
System.out.println(" The exam is over , The invigilator left ");
}
The operation results are as follows , It can be seen that students hand in their papers in disorder , But the teacher must wait for all the students to hand in their papers before leaving :
Student 0 Hand in papers
Student 2 Hand in papers
Student 3 Hand in papers
Student 1 Hand in papers
Student 4 Hand in papers
The exam is over , The invigilator left
2、 At some point , All threads execute together .
This function is to realize the maximum parallelism of multiple threads starting to execute tasks .
Parallel , It is emphasized that multiple threads start executing at a certain time . Concurrency is still executed in sequence .
It's like a race , Put multiple threads in the starting point , Waiting for the start of the shooting , And run at the same time .
The code is as follows :
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(1);
CountDownLatch countDownLatch2 = new CountDownLatch(5);
// Specified number of threads
ExecutorService threadPool = Executors.newFixedThreadPool(30);
for (int i = 0; i < 5; i++) {
int finalI = i;
threadPool.execute(() -> {
try {
System.out.println(" Athletes "+ finalI +" Preparing for ");
// Thread blocking , Waiting to be released
countDownLatch.await();
Thread.sleep(100);
System.out.println(" Athletes "+ finalI +" sprint ");
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch2.countDown();
System.out.println(" Athletes "+ finalI +" Finish the game ");
});
}
try {
Thread.sleep(1000);
System.out.println(" All the athletes are ready , Start running ");
// Unlock and release all threads above
countDownLatch.countDown();
// All threads complete the race , Go down
countDownLatch2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.shutdown();
}
The results are as follows :
Athletes 1 Preparing for
Athletes 4 Preparing for
Athletes 3 Preparing for
Athletes 0 Preparing for
Athletes 2 Preparing for
All the athletes are ready , Start running
Athletes 0 sprint
Athletes 1 sprint
Athletes 4 sprint
Athletes 3 sprint
Athletes 2 sprint
Athletes 3 Finish the game
Athletes 4 Finish the game
Athletes 1 Finish the game
Athletes 0 Finish the game
Athletes 2 Finish the game
3、 The source code parsing
With the basis of the last article , Let's talk about the process of sharing locks here, which will be more handy .
CountDownLatch Is an implementation of shared locks , It is constructed by default AQS Of state The value is count. When threads use countDown() When the method is used , Actually used tryReleaseShared Methods to CAS To reduce state, until state by 0 . When calling await() Method time , If state Not for 0, That proves that the task has not been completed ,await() The method will be blocked all the time , in other words await() The statement after the method will not be executed . then ,CountDownLatch Can spin CAS Judge state == 0, If state == 0 Words , All waiting threads will be released ,await() The statement after the method is executed .
First , Let's take a look at how it's constructed :
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
...
Sync(int count) {
setState(count);
}
You can see it , It's going on new When the object ,count Namely state The initial value of the .
then , Let's see await() Method :
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
The call sync Of acquireSharedInterruptibly Method , It's also AQS Methods :
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// If interrupted , Throw an exception
if (Thread.interrupted())
throw new InterruptedException();
// Try to get synchronization status
if (tryAcquireShared(arg) < 0)
// Failed to get synchronization status , The spin
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared() Method is CountDownLatch Rewrite method , as follows :
protected int tryAcquireShared(int acquires) {
// Whether the current status is 0 Releasable lock
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedInterruptibly The code is as follows :
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Add the current thread to the end of the synchronization queue ,addWaiter You can read the last article
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// The spin
for (;;) {
// The predecessor node of the current node
final Node p = node.predecessor();
// If the precursor node is the head node , Then try to get the synchronization status
if (p == head) {
// The current node attempts to get the synchronization status
int r = tryAcquireShared(arg);
if (r >= 0) {
// If successful , Then set the current node as the head node and wake up the next thread
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// If the precursor of the current node is not the head node , Try to suspend the current thread , Same as exclusive lock
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// Cancel acquisition lock , Same as exclusive lock
cancelAcquire(node);
}
}
below , Let's see countDown() Method source code :
public void countDown() {
sync.releaseShared(1);
}
It also calls AQS Of releaseShared() Method :
public final boolean releaseShared(int arg) {
// Get release synchronization status
if (tryReleaseShared(arg)) {
// If it works , Into spin , Try to wake up the successor node of the head node in the synchronization queue
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared() The code is as follows :
protected boolean tryReleaseShared(int releases) {
// Just one meaning , The spin , until state state -1 by 0
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
doReleaseShared() The method is as follows :
private void doReleaseShared() {
// Spin first
for (;;) {
// Get the header node
Node h = head;
if (h != null && h != tail) {
// Head node state
int ws = h.waitStatus;
// If it is SIGNAL, Try to wake up subsequent nodes
if (ws == Node.SIGNAL) {
// as long as head Successful from SIGNAL It is amended as follows 0, that head The thread corresponding to the successor node of will be awakened .
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// Put the next node that is not empty unpark
unparkSuccessor(h);
}
// Don't wake up at other times
else if (ws == 0 &&
// The predecessor node will not only wake up its successor node , At the same time, it may wake up the subsequent nodes . Without stop for
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// If head There is no change , Call break Exit loop
if (h == head) // loop if head changed
break;
}
}
When the successor node of the head node is awakened , The thread will wake up from where it was suspended , Carry on . If the current status is still >0, Then set the current node as the head node .setHeadAndPropagate() The code is as follows :
private void setHeadAndPropagate(Node node, int propagate) {
// Current header node
Node h = head; // Record old head for check below
// Set the current node as the head node
setHead(node);
// If you execute this function , that propagate It must be equal to 1
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// Get the next node of the current node
Node s = node.next;
// Wake up subsequent nodes
if (s == null || s.isShared())
doReleaseShared();
}
}
Um. , The writing is a little messy , Draw a picture and calm down :
Blue CountDownLatch, Yellow is AQS Source code .
3、 ... and 、CyclicBarrier
CyclicBarrier Literally means recyclable (Cyclic) My fence (Barrier). A group of threads reach a fence ( It can also be called synchronization point ) When is blocked , Until the last thread reaches the fence , The fence will open , All intercepted threads will continue to work .
It and CountDownLatch Is the difference between the , For example, examination. ,CountDownLatch The first candidate in the middle school doesn't care what the second candidate does . however CyclicBarrier in , Everyone must arrive , For example, primary school students travel , When you go home , Everyone must get on the bus to go home .
The core approach :
await() : All participants are already here barrier On the call await Before method , Will be waiting for .
CyclicBarrier(int parties, Runnable barrierAction): Used when a thread reaches a barrier , priority barrierAction, Easy to handle more complex business scenarios .
reset(): Reset the barrier to its original state . If all participants are currently waiting at the barrier , Then they will return , Throw one at the same time BrokenBarrierException.
The usage code is as follows :
public static void main(String[] args) {
// Every time 5 Personal departure
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
// Specified number of threads
ExecutorService threadPool = Executors.newFixedThreadPool(30);
for (int i = 0; i < 5; i++) {
int finalI = i;
threadPool.execute(() -> {
System.out.println(" Student "+ finalI +" Get on the train ");
try {
// Wait to ensure that the execution of the child thread ends
cyclicBarrier.await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println(" Student "+ finalI +" set out ");
});
}
threadPool.shutdown();
}
The results are as follows :
Student 1 Get on the train
Student 3 Get on the train
Student 4 Get on the train
Student 2 Get on the train
Student 0 Get on the train
Student 0 set out
Student 3 set out
Student 1 set out
Student 2 set out
Student 4 set out
It can be seen that , All the students arrived before they set out .
CountDownLatch Is the counter , Thread completes one record one , It's just that the count is not increasing but decreasing , and CyclicBarrier It's more like a valve , All threads are required to arrive , The valve will open , And then go ahead and do it .
Expand :
Phaser What is a class ?
Phaser Is a reusable synchronization fence , Its function and CountDownLatch、CyclicBarrier be similar , But it can be used to solve the scenario problem of controlling multiple threads to complete tasks in stages .—— Segmented fence
It's a little bit like CyclicBarrier Of barrierAction, This is just a summary .
边栏推荐
- Gd32f4xx UIP protocol stack migration record
- Chapter 16 oauth2authorizationrequestredirectwebfilter source code analysis
- QT -- thread
- Search (DFS and BFS)
- [Chongqing Guangdong education] Chongqing Engineering Vocational and Technical College
- Choose to pay tribute to the spirit behind continuous struggle -- Dialogue will values [Issue 4]
- LeetCode 6005. The minimum operand to make an array an alternating array
- VBA fast switching sheet
- Priority queue (heap)
- 多普勒效應(多普勒頻移)
猜你喜欢
What are the functions of Yunna fixed assets management system?
单商户V4.4,初心未变,实力依旧!
Gd32f4xx UIP protocol stack migration record
亲测可用fiddler手机抓包配置代理后没有网络
上门预约服务类的App功能详解
notepad++正则表达式替换字符串
Miaochai Weekly - 8
After summarizing more than 800 kubectl aliases, I'm no longer afraid that I can't remember commands!
[online chat] the original wechat applet can also reply to Facebook homepage messages!
N1 # if you work on a metauniverse product [metauniverse · interdisciplinary] Season 2 S2
随机推荐
Atcoder beginer contest 258 [competition record]
Search (DFS and BFS)
【GYM 102832H】【模板】Combination Lock(二分图博弈)
Extracting profile data from profile measurement
2022.7.5-----leetcode. seven hundred and twenty-nine
选择致敬持续奋斗背后的精神——对话威尔价值观【第四期】
FFMPEG关键结构体——AVFormatContext
Ffmpeg learning - core module
Shardingsphere source code analysis
MySql——CRUD
PV static creation and dynamic creation
[Luogu cf487e] tours (square tree) (tree chain dissection) (line segment tree)
云呐|公司固定资产管理系统有哪些?
Choose to pay tribute to the spirit behind continuous struggle -- Dialogue will values [Issue 4]
NSSA area where OSPF is configured for Huawei equipment
Go learning --- structure to map[string]interface{}
How to solve the problems caused by the import process of ecology9.0
Teach you to run uni app with simulator on hbuilderx, conscience teaching!!!
[noi simulation] Anaid's tree (Mobius inversion, exponential generating function, Ehrlich sieve, virtual tree)
Mysql - CRUD