当前位置:网站首页>【JUC系列】Fork/Join框架之概览
【JUC系列】Fork/Join框架之概览
2022-06-30 07:58:00 【顧棟】
JUC系列Fork/Join框架
Fork/Join框架是Java并发工具包中的一种可以将一个大任务拆分为很多小任务来异步执行的工具,自JDK1.7引入。
Fork/Join总览类图

主要模块
ForkJoinPool可以通过池中的ForkJoinWorkerThread来处理ForkJoinTask任务。
ForkJoinPool只接收ForkJoinTask任务(在实际使用中,也可以接收Runnable/Callable任务,但在真正运行时,也会把这些任务封装成ForkJoinTask类型的任务),RecursiveTask是ForkJoinTask的子类,是一个可以递归执行的ForkJoinTask,RecursiveAction是一个无返回值的RecursiveTask,CountedCompleter 在任务完成执行后会触发执行一个自定义的钩子函数。
在实际运用中,我们一般都会继承 RecursiveTask 、RecursiveAction 或 CountedCompleter 来实现我们的业务需求,而不会直接继承ForkJoinTask类。
核心类
任务对象:
ForkJoinTask
任务管理的线程池
ForkJoinPool
执行任务的线程
ForkJoinWorkerThread
核心算法:分治算法
核心算法:工作窃取法
work-stealing(工作窃取)算法: 线程池内的所有工作线程都尝试找到并执行已经提交的任务,或者是被其他活动任务创建的子任务(如果不存在就阻塞等待)。这种特性使得 ForkJoinPool 在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。尤其是构建异步模型的 ForkJoinPool 时,对不需要合并(join)的事件类型任务也非常适用。
在 ForkJoinPool 中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数 mode 决定),然后以FIFO的顺序随机窃取其他队列中的任务。
使用示例
RecursiveAction使用
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class SortTask extends RecursiveAction {
final long[] array;
final int low;
final int high;
private int THRESHOLD = 0;
public SortTask(long[] array) {
this.array = array;
this.low = 0;
this.high = array.length - 1;
}
public SortTask(long[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
@Override
protected void compute() {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] task is [low=" + low + ", high=" + high + "].");
if (low < high) {
int pivot = partition(array, low, high);
SortTask left = new SortTask(array, low, pivot - 1);
SortTask right = new SortTask(array, pivot + 1, high);
left.fork();
right.fork();
left.join();
right.join();
}
}
private int partition(long[] array, int low, int high) {
long x = array[high];
int i = low - 1;
for (int j = low; j < high; j++) {
if (array[j] <= x) {
i++;
swap(array, i, j);
}
}
swap(array, i + 1, high);
return i + 1;
}
private void swap(long[] array, int i, int j) {
if (i != j) {
long temp = array[i];
array[i] = array[j];
array[j] = temp;
}
}
public static void main(String[] args) throws Exception {
long[] array = {
10, 0, 21, 44, 52, 30};
SortTask sort = new SortTask(array);
ForkJoinPool pool = new ForkJoinPool();
pool.submit(sort);
boolean flag = true;
while (flag) {
if (sort.isDone()) {
StringBuilder a = new StringBuilder();
for (int i = 0; i < sort.array.length; i++) {
a.append(sort.array[i]).append(" ");
}
System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] a is [" + a.substring(0, a.length() - 1) + "].");
flag = false;
}
}
pool.shutdown();
}
}
执行结果
[18:06:06.143--ForkJoinPool-1-worker-1] task is [low=0, high=5].
[18:06:06.143--ForkJoinPool-1-worker-2] task is [low=0, high=2].
[18:06:06.143--ForkJoinPool-1-worker-2] task is [low=0, high=1].
[18:06:06.143--ForkJoinPool-1-worker-2] task is [low=0, high=-1].
[18:06:06.143--ForkJoinPool-1-worker-2] task is [low=1, high=1].
[18:06:06.143--ForkJoinPool-1-worker-2] task is [low=3, high=2].
[18:06:06.159--ForkJoinPool-1-worker-2] task is [low=4, high=5].
[18:06:06.159--ForkJoinPool-1-worker-2] task is [low=4, high=3].
[18:06:06.159--ForkJoinPool-1-worker-2] task is [low=5, high=5].
[18:06:06.159--main] a is [0 10 21 30 44 52].
RecursiveTask使用
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10;
private long start;
private long end;
public SumTask(long end) {
this(0, end);
}
private SumTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if ((end - start) <= THRESHOLD) {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] task is [" + start + "," + end + "].");
for (long l = start; l <= end; l++) {
sum += l;
}
} else {
long mid = (start + end) >>> 1;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
left.fork();
right.fork();
sum = left.join() + right.join();
}
return sum;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
SumTask sumTask = new SumTask(100);
ForkJoinPool pool = new ForkJoinPool();
Future<Long> future = pool.submit(sumTask);
boolean flag = true;
while (flag) {
if (future.isDone()) {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] sum = [" + future.get() + "].");
flag = false;
}
}
pool.shutdown();
}
}
执行结果
[18:09:09.123--ForkJoinPool-1-worker-2] task is [26,32].
[18:09:09.123--ForkJoinPool-1-worker-0] task is [39,44].
[18:09:09.123--ForkJoinPool-1-worker-3] task is [51,57].
[18:09:09.123--ForkJoinPool-1-worker-2] task is [33,38].
[18:09:09.123--ForkJoinPool-1-worker-1] task is [0,6].
[18:09:09.123--ForkJoinPool-1-worker-3] task is [58,63].
[18:09:09.123--ForkJoinPool-1-worker-0] task is [45,50].
[18:09:09.123--ForkJoinPool-1-worker-3] task is [64,69].
[18:09:09.123--ForkJoinPool-1-worker-0] task is [76,82].
[18:09:09.123--ForkJoinPool-1-worker-3] task is [70,75].
[18:09:09.123--ForkJoinPool-1-worker-0] task is [83,88].
[18:09:09.123--ForkJoinPool-1-worker-3] task is [89,94].
[18:09:09.123--ForkJoinPool-1-worker-0] task is [95,100].
[18:09:09.123--ForkJoinPool-1-worker-3] task is [13,19].
[18:09:09.123--ForkJoinPool-1-worker-2] task is [20,25].
[18:09:09.123--ForkJoinPool-1-worker-2] task is [7,12].
[18:09:09.123--main] sum = [5050].
参考链接:https://www.pdai.tech/md/java/thread/java-thread-x-juc-executor-ForkJoinPool.html
边栏推荐
- Digital white paper on total cost management in chain operation industry
- 2022.01.20 [bug note] | qiime2: an error was encoded while running dada2 in R (return code 1)
- 期末复习-PHP学习笔记2-PHP语言基础
- 你了解IP协议吗?
- Distance from point to line
- 想转行,却又不知道干什么?此文写给正在迷茫的你
- 深度学习——LSTM
- Construction of module 5 of actual combat Battalion
- Arm debug interface (adiv5) analysis (I) introduction and implementation [continuous update]
- Deloitte: investment management industry outlook in 2022
猜你喜欢

