当前位置:网站首页>Multithreading killer ---countdownlatch & cyclicbarrier
Multithreading killer ---countdownlatch & cyclicbarrier
2022-06-10 13:17:00 【Boring people in the early morning】
Multithreading is an indispensable roadblock in our study , Pry the stone open , You need to master JUC.JUC Some of the concurrency and synchronization features under are also described in previous articles , As follows :
Multithreading Basics
What is? JMM as well as volatile Characteristics of
Detailed discussion CAS as well as ABA problem
Thread pool and blocking queue
AQS Understanding , about AQS It is an important knowledge point , The following articles can help you understand AQS How to perform work at the bottom
Talk to you about AQS Understanding
AQS Analysis of the core principle of
in general AQS In concurrency, it is equivalent to human brain , One inside state, A thread that is currently locked , One FIFO The blocking queue of the thread solves the problem of thread safety ,
Then I'll move on to today's topic Multithreading countDownLatch and CyclicBarrier How to execute concurrently , And the difference between them :
CountDownLatch and CyclicBarrier Can achieve the wait between threads , But they have different priorities ; in addition ,CountDownLatch Can't be reused , and CyclicBarrier Can be reused 
After reading it, you will find CountDownLatch and CyclicBarrier Of the two await Method will block the current thread , So if you use... In the thread pool CyclicBarrier, If the thread pool has core The size is less than parties, Threads in the thread pool are always blocked , contrary CountDownLatch Of countDown() After execution , Exit the current thread perfectly , But at the start of the call CountDownLatch.await() When , Need to be in a child thread , Otherwise, the current thread will be blocked .
If you don't understand the above , You can see the following two specific code comparisons :
CyclicBarrier:
1、CyclicBarrier: A synchronous auxiliary class , Used to coordinate multiple sub threads , Let multiple child threads wait in front of this barrier , Until all child threads have reached this barrier , Continue to perform the following actions together .
2、 Use scenario example :
At the end of the year, the company organized the league construction , Every employee is required to work on Saturday morning 8 spot 【 Self driving 】 Gather at the gate of the company , then 【 Self driving 】 Go to the destination .
In this case , The company is the main thread , Employees as child threads .
package com.test.spring.support;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** * @author javaloveiphone * @date Creation time :2017 year 1 month 25 Japan In the morning 10:59:11 * @Description: */
public class Company {
public static void main(String[] args) throws InterruptedException {
// Number of employees
int count = 5;
// Create counters
CyclicBarrier barrier = new CyclicBarrier(count+1);
// Creating a thread pool , You can create
//ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1,1,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(count));
ExecutorService threadPool = Executors.newFixedThreadPool(count);
System.out.println(" The company sends a notice , Every employee on Saturday morning 8 spot 【 Self driving 】 Gather at the gate of the company ");
for(int i =0;i<count ;i++){
// Add a child thread to the thread pool to execute
threadPool.execute(new Employee(barrier,i+1));
Thread.sleep(10);
}
try {
// Block the current thread , Not until all employees arrive at the gate of the company
barrier.await();
Thread.sleep(10);
// Causes the current thread to wait until the latch counts down to zero , Unless the thread is interrupted or the specified waiting time is exceeded .
//latch.await(long timeout, TimeUnit unit)
System.out.println(" All employees have arrived at the gate of the company , Company leaders 【 Self driving 】 Go to the activity destination with the employees .");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}finally{
// Finally, close the thread pool , But perform tasks previously submitted , No new assignments
threadPool.shutdown();
// Close thread pool , Stop all active tasks in progress , Suspend processing of waiting tasks , And return to the list of tasks waiting to be performed .
//threadPool.shutdownNow();
}
}
}
// Distributed worker threads
class Employee implements Runnable{
private CyclicBarrier barrier;
private int employeeIndex;
public Employee(CyclicBarrier barrier,int employeeIndex){
this.barrier = barrier;
this.employeeIndex = employeeIndex;
}
@Override
public void run() {
try {
System.out.println(" staff :"+employeeIndex+", Gathering at the gate of the company ...");
Thread.sleep(10*employeeIndex);
System.out.println(" staff :"+employeeIndex+", Have reached .");
barrier.await();
Thread.sleep(10);
System.out.println(" staff :"+employeeIndex+",【 Self driving 】 Go to the destination ");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
Sub thread executed await() Method , Must wait for all other child threads to execute await() After method , To continue the follow-up together (await after main Of ) Work , Just like the above example , All self driving vehicles must arrive at the gate of the company , Then we can continue to drive to the destination together .
but , The main thread await() Subsequent work and sub threads await() The subsequent work will not be affected , As long as all the sub threads execute await() Method , The main thread is now ready for subsequent work , There is no need to pipe threads await() Method follow-up work .
What you can see is that once you reach the barrier , Each carries out his own , It is not executed in the order of previous arrival .
CountDownLatch
1、CountDownLatch: A synchronization tool class , It allows one or more threads to wait , Do not execute until the operation of other threads is finished .
2、ThreadPoolExecutor/ExecutorService: Thread pool , Threads can be reused using thread pools , Reduce the performance consumption caused by frequent thread creation , At the same time, the creation of threads 、 start-up 、 stop it 、 It is easier to destroy .
3、 Use scenario example :
At the end of the year, the company organized the league construction , Every employee is required to work on Saturday morning 8 Gather at the gate of the company , Take the bus rented by the company to the destination .
In this case , The company is the main thread , Employees as child threads .
package com.test.thread;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** * @author javaloveiphone * @date Creation time :2017 year 1 month 25 Japan In the morning 10:59:11 * @Description: */
public class Company {
public static void main(String[] args) throws InterruptedException {
// Number of employees
int count = 5;
// Create counters
// The quantity value passed in by the construction parameter represents latch.countDown() Number of calls
CountDownLatch latch = new CountDownLatch(count);
// Creating a thread pool , You can create
//ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1,1,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(count));
ExecutorService threadPool = Executors.newFixedThreadPool(count);
System.out.println(" The company sends a notice , Every employee on Saturday morning 8 Gather at the gate of the company ");
for(int i =0;i<count ;i++){
// Add a child thread to the thread pool to execute
Thread.sleep(10);
threadPool.execute(new Employee(latch,i+1));
}
try {
// Block the current thread , Not until all employees arrive at the gate of the company
latch.await();
// Causes the current thread to wait until the latch counts down to zero , Unless the thread is interrupted or the specified waiting time is exceeded .
//latch.await(long timeout, TimeUnit unit)
System.out.println(" All employees have arrived at the gate of the company , The bus starts , Go to the event destination .");
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
// Finally, close the thread pool , But perform tasks previously submitted , No new assignments
threadPool.shutdown();
// Close thread pool , Stop all active tasks in progress , Suspend processing of waiting tasks , And return to the list of tasks waiting to be performed .
//threadPool.shutdownNow();
}
}
}
// Distributed worker threads
class Employee implements Runnable{
private CountDownLatch latch;
private int employeeIndex;
public Employee(CountDownLatch latch,int employeeIndex){
this.latch = latch;
this.employeeIndex = employeeIndex;
}
@Override
public void run() {
try {
System.out.println(" staff :"+employeeIndex+", Gathering at the gate of the company ...");
Thread.sleep(10);
System.out.println(" staff :"+employeeIndex+", Have reached .");
} catch (Exception e) {
e.printStackTrace();
}finally{
// The current calculation has been completed , Counter minus one
latch.countDown();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// perform coutDown() after , Continue with your work , Not affected by the main thread
System.out.println(" staff :"+employeeIndex+", having dinner 、 Drink lots of water 、 Taking pictures .");
}
}
}
After every employee arrives , perform countDown() Method , Until all employees arrive , The counter for 0, The main thread will continue to execute .
But the child thread executes countDown() Method , Then I will continue my work , Like the one above 【 having dinner 、 Drink lots of water 、 Taking pictures 】, Is not blocked by the main thread 、 Whether other threads have executed countDown() The influence of the method .
CyclicBarrier And CountDownLatch The difference between :
1)、 The parameters passed in by constructing the two objects are different : structure CyclicBarrier Specific structure CountDownLatch The parameter of is too large 1, The reason is the structure CyclicBarrier The number of is called await() The number of times , structure CountDownLatch The number of is called countDown() The number of times ;
2)、 The child thread called barrier.await() after , Must wait for all child threads to complete barrier.await() After calling, you can continue your subsequent work together , And the child thread calls latch.countDown() after , Will continue the child thread's own work , Don't wait for other child threads latch.countDown() Call status .
3)、CyclicBarrier Can be recycled , and CountDownLatch Not recycled .
Popular is :
countDownLatch Similar to the school 4*100 Relay race ; altogether count by 4 Time (4 circle ), When a person finishes running , He can rest , Drink water ( Continue to execute the child thread ), The same goes for everyone else , When everyone has finished running ,count=0 了 , The referee announced the result (main Threads ).
CyclicBarrier: One day the students had class , The teacher in the class came early , Waiting for those students to come one after another . The teacher lectured (main Threads ), Listen to the class , Note taking ( Sub thread ). The students haven't come yet , Teachers can't give lectures (main The thread is blocked ), Other students in the classroom cannot take notes , Because the teacher hasn't started yet ( The child thread waits in front of the barrier ), When everyone arrives , Start doing your own thing , The teacher lectured , They take notes below , Pass a note, etc . Don't disturb each other .
Let's talk about Google plug-ins ListeningExecutorService How to use
Due to the common thread pool , Back to Future, The function is relatively simple ;Guava Defined ListenableFuture Interface and inherits JDK concurrent Under bag Future Interface ,ListenableFuture Allows you to register callback methods (callbacks), In the operation ( Multithreaded execution ) It's called when it's done .
1. How to use it is as follows :
1. Creating a thread pool
2. Decorate thread pool
3. Task processing
4. Callback function processing
5. Processing after all tasks are completed
Will execute the callback method , Executed successfully , If the implementation is unsuccessful, follow the method
Screenshot of successful execution :

