当前位置:网站首页>Countdowncatch and completabilefuture and cyclicbarrier
Countdowncatch and completabilefuture and cyclicbarrier
2022-06-30 18:38:00 【Jingling cat】
Multithreaded tool class CompletableFuture
Multithread synchronization tool class countDownLatch
Preface
Future and Promise.Future Equivalent to one Place holder , Representing one Operate on future results . commonly adopt get Sure Direct blocking results , Or let it Execute asynchronously and then pass callback Callback results .
get Do not operate in Task distribution The loop body Inside , Otherwise the whole operation will No Multithreading asynchronous Operation
But if The callback is embedded in the callback Well ? If the level is deep , It's hell .
Java Medium CompletableFuture In fact, that is Promise, be used for Solve the problem of callback to hell .Promise In order to Make the code beautiful and exist
How beautiful ? Let me put it this way , Once you use CompletableFuture, I can't help it , Just like the first girlfriend , Thinking of her every day
CountDownLatch Countdown
CountDownLatch be used for The main thread waits for other sub thread tasks to complete before executing , It allows the One or more threads always wait for , Do not execute until the operation of other threads is finished
CountDownLatch Through a Counter To achieve , Counter Of Initial value yes Number of threads .
whenever When a thread has finished executing , The value of the counter is -1 , When the counter value is 0 when , Express All threads have finished executing
then On locking The waiting thread can Carry on 了

