当前位置:网站首页>Countdownlatch explodes instantly! Based on AQS, why can cyclicbarrier be so popular?

Countdownlatch explodes instantly! Based on AQS, why can cyclicbarrier be so popular?

2020-11-08 18:11:00 Liu Zhihang

Preface

After watching CountDownLatch I'm going to show you , Suddenly I saw a CyclicBarrier —— Loopback barrier . Walter ? Loop back and barrier ? Comparison CountDownLatch One more loop , Let's have a look , Have a look !

official account :『 Liu Zhihang 』, Record the skills in work study 、 Development and source notes ; From time to time to share some of the life experience . You are welcome to guide !

Introduce

A synchronization aid , It allows all the waits of a group of threads to reach a common barrier to each other .

CyclicBarrier It's useful in programs that involve a fixed number of threads and have to wait for each other .

This barrier is called a loopback barrier , Because it can be reused after the waiting thread is released .

CyclicBarrier Support an optional Runnable command , The command comes after the last thread in the barrier arrives , But before releasing any threads , Once per barrier point .

This barrier operation is useful for updating the shared state before either party continues .

Through the above source code annotation, we can basically draw the following conclusions :

  1. CyclicBarrier and CountDownLatch similar , But it's a set of threads waiting , Until a set of operations performed in other threads is complete .
  2. CountDownLatch It's counting down , Call when it's over await perhaps countdown Will return to , however CyclicBarrier You can reset the barrier .
  3. CyclicBarrier You can also pass in parameters Runnable ,Runnable Will be executed before the thread is released .

Basic use

Since the above three conclusions are summed up , Of course, the following three aspects demonstrate how to use :

- Barrier function

public class CyclicBarrierTest {

    private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(11);

    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {

        ExecutorService pool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
                new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 10; i++) {

            pool.submit(() -> {

                try {
                    System.out.println(Thread.currentThread().getName() + "  Start execution ");
                    Thread.sleep(5000);
                    System.out.println(Thread.currentThread().getName() + "  end of execution , Ready to call  await");
                    CYCLIC_BARRIER.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }

            });

        }

        System.out.println(" Main thread execution  —————————————— >>>");

        CYCLIC_BARRIER.await();

        System.out.println(" The main thread continues to execute  —————————————— >>>");

        pool.shutdown();

    }
}

Through the above code, it actually simulates a similar CountDownLatch The function of , Let all threads wait , Until all call await after , Each thread continues to execute , At the same time, the main thread continues to execute .

But relative CountDownLatch To specify a thread or multiple waits , Until the execution of other threads ends , Wait for the thread to continue to execute ,CyclicBarrier Relatively speaking, it is still inferior .

The differences are summarized below :

  1. CountDownLatch Is the specified waiting thread , Other threads do countDown, So the count is 0 when , The waiting thread continues to execute .
  2. CyclicBarrier It's a set of thread calls await Wait for , When all goes into waiting , Together, this group will break through the barrier and continue to perform .

- Loopback function

public class CyclicBarrierTest2 {

    private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(5);

    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
        
        ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
                new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 5; i++) {

            pool.submit(() -> {

                try {
                    System.out.println(Thread.currentThread().getName() + "  Start execution ");
                    CYCLIC_BARRIER.await();

                    System.out.println(Thread.currentThread().getName() + "  Break through the barrier  >>> 1");
                    CYCLIC_BARRIER.await();

                    System.out.println(Thread.currentThread().getName() + "  Break through the barrier  >>>>> 2");
                    CYCLIC_BARRIER.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }

            });

        }

        pool.shutdown();
    }
}

carbon-gzpBD4

The use of the loop shown above .

- Loop Runnable

This only needs to be stated in the statement CyclicBarrier It can be amended as follows :

private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(5, new Runnable() {
    @Override
    public void run() {
        System.out.println(" Do it once  Runnable ");
    }
});

The results are as follows :

carbon1-lHnKnA

As you can see, it's just before the next count starts , Execute first Runnable . As to whether it's before releasing the barrier , It's easy , direct Debug Just walk around and you'll know ! There was a special video :

cyclicBarrier-vl-5Bz3Xa

adopt debug It can be seen that Runnable Will be executed before the thread is released .

Question question ?

  1. CyclicBarrier and AQS What does it matter ?
  2. CyclicBarrier What is the implementation principle of ?
  3. CyclicBarrier How to realize loopback ?

The following with questions to read the source code , To find out !

Source code analysis

The basic structure

CleanShot-2020-09-12-KFzaCR0G@2x-seVhre

adopt UML At first glance ,CyclicBarrier and AQS It doesn't matter , Then start with Parameters Constructors await() Method Look at the source code respectively .

Parameters

public class CyclicBarrier {

    /**
     *  Each use of the barrier is represented as a build instance .
     * broken  Indicates whether the barrier has been broken .
     */
    private static class Generation {
        boolean broken = false;
    }

