当前位置:网站首页>Multithreading and high concurrency (8) -- summarize AQS shared lock from countdownlatch (punch in for the third anniversary)
Multithreading and high concurrency (8) -- summarize AQS shared lock from countdownlatch (punch in for the third anniversary)
2022-07-06 00:16:00 【The green flowers of the Li Wang family】
I have written an article unconsciously 3 Years. ,3 I only wrote more than 60 articles in , A little ashamed . I hope there will be a burst of creation in the coming period , Can also usher in the explosion of their own technology . Come on! , Cui Hua !
One 、 summary
Last article : Multithreading and high concurrency (7)—— from ReentrantLock To AQS Source code Explained in detail AQS Principle , We know how to use AQS The synchronizer can be constructed simply and efficiently ( rewrite tryAcquire、tryRelease You can easily achieve ). What about here? ,JDK It provides us with many efficient synchronizers , Such as CountDownLatch 、CyclicBarrier、Phaser、Semaphore、Exchanger、ReentrantReadWriteLock, We will explain and analyze the following articles one by one .
here , We will pass CountDownLatch summary AQS The principle of shared lock , At the same time, compare CyclicBarrier and Phaser.
Two 、CountDownLatch
CountDownLatch It literally means counting down the bolts , That is, count down how many threads have completed execution . It allows int A thread is blocked in one place , Until all threads have completed their tasks .
1、 Common methods
await(): The thread calling this method is waiting , until latch The value of is reduced to 0 Or the current thread is interrupted . Generally, it is called by the main thread .—— Open door
await(long timeout, TimeUnit unit): With timeout await.
countDown(): send latch The value of the reduction 1, If it's down to 0, Will wake up all waiting in this latch On the thread .—— Reciprocal
getCount(): get latch value .
2、 Two typical uses
1、 The main thread is executed only after all businesses are executed
take CountDownLatch The counter of is initialized to n (new CountDownLatch(n)), Every time a task thread finishes executing , Just subtract the counter 1 (countdownlatch.countDown()), When the value of the counter changes to 0 when , stay CountDownLatch On await() The thread will wake up . For example, Baidu's text to voice code , For long text , You need to turn in segments , Each paragraph needs to be converted to integrate the whole text .
Let's give you another example , Like an exam , Yes 30 Candidates ( Threads ), A invigilator ( The main thread ), Each candidate can hand in the test paper in advance after completing the test paper , And you don't need to worry about other exams after you hand in the paper , After the exam ( All students hand in their papers ,latch by 0), The invigilator can leave ( End of main thread ).
30 Student output results are too long , Let's change to 5 A student , The code is as follows :
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(5);
// Specified number of threads
ExecutorService threadPool = Executors.newFixedThreadPool(30);
for (int i = 0; i < 5; i++) {
int finalI = i;
threadPool.execute(() -> {
System.out.println(" Student "+ finalI +" Hand in papers ");
countDownLatch.countDown();
});
}
try {
// Open door
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.shutdown();
System.out.println(" The exam is over , The invigilator left ");
}
The operation results are as follows , It can be seen that students hand in their papers in disorder , But the teacher must wait for all the students to hand in their papers before leaving :
Student 0 Hand in papers
Student 2 Hand in papers
Student 3 Hand in papers
Student 1 Hand in papers
Student 4 Hand in papers
The exam is over , The invigilator left
2、 At some point , All threads execute together .
This function is to realize the maximum parallelism of multiple threads starting to execute tasks .
Parallel , It is emphasized that multiple threads start executing at a certain time . Concurrency is still executed in sequence .
It's like a race , Put multiple threads in the starting point , Waiting for the start of the shooting , And run at the same time .
The code is as follows :
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(1);
CountDownLatch countDownLatch2 = new CountDownLatch(5);
// Specified number of threads
ExecutorService threadPool = Executors.newFixedThreadPool(30);
for (int i = 0; i < 5; i++) {
int finalI = i;
threadPool.execute(() -> {
try {
System.out.println(" Athletes "+ finalI +" Preparing for ");
// Thread blocking , Waiting to be released
countDownLatch.await();
Thread.sleep(100);
System.out.println(" Athletes "+ finalI +" sprint ");
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch2.countDown();
System.out.println(" Athletes "+ finalI +" Finish the game ");
});
}
try {
Thread.sleep(1000);
System.out.println(" All the athletes are ready , Start running ");
// Unlock and release all threads above
countDownLatch.countDown();
// All threads complete the race , Go down
countDownLatch2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.shutdown();
}
The results are as follows :
Athletes 1 Preparing for
Athletes 4 Preparing for
Athletes 3 Preparing for
Athletes 0 Preparing for
Athletes 2 Preparing for
All the athletes are ready , Start running
Athletes 0 sprint
Athletes 1 sprint
Athletes 4 sprint
Athletes 3 sprint
Athletes 2 sprint
Athletes 3 Finish the game
Athletes 4 Finish the game
Athletes 1 Finish the game
Athletes 0 Finish the game
Athletes 2 Finish the game
3、 The source code parsing
With the basis of the last article , Let's talk about the process of sharing locks here, which will be more handy .
CountDownLatch Is an implementation of shared locks , It is constructed by default AQS Of state The value is count. When threads use countDown() When the method is used , Actually used tryReleaseShared Methods to CAS To reduce state, until state by 0 . When calling await() Method time , If state Not for 0, That proves that the task has not been completed ,await() The method will be blocked all the time , in other words await() The statement after the method will not be executed . then ,CountDownLatch Can spin CAS Judge state == 0, If state == 0 Words , All waiting threads will be released ,await() The statement after the method is executed .
First , Let's take a look at how it's constructed :
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
...
Sync(int count) {
setState(count);
}
You can see it , It's going on new When the object ,count Namely state The initial value of the .
then , Let's see await() Method :
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
The call sync Of acquireSharedInterruptibly Method , It's also AQS Methods :
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// If interrupted , Throw an exception
if (Thread.interrupted())
throw new InterruptedException();
// Try to get synchronization status
if (tryAcquireShared(arg) < 0)
// Failed to get synchronization status , The spin
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared() Method is CountDownLatch Rewrite method , as follows :
protected int tryAcquireShared(int acquires) {
// Whether the current status is 0 Releasable lock
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedInterruptibly The code is as follows :
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Add the current thread to the end of the synchronization queue ,addWaiter You can read the last article
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// The spin
for (;;) {
// The predecessor node of the current node
final Node p = node.predecessor();
// If the precursor node is the head node , Then try to get the synchronization status
if (p == head) {
// The current node attempts to get the synchronization status
int r = tryAcquireShared(arg);
if (r >= 0) {
// If successful , Then set the current node as the head node and wake up the next thread
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// If the precursor of the current node is not the head node , Try to suspend the current thread , Same as exclusive lock
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// Cancel acquisition lock , Same as exclusive lock
cancelAcquire(node);
}
}
below , Let's see countDown() Method source code :
public void countDown() {
sync.releaseShared(1);
}
It also calls AQS Of releaseShared() Method :
public final boolean releaseShared(int arg) {
// Get release synchronization status
if (tryReleaseShared(arg)) {
// If it works , Into spin , Try to wake up the successor node of the head node in the synchronization queue
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared() The code is as follows :
protected boolean tryReleaseShared(int releases) {
// Just one meaning , The spin , until state state -1 by 0
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
doReleaseShared() The method is as follows :
private void doReleaseShared() {
// Spin first
for (;;) {
// Get the header node
Node h = head;
if (h != null && h != tail) {
// Head node state
int ws = h.waitStatus;
// If it is SIGNAL, Try to wake up subsequent nodes
if (ws == Node.SIGNAL) {
// as long as head Successful from SIGNAL It is amended as follows 0, that head The thread corresponding to the successor node of will be awakened .
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// Put the next node that is not empty unpark
unparkSuccessor(h);
}
// Don't wake up at other times
else if (ws == 0 &&
// The predecessor node will not only wake up its successor node , At the same time, it may wake up the subsequent nodes . Without stop for
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// If head There is no change , Call break Exit loop
if (h == head) // loop if head changed
break;
}
}
When the successor node of the head node is awakened , The thread will wake up from where it was suspended , Carry on . If the current status is still >0, Then set the current node as the head node .setHeadAndPropagate() The code is as follows :
private void setHeadAndPropagate(Node node, int propagate) {
// Current header node
Node h = head; // Record old head for check below
// Set the current node as the head node
setHead(node);
// If you execute this function , that propagate It must be equal to 1
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// Get the next node of the current node
Node s = node.next;
// Wake up subsequent nodes
if (s == null || s.isShared())
doReleaseShared();
}
}
Um. , The writing is a little messy , Draw a picture and calm down :
Blue CountDownLatch, Yellow is AQS Source code .
3、 ... and 、CyclicBarrier
CyclicBarrier Literally means recyclable (Cyclic) My fence (Barrier). A group of threads reach a fence ( It can also be called synchronization point ) When is blocked , Until the last thread reaches the fence , The fence will open , All intercepted threads will continue to work .
It and CountDownLatch Is the difference between the , For example, examination. ,CountDownLatch The first candidate in the middle school doesn't care what the second candidate does . however CyclicBarrier in , Everyone must arrive , For example, primary school students travel , When you go home , Everyone must get on the bus to go home .
The core approach :
await() : All participants are already here barrier On the call await Before method , Will be waiting for .
CyclicBarrier(int parties, Runnable barrierAction): Used when a thread reaches a barrier , priority barrierAction, Easy to handle more complex business scenarios .
reset(): Reset the barrier to its original state . If all participants are currently waiting at the barrier , Then they will return , Throw one at the same time BrokenBarrierException.
The usage code is as follows :
public static void main(String[] args) {
// Every time 5 Personal departure
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
// Specified number of threads
ExecutorService threadPool = Executors.newFixedThreadPool(30);
for (int i = 0; i < 5; i++) {
int finalI = i;
threadPool.execute(() -> {
System.out.println(" Student "+ finalI +" Get on the train ");
try {
// Wait to ensure that the execution of the child thread ends
cyclicBarrier.await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println(" Student "+ finalI +" set out ");
});
}
threadPool.shutdown();
}
The results are as follows :
Student 1 Get on the train
Student 3 Get on the train
Student 4 Get on the train
Student 2 Get on the train
Student 0 Get on the train
Student 0 set out
Student 3 set out
Student 1 set out
Student 2 set out
Student 4 set out
It can be seen that , All the students arrived before they set out .
CountDownLatch Is the counter , Thread completes one record one , It's just that the count is not increasing but decreasing , and CyclicBarrier It's more like a valve , All threads are required to arrive , The valve will open , And then go ahead and do it .
Expand :
Phaser What is a class ?
Phaser Is a reusable synchronization fence , Its function and CountDownLatch、CyclicBarrier be similar , But it can be used to solve the scenario problem of controlling multiple threads to complete tasks in stages .—— Segmented fence
It's a little bit like CyclicBarrier Of barrierAction, This is just a summary .
边栏推荐
- Global and Chinese markets of POM plastic gears 2022-2028: Research Report on technology, participants, trends, market size and share
- 【NOI模拟赛】Anaid 的树(莫比乌斯反演,指数型生成函数,埃氏筛,虚树)
- AtCoder Beginner Contest 258【比赛记录】
- 转:未来,这样的组织才能扛住风险
- Huawei equipment is configured with OSPF and BFD linkage
- Recognize the small experiment of extracting and displaying Mel spectrum (observe the difference between different y_axis and x_axis)
- MySQL之函数
- Key structure of ffmpeg -- AVCodecContext
- QT -- thread
- 云呐|固定资产管理系统功能包括哪些?
猜你喜欢
Teach you to run uni app with simulator on hbuilderx, conscience teaching!!!
Huawei equipment configuration ospf-bgp linkage
Detailed explanation of APP functions of door-to-door appointment service
How much do you know about the bank deposit business that software test engineers must know?
Single merchant v4.4 has the same original intention and strength!
Knowledge about the memory size occupied by the structure
LeetCode 1189. Maximum number of "balloons"
After summarizing more than 800 kubectl aliases, I'm no longer afraid that I can't remember commands!
Extracting profile data from profile measurement
Search (DFS and BFS)
随机推荐
AtCoder Beginner Contest 254【VP记录】
After summarizing more than 800 kubectl aliases, I'm no longer afraid that I can't remember commands!
JS 这次真的可以禁止常量修改了!
Codeforces gr19 D (think more about why the first-hand value range is 100, JLS yyds)
18. (ArcGIS API for JS) ArcGIS API for JS point collection (sketchviewmodel)
Room cannot create an SQLite connection to verify the queries
Gavin teacher's perception of transformer live class - rasa project actual combat e-commerce retail customer service intelligent business dialogue robot system behavior analysis and project summary (4
MySQL functions
微信小程序---WXML 模板语法(附带笔记文档)
LeetCode 6004. Get operands of 0
What is information security? What is included? What is the difference with network security?
MySql——CRUD
OS i/o devices and device controllers
[designmode] composite mode
LeetCode 8. String conversion integer (ATOI)
Global and Chinese market of water heater expansion tank 2022-2028: Research Report on technology, participants, trends, market size and share
Add noise randomly to open3d point cloud
如何解决ecology9.0执行导入流程流程产生的问题
Recognize the small experiment of extracting and displaying Mel spectrum (observe the difference between different y_axis and x_axis)
【GYM 102832H】【模板】Combination Lock(二分图博弈)