30 Write your own concurrency tool class (Semaphore, CyclicBarrier, CountDownLatch) What is the experience ?

Preface

In this article, we first introduce three tools Semaphore, CyclicBarrier, CountDownLatch How to use , Then carefully analyze the principles of the internal implementation of these three tools , Finally, it will be used with you ReentrantLock Implement these three tools .

Use of concurrent tool classes

CountDownLatch

CountDownLatch The main function is to allow one or more threads to wait for other threads to complete the operation . For example, we now have a task , Yes \(N\) Threads will go to the array data[N] The corresponding position puts data according to different tasks , After each thread puts the data , The main thread needs to sum all the data in this array , That is, the main thread needs to be blocked before each thread is put in ! In such a scenario , We can use CountDownLatch.

The code of the above problem :

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch; public class CountDownLatchDemo { public static int[] data = new int[10]; public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10); for (int i = 0; i < 10; i++) {
int temp = i;
new Thread(() -> {
Random random = new Random();
data[temp] = random.nextInt(100001);
latch.countDown();
}).start();
} // Only function latch.countDown() At least be called 10 Time
// The main thread will not be blocked
// This 10 Is in CountDownLatch Initialize the passed 10
latch.await();
System.out.println(" The sum result is :" + Arrays.stream(data).sum());
}
}

In the code above , The main thread calls latch.await(); Block yourself , Then you need to wait for other threads to call methods latch.countDown() Only this method is called the number of times equal to that given during initialization CountDownLatch When passing parameters , The main thread will be released .

CyclicBarrier

CyclicBarrier What it has to do is , Let one Group threads reach a barrier ( It can also be called synchronization point ) When is blocked , Until the last thread reaches the barrier , The barrier will open , All threads blocked by the barrier will continue to run . We usually also CyclicBarrier Referred to as Roadblock .

Sample code :

public class CycleBarrierDemo {

    public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(5); for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " Start the waiting ");
// All threads will call this line of code
// The number of threads called in this line of code is insufficient 5
// All threads will be blocked here at the end of the month
// Only to 5 When , this 5 Threads will be released
// So this line of code is called synchronization point
barrier.await();
// If there is a sixth thread executing this line of code
// The sixth thread will also be blocked Know the first 10
// The thread executes this line of code 6-10 this 5 Threads
// Will be released
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " Waiting for completion ");
}).start();
}
}
}

We're initializing CyclicBarrier Object time , The number passed is 5, This number means only 5 When threads reach the synchronization point , that 5 Threads will be released at the same time , And if it comes 6 A thread , Threads that are not released for the first time must wait until the next time 5 Threads have reached the synchronization point barrier.await() when , It will be released 5 Threads .

  • Like at the beginning 5 The status of threads is as follows , Synchronization point Not yet 5 Threads arrive at , Therefore, it will not be released .

  • When there is 5 Threads or more arrive Synchronization point barrier.await() When , It will be released 5 Threads , Note that 5 Threads , If there are many threads, you must wait until the next set 5 Threads will be released again , That is to say, only release each time 5 Threads , This is also called CyclicBarrier( Circular barricade ) Why ( Because each release 5 Threads , Count again after release , Until another 5 A new thread is coming , Before it is released again ).

Semaphore

Semaphore Semaphore ) Generally speaking, it is to control the number of threads that can execute a certain piece of code , He can control the concurrency of the program !

semaphore.acquire

\(\mathcal{R}\)

semaphore.release

Like the one above acquire and release Code between \(\mathcal{R}\) Is the code we need to control , We can go through Semaphore Control how many threads can execute code at a certain time \(\mathcal{R}\). There is a counter inside the semaphore , When we initialize, it is set to \(N\), When a thread calls acquire Function time , The counter needs to be reduced by one , call release Function, the counter needs to be increased by one , Only when the counter is greater than 0 when , Thread calls acquire Can enter the code block \(\mathcal{R}\), Otherwise, it will be blocked , Only threads call release Function time , Only blocked threads can be awakened , When you wake up, the counter will decrease by one .

Sample code :

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore mySemaphore = new Semaphore(5);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " Ready to enter the critical zone ");
try {
mySemaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " Has entered the critical zone ");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " Prepare to leave the critical zone ");
mySemaphore.release();
System.out.println(Thread.currentThread().getName() + " Has left the critical zone ");
}).start();
}
}
}

Write the concurrency tool class by yourself

