当前位置:网站首页>[Part VI] source code analysis and application details of countdownlatch [key]
[Part VI] source code analysis and application details of countdownlatch [key]
2022-06-12 22:23:00 【__ Struggling Kaka】
1.1 background
CountDownLatch It's a kind of java.util.concurrent Package next Synchronization tool class , The tool classes introduced with it are CyclicBarrier、Semaphore、concurrentHashMap and BlockingQueue.
1.2 Concept
countDownLatch This class enables a thread to wait for other threads to execute after they have finished executing .
Realization principle : It is realized by a counter , The initial value of the counter is the number of threads . Every time a thread finishes executing , The value of the counter is -1, When the counter value is 0 when , Indicates that all threads have finished executing , Then the thread waiting on the lock can resume work .
1.3 Implementation principle analysis
1.3.1 CountDownLatch structure
As can be seen from the above figure ,CountDownLatch There is only one way to construct CountDownLatch(int count) This is to set the number of threads that the shared lock can hold at the same time .
CountDownLatch Internally, a Sync,Sync Inherited from AQS, And it is a shared lock mode .
1.3.1、 Create counters ( initialization )
When we call CountDownLatch countDownLatch=new CountDownLatch(4); When , A... Is created AQS The synchronization queue of , And create CountDownLatch The counter passed in is assigned to AQS Queued state, therefore state The value of also represents CountDownLatch The number of counts remaining ;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);// Initialize shared lock AQS, Create a synchronization queue , And set the initial counter value
}
// take Sync from AQS inherited count, Set to count、
Sync(int count) {
setState(count);
}
1.3.2、 Blocking threads
When we call countDownLatch.wait() When , Will create a node , Add to AQS Blocking queues , And suspend the current thread at the same time .
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Determine whether the counter has been executed , If it is not finished, the current thread will be added Blocking queues
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// If interrupted , Throw an exception
if (Thread.interrupted())
throw new InterruptedException();
// Application for Shared lock , If the application is successful, it will return
if (tryAcquireShared(arg) < 0)
// If the application fails , Just try to join the team and hang up
doAcquireSharedInterruptibly(arg);
}
-------------------------------------------------------------------------
protected int tryAcquireShared(int acquires) {
// be equal to 0 return 1, It's not equal to 0 return -1
return (getState() == 0) ? 1 : -1;
}
Build a two-way linked list of blocking queues , Suspends the current thread
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Add a new node to the blocking queue
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// Get the current node pre node
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);// Back to the lock state
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// Reorganize the two-way linked list , Empty invalid nodes , Suspends the current thread
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1.3.3、 Counter decrement
When we call countDownLatch.countDown() Method time , The counter will be decremented 1 operation ,AQS Inside is through The way the lock is released , Yes state Make a reduction 1 operation , When state=0 It is proved that the counter has been decremented , At this point AQS All node threads in the blocking queue wake up .
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// Release the lock
if (tryReleaseShared(arg)) {
// When all threads call countDown() after ,state be equal to 0, Wakes up the waiting thread ,
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// utilize CAS The spin , know CAS take state-1, If state by 0, return true, Not for 0, return false.
// When all threads call countDown() after ,state be equal to 0.
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
// If the queue is not empty , Wake up the second node in the queue
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// Wake up the
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; 、
}
if (h == head)
break;
}
}
unparkSuccessor(h)
LockSupport.unpark(s.thread);
Wake up the thread , Will go back to before await() Where it's blocked .
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Before AQS I said , take thread The packing is Node The team , I won't go into details
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// Attempt to acquire lock
int r = tryAcquireShared(arg);
if (r >= 0) {
// Successful lock snatching , return
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// If you don't get the lock , Just try to hang , If the lock grabbing process is interrupted , Throw an exception
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// Here, here, here, here
// The thread is awakened , Stop blocking if the thread is interrupted , Throw an exception
// If the thread is not interrupted , Just spin to ask for a lock , The lock request succeeds and then returns
//await Call complete , return
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1.3.4 summary
As can be seen from the above code , One thread call countDown() Will state-1.
await The role of is to be state!=0 When , The thread will block , That is to say count Threads successfully called countDown() after ,state It will be equal to 0,await() Will stop blocking .
So CountDownLatch It can be applied to the scenario where a thread waits for several threads .
Tips
1、 When a thread calls countDown() after , It won't block , Will continue to execute the code .
2、CountDownLatch Can only be used once , Do not reuse .
1.4 Sample code
1.4.1 Common examples
public class CountDownLatchTest {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);
System.out.println(" The main thread starts executing …… ……");
// The first sub thread executes
ExecutorService es1 = Executors.newSingleThreadExecutor();
es1.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println(" Sub thread :"+Thread.currentThread().getName()+" perform ");
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
}
});
es1.shutdown();
// The second sub thread executes
ExecutorService es2 = Executors.newSingleThreadExecutor();
es2.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" Sub thread :"+Thread.currentThread().getName()+" perform ");
latch.countDown();
}
});
es2.shutdown();
System.out.println(" Wait for two threads to finish executing …… ……");
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" Both sub threads are finished , Resume main thread ");
}
}
Execution results
The main thread starts executing …… ……
Wait for two threads to finish executing …… ……
Sub thread :pool-1-thread-1 perform
Sub thread :pool-2-thread-1 perform
Both sub threads are finished , Resume main thread
1.4.2 Simulation concurrency example
public class Parallellimit {
public static void main(String[] args) {
ExecutorService pool = Executors.newCachedThreadPool();
CountDownLatch cdl = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
CountRunnable runnable = new CountRunnable(cdl);
pool.execute(runnable);
}
}
}
class CountRunnable implements Runnable {
private CountDownLatch countDownLatch;
public CountRunnable(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
synchronized (countDownLatch) {
/*** One capacity reduction at a time */
countDownLatch.countDown();
System.out.println("thread counts = " + (countDownLatch.getCount()));
}
countDownLatch.await();
System.out.println("concurrency counts = " + (100 - countDownLatch.getCount()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
CountDownLatch and CyclicBarrier difference
1、countDownLatch It's a counter , Thread completes one record one , Counter decrement , Only once
2、CyclicBarrier The counter is more like a valve , All threads are required to arrive , And then go ahead and do it , Counter increment , Provide reset function , It can be used multiple times
边栏推荐
- June training (day 11) - matrix
- leetcodeSQL:574. Elected
- 基于51单片机的酒精检测仪
- The programmer dedicated to promoting VIM has left. Father of vim: I will dedicate version 9.0 to him
- Unity commonly used 3D mathematical calculation
- 证券开户有风险吗?怎么开户安全呢?
- JVM foundation - > three ⾊ mark
- Es6+ new content
- Is it safe to open an account in flush? How to open an account online to buy stocks
- 微信小程序提现功能
猜你喜欢

Redis optimization

Have you really learned the common ancestor problem recently?

be careful! Your Navicat may have been poisoned

Mr. Sun's version of JDBC (21:34:25, June 12, 2022)

JVM Basics - > What are the thread shared areas in the JVM

Database daily question --- day 10: combine two tables

JVM Basics - > how to troubleshoot JVM problems in your project

接口测试工具apipost3.0版本对于流程测试和引用参数变量

Prefix sum and difference

Configuring Dingding notification of SQL audit platform archery
随机推荐
2022-02-28 incluxdb high availability planning
[web technology] 1348- talk about several ways to implement watermarking
How to perform disaster recovery and recovery for kubernetes cluster? (22)
[probability theory and mathematical statistics] final review: formula summary and simple examples (end)
[data analysis] data clustering and grouping based on kmeans, including Matlab source code
【LeetCode】5. 最长回文子串
【图像去噪】基于三边滤波器实现图像去噪附matlab代码
Su embedded training day13 - file IO
flutter系列之:flutter中常用的GridView layout详解
Hostvars in ansible
[sword finger offer] sword finger offer 58 - ii Rotate string left
【Web技术】1348- 聊聊水印实现的几种方式
MySQL architecture and basic management (II)
项目里面的traceID的设计
Qt Quick 3D学习:使用鼠标键盘控制节点位置和方向
USB mechanical keyboard changed to Bluetooth Keyboard
QT quick 3D learning: use mouse and keyboard to control node position and direction
【LeetCode】数组中第K大的元素
Yyds dry goods inventory solution Huawei machine test: weighing weight
RAID disk array