当前位置:网站首页>Deeply understand the underlying implementation principle of countdownlatch in concurrent programming
Deeply understand the underlying implementation principle of countdownlatch in concurrent programming
2022-07-01 05:25:00 【Jackli1123 (cross correlation second return)】
Deep understanding of concurrent programming CountDownLatch Underlying implementation principle
List of articles
One 、 What is? CountDownLatch
CountDownLatch It's a kind of java.util.concurrent Package the next synchronization tool class , It's like a counter , It allows one or more threads to wait until the execution of a set of operations in other threads is complete .
CountDownLatch The bottom layer is based on AQS Realized , Provides a pass int Construction method of type new CountDownLatch(int count), Indicates that the initial value of the counter is count, There are also two core approaches :countDown() Method indicates that the number of counters is subtracted 1 operation ; await() Method means to put the current thread into a waiting state until count The value of is 0 In order to proceed ; And an inheritance from AQS The inner class of Sync To do specific operations .
Two 、CountDownLatch Source code analysis
It is suggested that you should take a look at the source code here lock Lock source code , Because the two source code pairs AQS The logic used is almost identical , The same source code also Semaphore Semaphore .
Deep understanding of concurrent programming AQS Source code interpretation
1. Construction method
CountDownLatch The construction method of is a need to pass int A parametric construction of a type , The parameters to be passed must ≥0, because CountDownLatch It's a minus counter
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// Set the initial state Is the initial value of the counter , This is using AQS Of state,Lock This variable is used in the lock to indicate the number of reentries
Sync(int count) {
setState(count);
}
2.await() Execute the wait method
perform await() Method will enter the wait state , This is nothing to see , If you are interested, you can take a look at the above Lock Lock source code analysis , The code is almost the same
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// Returns the status of the current counter , If the counter is not equal to 0 Then return to -1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// This careful look is also related to lock The lock lock() The method is the same
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Create a new waiting node
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// Only when the result of the counter is 0 And when the thread is awakened, it returns 1 In order to proceed
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // Here we enter a blocking state ,
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// Create a new waiting node , The operation here is similar to lock The locks are almost identical, so I won't explain them in detail , It can also be seen from here CountDownLatch Our linked list is a two-way linked list
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
2.countDown() Count minus 1 operation
I don't want to see this , Also follow the front lock The operation of lock release is the same
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// Counter state Perform minus one operation , When minus 1 The operation after is 0 Back when true, Otherwise return to false
if (tryReleaseShared(arg)) {
// Only when state The value of is reduced to 1 Wake up the thread of the header node
doReleaseShared();
return true;
}
return false;
}
// Perform wake-up thread operation
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus; // obtain head State of thread
if (ws == Node.SIGNAL) {
// If the state is -1 Indicates waiting for wakeup
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // Try to change the status of the thread from -1 The wait for wakeup status is changed to 0
continue;
unparkSuccessor(h); // Wake up the head Threads
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
3、 ... and 、 Handwriting CountDownLatch Look at the execution logic
public class MyCountDownLatch {
private Sync sync;
public MyCountDownLatch(int count) {
sync = new Sync(count);
}
/** * Make the current thread blocked */
public void await() {
sync.acquireShared(1);
}
/** * Counter -1 operation */
public void countDown() {
sync.releaseShared(1);
}
// be based on AQS The inner class of
class Sync extends AbstractQueuedSynchronizer {
public Sync(int count) {
setState(count);
}
/** * If the return result is <0 Under the circumstances , Then put the current thread into aqs In a two-way list * * @param arg * @return */
@Override
protected int tryAcquireShared(int arg) {
// If aqs The state of >0, Then put the current thread into aqs In a two-way list Return equal to 0 -1;
return getState() == 0 ? 1 : -1;
}
/** * If the method returns true Under the circumstances , Just wake up the blocked thread * getState() == 0 -1 If ==0 return true * >0 false * * @param arg * @return */
@Override
protected boolean tryReleaseShared(int arg) {
for (; ; ) {
int oldState = getState();
if (oldState == 0) {
return false;
}
int newState = oldState - arg;
if (compareAndSetState(oldState, newState)) {
return newState == 0;
}
}
}
}
}
perform demo Method validation :
public static void main(String[] args) {
MyCountDownLatch mayiktCountDownLatch = new MyCountDownLatch(2);
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " End of thread execution ");
mayiktCountDownLatch.countDown();
}).start();
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " End of thread execution ");
mayiktCountDownLatch.countDown();
}).start();
System.out.println(" The main thread begins to enter the waiting process ");
mayiktCountDownLatch.await();
}
Finally, the execution result , The main thread enters the wait state , Until the two threads have finished executing countDown Then continue to execute :
Four 、Semaphore Semaphore
Semaphore Used to restrict access to certain resources ( Physical or logical ) Number of threads for , He maintains a collection of licenses , Limit how many resources you need to maintain license sets , If there is N A resource , That corresponds to N A license , There can only be... At the same time N Thread access . When a thread obtains a license, it calls acquire Method , When you run out of free resources, call release Method .
It is simply understood as Semaphore Semaphores can limit the flow of threads ( The maximum number of threads executing at the same time ), The bottom layer is also based on AQS Don't read the specific source code , Let's talk about the usage .
**acquire():** Get a license , Get one license at a time Semaphore Available licenses for (state) The number of 1, by 0 When the following thread enters the blocking state .
**release():** Release a license , Release one license at a time Semaphore Available licenses for (state) The quantity of is added 1, Wake up a waiting thread .
public class Test001 {
// Suppose the parking lot has 5 A parking space , Then the maximum can only stop 5 The line behind the car , Threads are cars
public static void main(String[] args) throws InterruptedException {
// Set up AQS Status as 5 There can only be 5 Thread execution code Current limiting Long can increase capacity 5 personal
Semaphore semaphore = new Semaphore(5);
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
try {
// AQS The status will subtract 1, Until state The status of is 0 Enter the two-way blocking queue
semaphore.acquire(); // Enter the parking lot
System.out.println(Thread.currentThread().getName() + ", Parking ");
// AQS state +1 At the same time, wake up a thread in the blocking queue
semaphore.release(); // Get out of the parking lot
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
Content sources : Ant class
边栏推荐
- AcWing 889. 01 sequence satisfying the condition (Cartland number)
- Data consistency between redis and database
- 0xc000007b应用程序无法正常启动解决方案(亲测有效)
- 液压滑环的特点讲解
- El cascade echo failed; El cascader does not echo
- Redis database deployment and common commands
- 导电滑环使用的注意事项
- Print stream and system setout();
- Understand several related problems in JVM - JVM memory layout, class loading mechanism, garbage collection
- Web Security (x) what is OAuth 2.0?
猜你喜欢

Actual combat: gateway api-2022.2.13

导电滑环使用的注意事项

Use and principle of wait notify

数字金额加逗号;js给数字加三位一逗号间隔的两种方法;js数据格式化

Day 05 - file operation function

Fluentd is easy to use. Combined with the rainbow plug-in market, log collection is faster

Numeric amount plus comma; JS two methods of adding three digits and a comma to numbers; JS data formatting

Solution: drag the Xib control to the code file, and an error setvalue:forundefined key:this class is not key value coding compliant for the key is reported
![[RootersCTF2019]babyWeb](/img/b4/aa8f8e107a9dacbace72d4717b1834.png)
[RootersCTF2019]babyWeb

0xc000007b the application cannot start the solution normally (the pro test is valid)
随机推荐
printk 调试总结
工业导电滑环的应用
云原生存储解决方案Rook-Ceph与Rainbond结合的实践
AcWing 885. Find the combination number I (recursive preprocessing)
0xc000007b the application cannot start the solution normally (the pro test is valid)
[data recovery in North Asia] a data recovery case of raid crash caused by hard disk drop during data synchronization of hot spare disk of RAID5 disk array
液压滑环的特点讲解
AcWing 888. Finding combinatorial number IV (the problem of finding combinatorial number with high precision)
Detailed explanation of set
Rainbond结合NeuVector实践容器安全管理
数字金额加逗号;js给数字加三位一逗号间隔的两种方法;js数据格式化
Programmers dig "holes" to get rich: if they find a loophole, they will be rewarded 12.72 million yuan
How to meet the requirements of source code confidentiality and source code security management
Txncoordsender of cockroachdb distributed transaction source code analysis
How to create a progress bar that changes color according to progress
Set集合详细讲解
Print stream and system setout();
AcWing 884. Gauss elimination for solving XOR linear equations
Set set detailed explanation
[RootersCTF2019]babyWeb