当前位置:网站首页>Countdowncatch and completabilefuture and cyclicbarrier
Countdowncatch and completabilefuture and cyclicbarrier
2022-06-30 18:38:00 【Jingling cat】
Multithreaded tool class CompletableFuture
Multithread synchronization tool class countDownLatch
Preface
Future and Promise.Future Equivalent to one Place holder , Representing one Operate on future results . commonly adopt get Sure Direct blocking results , Or let it Execute asynchronously and then pass callback Callback results .
get Do not operate in Task distribution The loop body Inside , Otherwise the whole operation will No Multithreading asynchronous Operation
But if The callback is embedded in the callback Well ? If the level is deep , It's hell .
Java Medium CompletableFuture In fact, that is Promise, be used for Solve the problem of callback to hell .Promise In order to Make the code beautiful and exist
How beautiful ? Let me put it this way , Once you use CompletableFuture, I can't help it , Just like the first girlfriend , Thinking of her every day
CountDownLatch Countdown
CountDownLatch be used for The main thread waits for other sub thread tasks to complete before executing , It allows the One or more threads always wait for , Do not execute until the operation of other threads is finished
CountDownLatch Through a Counter To achieve , Counter Of Initial value yes Number of threads .
whenever When a thread has finished executing , The value of the counter is -1 , When the counter value is 0 when , Express All threads have finished executing
then On locking The waiting thread can Carry on 了

