PrefaceAfter watching CountDownLatch I'm going to show you , Suddenly I saw a CyclicBarrier —— Loopback barrier . Walter ? Loop back and barrier ? Comparison CountDownLatch One more loop , Let's have a look , Have a look !
official account :『 Liu Zhihang 』, Record the skills in work study 、 Development and source notes ; From time to time to share some of the life experience . You are welcome to guide !
Introduce
A synchronization aid , It allows all the waits of a group of threads to reach a common barrier to each other .
CyclicBarrier It's useful in programs that involve a fixed number of threads and have to wait for each other .
This barrier is called a loopback barrier , Because it can be reused after the waiting thread is released .
CyclicBarrier Support an optional Runnable command , The command comes after the last thread in the barrier arrives , But before releasing any threads , Once per barrier point .
This barrier operation is useful for updating the shared state before either party continues .
Through the above source code annotation, we can basically draw the following conclusions :
- CyclicBarrier and CountDownLatch similar , But it's a set of threads waiting , Until a set of operations performed in other threads is complete .
- CountDownLatch It's counting down , Call when it's over await perhaps countdown Will return to , however CyclicBarrier You can reset the barrier .
- CyclicBarrier You can also pass in parameters Runnable ,Runnable Will be executed before the thread is released .
Basic use
Since the above three conclusions are summed up , Of course, the following three aspects demonstrate how to use :
- Barrier function
public class CyclicBarrierTest {
private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(11);
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
ExecutorService pool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 10; i++) {
pool.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " Start execution ");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " end of execution , Ready to call await");
CYCLIC_BARRIER.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
System.out.println(" Main thread execution —————————————— >>>");
CYCLIC_BARRIER.await();
System.out.println(" The main thread continues to execute —————————————— >>>");
pool.shutdown();
}
}
Through the above code, it actually simulates a similar CountDownLatch The function of , Let all threads wait , Until all call await after , Each thread continues to execute , At the same time, the main thread continues to execute .
But relative CountDownLatch To specify a thread or multiple waits , Until the execution of other threads ends , Wait for the thread to continue to execute ,CyclicBarrier Relatively speaking, it is still inferior .
The differences are summarized below :
- CountDownLatch Is the specified waiting thread , Other threads do countDown, So the count is 0 when , The waiting thread continues to execute .
- CyclicBarrier It's a set of thread calls await Wait for , When all goes into waiting , Together, this group will break through the barrier and continue to perform .
- Loopback function
public class CyclicBarrierTest2 {
private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(5);
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 5; i++) {
pool.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " Start execution ");
CYCLIC_BARRIER.await();
System.out.println(Thread.currentThread().getName() + " Break through the barrier >>> 1");
CYCLIC_BARRIER.await();
System.out.println(Thread.currentThread().getName() + " Break through the barrier >>>>> 2");
CYCLIC_BARRIER.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
pool.shutdown();
}
}
The use of the loop shown above .
- Loop Runnable
This only needs to be stated in the statement CyclicBarrier It can be amended as follows :
private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println(" Do it once Runnable ");
}
});
The results are as follows :
As you can see, it's just before the next count starts , Execute first Runnable . As to whether it's before releasing the barrier , It's easy , direct Debug Just walk around and you'll know ! There was a special video :
adopt debug It can be seen that Runnable Will be executed before the thread is released .
Question question ?
- CyclicBarrier and AQS What does it matter ?
- CyclicBarrier What is the implementation principle of ?
- CyclicBarrier How to realize loopback ?
The following with questions to read the source code , To find out !
Source code analysis
The basic structure
adopt UML At first glance ,CyclicBarrier and AQS It doesn't matter , Then start with Parameters 、 Constructors 、await() Method Look at the source code respectively .
Parameters
public class CyclicBarrier {
/**
* Each use of the barrier is represented as a build instance .
* broken Indicates whether the barrier has been broken .
*/
private static class Generation {
boolean broken = false;
}
/** lock */
private final ReentrantLock lock = new ReentrantLock();
/** Conditions wait , Until the barrier */
private final Condition trip = lock.newCondition();
/** Waiting to count */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** At present generation The newly created */
private Generation generation = new Generation();
/** Still waiting parties Number , Decline by 0 Reset */
private int count;
}
As can be seen from the above :
A static class is used internally Generation , What's the function of it ? We can learn from the notes that , Every time you use a barrier, it generates , What's the use , It's actually used to indicate whether the barrier has been broken .
There's another one inside parties It means waiting count ,count Indicates the count that is still waiting .
Read on !
Constructors
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
There are two entries here :
- parties( Waiting to count ): Record how many thread calls await after , To break the barrier together .
- barrierAction: The act of breaking through a barrier .
- But at the same time parties and count Assigned to the passed in parties.
One parameter construction , It's really just the barrierAction The assignment is null.
await() Method
In the example await() Method , Then from await() How to start :
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
await() It's the play , First, according to the source code annotation , Understand what it is , See what the author says :
- Wait until all parties call on this obstacle await.
If the current thread is not the last one to arrive , It is disabled for thread scheduling purposes , And make it dormant , Until one of the following happens :
- The last thread arrived ;
- Other threads interrupt the current thread ;
- Some other threads interrupt one of the other waiting threads ;
- Other threads timed out while waiting for the barrier ;
- Other threads call on this barrier reset.
See these , What we want to see most is of course 2.1 , Wait for the last thread to reach the barrier , After that, all the threads continue to execute together .
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// Lock
final ReentrantLock lock = this.lock;
lock.lock();
try {
// This generation is used here
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// Thread end interrupt flag
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// Decrement the count
int index = --count;
// If it is 0 be
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// No null Act first
if (command != null)
// This is not a new thread
command.run();
ranAction = true;
// The next generation
nextGeneration();
return 0;
} finally {
// When the mission is not successful , namely ranAction still false Break the barrier
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// The spin
for (;;) {
try {
// No timeout set
if (!timed)
// Has reached the awaited
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
// It's the next generation
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
This big chunk of code , There's no desire to see , Go straight to it !
therefore …… Look right here .
The code still needs to be read , Look at it separately ( The exception flow is omitted ):
- Used ReentrantLock The mutex , So right. count、broken The modification is atomic .
- Yes count Conduct --count operation , So we can understand why we say count It's the count that's still waiting , Or how much more to reach the barrier .
When count by 0 , It means that we have reached the barrier point
- command Not for null, Will execute first command.run(), It's worth noting that this is not a new thread .
- nextGeneration() Start a new generation , Reset count by parties.
- stay finally It uses breakBarrier() Break the barrier .
When count No 0
- The spin , Until it was 0.
There are two ways to do it :
private void nextGeneration() {
// Wake up the thread
trip.signalAll();
// to update count by parties
count = parties;
// to update Generation
generation = new Generation();
}
// Break the barrier , And wake up all of
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
reset()
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
Reset the barrier to its original state ,reset() Method is actually called breakBarrier() and nextGeneration(), The former breaks the current generation , The latter is the beginning of a new round .
summary
Q: CyclicBarrier and AQS What does it matter ?
A: By reading the source code , In fact, it was found that ReentrantLock The mutex as well as Condition Wait for wake-up function of .
Q: CyclicBarrier What is the implementation principle of ?
A: Inside there are two counts , Namely parties and count , Initially, the two are equal , When a thread calls await() when ,count Decline , as long as count Not for 0 , Will block the thread , until count Decrement to 0 when , All threads are released together , At the same time count Reset to parties.
Q: CyclicBarrier How to realize loopback ?
A: Use two counts ,count Decline , When count by 0 when , Will reset to parties, So as to achieve the loop effect .
Q: Why? count Of --count The operation is not used CAS?
A: Because already lock.lock() 了 , Used ReentrantLock The lock can guarantee count The atomicity of .
CyclicBarrier and CountDownLatch The difference between
- Loop :CyclicBarrier You can go back , Recount .CountDownLatch Only one round .
- Counter :CyclicBarrier The counter maintains its own decrement , CountDownLatch The maintenance of the counter is given to the user .
- Blocking threads :CyclicBarrier What's blocking is itself , When you reach the barrier , All blocked threads are released together .CountDownLatch You can specify blocking threads .
Conclusion
This paper mainly introduces CyclicBarrier In a common way , Through source code , Analyze how to achieve the barrier and loop effect . Wrong place , Please correct more .