当前位置:网站首页>Deeply understand the underlying implementation principle of countdownlatch in concurrent programming
Deeply understand the underlying implementation principle of countdownlatch in concurrent programming
2022-07-01 05:25:00 【Jackli1123 (cross correlation second return)】
Deep understanding of concurrent programming CountDownLatch Underlying implementation principle
List of articles
One 、 What is? CountDownLatch
CountDownLatch It's a kind of java.util.concurrent Package the next synchronization tool class , It's like a counter , It allows one or more threads to wait until the execution of a set of operations in other threads is complete .
CountDownLatch The bottom layer is based on AQS Realized , Provides a pass int Construction method of type new CountDownLatch(int count), Indicates that the initial value of the counter is count, There are also two core approaches :countDown() Method indicates that the number of counters is subtracted 1 operation ; await() Method means to put the current thread into a waiting state until count The value of is 0 In order to proceed ; And an inheritance from AQS The inner class of Sync To do specific operations .
Two 、CountDownLatch Source code analysis
It is suggested that you should take a look at the source code here lock Lock source code , Because the two source code pairs AQS The logic used is almost identical , The same source code also Semaphore Semaphore .
Deep understanding of concurrent programming AQS Source code interpretation
1. Construction method
CountDownLatch The construction method of is a need to pass int A parametric construction of a type , The parameters to be passed must ≥0, because CountDownLatch It's a minus counter
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// Set the initial state Is the initial value of the counter , This is using AQS Of state,Lock This variable is used in the lock to indicate the number of reentries
Sync(int count) {
setState(count);
}
2.await() Execute the wait method
perform await() Method will enter the wait state , This is nothing to see , If you are interested, you can take a look at the above Lock Lock source code analysis , The code is almost the same
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// Returns the status of the current counter , If the counter is not equal to 0 Then return to -1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// This careful look is also related to lock The lock lock() The method is the same
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Create a new waiting node
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// Only when the result of the counter is 0 And when the thread is awakened, it returns 1 In order to proceed
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // Here we enter a blocking state ,
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// Create a new waiting node , The operation here is similar to lock The locks are almost identical, so I won't explain them in detail , It can also be seen from here CountDownLatch Our linked list is a two-way linked list
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
2.countDown() Count minus 1 operation
I don't want to see this , Also follow the front lock The operation of lock release is the same
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// Counter state Perform minus one operation , When minus 1 The operation after is 0 Back when true, Otherwise return to false
if (tryReleaseShared(arg)) {
// Only when state The value of is reduced to 1 Wake up the thread of the header node
doReleaseShared();
return true;
}
return false;
}
// Perform wake-up thread operation
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus; // obtain head State of thread
if (ws == Node.SIGNAL) {
// If the state is -1 Indicates waiting for wakeup
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // Try to change the status of the thread from -1 The wait for wakeup status is changed to 0
continue;
unparkSuccessor(h); // Wake up the head Threads
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
3、 ... and 、 Handwriting CountDownLatch Look at the execution logic
public class MyCountDownLatch {
private Sync sync;
public MyCountDownLatch(int count) {
sync = new Sync(count);
}
/** * Make the current thread blocked */
public void await() {
sync.acquireShared(1);
}
/** * Counter -1 operation */
public void countDown() {
sync.releaseShared(1);
}
// be based on AQS The inner class of
class Sync extends AbstractQueuedSynchronizer {
public Sync(int count) {
setState(count);
}
/** * If the return result is <0 Under the circumstances , Then put the current thread into aqs In a two-way list * * @param arg * @return */
@Override
protected int tryAcquireShared(int arg) {
// If aqs The state of >0, Then put the current thread into aqs In a two-way list Return equal to 0 -1;
return getState() == 0 ? 1 : -1;
}
/** * If the method returns true Under the circumstances , Just wake up the blocked thread * getState() == 0 -1 If ==0 return true * >0 false * * @param arg * @return */
@Override
protected boolean tryReleaseShared(int arg) {
for (; ; ) {
int oldState = getState();
if (oldState == 0) {
return false;
}
int newState = oldState - arg;
if (compareAndSetState(oldState, newState)) {
return newState == 0;
}
}
}
}
}
perform demo Method validation :
public static void main(String[] args) {
MyCountDownLatch mayiktCountDownLatch = new MyCountDownLatch(2);
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " End of thread execution ");
mayiktCountDownLatch.countDown();
}).start();
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " End of thread execution ");
mayiktCountDownLatch.countDown();
}).start();
System.out.println(" The main thread begins to enter the waiting process ");
mayiktCountDownLatch.await();
}
Finally, the execution result , The main thread enters the wait state , Until the two threads have finished executing countDown Then continue to execute :
Four 、Semaphore Semaphore
Semaphore Used to restrict access to certain resources ( Physical or logical ) Number of threads for , He maintains a collection of licenses , Limit how many resources you need to maintain license sets , If there is N A resource , That corresponds to N A license , There can only be... At the same time N Thread access . When a thread obtains a license, it calls acquire Method , When you run out of free resources, call release Method .
It is simply understood as Semaphore Semaphores can limit the flow of threads ( The maximum number of threads executing at the same time ), The bottom layer is also based on AQS Don't read the specific source code , Let's talk about the usage .
**acquire():** Get a license , Get one license at a time Semaphore Available licenses for (state) The number of 1, by 0 When the following thread enters the blocking state .
**release():** Release a license , Release one license at a time Semaphore Available licenses for (state) The quantity of is added 1, Wake up a waiting thread .
public class Test001 {
// Suppose the parking lot has 5 A parking space , Then the maximum can only stop 5 The line behind the car , Threads are cars
public static void main(String[] args) throws InterruptedException {
// Set up AQS Status as 5 There can only be 5 Thread execution code Current limiting Long can increase capacity 5 personal
Semaphore semaphore = new Semaphore(5);
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
try {
// AQS The status will subtract 1, Until state The status of is 0 Enter the two-way blocking queue
semaphore.acquire(); // Enter the parking lot
System.out.println(Thread.currentThread().getName() + ", Parking ");
// AQS state +1 At the same time, wake up a thread in the blocking queue
semaphore.release(); // Get out of the parking lot
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
Content sources : Ant class
边栏推荐
- Vmware workstation network card settings and three common network modes
- 0xc000007b the application cannot start the solution normally (the pro test is valid)
- Detailed explanation of set
- Distributed architecture system splitting principles, requirements and microservice splitting steps
- Intelligent operation and maintenance: visual management system based on BIM Technology
- Some common commands of podman
- Numeric amount plus comma; JS two methods of adding three digits and a comma to numbers; JS data formatting
- Distributed transactions - Solutions
- Series of improving enterprise product delivery efficiency (1) -- one click installation and upgrade of enterprise applications
- Web Security (IX) what is JWT?
猜你喜欢
How to create a progress bar that changes color according to progress
Implementation of distributed lock
Solution: drag the Xib control to the code file, and an error setvalue:forundefined key:this class is not key value coding compliant for the key is reported
tar命令
CockroachDB 分布式事务源码分析之 TxnCoordSender
LevelDB源码分析之LRU Cache
0xc000007b the application cannot start the solution normally (the pro test is valid)
Precautions for use of conductive slip ring
And search: the suspects (find the number of people related to the nth person)
第05天-文件操作函数
随机推荐
Cockroachdb: the resistant geo distributed SQL database paper reading notes
Global and Chinese market of digital badge 2022-2028: Research Report on technology, participants, trends, market size and share
SSGSSRCSR区别
Simple read / write verification of qdatastream
Some common commands of podman
AcWing 888. Finding combinatorial number IV (the problem of finding combinatorial number with high precision)
Fluentd is easy to use. Combined with the rainbow plug-in market, log collection is faster
Thread safety issues
Use and principle of Park unpark
Global and Chinese market of search engine optimization (SEO) software 2022-2028: Research Report on technology, participants, trends, market size and share
Tcp/ip explanation (version 2) notes / 3 link layer / 3.2 Ethernet and IEEE 802 lan/man standards
智慧运维:基于 BIM 技术的可视化管理系统
Web Security (IX) what is JWT?
Data consistency between redis and database
Summary of spanner's paper
eBPF Cilium实战(2) - 底层网络可观测性
Tar command
AcWing 886. Finding combinatorial number II (pretreatment factorial)
Global and Chinese market of enterprise wireless LAN 2022-2028: Research Report on technology, participants, trends, market size and share
Global and Chinese market of metal oxide semiconductor field effect transistors 2022-2028: Research Report on technology, participants, trends, market size and share