当前位置:网站首页>This article introduces you to j.u.c's futuretask, fork/join framework and BlockingQueue
This article introduces you to j.u.c's futuretask, fork/join framework and BlockingQueue
2022-06-10 19:31:00 【51CTO】
Abstract : J.U.C yes Java A very important toolkit in concurrent programming , today , Let's focus on J.U.C Inside FutureTask、Fork/Join The framework and BlockingQueue.
This article is shared from Huawei cloud community 《 【 High concurrency 】J.U.C Component extension 》, author : ice The river .
FutureTask
FutureTask yes J.U.C(java.util.concurrent) Under the , But it's not AQS(AbstractQueuedSynchronizer) Subclasses of . Its processing of thread results is worthy of reference and use in the project .
Thread and Runnable The execution result cannot be obtained after executing the task .Java1.5 Began to offer Callable and Future, Through them, after the task is completed , Get the result of task execution .
Callable And Runnable Interface comparison
Callable: Generic interface , Provide a call() Method , Supports throwing exceptions , And there is a return value after execution
Runnable: Interface , Provide a run() Method , Throwing exceptions is not supported , No return value after execution
Future Interface
For specific Callable and Runnable Mission , You can cancel , Query whether the task has been cancelled , Whether the query is completed and the results are obtained .
Future You can monitor target thread calls call() The situation of , When calling Future Of get() When the method is used , You can get the results . here , The thread executing the task may not complete directly , The current thread starts blocking , until call() Method end return result , The current thread will continue to execute . All in all ,Future You can get the return value of other thread task methods .
FutureTask class
The implemented interface is RunnableFuture, and RunnableFuture Interface inherited Runnable and Future Two interfaces , therefore FutureTask Class is also execution in the end Callable Type of task . If FutureTask The constructor parameter of the class is Runnable Words , Will be converted into a Callable type .
Class implements two interfaces :Runnable and Future. therefore , It can be used as Runnable Executed by thread , Can be used as Future obtain Callable The return value of , The benefits of this design are as follows :
Suppose there is a time-consuming logic , You need to calculate and return this value , meanwhile , This value is not needed immediately , You can use Runnable and Future The combination of , Use another thread to calculate the return value , Before the current thread uses this return value , You can do other things , Wait until you need this return value , Re pass Future obtain .
Future The sample code is as follows :
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class FutureExample {
static class MyCallable implements Callable<String>{
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
}
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());
log.info("do something in main");
Thread.sleep(1000);
String result = future.get();
log.info("result: {}", result);
executorService.shutdown();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
FutureTask The sample code is as follows :
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
@Slf4j
public class FutureTaskExample {
public static void main(String[] args) throws Exception{
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
});
new Thread(futureTask).start();
log.info("do something in main");
Thread.sleep(1000);
String result = futureTask.get();
log.info("result: {}", result);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
Fork/Join frame
be located J.U.C(java.util.concurrent) in , yes Java7 A framework for performing parallel tasks provided in , It can divide a large task into several small tasks , Finally, summarize the results of each small task to get the final result . Basic ideas and Hadoop Of MapReduce Similar thinking .
The work stealing algorithm is mainly used ( A thread steals tasks from other queues to execute ), A kind of parallel divide and conquer computation Work-stealing Strategy
Why use a job stealing algorithm ?
If we need to do a bigger task , We can divide this task into several independent sub tasks , To reduce the competition between threads , So I put these subtasks in different queues , And create a separate thread for each queue to perform the tasks in the queue , Threads and queues correspond one by one , such as A The thread is responsible for handling A Tasks in a queue . But some threads will finish the tasks in their queues first , There are tasks waiting to be processed in the queue corresponding to other threads . The thread that does the work is not waiting for , Why don't you help other threads , So it goes to the queue of other threads to steal a task to execute . And at this point they access the same queue , So in order to reduce the competition between stealing task thread and being stolen task thread , We usually use a two terminal queue , The thread of the stolen task always takes the task from the head of the two terminal queue to execute , The thread that steals the task will always take the task from the end of the double end queue to execute .
Advantages of job theft algorithm :
Make full use of threads for Parallel Computing , And reduce the competition between threads
Disadvantages of job stealing algorithm :
In some cases there is still competition , For example, when there is only one task in the two terminal queue . And the algorithm will consume more system resources , For example, create multiple threads and multiple dual end queues .
about Fork/Join In terms of framework , When a task is waiting for it to use Join At the end of the subtask created by the operation , The worker thread executing this task looks for other tasks that have not been executed , And begin to perform the tasks that have not been performed , In this way , Threads make the most of their runtime to improve application performance . To achieve this goal ,Fork/Join There are some limitations to the tasks that the framework performs .
Fork/Join Framework limitations :
(1) Tasks can only use Fork and Join Operation for synchronization mechanism , If other synchronization mechanisms are used , During synchronous operation , The worker thread cannot perform other tasks . such as , stay Fork/Join In the frame , Make the task sleep , that , During sleep , The worker thread that is performing this task will not perform other tasks .
(2) stay Fork/Join In the frame , The split task should not be performed IO operation , such as : Read and write data files
(3) The task cannot throw a check exception , The necessary code must be used to get these exceptions out
Fork/Join Core classes of framework
Fork/Join At the core of the framework are two classes :ForkJoinPool and ForkJoinTask.ForkJoinPool Responsible for the implementation of work stealing algorithm 、 Manage worker threads 、 Provide information about the status and execution of the task .ForkJoinTask Mainly provides in the task execution Fork and Join The mechanism of operation .
The sample code is as follows :
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// If the task is small enough, calculate the task
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// If the task is greater than the threshold , Split it into two subtasks
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// Perform subtasks
leftTask.fork();
rightTask.fork();
// Its merge task is finished
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// Merge subtasks
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
// Generate a calculation task , Calculation 1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
// Perform a task
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
BlockingQueue
Blocking queues , It's thread safe .
The blocking conditions are as follows :
(1) When the queue is full , Enter the queue
(2) When the queue is empty , Do queue out operation
The usage scenario is as follows :
Mainly in producer and consumer scenarios
BlockingQueue Methods
BlockingQueue have 4 Group different methods for inserting 、 Remove and check elements in the queue . If the requested operation cannot be performed immediately , Each method behaves differently . These methods are as follows :