Scenarios used : Such as Multi module data accelerated loading 、 Manage the downstream interface timeout of mass data etc.
CountDownLatch The main method :
await() : call await() Methodical Threads Meeting suspended , It will wait until count The value is 0 only Carry on
await(long timeout,TimeUnit unit) : It's just After waiting for a certain time count Value hasn't changed to 0 If you do, you'll Carry on ( Applicable to scenarios where partial data loss is allowed )
countDown() : take count Value reduction 1
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class Test020 {
/** * Thread pool */
private static final ExecutorService QUERY_POOL = new ThreadPoolExecutor(
10, 10,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10000),
new ThreadPoolExecutor.DiscardPolicy());
public static void main(String[] args) {
long start = System.currentTimeMillis();
try {
List<Long> resultList = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
QUERY_POOL.execute(() -> {
try {
resultList.addAll(dohandler());
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
System.out.println(" result :" + resultList + " Time consuming :" + (System.currentTimeMillis() - start));
} catch (Exception e) {
System.out.println(" Something goes wrong ");
}
}
public static List<Long> dohandler() {
try {
Thread.sleep(2500);
return Lists.newArrayList(123L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}

// Create initialization 3 A pool of threads
private ExecutorService threadPool = Executors.newFixedThreadPool(3);
// Save the average grade of each student
private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(); // Note that concurrency is used here map
private CountDownLatch countDownLatch = new CountDownLatch(3);
private void count() {
for (int i = 0; i < 3; i++) {
threadPool.execute(() -> {
// Calculate the average score of each student , The code is slightly () Assuming that 60~100 The random number
int score = (int) (Math.random() * 40 + 60);
try {
Thread.sleep(Math.round(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName() + " The average grade of the students is " + score);
countDownLatch.countDown();
});
}
this.run();
threadPool.shutdown();
}
@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = 0;
Set<String> set = map.keySet();
for (String s : set) {
result += map.get(s);
}
System.out.println(" The average score of the three is :" + (result / 3) + " branch ");
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
CyclicBarrier1 cb = new CyclicBarrier1();
cb.count();
Thread.sleep(100);
long end = System.currentTimeMillis();
System.out.println(end - now);
}
CompletableFuture
CompletableFuture It's right Future Application of pattern namely Realization , Support Stream call 、 Asynchronous execution , It supports Be notified after completion CompletableFuture By default, it will use ForkJoinPool Chi Lai Provide threads to perform tasks
From its source code , We can see ,CompletableFuture Directly provided Several convenient static method portals . Among them is run and supply Two groups

ForkJoinPool The thread pool is in JDK 8 Join in , The main usage is the same as the previous thread pool , It's also Give the task to the thread pool to execute , Thread pool There are also Task queue to store tasks , Different from the previous five thread pools , it Ideal for performing tasks that can be decomposed into subtasks , such as Tree traversal , Merge sort , Or some other recursive scenario

run The parameter is Runnable no return value
supply The parameter is Supplier There is a return value
runAsync(Runnable runnable) : Synchronous execution , Use the default thread pool
runAsync(Runnable runnable, Executor executor) : Synchronous execution , Manual thread pool
supplyAsync(Supplier supplier) : Asynchronous execution , Use the default thread pool
supplyAsync(Supplier supplier, Executor executor): Asynchronous execution , Manual thread pool
These two groups Static functions , provide Pass in the function of custom thread pool . If you Not using an external thread pool , Then it will use default ForkJoin Thread pool . The default thread pool , Size and use are beyond your control , So I suggest you pass one .
Typical code , It looks like this :
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
return "test";
});
String result = future.join();
CompletableFuture Main role of , Namely Make the code look good . coordination Java8 After that Stream flow , The whole computing process can be abstracted into a flow .
The calculation result of the previous task , It can be directly used as the input of the following tasks , It's like a pipe
api And function
thenApply
thenApplyAsync
thenAccept
thenAcceptAsync
thenRun
thenRunAsync
thenCombine
thenCombineAsync
thenCompose
thenComposeAsync
The following code executes as a result 99, It is not because it is asynchronous that the sequence of code execution is disrupted
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
.thenApplyAsync((e) -> {
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
return e * 10;
}).thenApplyAsync(e -> e - 1);
cf.join();
System.out.println(cf.get());
The function also depends on then The following verb :
apply There are input parameters and return values , The input parameter is the output of the predecessor task
accept There is an input parameter but no return value , Returns the CompletableFuture
run There is no input parameter and no return value , It will also return to CompletableFuture
combine Form a composite structure , Connect two CompletableFuture, And put their 2 Output results , As combine The input of
compose Will be nested CompletableFuture Lay it flat , Used to connect two in series CompletableFuture
The function list above , In fact, there are many
when
handle
when It means , It is the callback when the task is completed .
Let's take our example above , Intend to finish the task , Output one done. It also belongs to the category of only input parameters but no output parameters , Suitable for observation in the last step .
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
.thenApplyAsync((e) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
return e * 10;
}).thenApplyAsync(e -> e - 1)
.whenComplete((r, e)->{
System.out.println("done");
})
;
cf.join();
System.out.println(cf.get());
handle and exceptionally The role of , and whenComplete It's very similar
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture The task of is tandem , If an exception occurs in one of its steps , Will affect the operation of subsequent code .
exceptionally You can tell by the name , It is specialized in dealing with this kind of situation . such as , We force a step to divide by 0, Something goes wrong , Return after capture -1, It will be able to continue to run .
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
.thenApplyAsync(e->e/0)
.thenApplyAsync(e -> e - 1)
.exceptionally(ex->{
System.out.println(ex);
return -1;
});
cf.join();
System.out.println(cf.get());
handle More advanced , Because it has an exception parameter , There is also a normal input . The treatment methods are similar , I won't repeat .
replace CountDownLatch
A business interface , need Processing hundreds of requests , After the request Then sum up these results .
If it's sequential , Assume that each interface takes 100ms, that 100 Interface , Time consuming 10 second . If we If you get them in parallel , Then the efficiency will be improved .
Use CountDownLatch Can solve :
ExecutorService executor = Executors.newFixedThreadPool(5);
CountDownLatch countDown = new CountDownLatch(requests.size());
for(Request request:requests){
executor.execute(()->{
try{
//some opts
}finally{
countDown.countDown();
}
});
}
countDown.await(200,TimeUnit.MILLISECONDS);
Use CompletableFuture To replace CountDownLatch:
ExecutorService executor = Executors.newFixedThreadPool(5);
List<CompletableFuture<Result>> futureList = requests.stream()
.map(request->
CompletableFuture.supplyAsync(e->{
//some opts
},executor))
.collect(Collectors.toList());
CompletableFuture<Void> allCF = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
allCF.join();
allOf, Used to put all the CompletableFuture Put together ; also anyOf, Indicates that only one of the .
frequently-used , There are three more functions :
thenAcceptBoth: Dealing with two tasks , There are two task results entered , No return value
thenCombine: Dealing with two tasks , There are input parameters and return values , like best
runAfterBoth: Dealing with two tasks , Without the participation , No return value
CyclicBarrier
CyclicBarrier and CountDownLatch Both of these tools are in java.util.concurrent package Next
CyclicBarrier Literally, it means It can be recycled (Cyclic) The barrier (Barrier). What it has to do is , Give Way A set of threads arrive A barrier ( It can also be called synchronization point ) when Blocked , straight When the last thread reaches the barrier , The barrier will open , all The thread intercepted by the barrier will continue to work .
The reason why this barrier is decorated with cycles , Because after all threads release each other
This barrier can be reused (reset() Method to reset the barrier point ), This is related to CountDownLatch Different
CyclicBarrier It's a kind of Synchronization mechanism Allow a group of threads to wait for each other , wait until All threads are arrive A barrier point only sign out await Method , it No direct implementation AQS It is With the help of ReentrantLock To realize the synchronization mechanism
CyclicBarrier It's recyclable
CountDownLatch It's disposable
In addition, its semantics is also similar to CountDownLatch Different
CountDownLatch Reduce Count The arrival condition is release The way
CyclicBarrier Strike barrier point (await) It's using Acquire The way
Acquire It will block , This also achieved CyclicBarrier Another feature of , as long as There is a thread interrupt
that Barrier point Just Broken , All threads All will Awakened (CyclicBarrier Be responsible for the implementation of this part , Not by AQS The scheduling of )
This also avoids other threads waiting because one thread is interrupted and can never reach the barrier point .
Barrier point Broken Of CyclicBarrier Will no longer be available ( Will throw out BrokenBarrierException) Unless you execute reset operation
Method
CyclicBarrier Yes Two constructors
CyclicBarrier(int parties) int Parameters of type Express Several threads To participate in this Barrier interception ,( Take the example above , That is, a few people travel with a group );
CyclicBarrier(int parties,Runnable barrierAction) When all threads Reach a barrier point when , priority barrierAction This thread
await(): Call per thread await(), Indicates that we have arrived at Barrier point , then Current thread Blocked
private static ExecutorService executor = Executors.newFixedThreadPool(10);
private static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
private void count() {
for (int i = 0; i < 3; i++) {
executor.execute(() -> {
// Calculate the average score of each student , The code is slightly () Assuming that 60~100 The random number
int score = (int) (Math.random() * 40 + 60);
try {
Thread.sleep(Math.round(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName() + " The average grade of the students is " + score);
try {
// Run... After execution await(), Wait until the average scores of all students have been calculated
cyclicBarrier.await();
this.run();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
public void run() {
int result = 0;
Set<String> set = map.keySet();
for (String s : set) {
result += map.get(s);
}
System.out.println(" The average score of the three is :" + (result / 3) + " branch ");
}

and countDownLatch difference
CountDownLatch Of Counter Can only be used once
CyclicBarrier The counter of can be used reset() Method reset
therefore CyclicBarrier Can deal with More complex Business scenario of , For example, if you calculate An error occurred , Sure Reset counter , And let the threads do it again
CyclicBarrier Also provide Other useful methods
getNumberWaiting Method can obtain CyclicBarrier Obstructed Number of threads
isBroken Method to know Whether the blocked thread is interrupted
CountDownLatch Will block the main thread ,CyclicBarrier It won't block The main thread , It's just blocking Sub thread
A thread interrupt CyclicBarrier It throws an exception , Avoided All threads Wait indefinitely
CountDownLatch: One or more threads , wait for Other multiple threads Do something before you do it
CyclicBarrier: Multiple threads Waiting for each other , until arrive Same synchronization point , Again Continue to work together
about CountDownLatch Come on , The key is “ One thread ( Multiple threads ) wait for ”, While the rest of the N Threads stay complete “ Something ” after , Can terminate , You can also wait
And for CyclicBarrier, The key is Multiple threads , stay Any thread Not completed , All threads have to wait
CountDownLatch yes Counter , Threads Complete a record , It's just Count Not incremental but Decline
CyclicBarrier More like a valve , All threads are required All arrived , valve To open , And then go ahead and do it
summary
about Nesting of various callbacks ,CompletableFuture For us More intuitive 、 More beautiful API. stay “ Multiple tasks waiting to be completed ” This application scenario ,CompletableFuture useful
边栏推荐
- Lenovo's "dual platform" operation and maintenance solution helps to comprehensively improve the intelligent management ability of the intelligent medical industry
- Classic problem of leetcode dynamic programming (I)
- MySQL advanced - basic index and seven joins
- ASP. Net authentication code login
- Elastic 8.0: opening a new era of speed, scale, relevance and simplicity
- Post office - post office issues (dynamic planning)
- 使用excel快速生成sql语句
- Redis (IV) - delete policy
- Only black-and-white box test is required for test opening post? No, but also learn performance test
- 腾讯持久化框架MMKV原理探究
猜你喜欢

Dropout: immediate deactivation

基于eNSP的校园网设计的仿真模拟

It's not easy to say I love you | use the minimum web API to upload files

基於SSH的網上商城設計

ASP. Net generate verification code

清华只能排第3?2022软科中国大学AI专业排名发布
![The company was jailed for nonstandard bug during the test ~ [cartoon version]](/img/cd/42ab3fc0000fa7dfe2ac89de3486e4.jpg)
The company was jailed for nonstandard bug during the test ~ [cartoon version]

Tsinghua only ranks third? 2022 release of AI major ranking of Chinese Universities of soft science

医院在线问诊小程序源码 互联网医院源码 智慧医院源码

News management system based on SSM
随机推荐
MySQL n'a pas pu trouver MySQL. Solution temporaire pour le fichier Sock
ASP. Net password encryption and password login
剑指 Offer 16. 数值的整数次方
「杂谈」对数据分析未来的几点思考
MySQL找不到mysql.sock文件的臨時解
OneFlow源码解析:算子签名的自动推断
Communication network electronic billing system based on SSH
ASP. Net generate verification code
使用excel快速生成sql语句
电子元器件行业在线采购系统精准匹配采购需求,撬动电子产业数字化发展
Deep understanding of JVM (VI) -- garbage collection (III)
如何做好软件系统的需求调研,七种武器让你轻松搞定
Redis (III) - transaction
Tide - 基于 async-std 的 Rust-web 框架
Redis (IV) - delete policy
基于eNSP的校园网设计的仿真模拟
Rust 文件系统处理之文件读写 - Rust 实践指南
电子元器件招标采购商城:优化传统采购业务,提速企业数字化升级
Grep output with multiple colors- Grep output with multiple Colors?
Hcip (Huawei Senior Network Security Engineer) (Experiment 8) (MPLS basic experiment)