当前位置:网站首页>Multithreading killer ---countdownlatch & cyclicbarrier

Multithreading killer ---countdownlatch & cyclicbarrier

2022-06-10 13:17:00 Boring people in the early morning

Multithreading is an indispensable roadblock in our study , Pry the stone open , You need to master JUC.JUC Some of the concurrency and synchronization features under are also described in previous articles , As follows :
Multithreading Basics

What is? JMM as well as volatile Characteristics of
Detailed discussion CAS as well as ABA problem

Thread pool and blocking queue

AQS Understanding , about AQS It is an important knowledge point , The following articles can help you understand AQS How to perform work at the bottom
Talk to you about AQS Understanding

AQS Analysis of the core principle of

Java Concurrent high-frequency interview questions : I want to talk about you. AQS The understanding of the ?

in general AQS In concurrency, it is equivalent to human brain , One inside state, A thread that is currently locked , One FIFO The blocking queue of the thread solves the problem of thread safety ,

Then I'll move on to today's topic Multithreading countDownLatch and CyclicBarrier How to execute concurrently , And the difference between them :
CountDownLatch and CyclicBarrier Can achieve the wait between threads , But they have different priorities ; in addition ,CountDownLatch Can't be reused , and CyclicBarrier Can be reused
 Insert picture description here  Insert picture description here After reading it, you will find CountDownLatch and CyclicBarrier Of the two await Method will block the current thread , So if you use... In the thread pool CyclicBarrier, If the thread pool has core The size is less than parties, Threads in the thread pool are always blocked , contrary CountDownLatch Of countDown() After execution , Exit the current thread perfectly , But at the start of the call CountDownLatch.await() When , Need to be in a child thread , Otherwise, the current thread will be blocked .

If you don't understand the above , You can see the following two specific code comparisons :

CyclicBarrier:

1、CyclicBarrier: A synchronous auxiliary class , Used to coordinate multiple sub threads , Let multiple child threads wait in front of this barrier , Until all child threads have reached this barrier , Continue to perform the following actions together .

2、 Use scenario example :
At the end of the year, the company organized the league construction , Every employee is required to work on Saturday morning 8 spot 【 Self driving 】 Gather at the gate of the company , then 【 Self driving 】 Go to the destination .
In this case , The company is the main thread , Employees as child threads .

package com.test.spring.support;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/** * @author javaloveiphone * @date  Creation time :2017 year 1 month 25 Japan   In the morning 10:59:11 * @Description: */
public class Company {
    
    public static void main(String[] args) throws InterruptedException {
    

        // Number of employees 
        int count = 5;
        // Create counters 
        CyclicBarrier barrier = new CyclicBarrier(count+1);

        // Creating a thread pool , You can create 
        //ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1,1,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(count));
        ExecutorService threadPool =  Executors.newFixedThreadPool(count);

        System.out.println(" The company sends a notice , Every employee on Saturday morning 8 spot 【 Self driving 】 Gather at the gate of the company ");
        for(int i =0;i<count ;i++){
    
            // Add a child thread to the thread pool to execute 
            threadPool.execute(new Employee(barrier,i+1));
            Thread.sleep(10);
        }
        try {
    
            // Block the current thread , Not until all employees arrive at the gate of the company 
            barrier.await();
            Thread.sleep(10);
            //  Causes the current thread to wait until the latch counts down to zero , Unless the thread is interrupted or the specified waiting time is exceeded .
            //latch.await(long timeout, TimeUnit unit)
            System.out.println(" All employees have arrived at the gate of the company , Company leaders 【 Self driving 】 Go to the activity destination with the employees .");
        } catch (InterruptedException e) {
    
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
    
            e.printStackTrace();
        }finally{
    
            // Finally, close the thread pool , But perform tasks previously submitted , No new assignments 
            threadPool.shutdown();
            // Close thread pool , Stop all active tasks in progress , Suspend processing of waiting tasks , And return to the list of tasks waiting to be performed .
            //threadPool.shutdownNow();
        }
    }
}

// Distributed worker threads 
class Employee implements Runnable{
    