In this section, we mainly use ReentrantLock Implement the three concurrency tool classes we mentioned above , So you first need to understand ReentrantLock This tool .ReentrantLock There are two main functions in lock and unlock, It is mainly used for the protection of critical areas , Only one thread can enter at a time lock and unlock Surrounded code blocks . In addition, you also need to know ReentrantLock.newCondition function , This function will return a condition variable Condition, This conditional variable has three main functions awaitsignal and signalAll, The functions and effects of these three functions are similar to Object Class waitnotify and notifyAll equally , Before reading the following , First of all, you need to understand their usage .

  • Which thread calls the function condition.await, That thread will be suspended .
  • If a thread calls a function conditon.signal, Will awaken a being condition.await Function blocked thread .
  • If a thread calls a function conditon.signalAll, It will wake up all the condition.await Function blocked thread .

CountDownLatch

We are using CountDownLatch when , There will be thread calls CountDownLatch Of await function , Other threads will call CountDownLatch Of countDown function . stay CountDownLatch There will be a counter inside , The value of the counter can be set during initialization , Every time a thread calls countDown The value of the function counter will be reduced by one .

  • If the thread is calling await Function before , The value of the counter has been less than or equal to 0 when , call await The thread of the function will not block , Direct release .
  • If the thread is calling await Function before , The value of the counter is greater than 0 when , call await The thread of the function will be blocked , When another thread reduces the value of the counter to 0 when , Then this will reduce the counter to 0 Threads need to use condition.signalAll() The function will all the others be await Blocked function wakes up .
  • Threads can use functions if they want to block themselves condition.await(), If a thread reaches the condition of waking up other threads after entering the critical zone , We can use functions condition.signalAll() Wake up all functions await Blocked threads .

The above rule has already CountDownLatch The overall function of is clearly described , In order to explain the code clearly , I put the corresponding text explanation in the code :

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; public class MyCountDownLatch {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private int curValue; public MyCountDownLatch(int targetValue) {
// We need a variable to hold the value of the counter
this.curValue = targetValue;
} public void countDown() {
// curValue It's a shared variable
// We need to protect it with a lock
// So only one thread enters at a time lock Protect
// Code area of
lock.lock();
try {
// Every time you execute countDown Counters need to be reduced by one
// And if the counter equals 0 We need to wake up which ones are
// await Function blocked thread
curValue--;
if (curValue <= 0)
condition.signalAll();
}catch (Exception ignored){}
finally {
lock.unlock();
}
} public void await() {
lock.lock();
try {
// If curValue The value is greater than 0
// shows countDown Not enough calls
// The thread needs to be suspended Otherwise, it will be released directly
if (curValue > 0)
// Using conditional variables condition Suspend the thread
condition.await();
}catch (Exception ignored){}
finally {
lock.unlock();
}
}
}

We can use the following code to test our own CountDownLatch

public static void main(String[] args) throws InterruptedException {
MyCountDownLatch latch = new MyCountDownLatch(5);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
latch.countDown();
System.out.println(Thread.currentThread().getName() + "countDown Execution completed ");
}).start();
} for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "latch Execution completed ");
}).start();
}
}

CyclicBarrier

CyclicBarrier There is a roadblock ( Synchronization point ), All threads will be blocked after reaching the roadblock , When the number of blocked threads reaches the specified number , You need to release the specified number of threads .

  • stay CyclicBarrier There will be a data threadCount, It means you need to reach this at the barricade threadCount Only when there are threads , And it needs to be released threadCount Threads , Here we can cycle through functions condition.signal() To wake up a specified number of threads , So that they can be released . If the thread needs to block itself , You can use functions condition.await().
  • stay CyclicBarrier There needs to be a variable currentThreadNumber, Used to record the number of currently blocked threads .
  • Users can also give CyclicBarrier Pass in a Runnable object , This needs to be implemented when releasing Runnable object , You can open a new thread to execute this Runnable object , Or let this thread that wakes up other threads execute Runnable object .

According to the above CyclicBarrier requirement , The code is as follows ( Analysis and explanation are in the notes ):

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; public class MyCyclicBarrier { private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private int threadCount;
private int currentThreadNumber;
private Runnable runnable; public MyBarrier(int count) {
threadCount = count;
} /**
* Allow to pass in a runnable object
* This is executed when a batch of threads are released runnable function
* @param count
* @param runnable
*/
public MyBarrier(int count, Runnable runnable) {
this(count);
this.runnable = runnable;
} public void await() {
lock.lock();
currentThreadNumber++;
try {
// If the number of blocked threads is less than threadCount Blocking is required
// If this thread needs to wake up other threads
if (currentThreadNumber == threadCount) {
// Count again after release
// Because after release condition.await();
// The number of blocked threads is 0
currentThreadNumber = 0;
if (runnable != null) {
new Thread(runnable).start();
}
// Wake up the threadCount - 1 Threads Because the current thread
// It is already in the running state So just wake up threadCount - 1
// Blocked threads
for (int i = 1; i < threadCount; i++)
condition.signal();
}else {
// If the number is not reached, you need to block threads
condition.await();
}
}catch (Exception ignored){}
finally {
lock.unlock();
}
} }