    /**  lock  */
    private final ReentrantLock lock = new ReentrantLock();
    /**  Conditions wait , Until the barrier  */
    private final Condition trip = lock.newCondition();
    /**  Waiting to count  */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /**  At present  generation  The newly created */
    private Generation generation = new Generation();
    /**  Still waiting  parties  Number , Decline   by  0  Reset  */
    private int count; 
}

As can be seen from the above :

A static class is used internally Generation , What's the function of it ? We can learn from the notes that , Every time you use a barrier, it generates , What's the use , It's actually used to indicate whether the barrier has been broken .

There's another one inside parties It means waiting count ,count Indicates the count that is still waiting .

Read on !

Constructors

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

There are two entries here :

  • parties( Waiting to count ): Record how many thread calls await after , To break the barrier together .
  • barrierAction: The act of breaking through a barrier .
  • But at the same time parties and count Assigned to the passed in parties.

One parameter construction , It's really just the barrierAction The assignment is null.

await() Method

In the example await() Method , Then from await() How to start :

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

await() It's the play , First, according to the source code annotation , Understand what it is , See what the author says :

  1. Wait until all parties call on this obstacle await.
  2. If the current thread is not the last one to arrive , It is disabled for thread scheduling purposes , And make it dormant , Until one of the following happens :
    1. The last thread arrived ;
    2. Other threads interrupt the current thread ;
    3. Some other threads interrupt one of the other waiting threads ;
    4. Other threads timed out while waiting for the barrier ;
    5. Other threads call on this barrier reset.

See these , What we want to see most is of course 2.1 , Wait for the last thread to reach the barrier , After that, all the threads continue to execute together .


private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    
    //  Lock 
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {

        //  This generation is used here 
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();
        //  Thread end interrupt flag 
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        //  Decrement the count 
        int index = --count;
        //  If it is  0  be 
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                //  No  null  Act first 
                if (command != null)
                    //  This is not a new thread 
                    command.run();
                ranAction = true;
                //  The next generation 
                nextGeneration();
                return 0;
            } finally {
                //  When the mission is not successful , namely  ranAction  still  false  Break the barrier 
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        //  The spin 
        for (;;) {
            try {
                //  No timeout set 
                if (!timed)
                //  Has reached the awaited 
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();
            //  It's the next generation 
            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

This big chunk of code , There's no desire to see , Go straight to it !

therefore …… Look right here .

The code still needs to be read , Look at it separately ( The exception flow is omitted ):

  1. Used ReentrantLock The mutex , So right. count、broken The modification is atomic .
  2. Yes count Conduct --count operation , So we can understand why we say count It's the count that's still waiting , Or how much more to reach the barrier .
  3. When count by 0 , It means that we have reached the barrier point
    1. cyclicbarrier-amQMu4
    2. command Not for null, Will execute first command.run(), It's worth noting that this is not a new thread .
    3. nextGeneration() Start a new generation , Reset count by parties.
    4. stay finally It uses breakBarrier() Break the barrier .
  4. When count No 0
    1. The spin , Until it was 0.

There are two ways to do it :

private void nextGeneration() {
    //  Wake up the thread 
    trip.signalAll();
    //  to update  count  by  parties
    count = parties;
    //  to update  Generation
    generation = new Generation();
}
//  Break the barrier , And wake up all of 
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

reset()


public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

Reset the barrier to its original state ,reset() Method is actually called breakBarrier() and nextGeneration(), The former breaks the current generation , The latter is the beginning of a new round .

summary

Q: CyclicBarrier and AQS What does it matter ?
A: By reading the source code , In fact, it was found that ReentrantLock The mutex as well as Condition Wait for wake-up function of .

Q: CyclicBarrier What is the implementation principle of ?
A: Inside there are two counts , Namely parties and count , Initially, the two are equal , When a thread calls await() when ,count Decline , as long as count Not for 0 , Will block the thread , until count Decrement to 0 when , All threads are released together , At the same time count Reset to parties.

Q: CyclicBarrier How to realize loopback ?
A: Use two counts ,count Decline , When count by 0 when , Will reset to parties, So as to achieve the loop effect .

Q: Why? count Of --count The operation is not used CAS?
A: Because already lock.lock() 了 , Used ReentrantLock The lock can guarantee count The atomicity of .

CyclicBarrier and CountDownLatch The difference between

  1. Loop :CyclicBarrier You can go back , Recount .CountDownLatch Only one round .
  2. Counter :CyclicBarrier The counter maintains its own decrement , CountDownLatch The maintenance of the counter is given to the user .
  3. Blocking threads :CyclicBarrier What's blocking is itself , When you reach the barrier , All blocked threads are released together .CountDownLatch You can specify blocking threads .

Conclusion

This paper mainly introduces CyclicBarrier In a common way , Through source code , Analyze how to achieve the barrier and loop effect . Wrong place , Please correct more .

版权声明
本文为[Liu Zhihang]所创,转载请带上原文链接,感谢