    private CyclicBarrier barrier;
    private int employeeIndex;

    public Employee(CyclicBarrier barrier,int employeeIndex){
    
        this.barrier = barrier;
        this.employeeIndex = employeeIndex;
    }

    @Override
    public void run() {
    
        try {
    
            System.out.println(" staff :"+employeeIndex+", Gathering at the gate of the company ...");
            Thread.sleep(10*employeeIndex);
            System.out.println(" staff :"+employeeIndex+", Have reached .");
            barrier.await();
            Thread.sleep(10);
            System.out.println(" staff :"+employeeIndex+",【 Self driving 】 Go to the destination ");
        } catch (InterruptedException e) {
    
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
    
            e.printStackTrace();
        }
    }
}

 Insert picture description here Sub thread executed await() Method , Must wait for all other child threads to execute await() After method , To continue the follow-up together (await after main Of ) Work , Just like the above example , All self driving vehicles must arrive at the gate of the company , Then we can continue to drive to the destination together .
but , The main thread await() Subsequent work and sub threads await() The subsequent work will not be affected , As long as all the sub threads execute await() Method , The main thread is now ready for subsequent work , There is no need to pipe threads await() Method follow-up work .

What you can see is that once you reach the barrier , Each carries out his own , It is not executed in the order of previous arrival .

CountDownLatch

1、CountDownLatch: A synchronization tool class , It allows one or more threads to wait , Do not execute until the operation of other threads is finished .

2、ThreadPoolExecutor/ExecutorService: Thread pool , Threads can be reused using thread pools , Reduce the performance consumption caused by frequent thread creation , At the same time, the creation of threads 、 start-up 、 stop it 、 It is easier to destroy .

3、 Use scenario example :
At the end of the year, the company organized the league construction , Every employee is required to work on Saturday morning 8 Gather at the gate of the company , Take the bus rented by the company to the destination .
In this case , The company is the main thread , Employees as child threads .

package com.test.thread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/** * @author javaloveiphone * @date  Creation time :2017 year 1 month 25 Japan   In the morning 10:59:11 * @Description: */
public class Company {
    
    public static void main(String[] args) throws InterruptedException {
    

        // Number of employees 
        int count = 5;
        // Create counters 
        // The quantity value passed in by the construction parameter represents latch.countDown() Number of calls 
        CountDownLatch latch = new CountDownLatch(count);

        // Creating a thread pool , You can create 
        //ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1,1,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(count));
        ExecutorService threadPool =  Executors.newFixedThreadPool(count);

        System.out.println(" The company sends a notice , Every employee on Saturday morning 8 Gather at the gate of the company ");
        for(int i =0;i<count ;i++){
    
            // Add a child thread to the thread pool to execute 
            Thread.sleep(10);
            threadPool.execute(new Employee(latch,i+1));
        }
        try {
    
            // Block the current thread , Not until all employees arrive at the gate of the company 
            latch.await();
            //  Causes the current thread to wait until the latch counts down to zero , Unless the thread is interrupted or the specified waiting time is exceeded .
            //latch.await(long timeout, TimeUnit unit)
            System.out.println(" All employees have arrived at the gate of the company , The bus starts , Go to the event destination .");
        } catch (InterruptedException e) {
    
            e.printStackTrace();
        }finally{
    
            // Finally, close the thread pool , But perform tasks previously submitted , No new assignments 
            threadPool.shutdown();
            // Close thread pool , Stop all active tasks in progress , Suspend processing of waiting tasks , And return to the list of tasks waiting to be performed .
            //threadPool.shutdownNow();
        }
    }
}

// Distributed worker threads 
class Employee implements Runnable{
    

    private CountDownLatch latch;
    private int employeeIndex;

    public Employee(CountDownLatch latch,int employeeIndex){
    
        this.latch = latch;
        this.employeeIndex = employeeIndex;
    }