The following is a test written by ourselves Roadblock Code for :

public static void main(String[] args) throws InterruptedException {
MyCyclicBarrier barrier = new MyCyclicBarrier(5, () -> {
System.out.println(Thread.currentThread().getName() + " Start a new thread ");
for (int i = 0; i < 1; i++) {
System.out.println(i);
}
}); for (int i = 0; i < 5; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " Get in the jam ");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
barrier.await();
System.out.println(Thread.currentThread().getName() + " Blocking complete ");
}).start();
}
}

Semaphore

Semaphore You can control the number of threads executing a critical section of code , stay Semaphore There will be two counters semCount and curCount.

  • semCount Indicates the number of threads that can execute critical area code .
  • curCount Indicates the number of threads executing critical zone code .

The implementation of this tool is not complicated , The specific analysis is in the notes :

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; public class MySemaphore { private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private int semCount;
private int curCount; public MySemaphore(int semCount) {
this.semCount = semCount;
} public void acquire() {
lock.lock();
try {
// Add one to the number of threads executing the code in the critical area
curCount++;
// If the number of threads is greater than the specified number of threads that can be executed
// You need to block the current thread
// Otherwise, it will be released directly
if (curCount > semCount) {
condition.await();
}
}catch (Exception ignored) {}
finally {
lock.unlock();
}
} public void release() {
lock.lock();
try {
// The thread executes the code in the critical area
// About to leave the critical zone therefore curCount
// It needs to be reduced by one
curCount--;
// If there is a thread blocking, you need to wake up the blocked thread
// If there are no blocked threads After this function is executed
// It will not affect the results Therefore, there is no need to
// if Judge
condition.signal();
// signal The function only calls signal Function before
// By await Function blocking threads have an impact If
// A thread calls await Function in signal function
// After performing , Then the previous time signal Function call
// It won't affect the next time await function
}catch (Exception ignored){}
finally {
lock.unlock();
}
}
}

Use the following code to test our own MySemaphore

public static void main(String[] args) {
MySemaphore mySemaphore = new MySemaphore(5);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
mySemaphore.acquire();
System.out.println(Thread.currentThread().getName() + " Has entered the critical zone ");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
mySemaphore.release();
System.out.println(Thread.currentThread().getName() + " Has left the critical zone ");
}).start();
}
}

summary

In this article, we mainly introduce how to use three commonly used tool classes in concurrency , Then we introduce the details of our own implementation of three tool classes , In fact, it's mainly the use of Condition variables, Realized , Because it can realize thread blocking and wakeup , In fact, as long as you understand Condition variables, How to use , And the requirements of the three tools can also be realized by yourself .

That's all the content of this article , I hope you can get something , I am a LeHung, See you next time !!!( Remember Like collection Oh !)


More wonderful content collections can be accessed to the project :https://github.com/Chang-LeHung/CSCore

Official account : Worthless research monk , Learn more about computers (Java、Python、 Fundamentals of computer system 、 Algorithm and data structure ) knowledge .