Four different groups of behavioral explanations :
- Throw an exception
If the attempted operation cannot be performed immediately , Throw an exception .
- Special values
If the attempted operation cannot be performed immediately , Returns a specific value ( Often true / false).
- Blocking
If the attempted operation cannot be performed immediately , This method call will block , Until it can execute .
- Overtime
If the attempted operation cannot be performed immediately , This method call will block , Until it can execute , But the waiting time will not exceed the given value . Returns a specific value to tell if the operation was successful ( Typically true / false).
BlockingQueue The implementation classes of are as follows :
- ArrayBlockingQueue: Bounded blocking queues ( Limited capacity , The capacity size must be specified during initialization , The capacity cannot be changed after it is specified ), The internal implementation is an array , With FIFO How to store data , The most recently inserted object is the tail , The most recently removed object is the head .
- DelayQueue: What's blocking is the internal elements ,DelayQueue The element in must implement an interface ——Delayed( Exist in J.U.C Next ).Delayed Interface inherited Comparable Interface , This is because Delayed The elements in the interface need to be sorted , In general , This is the Delayed The elements in the interface are sorted according to the priority of expiration time . Application scenarios mainly include : Close the connection regularly 、 Cache object 、 Timeout processing, etc . Internal implementation uses PriorityQueue and ReentrantLock.
- LinkedBlockingQueue: Size configuration is optional , If the size is specified during initialization , There are boundaries ; If no size is specified during initialization , Is boundless ( In fact, the default size is Integer Maximum of type ). Internal implementation is a linked list , With FIFO How to store data , The most recently inserted object is the tail , The most recently removed object is the head .
- PriorityBlockingQueue: Blocking queue with priority , Borderless , But there are sorting rules , Allow inserting empty objects ( That is to say null). All inserted objects must implement Comparable Interface , The sorting rule of queue priority is based on Comparable Interface . It can be downloaded from PriorityBlockingQueue Get an iterator in Iterator, But this iterator does not guarantee that iterations are performed in order of priority .
- SynchronousQueue: Only one element is allowed inside the queue , When a thread inserts an element , Will be blocked , Unless this element is consumed by another thread . therefore , Also known as SynchronousQueue For synchronization queue .SynchronousQueue Is an unbounded non cached queue . Accurately speaking , It does not store elements , Put in the element only after waiting for the element to be taken away , To put in the elements again
Click to follow , The first time to learn about Huawei's new cloud technology ~
边栏推荐
- SAR图像聚焦质量评价插件
- C知识练习
- 100003字,带你解密 双11、618电商大促场景下的系统架构体系
- 掌握高性能计算前,我们先了解一下它的历史
- libcurl 7.61.0 VS2013 编译教程
- Apicloud visual development - one click generation of professional source code
- Explain the interview questions by holding chestnuts (interview, review and study)
- Openssl1.1.1 VS2013-编译教程
- mysql(17-触发器)
- MySQL (17 trigger)
猜你喜欢

改变世界的开发者丨玩转“俄罗斯方块”的瑶光少年

SAR图像聚焦质量评价插件
![MySQL advanced Chapter 1 (installing MySQL under Linux) [i]](/img/f9/60998504e20561886b5f62eb642488.png)
MySQL advanced Chapter 1 (installing MySQL under Linux) [i]

2022.05.23(LC_300_最长递增子序列)

Source code analysis and practical testing openfeign load balancing

Leecode27977 double finger needling

Nodejs basic architecture analysis parsing engine directory plug-in installation core module

【C语言进阶】数据的存储【下篇】【万字总结】

端午“沉浸式云旅游”怎么玩?即构助力“直播+”新场景落地

单纯形法代码求解(含超详细代码注释和整个流程图)
随机推荐
最长上升子序列(LIS)洛谷
2022.05.23(LC_300_最长递增子序列)
Live broadcast preview | a new era of social interaction, exploring new social experiences in the universe
c指针(面试经典题目练习)
mysql(17-觸發器)
Leecode27977 double finger needling
[Agency] 10 minutes to master the essential difference between forward agency and reverse agency
Data URL
个人如何投资理财比较安全?
调试的技巧
改变世界的开发者丨玩转“俄罗斯方块”的瑶光少年
How to play the Dragon Boat Festival "immersive cloud Tour"? That is to say, it helps "live broadcast +" new scene landing
掌握高性能计算前,我们先了解一下它的历史
Lingo12 software download and lingo language introduction resources
中国 璞富腾酒店及度假村旗下酒店推出全新水疗产品共庆6月11日全球健康日
Openssl1.1.1 VS2013-编译教程
Analysis of Muduo source code -- an analysis of the rigor, efficiency and flexibility of Muduo library code design with three slices
Nodejs judge system type get host name execute console command Chinese garbled code
libcurl 7.61.0 VS2013 编译教程
Analysis of optical storage direct flexible power distribution system