    @Override
    public void run() {
    
        try {
    
            System.out.println(" staff :"+employeeIndex+", Gathering at the gate of the company ...");
            Thread.sleep(10);
            System.out.println(" staff :"+employeeIndex+", Have reached .");
        } catch (Exception e) {
    
            e.printStackTrace();
        }finally{
    
            // The current calculation has been completed , Counter minus one 
            latch.countDown();  
            try {
    
                Thread.sleep(10);
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
            // perform coutDown() after , Continue with your work , Not affected by the main thread 
            System.out.println(" staff :"+employeeIndex+", having dinner 、 Drink lots of water 、 Taking pictures .");
        }
    }
}

 Insert picture description here After every employee arrives , perform countDown() Method , Until all employees arrive , The counter for 0, The main thread will continue to execute .

But the child thread executes countDown() Method , Then I will continue my work , Like the one above 【 having dinner 、 Drink lots of water 、 Taking pictures 】, Is not blocked by the main thread 、 Whether other threads have executed countDown() The influence of the method .

CyclicBarrier And CountDownLatch The difference between :

1)、 The parameters passed in by constructing the two objects are different : structure CyclicBarrier Specific structure CountDownLatch The parameter of is too large 1, The reason is the structure CyclicBarrier The number of is called await() The number of times , structure CountDownLatch The number of is called countDown() The number of times ;

2)、 The child thread called barrier.await() after , Must wait for all child threads to complete barrier.await() After calling, you can continue your subsequent work together , And the child thread calls latch.countDown() after , Will continue the child thread's own work , Don't wait for other child threads latch.countDown() Call status .

3)、CyclicBarrier Can be recycled , and CountDownLatch Not recycled .

Popular is :
countDownLatch Similar to the school 4*100 Relay race ; altogether count by 4 Time (4 circle ), When a person finishes running , He can rest , Drink water ( Continue to execute the child thread ), The same goes for everyone else , When everyone has finished running ,count=0 了 , The referee announced the result (main Threads ).

CyclicBarrier: One day the students had class , The teacher in the class came early , Waiting for those students to come one after another . The teacher lectured (main Threads ), Listen to the class , Note taking ( Sub thread ). The students haven't come yet , Teachers can't give lectures (main The thread is blocked ), Other students in the classroom cannot take notes , Because the teacher hasn't started yet ( The child thread waits in front of the barrier ), When everyone arrives , Start doing your own thing , The teacher lectured , They take notes below , Pass a note, etc . Don't disturb each other .

Let's talk about Google plug-ins ListeningExecutorService How to use

Due to the common thread pool , Back to Future, The function is relatively simple ;Guava Defined ListenableFuture Interface and inherits JDK concurrent Under bag Future Interface ,ListenableFuture Allows you to register callback methods (callbacks), In the operation ( Multithreaded execution ) It's called when it's done .

1. How to use it is as follows :

1. Creating a thread pool 
2. Decorate thread pool 
3. Task processing 
4. Callback function processing 
5. Processing after all tasks are completed 

Will execute the callback method , Executed successfully , If the implementation is unsuccessful, follow the method
 Insert picture description here Screenshot of successful execution :

 Insert picture description here

Modify the waiting expiration time as follows :
 Insert picture description here
Thread pool waiting 5 second , Automatically end the thread in five seconds .
 Insert picture description here Yes shutdownNow Method , The execution is interrupted , Not implemented because this example is basic Callable All failed to receive the returned Runnable Threads , Thread inheritance Runnable Those threads that can receive returns that have not been executed .

The full code of the above case is as follows :

 Insert picture description here

pom rely on

 <dependencies>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>18.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.3.15</version>
        </dependency>
        
    </dependencies>


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;

/** * @author Shuyu.Wang * @package:com.ganinfo.test * @className: * @description: * @date 2018-10-28 19:35 **/
public class AuthCallable implements Callable {
    