30 Write your own concurrency tool class (Semaphore, CyclicBarrier, CountDownLatch) What is the experience ? More articles about

  1. Concurrent tool class ——Semaphore

    This blog series is a summary of the process of learning concurrent programming . Because there are many articles , The time of writing is also relatively scattered , So I put together a catalog ( Portal ), Convenient access . Concurrent programming series blog portal Semaphore([' seməf :(r)]) The main ...

  2. Use of concurrent tool classes CountDownLatch,CyclicBarrier,Semaphore,Exchanger

    1.CountDownLatch Synchronization assist that allows one or more threads to wait until a set of operations performed in other threads complete . A CountDownLatch Initialize... With the given count . await Methods block , Until because of countDo ...

  3. Java Concurrent tool class Semaphore Application example

    package com.thread.test.thread; import java.util.Random; import java.util.concurrent.*; /** * Semaph ...

  4. j.u.c series (09)--- The concurrency tool class :CyclicBarrier

    Write it at the front CyclicBarrier Is a synchronization helper class , Allow a group of threads to wait for each other , Until reaching a public barrier point (common barrier point). Because it's time to barrier Can be reused after releasing the waiting thread , therefore ...

  5. JUC Concurrent tool class CyclicBarrier Synchronization barrier

    First look at it. CyclicBarrier Usage scenarios of : 10 An engineer came to the company to apply for , Recruitment methods are divided into written examination and interview . First , Wait for others to arrive , Start the written test : After the written examination , Go to the interview again . hold 10 As a person 10 Threads ,10 Between threads ...

  6. Java Multithreading concurrent tool class - Semaphore Semaphore Object explanation

    Java Multithreading concurrent tool class -Semaphore Object explanation Through the previous study , We already know that Java In the multithreading concurrency scenario, there are two more tool classes : Do addition CycliBarrier Object and subtractive CountDownL ...

  7. JUC Commonly used 4 Large concurrency utility class

    What is? JUC? JUC Namely java.util.concurrent package , This bag is commonly known as JUC, It's all about concurrency The package is located in java Below rt.jar Under the bag 4 Big common concurrency tool class : Count ...

  8. 【 Relearning Java】 Multi thread advanced ( Thread pool 、 Atomicity 、 Concurrent tool class )

    Thread pool Thread state Introduction When the thread is created and started , It doesn't go into execution as soon as it starts up , It's not always in execution . Thread objects have different states at different times . that Java What kinds of states exist in threads in ?Java Thread in The state is determined ...

  9. Java Concurrent programming - Concurrency tool classes and thread pools

    JUC Several commonly used concurrency tool classes are provided in , such as CountDownLatch.CyclicBarrier.Semaphore. CountDownLatch: countdownlatch Is a synchronization tool class ...

  10. Java Concurrent ( 13、 ... and ): Concurrent tool class —— Synchronization barrier CyclicBarrier

    Do summary 1.CyclicBarrier What is it? ? CyclicBarrier Literally means recyclable (Cyclic) The barrier (Barrier). What it has to do is , Let a group of threads reach a barrier ( It can also be called synchronization point ) ...

Random recommendation

  1. About python Learning records of functions

    1. Default parameters must point to immutable objects ! 2. extra = {'city': 'Beijing', 'job': 'Engineer'} Be careful kw To obtain the dict yes extra A copy of , Yes kw The change of will not affect the letter ...

  2. 【 turn 】 Seven years IT Seven summaries of experience

    http://www.unitymanual.com/thread-30000-1-1.html?_dsign=ebe6a043 1. Share the first lesson :“ Education represents the past . Ability represents the present . Learning power represents the future .” ...

  3. 【 Security 】requests and BeautifulSoup A profound

    web The question of safety , In order to find key A program written casually , Nowhere to put , Stick it up for now . # -*- coding: UTF-8 -*- __author__ = 'weimw' import requests from B ...

  4. CentOS 7 Installation of Mono&amp;MonoDevelop

    I read an article in the garden before that said that in CentOS 7 Installation on MonoDevelop Of , I've been trying to do it again, but I haven't done it , It rained heavily outside Shenzhen today , Just get this at home , Try in the future Linux It says C# play . This try , It's true ...

  5. password hdu

    Time Limit : 2000/1000ms (Java/Other)   Memory Limit : 65536/32768K (Java/Other) Total Submission(s) ...

  6. JAVA Lightweight file monitoring

    Original address :http://blog.csdn.net/three_man/article/details/31012903?utm_source=tuicool Introduce This paper mainly introduces a lightweight file monitor ...

  7. PHP in public、protected、private Permission modifier

    PHP There are three access modifiers in The default is public public( Public . Default ) protected( The protected ) private( Private ) Access right public protected private Within class ...

  8. 73. Set Matrix Zeroes( secondary )

    Given a m x n matrix, if an element is 0, set its entire row and column to 0. Do it in place. The focus is on spatial complexity ...

  9. Android UI( Two )DridView Menu for

    Jeff Lee blog:   http://www.cnblogs.com/Alandre/  ( Mud, brick, tile, pulp, carpenter ),retain the url when reproduced ! Thanks ...

  10. Sketch webView Summary of plug-in development technology

    I believe everyone is right Sketch Have a certain understanding of . In addition to the basic vector design function , Plugins make Sketch Keep a strong unique script .Sketch Open the third-party plug-in interface , Designers can easily find suitable plug-ins for their own way of working in hundreds of plug-ins ...