当前位置:网站首页>并发编程系列之什么是ForkJoin框架?

并发编程系列之什么是ForkJoin框架?

2022-07-01 15:40:00 51CTO

并发编程系列之什么是ForkJoin框架?

1、什么是ForkJoin框架

ForkJoin框架是java的JUC包里提供的,用于处理一些比较繁重的任务,会将这个大任务分为多个小任务,多个小任务处理完成后会将结果汇总给Result,体现的是一种“分而治之”的思想。第一步,拆分fork任务,将大任务分为多个小任务;第二步,归并join,会将小任务的处理结果进行归并为一个结果。


并发编程系列之什么是ForkJoin框架?_斐波那契数列

在这里插入图片描述

2、ForkJoinTask

​ForkJoinTask​​​是​​ForkJoin​​​框架的提供的任务API,​​ForkJoinTask​​​是一个抽象类,有两个主要的实现类,​​RecursiveTask​​​和​​RecursiveAction​​​,其中​​RecursiveTask​​​和​​RecursiveAction​​​的主要区别是,​​RecursiveAction​​​没有返回值,而​​RecursiveTask​​是有返回值的

并发编程系列之什么是ForkJoin框架?_线程池_02

在这里插入图片描述

3、ForkJoinPool

ForkJoinPool类是forkjoin框架的线程池实现,基于ExecutorService接口。这个线程池是jdk1.7才加入的,用于管理线程,执行forkjoin的任务。对于线程池的使用,我们使用ThreadPoolExecutor比较多,可以在idea里看一下uml类图,可以看出ForkJoinPool和ThreadPoolExecutor实现差不多的。


并发编程系列之什么是ForkJoin框架?_java_03

在这里插入图片描述

      
      
ForkJoinPool()
ForkJoinPool(int parallelism)
ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler, boolean asyncMode)
  • 1.
  • 2.
  • 3.
  • 4.

几个重要的参数:

  • parallelism:并行度,并行执行线程,可用指定,也可以不指定,不指定的情况,是根据cpu核数创建可用的线程
  • ForkJoinWorkerThreadFactory:创建线程的工厂实现
  • UncaughtExceptionHandler :因为未知异常中断的回调处理
  • asyncMode:是否异步,默认情况是false

使用时候,可以直接创建ForkJoinPool,可以不传参,不传参的情况,默认指定的线程并行数为​​Runtime.getRunTime().availableProcessors();​​,表示根据cpu核数创建可用线程数

      
      
ForkJoinPool forkJoinPool = new ForkJoinPool();
ArraySortTask task = new ArraySortTask(array , 0 , size);
forkJoinPool.submit(task);
task.get();
  • 1.
  • 2.
  • 3.
  • 4.

也是可用传参,对并行度进行指定的​​public ForkJoinPool(int parallelism)​​, parallelism并行度,并行执行几个线程

将forkjoin任务加入到FrokJoinPool线程池有几种方式

  • execute():调用其 fork 方法在多个线程之间拆分工作。
  • invoke():在ForkJoinPool线程池上调用invoke方法
  • submit():返回一个Future对象,Future可以进行监控,任务完成后返回结果

4、打印斐波那契数列

ForkJoin框架可以用于一些递归遍历的场景,对于斐波那契数列,你可以比较熟悉,因为在面试中有时候经常问到,斐波那契数列的特点就是最后一项的结果等于前面两项的和

      
      
package com.example.concurrent.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
* <pre>
* 斐波那契数列
* </pre>
* <p>
* <pre>
* @author nicky.ma
* 修改记录
* 修改后版本: 修改人: 修改日期: 2021/10/12 16:22 修改内容:
* </pre>
*/
public class Fibonacci extends RecursiveTask<Integer>{

private int n;

public Fibonacci(int n) {
this.n = n;
}

@Override
protected Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
f2.fork();
return f1.join() + f2.join();
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
for (int i = 0; i< 10; i++) {
ForkJoinTask task = pool.submit(new Fibonacci(i));
System.out.println(task.get());
}
}
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.

5、ForkJoin归并排序

面试题:快速实现对一个长度百万的数组的排序

难点:可以使用归并排序,多线程如何组织实现归并排序

      
      
package com.example.concurrent.forkjoin;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

/**
* <pre>
* 大数组排序
* </pre>
* <p>
* <pre>
* @author mazq
* 修改记录
* 修改后版本: 修改人: 修改日期: 2021/10/12 17:04 修改内容:
* </pre>
*/
public class ArraySortTask extends RecursiveAction{

final long[] array; final int lo, hi;
ArraySortTask(long[] array, int lo, int hi) {
this.array = array; this.lo = lo; this.hi = hi;
}
ArraySortTask(long[] array) { this(array, 0, array.length); }

@Override
protected void compute() {
if (hi - lo < THRESHOLD)
// 少于阀值,使用Arrays.sort 快排
sortSequentially(lo, hi);
else {
/* 归并排序 */
// 取中间值
int mid = (lo + hi) >>> 1;
// 拆分任务
invokeAll(new ArraySortTask(array, lo, mid),
new ArraySortTask(array, mid, hi));
// 归并结果
merge(lo, mid, hi);
}
}

// implementation details follow:
static final int THRESHOLD = 1000;
void sortSequentially(int lo, int hi) {
Arrays.sort(array, lo, hi);
}

void merge(int lo, int mid, int hi) {
long[] buf = Arrays.copyOfRange(array, lo, mid);
for (int i = 0, j = lo, k = mid; i < buf.length; j++)
array[j] = (k == hi || buf[i] < array[k]) ?
buf[i++] : array[k++];
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
int size = 10_000;
long[] array = new long[size];
Random random = new Random();
for (int i = 0; i< size; i++) {
array[i] = random.nextInt();
}

ForkJoinPool forkJoinPool = new ForkJoinPool();
ArraySortTask task = new ArraySortTask(array , 0 , size);
forkJoinPool.submit(task);
task.get();

for (long a : array) {
System.out.println(a);
}
}
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.

参考资料

原网站

版权声明
本文为[51CTO]所创,转载请带上原文链接,感谢
https://blog.51cto.com/u_15704340/5434537