Final review -php learning notes 3-php process control statement

Combinatorial mathematics Chapter 2 Notes

Deep learning - networks in networks and 1x1 convolution

深度学习——循环神经网络

2022 Research Report on China's intelligent fiscal and tax Market: accurate positioning, integration and diversity

为什么大学毕业了还不知道干什么?

Deep learning - residual networks resnets

深度学习——语言模型和序列生成

Construction of energy conservation supervision system for campus buildings of ankery University

深度学习——卷积的滑动窗口实现
随机推荐
深度学习——使用词嵌入and词嵌入特征
Why don't you know what to do after graduation from university?
min_ max_ Gray operator understanding
2022.01.20 [bug note] | qiime2: an error was encoded while running dada2 in R (return code 1)
Final review -php learning notes 2-php language foundation
The counting tool of combinatorial mathematics -- generating function
Combinatorial mathematics Chapter 1 Notes
February 14, 2022 [reading notes] - life science based on deep learning Chapter 2 Introduction to deep learning (Part 1)
鲸探NFT数字臧品系统开发技术分享
More, faster, better and cheaper. Here comes the fastdeploy beta of the low threshold AI deployment tool!
mysql无法连接内网的数据库
Global digital industry strategy and policy observation in 2021 (China Academy of ICT)
C. Fishingprince Plays With Array
深度学习——残差网络ResNets
2022 retail industry strategy: three strategies for consumer goods gold digging (in depth)
Digital white paper on total cost management in chain operation industry
Permutation and combination of probability
深度学习——LSTM
ACM. HJ48 从单向链表中删除指定值的节点 ●●
想问问,炒股怎么选择证券公司?网上开户安全么?