    private AuthType authType;
    /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */
    @Override
    public Object call() throws Exception {
    
        if ("1".equals(authType.getType())){
    
            System.out.println(" Mode one "+authType.getName()+" to grant authorization start");
            Thread.sleep(8000);
            System.out.println(" Mode 1 authorization end");
            return authType.getName();
        }
        if ("2".equals(authType.getType())){
    
            System.out.println(" Mode two "+authType.getName()+" to grant authorization start");
            Thread.sleep(7000);
            System.out.println(" Mode 2 authorization end");
            return authType.getName();
        }
        if ("3".equals(authType.getType())){
    
            System.out.println(" Mode three "+authType.getName()+" to grant authorization start");
            Thread.sleep(5000);
            System.out.println(" Mode 3 authorization end");
            return authType.getName();
        }
        if ("4".equals(authType.getType())){
    
            System.out.println(" Mode 4 "+authType.getName()+" to grant authorization start");
            Thread.sleep(3000);
            System.out.println(" Mode 4 authorization end");
            return authType.getName();
        }
        if ("5".equals(authType.getType())){
    
            System.out.println(" Methods five "+authType.getName()+" to grant authorization start");
            Thread.sleep(1000);
            System.out.println(" Mode 5 authorization end");
            return authType.getName();
        }
        return null;
    }

    public void setAuthType(AuthType authType) {
    
        this.authType = authType;
    }
}

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/** * @author Shuyu.Wang * @package:com.ganinfo.test * @className: * @description: * @date 2018-10-28 19:41 **/
public class AuthService {
    
    public static void main(String[] args) {
    
            long start=System.currentTimeMillis();
            final List<String> list = new ArrayList<>();
            int count=5;
            try {
    
                final CountDownLatch countDownLatch = new CountDownLatch(count);
                ExecutorService executorService = Executors.newFixedThreadPool(8);
                ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
                for (int i = 1; i <= count; i++) {
    
                    AuthCallable authCallable = new AuthCallable();
                    AuthType authType = new AuthType();
                    authType.setType(String.valueOf(i));
                    authType.setName(String.valueOf(i) + " name ");
                    authCallable.setAuthType(authType);
                    ListenableFuture listenableFuture = listeningExecutorService.submit(authCallable);
                    Futures.addCallback(listenableFuture, new FutureCallback<String>() {
    
                        @Override
                        public void onSuccess(String name) {
    
                            System.out.println(" Authorization results " + name);
                            list.add(name);
                            countDownLatch.countDown();
                        }
                        @Override
                        public void onFailure(Throwable throwable) {
    
                            countDownLatch.countDown();
                            System.out.println(" Processing error :"+throwable);
                        }
                    });
                }
                try {
    
                    executorService.shutdown();
                    //shutdown After calling , No more submit new task, already submit Will continue to carry out .
                    if (!countDownLatch.await(5, TimeUnit.MINUTES)) {
    
                        System.out.println(" When timeout occurs, an interrupt is issued to all threads in the thread pool ");
                        //  When timeout occurs, an interrupt is issued to all threads in the thread pool (interrupted).
                        executorService.shutdownNow();
                    }
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                    //shutdownNow Attempting to stop the currently executing task, And return those that have not yet been executed task Of list
                    // In this case, we inherit Callable, So we can't receive the returned Runable Interface 
                    List<Runnable> listRun =executorService.shutdownNow();
                }
                System.out.println(" Execution results " + list.toString());
                long end=System.currentTimeMillis();
                System.out.println(" when " + (end-start));
            } catch (Exception ex) {
    
                ex.printStackTrace();
            }
        }
}

import lombok.Data;

/** * @author Shuyu.Wang * @package:com.ganinfo.test * @className: * @description: * @date 2018-10-28 19:34 **/
@Data
public class AuthType {
    
    private String type;
    private String name;
}

In a word shutdown and shutdownNow The difference between

shutdown Just set the state of the thread pool to SHUTWDOWN state , The task being carried out will continue to be carried out , If not executed, interrupt . and shutdownNow Set the status of the thread pool to STOP, The task being performed is stopped , If the task is not executed, it returns .

Reference article :
Java Concurrent programming –CountDownLatch Coordination thread pool
java Multithreading CyclicBarrier Examples of use , Let the thread start

java Multithreading CountDownLatch And thread pool ThreadPoolExecutor/ExecutorService Examples of use

https://blog.csdn.net/weixin_39800144/article/details/82776523

原网站

版权声明
本文为[Boring people in the early morning]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/161/202206101112046818.html

随机推荐