当前位置:网站首页>[Part 7] source code analysis and application details of cyclicbarrier [key]
[Part 7] source code analysis and application details of cyclicbarrier [key]
2022-06-12 22:23:00 【__ Struggling Kaka】
1.1 summary
CyclicBarrier Literally, it means It can be recycled (Cyclic) The barrier (Barrier). What it has to do is , Let a group of threads reach a barrier ( It can also be called synchronization point ) When is blocked , Until the last thread reaches the barrier , The barrier will open , All threads blocked by the barrier will continue to work .
CyclicBarrier The default constructor is CyclicBarrier(int parties), Its parameters represent the number of threads intercepted by the barrier , Call per thread await The method tells CyclicBarrier I've reached the barrier , Then the current thread is blocked .
1.2 The source code parsing
First ,CyclicBarrier The source code and CountDownLatch Be the same in essentials while differing in minor points ,CountDownLatch be based on AQS The use of Shared patterns , and CyclicBarrier be based on Condition To achieve .
1.2.1 chart
From the above structure diagram ,CyclicBarrier Internal use ReentrantLock and Condition Two classes to ensure synchronization
1.2.2 initialization
final CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
System.out.println(" Thread collection completed ");
}
});
--------------------------------------------------------------------
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// Number of corresponding threads
this.parties = parties;
// The number of threads that are not currently blocked
this.count = parties;
// Thread of execution
this.barrierCommand = barrierAction;
}
private static class Generation {
boolean broken = false;
}
// An exclusive lock
private final ReentrantLock lock = new ReentrantLock();
// because CyclicBarrier It is reusable , A round represents a generation
private Generation generation = new Generation();
// When calling await after , Will be blocked .
private final Condition trip = lock.newCondition();
The constructor called by the initialization code needs to pass in a count And a Runnable object .
count Corresponding to the number of threads ,Runnable Corresponding to the last thread to be synchronized !
await()
Before reading the source code , Friends first understand CyclicBarrier A mechanism of :CyclicBarrier It's recyclable , Each cycle corresponds to an age , Corresponding CyclicBarrier There's one built in Generation class , Used to describe the state of the current era .
private static class Generation {
//n+1 Only one of the threads is interrupted or has a timeout , will broken Set to true
boolean broken = false;
}
Read the source code with this concept , The train of thought is very clear
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// Lock
lock.lock();
try {
final Generation g = generation;
// A thread timed out , Or it was interrupted
if (g.broken)
throw new BrokenBarrierException();
// If the thread is interrupted , Wake up all threads , interruptible
if (Thread.interrupted()) {
// Wakes up all waiting threads , Reset count, And will g.broken=true
breakBarrier();
// Throw an exception
throw new InterruptedException();
}
//await After calling ,count--
int index = --count;
if (index == 0) {
//count==0, All threads call await() 了 , It can be executed barrierCommand 了 .
// Whether the last thread has been executed
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// Start execution
command.run();
// Execute the last thread
ranAction = true;
// Wake up all waiting trip in condition queue Thread in , Start a new round of thread synchronization
nextGeneration();
return 0;
} finally {
// If you execute ranAction Failure , Indicates that an exception occurred
if (!ranAction)
// The above code has an exception , perform breakBarrier
breakBarrier();
}
}
// The code that runs here , All are n One of the threads n-1 Threads
for (;;) {
try {
// Blocking call
if (!timed)
// Blocking suspends the thread until condition queue
trip.await();
// It's called await(timeout), Set the timeout
else if (nanos > 0L)
// It's blocked at most nanos Time to suspend thread to condition queue
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// In the process of waiting , If the thread is interrupted , If other threads have not called breakBarrier, Then the current thread calls
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// If other threads have breakBarrier, Just interrupt yourself directly ok 了
Thread.currentThread().interrupt();
}
}
// If the normal process is interrupted , Throw out BrokenBarrierException
if (g.broken)
throw new BrokenBarrierException();
// If the normal upgrading of the words , return index
if (g != generation)
return index;
// If await If the timeout , It will be breakBarrier, Throw out TimeoutException
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void breakBarrier() {
//n+1 One of the threads was interrupted , perhaps await The request timed out , Just interrupt the current generation , Wake up all threads , Each thread handles this exception independently
// Set up broken by true
generation.broken = true;
// Reset count
count = parties;
// Wake up waiting for trip Of condition queue All threads in
trip.signalAll();
}
private void nextGeneration() {
// All threads in the current round are successfully executed , Reset year , Start the next synchronization
// Wake up waiting for trip Of condition queue All threads in
trip.signalAll();
// count Reset to parties
count = parties;
//generation Also reinitialize
generation = new Generation();
}
1.2.3 summary
1、CyclicBarrier The bottom layer is through a ReentrantLock、 One int Variable count、 One Condition、 One Generation To achieve .
technological process :
1、 First, initialize a ReentranLock object 、 take count A variable is assigned to parties, Initialize a Condition、 Initialize a Generation,Generation Only one of them broken Boolean two corresponds to whether the current synchronization state is broken , take Runnable Object saved to barrierCommand.
2、 When a thread calls await() When . Will snatch first lock lock , And then count- -. If count == 0, This indicates that all threads have been executed , You can start executing barrierCommand 了 . And through SignalAll() To wake up all waiting threads , And will Generation Reinitialize , take count Reassign to zero parties, Start the next synchronization .
3、 If count != 0, It indicates that there are other threads not completed , Just call condition.await() Method , Suspend the thread .
Tips
Generation.broken
When a thread is interrupted or suspended for timeout , take generation Of broken Set to true, The synchronization is broken , Will immediately wake up all previously suspended threads .
The new thread is operating count– Before , Will judge broken, If broken=true, It will throw an exception directly !
1.3 Code example
public class CyclicBarrierDemo {
static class TaskThread extends Thread {
CyclicBarrier barrier;
public TaskThread(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(getName() + " Get to the fence A");
barrier.await();
System.out.println(getName() + " Break through the fence A");
Thread.sleep(2000);
System.out.println(getName() + " Get to the fence B");
barrier.await();
System.out.println(getName() + " Break through the fence B");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int threadNum = 5;
CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Finish the final task ");
}
});
for(int i = 0; i < threadNum; i++) {
new TaskThread(barrier).start();
}
}
}
Execution results
Thread-1 Get to the fence A
Thread-3 Get to the fence A
Thread-0 Get to the fence A
Thread-4 Get to the fence A
Thread-2 Get to the fence A
Thread-2 Finish the final task
Thread-2 Break through the fence A
Thread-1 Break through the fence A
Thread-3 Break through the fence A
Thread-4 Break through the fence A
Thread-0 Break through the fence A
Thread-4 Get to the fence B
Thread-0 Get to the fence B
Thread-3 Get to the fence B
Thread-2 Get to the fence B
Thread-1 Get to the fence B
Thread-1 Finish the final task
Thread-1 Break through the fence B
Thread-0 Break through the fence B
Thread-4 Break through the fence B
Thread-2 Break through the fence B
Thread-3 Break through the fence B
边栏推荐
- 设计消息队列存储消息数据的 MySQL 表格
- 孙老师版本JDBC(2022年6月12日21:34:25)
- [Jianzhi offer] Jianzhi offer 09 Implementing queues with two stacks
- be careful! Your Navicat may have been poisoned
- MySQL introduction and installation (I)
- Implementation of master-slave replication and master-master replication for MySQL and MariaDB databases
- Is it safe to open an account in tonghuashun? How to open an account
- MySQL architecture and basic management (II)
- Things about the kotlin collaboration process - pipeline channel
- 【LeetCode】102. 二叉树的层序遍历
猜你喜欢
Design a MySQL table for message queue to store message data
C#读取word中表格数据
SQL query list all views in SQL Server 2005 database - SQL query to list all views in an SQL Server 2005 database
Flutter series part: detailed explanation of GridView layout commonly used in flutter
[image denoising] image denoising based on trilateral filter with matlab code
Ansible基础和常用模块(一)
MySQL introduction and installation (I)
"Oracle database parallel execution" technical white paper reading notes
接口测试工具apipost3.0版本对于流程测试和引用参数变量
Dolphin-2.0.3 cluster deployment document
随机推荐
Flutter series part: detailed explanation of GridView layout commonly used in flutter
反走样/抗锯齿技术
经济学人聚焦WTO MC12:数字经济或成重要议题
Why is pain rating important?
be careful! Your Navicat may have been poisoned
JVM Basics - > What are the thread shared areas in the JVM
Is it safe to open an account in tonghuashun? How to open an account
Mysql concat_ws、concat函数使用
孙老师版本JDBC(2022年6月12日21:34:25)
talent showing itself! Oceanbase was selected into the 2021 "sci tech innovation China" open source innovation list
Prefix sum and difference
【Proteus仿真】简易数码管定时器时钟
The kotlin coroutine -- coroutine context and exception propagation
PCB package download website recommendation and detailed usage
JVM foundation > CMS garbage collector
C#读取word中表格数据
USB机械键盘改蓝牙键盘
微信小程序提现功能
Hostvars in ansible
3.5 测试类的setup和teardown