Modify the waiting expiration time as follows :
Thread pool waiting 5 second , Automatically end the thread in five seconds .
Yes shutdownNow Method , The execution is interrupted , Not implemented because this example is basic Callable All failed to receive the returned Runnable Threads , Thread inheritance Runnable Those threads that can receive returns that have not been executed .
The full code of the above case is as follows :

pom rely on
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.15</version>
</dependency>
</dependencies>
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
/** * @author Shuyu.Wang * @package:com.ganinfo.test * @className: * @description: * @date 2018-10-28 19:35 **/
public class AuthCallable implements Callable {
private AuthType authType;
/** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */
@Override
public Object call() throws Exception {
if ("1".equals(authType.getType())){
System.out.println(" Mode one "+authType.getName()+" to grant authorization start");
Thread.sleep(8000);
System.out.println(" Mode 1 authorization end");
return authType.getName();
}
if ("2".equals(authType.getType())){
System.out.println(" Mode two "+authType.getName()+" to grant authorization start");
Thread.sleep(7000);
System.out.println(" Mode 2 authorization end");
return authType.getName();
}
if ("3".equals(authType.getType())){
System.out.println(" Mode three "+authType.getName()+" to grant authorization start");
Thread.sleep(5000);
System.out.println(" Mode 3 authorization end");
return authType.getName();
}
if ("4".equals(authType.getType())){
System.out.println(" Mode 4 "+authType.getName()+" to grant authorization start");
Thread.sleep(3000);
System.out.println(" Mode 4 authorization end");
return authType.getName();
}
if ("5".equals(authType.getType())){
System.out.println(" Methods five "+authType.getName()+" to grant authorization start");
Thread.sleep(1000);
System.out.println(" Mode 5 authorization end");
return authType.getName();
}
return null;
}
public void setAuthType(AuthType authType) {
this.authType = authType;
}
}
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/** * @author Shuyu.Wang * @package:com.ganinfo.test * @className: * @description: * @date 2018-10-28 19:41 **/
public class AuthService {
public static void main(String[] args) {
long start=System.currentTimeMillis();
final List<String> list = new ArrayList<>();
int count=5;
try {
final CountDownLatch countDownLatch = new CountDownLatch(count);
ExecutorService executorService = Executors.newFixedThreadPool(8);
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
for (int i = 1; i <= count; i++) {
AuthCallable authCallable = new AuthCallable();
AuthType authType = new AuthType();
authType.setType(String.valueOf(i));
authType.setName(String.valueOf(i) + " name ");
authCallable.setAuthType(authType);
ListenableFuture listenableFuture = listeningExecutorService.submit(authCallable);
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String name) {
System.out.println(" Authorization results " + name);
list.add(name);
countDownLatch.countDown();
}
@Override
public void onFailure(Throwable throwable) {
countDownLatch.countDown();
System.out.println(" Processing error :"+throwable);
}
});
}
try {
executorService.shutdown();
//shutdown After calling , No more submit new task, already submit Will continue to carry out .
if (!countDownLatch.await(5, TimeUnit.MINUTES)) {
System.out.println(" When timeout occurs, an interrupt is issued to all threads in the thread pool ");
// When timeout occurs, an interrupt is issued to all threads in the thread pool (interrupted).
executorService.shutdownNow();
}
} catch (InterruptedException e) {
e.printStackTrace();
//shutdownNow Attempting to stop the currently executing task, And return those that have not yet been executed task Of list
// In this case, we inherit Callable, So we can't receive the returned Runable Interface
List<Runnable> listRun =executorService.shutdownNow();
}
System.out.println(" Execution results " + list.toString());
long end=System.currentTimeMillis();
System.out.println(" when " + (end-start));
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
import lombok.Data;
/** * @author Shuyu.Wang * @package:com.ganinfo.test * @className: * @description: * @date 2018-10-28 19:34 **/
@Data
public class AuthType {
private String type;
private String name;
}
In a word shutdown and shutdownNow The difference between
shutdown Just set the state of the thread pool to SHUTWDOWN state , The task being carried out will continue to be carried out , If not executed, interrupt . and shutdownNow Set the status of the thread pool to STOP, The task being performed is stopped , If the task is not executed, it returns .
Reference article :
Java Concurrent programming –CountDownLatch Coordination thread pool
java Multithreading CyclicBarrier Examples of use , Let the thread start
https://blog.csdn.net/weixin_39800144/article/details/82776523
边栏推荐
- 如果再写for循环,我就锤自己了
- How to configure multiple data sources
- Neuron Newsletter 2022-05|新增 2 個南向驅動和 1 個北向應用、Modbus TCP 實現定制擴展
- 12、 Process address space (PMAP; vdso; MMAP)
- 由文件图形丢失,说明自己都不用自己开发的OFFICE
- How about the one-stop machine learning opening platform mlflow?
- [raise bar C #] how to call the base of the interface
- [spark] (task8) pipeline channel establishment in sparkml
- Sohu employees encounter wage subsidy fraud. What is the difference between black property and gray property and how to trace the source?
- Some words from ShareIt group
猜你喜欢

Case sharing and implementation introduction of SAP field service management and wechat integration

Vdo-slam: a visual dynamic object aware slam system paper reading

Commencez par interpréter le Code généré automatiquement par la BDC et expliquez les composantes du programme de l'interface graphique SAP.

從解讀 BDC 自動生成的代碼談起,講解 SAPGUI 的程序組成部分
![[FAQ] summary of common problems and solutions during the use of rest API interface of sports health service](/img/73/c6c4c0d92e5adb2e831ea4a0290ee9.jpg)
[FAQ] summary of common problems and solutions during the use of rest API interface of sports health service

Today, a couple won the largest e-commerce IPO in Hong Kong

Code free may event Microsoft low code matrix update; Multiple industry reports released

Which EDA design software should Altium Allegro pads choose

Mobile phone manufacturers "go back to their ancestors", only apple said no

How about the one-stop machine learning opening platform mlflow?
随机推荐
Use and inspection of safety tools and instruments
If the files and graphics are lost, it means that you don't need the office developed by yourself
Which EDA design software should Altium Allegro pads choose
世贸组织MC12重启 议程重点关注全球经济复苏
汇编语言入门-总结
Can qiniu open an account? Is it safe to open an account in qiniu
施一公等团队登Science封面:AI与冷冻电镜揭示「原子级」NPC结构,生命科学突破
Site investigation system
超详细的FFmpeg安装及简单使用教程
Program, calculate 2/1+3/2+5/3+8/5 Value of. It is required to calculate the sum of the first n items and keep 2 decimal places (starting from the second item of the sequence, the numerator of each it
CF894C Marco and GCD Sequence
VDO-SLAM源码阅读笔记[1] Track()中动态obj部分
Simple integration of client go gin six list watch two (about the improvement of RS, pod and deployment)
The APK file does not exist on disk
2022年6月中国数据库排行榜:TiDB卷土重来摘桂冠,达梦蛰伏五月夺探花
Wei Lai: "pinches" the data and "pinches" the future
Nanomq newsletter 2022-05 | release of V0.8.0, new webhook extension interface and connection authentication API
百度程序员删库被判9个月,手机号一键解绑功能发布,推特再向马斯克妥协,今日更多大新闻在此...
TIDB 初级课程体验 8 (集群的管理维护, 添加一个TIKV节点)
Tidb elementary course experience 8 (cluster management and maintenance, adding a tikv node)