Scenarios used : Such as Multi module data accelerated loading 、 Manage the downstream interface timeout of mass data etc.
CountDownLatch The main method :
await() : call await() Methodical Threads Meeting suspended , It will wait until count The value is 0 only Carry on
await(long timeout,TimeUnit unit) : It's just After waiting for a certain time count Value hasn't changed to 0 If you do, you'll Carry on ( Applicable to scenarios where partial data loss is allowed )
countDown() : take count Value reduction 1
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class Test020 {
/** * Thread pool */
private static final ExecutorService QUERY_POOL = new ThreadPoolExecutor(
10, 10,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10000),
new ThreadPoolExecutor.DiscardPolicy());
public static void main(String[] args) {
long start = System.currentTimeMillis();
try {
List<Long> resultList = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
QUERY_POOL.execute(() -> {
try {
resultList.addAll(dohandler());
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
System.out.println(" result :" + resultList + " Time consuming :" + (System.currentTimeMillis() - start));
} catch (Exception e) {
System.out.println(" Something goes wrong ");
}
}
public static List<Long> dohandler() {
try {
Thread.sleep(2500);
return Lists.newArrayList(123L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}

// Create initialization 3 A pool of threads
private ExecutorService threadPool = Executors.newFixedThreadPool(3);
// Save the average grade of each student
private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(); // Note that concurrency is used here map
private CountDownLatch countDownLatch = new CountDownLatch(3);
private void count() {
for (int i = 0; i < 3; i++) {
threadPool.execute(() -> {
// Calculate the average score of each student , The code is slightly () Assuming that 60~100 The random number
int score = (int) (Math.random() * 40 + 60);
try {
Thread.sleep(Math.round(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName() + " The average grade of the students is " + score);
countDownLatch.countDown();
});
}
this.run();
threadPool.shutdown();
}
@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = 0;
Set<String> set = map.keySet();
for (String s : set) {
result += map.get(s);
}
System.out.println(" The average score of the three is :" + (result / 3) + " branch ");
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
CyclicBarrier1 cb = new CyclicBarrier1();
cb.count();
Thread.sleep(100);
long end = System.currentTimeMillis();
System.out.println(end - now);
}
CompletableFuture
CompletableFuture It's right Future Application of pattern namely Realization , Support Stream call 、 Asynchronous execution , It supports Be notified after completion CompletableFuture By default, it will use ForkJoinPool Chi Lai Provide threads to perform tasks
From its source code , We can see ,CompletableFuture Directly provided Several convenient static method portals . Among them is run and supply Two groups

ForkJoinPool The thread pool is in JDK 8 Join in , The main usage is the same as the previous thread pool , It's also Give the task to the thread pool to execute , Thread pool There are also Task queue to store tasks , Different from the previous five thread pools , it Ideal for performing tasks that can be decomposed into subtasks , such as Tree traversal , Merge sort , Or some other recursive scenario

run The parameter is Runnable no return value
supply The parameter is Supplier There is a return value
runAsync(Runnable runnable) : Synchronous execution , Use the default thread pool
runAsync(Runnable runnable, Executor executor) : Synchronous execution , Manual thread pool
supplyAsync(Supplier supplier) : Asynchronous execution , Use the default thread pool
supplyAsync(Supplier supplier, Executor executor): Asynchronous execution , Manual thread pool
These two groups Static functions , provide Pass in the function of custom thread pool . If you Not using an external thread pool , Then it will use default ForkJoin Thread pool . The default thread pool , Size and use are beyond your control , So I suggest you pass one .
Typical code , It looks like this :
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
return "test";
});
String result = future.join();
CompletableFuture Main role of , Namely Make the code look good . coordination Java8 After that Stream flow , The whole computing process can be abstracted into a flow .
The calculation result of the previous task , It can be directly used as the input of the following tasks , It's like a pipe
api And function
thenApply
thenApplyAsync
thenAccept
thenAcceptAsync
thenRun
thenRunAsync
thenCombine
thenCombineAsync
thenCompose
thenComposeAsync
The following code executes as a result 99, It is not because it is asynchronous that the sequence of code execution is disrupted
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
.thenApplyAsync((e) -> {
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
return e * 10;
}).thenApplyAsync(e -> e - 1);
cf.join();
System.out.println(cf.get());
The function also depends on then The following verb :
apply There are input parameters and return values , The input parameter is the output of the predecessor task
accept There is an input parameter but no return value , Returns the CompletableFuture
run There is no input parameter and no return value , It will also return to CompletableFuture
combine Form a composite structure , Connect two CompletableFuture, And put their 2 Output results , As combine The input of
compose Will be nested CompletableFuture Lay it flat , Used to connect two in series CompletableFuture
The function list above , In fact, there are many
when
handle
when It means , It is the callback when the task is completed .
Let's take our example above , Intend to finish the task , Output one done. It also belongs to the category of only input parameters but no output parameters , Suitable for observation in the last step .
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
.thenApplyAsync((e) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
return e * 10;
}).thenApplyAsync(e -> e - 1)
.whenComplete((r, e)->{
System.out.println("done");
})
;
cf.join();
System.out.println(cf.get());
handle and exceptionally The role of , and whenComplete It's very similar
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture The task of is tandem , If an exception occurs in one of its steps , Will affect the operation of subsequent code .
exceptionally You can tell by the name , It is specialized in dealing with this kind of situation . such as , We force a step to divide by 0, Something goes wrong , Return after capture -1, It will be able to continue to run .
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 10)
.thenApplyAsync(e->e/0)
.thenApplyAsync(e -> e - 1)
.exceptionally(ex->{
System.out.println(ex);
return -1;
});
cf.join();
System.out.println(cf.get());
handle More advanced , Because it has an exception parameter , There is also a normal input . The treatment methods are similar , I won't repeat .
replace CountDownLatch
A business interface , need Processing hundreds of requests , After the request Then sum up these results .
If it's sequential , Assume that each interface takes 100ms, that 100 Interface , Time consuming 10 second . If we If you get them in parallel , Then the efficiency will be improved .
Use CountDownLatch Can solve :
ExecutorService executor = Executors.newFixedThreadPool(5);
CountDownLatch countDown = new CountDownLatch(requests.size());
for(Request request:requests){
executor.execute(()->{
try{
//some opts
}finally{
countDown.countDown();
}
});
}
countDown.await(200,TimeUnit.MILLISECONDS);
Use CompletableFuture To replace CountDownLatch:
ExecutorService executor = Executors.newFixedThreadPool(5);
List<CompletableFuture<Result>> futureList = requests.stream()
.map(request->
CompletableFuture.supplyAsync(e->{
//some opts
},executor))
.collect(Collectors.toList());
CompletableFuture<Void> allCF = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
allCF.join();
allOf, Used to put all the CompletableFuture Put together ; also anyOf, Indicates that only one of the .
frequently-used , There are three more functions :
thenAcceptBoth: Dealing with two tasks , There are two task results entered , No return value
thenCombine: Dealing with two tasks , There are input parameters and return values , like best
runAfterBoth: Dealing with two tasks , Without the participation , No return value
CyclicBarrier
CyclicBarrier and CountDownLatch Both of these tools are in java.util.concurrent package Next
CyclicBarrier Literally, it means It can be recycled (Cyclic) The barrier (Barrier). What it has to do is , Give Way A set of threads arrive A barrier ( It can also be called synchronization point ) when Blocked , straight When the last thread reaches the barrier , The barrier will open , all The thread intercepted by the barrier will continue to work .
The reason why this barrier is decorated with cycles , Because after all threads release each other
This barrier can be reused (reset() Method to reset the barrier point ), This is related to CountDownLatch Different
CyclicBarrier It's a kind of Synchronization mechanism Allow a group of threads to wait for each other , wait until All threads are arrive A barrier point only sign out await Method , it No direct implementation AQS It is With the help of ReentrantLock To realize the synchronization mechanism
CyclicBarrier It's recyclable
CountDownLatch It's disposable
In addition, its semantics is also similar to CountDownLatch Different
CountDownLatch Reduce Count The arrival condition is release The way
CyclicBarrier Strike barrier point (await) It's using Acquire The way
Acquire It will block , This also achieved CyclicBarrier Another feature of , as long as There is a thread interrupt
that Barrier point Just Broken , All threads All will Awakened (CyclicBarrier Be responsible for the implementation of this part , Not by AQS The scheduling of )
This also avoids other threads waiting because one thread is interrupted and can never reach the barrier point .
Barrier point Broken Of CyclicBarrier Will no longer be available ( Will throw out BrokenBarrierException) Unless you execute reset operation
Method
CyclicBarrier Yes Two constructors
CyclicBarrier(int parties) int Parameters of type Express Several threads To participate in this Barrier interception ,( Take the example above , That is, a few people travel with a group );
CyclicBarrier(int parties,Runnable barrierAction) When all threads Reach a barrier point when , priority barrierAction This thread
await(): Call per thread await(), Indicates that we have arrived at Barrier point , then Current thread Blocked
private static ExecutorService executor = Executors.newFixedThreadPool(10);
private static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
private void count() {
for (int i = 0; i < 3; i++) {
executor.execute(() -> {
// Calculate the average score of each student , The code is slightly () Assuming that 60~100 The random number
int score = (int) (Math.random() * 40 + 60);
try {
Thread.sleep(Math.round(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName() + " The average grade of the students is " + score);
try {
// Run... After execution await(), Wait until the average scores of all students have been calculated
cyclicBarrier.await();
this.run();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
public void run() {
int result = 0;
Set<String> set = map.keySet();
for (String s : set) {
result += map.get(s);
}
System.out.println(" The average score of the three is :" + (result / 3) + " branch ");
}

and countDownLatch difference
CountDownLatch Of Counter Can only be used once
CyclicBarrier The counter of can be used reset() Method reset
therefore CyclicBarrier Can deal with More complex Business scenario of , For example, if you calculate An error occurred , Sure Reset counter , And let the threads do it again
CyclicBarrier Also provide Other useful methods
getNumberWaiting Method can obtain CyclicBarrier Obstructed Number of threads
isBroken Method to know Whether the blocked thread is interrupted
CountDownLatch Will block the main thread ,CyclicBarrier It won't block The main thread , It's just blocking Sub thread
A thread interrupt CyclicBarrier It throws an exception , Avoided All threads Wait indefinitely
CountDownLatch: One or more threads , wait for Other multiple threads Do something before you do it
CyclicBarrier: Multiple threads Waiting for each other , until arrive Same synchronization point , Again Continue to work together
about CountDownLatch Come on , The key is “ One thread ( Multiple threads ) wait for ”, While the rest of the N Threads stay complete “ Something ” after , Can terminate , You can also wait
And for CyclicBarrier, The key is Multiple threads , stay Any thread Not completed , All threads have to wait
CountDownLatch yes Counter , Threads Complete a record , It's just Count Not incremental but Decline
CyclicBarrier More like a valve , All threads are required All arrived , valve To open , And then go ahead and do it
summary
about Nesting of various callbacks ,CompletableFuture For us More intuitive 、 More beautiful API. stay “ Multiple tasks waiting to be completed ” This application scenario ,CompletableFuture useful
边栏推荐
- TiDB Dashboard里面可以写sql执行吗
- Optimization of interface display for general kernel upgrade of mobo video management system v3.5.0
- 抖音最新Xbogus,signature生成js逆向分析
- Rhai - Rust 的嵌入式脚本引擎
- Deep understanding of JVM (IV) - garbage collection (I)
- AI首席架构师10-AICA-蓝翔 《飞桨框架设计与核心技术》
- uni-app进阶之内嵌应用【day14】
- ASP. Net password encryption and password login
- MySQL找不到mysql.sock文件的臨時解
- 漏洞复现----37、Apache Unomi 远程代码执行漏洞 (CVE-2020-13942)
猜你喜欢

英飞凌--GTM架构-Generic Timer Module
![The company was jailed for nonstandard bug during the test ~ [cartoon version]](/img/cd/42ab3fc0000fa7dfe2ac89de3486e4.jpg)
The company was jailed for nonstandard bug during the test ~ [cartoon version]
![[PROJECT] Xiaomao school (IX)](/img/01/f7fc609e7a156d6e60ce6482ba2ac1.jpg)
[PROJECT] Xiaomao school (IX)

Solve the problem of unable to connect to command metric stream and related problems in the hystrix dashboard

腾讯持久化框架MMKV原理探究

TCP session hijacking based on hunt1.5

程序员女友给我做了一个疲劳驾驶检测

C# Winform程序界面优化实例

CODING 正式入驻腾讯会议应用市场!

基於SSH的網上商城設計
随机推荐
基于SSH的通讯网络电子计费系统
Tsinghua only ranks third? 2022 release of AI major ranking of Chinese Universities of soft science
Multipass中文文档-设置图形界面
剑指 Offer 17. 打印从1到最大的n位数
If you want to learn software testing, you must see series, 2022 software testing engineer's career development
Merged binary tree of leetcode
又一篇CVPR 2022论文被指抄袭,平安保险研究者控诉IBM苏黎世团队
TeamTalk WinClient编译问题及解决方案
C语言结构体
Geoffrey Hinton:我的五十年深度学习生涯与研究心法
Apple Watch无法开机怎么办?苹果手表不能开机解决方法!
LeetCode动态规划经典题(一)
大佬们目前flinksql, cdcmysql跟Kafka双流join,结果放到mysql 或者ka
VS code 树视图 treeView
Php8.0 environment detailed installation tutorial
Helping the ultimate experience, best practice of volcano engine edge computing
医疗行业企业供应链系统解决方案:实现医疗数智化供应链协同可视
Summary of methods for offline installation of chrome extensions in China
uni-app进阶之自定义【day13】
抖音最新Xbogus,signature生成js逆向分析