当前位置:网站首页>并发编程高级部分:并行流,Tasks和Executors以及CompletableFuture类
并发编程高级部分:并行流,Tasks和Executors以及CompletableFuture类
2022-06-21 09:28:00 【凄戚】
并发编程
这篇博客是对并发编程学习过程中总结的笔记。
这篇博客是对并发编程概念最基本的介绍,它的目标是为我们提供足够的基础知识,使你能够把握问题的额复杂性和危险性,从而避免一些错误。
这篇博客介绍了一些高级并发的概念,包括最新的 parallel Streams 和 CompletableFutures。
1、并发的新定义
并发通常表示:不止一个任务正在执行。而并行几乎总是表示:不止一个任务同时执行。
这两个概念混合在一起的一个主要原因是 Java 使用线程来实现并发和并行。
还有一些概念:
- 纯并发:在单个CPU上运行任务。纯并发系统比顺序系统更快的产生结果,但它的运行速度不会因为处理器的增加而变快。
- 并发-并行:使用并发技术,结果程序可以利用更多处理器更快地产生结果。
- 并行-并发:使用并行编程技术编写,如果只有一个处理器,结果程序仍可运行。(Java 8 流式编程就是这样)
- 纯并行:除非有多个处理器,否则实现不了。
在简单理解了以上这些概念后,对并发的更简单定义更好理解:
并发性是一系列性能技术,专注于减少等待
2、Java 的并发
Java采用了更加传统的方式实现并发,即在顺序语言之上添加对线程的支持。即线程机制是在由执行程序表示的单个进程中创建任务交换,而不是在多任务操作系统中叉入(fork)外部进程。
并发通常是提高运行在单处理器上的程序的性能。
并发性相比较顺序执行程序增加了额外开销,包括复杂性成本,但可以通过程序设计、资源平衡和用户的便利性来抵消。通常,并发性使你能够创建更低耦合的设计;否则的话你就必须特别小心那些使用了并发操作的代码。
实现并发最直接的方式就是在操作系统级别使用进程。进程是运行在它自己的地址空间内的自包容的程序。多任务操作系统可以通过周期性地将CPU 从一个进程切换到另一个进程来实现同时运行多个进程(程序)。而与此同时,像Java 所使用的这种并发系统会共享诸如内存和I/O 这样的资源,因此编写多线程程序最基本的困难在于协调不同线程驱动的任务之间对这些资源的使用,以使得这些资源不会同时被多个任务访问。
并发的四句格言:
- 不要这样做
- 没有什么是正确的,一切都可能有问题
- 它起作用,并不意味着它没有问题
- 你必须仍然理解
总结一下就是并发很不安全,但是你还必须得学它。
下面是将介绍的内容:最新的高级 Java 并发结构 :
- Parallel Streams(并行流):这是Java 8 Streams 提供的改进语法,你可以简单的将
parallel添加到表达式来实现并行化流。 - 创建和运行任务:任务是指可以一段独立运行的代码。
- 终止长时间运行的任务:任务不会一直独立运行,因此需要一种机制来关闭它们。典型的方法是使用一个标志,这引入了共享内存的问题,为了避免额外的问题,我们使用 Java 的Atomic 库。
- Completable Futrues :当你将衣服带到干洗店时,他们会给你一张收据。有了这张收据,你可以继续去做其他任务,当你的衣服洗好时你直接将其取走。收据就是你和干洗店在后台执行的任务的连接,Futrue 是Java 5 引入的。Future 比以前的方法更方便,但你仍然必须出现并用收据取出干洗,如果任务没有完成你还需要等待。对于一系列链式操作来说,Futures 并没有多大帮助。(很多场景下,通过 ExecutorService 获取线程运行的结果,使用execute方法去提交任务是无法获得结果的,这时候会改用submit方法去提交,以便获得线程运行的结果。而submit方法返回的就是Future。 使用
future.get()方法去获取线程执行结果,get()方法是阻塞的,当主线程执行到get()方法,当前线程会去等待异步任务执行完成,换言之,异步的效果在我们使用get()拿结果时,会变得无效。) - CompletableFutrue :这是 Java 8 提出的更好的解决方案:它允许你将操作链接在一起,因此你就不必将代码写入接口排序操作。有了 CompletableFutrue 完美的结合,你就可以容易的完成一系列的链式操作(在异步计算中,两个计算任务相互独立,但是任务一又依赖于任务一的结果,这种情况下,靠Future是解决不了,而CompletableFuture则可以实现)。
- 死锁:某些任务在执行时必须等待其他任务的结果。被阻塞的任务可能等待另一个被阻塞的任务,依次循环,如果被阻止 的任务循环到第一个,没有人可以取得任何进展,就会产生死锁。
3、并行流
Java 8 的一个显著优点是:在某些情况下,它们可以很容易的并行化。这来自库的仔细设计,特别是流使用了内部迭代的方式–也就是说,它们控制着自己的迭代器。这种特殊的迭代器是 Spliterator,它被限制为易于自动分割。只要使用.parallel就会使其分割开来并并行执行。但是如果你的代码是用Streams 编写的,那么并行化以提高速度是很寻常的。
例如,考虑来自 Streams 的 Prime.java。查找质数是一个耗时的过程,我们可以比较一下:
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.stream.Collectors;
import static java.util.stream.LongStream.*;
/** * 流使用了内部迭代的方式–也就是说,它们控制着自己的迭代器。 * @Date 2021/8/14 17:47 * @Created by gt136 */
public class ParallelPrime {
static final int COUNT = 100_000;
public static boolean isPrime(long n) {
//rangeClosed返回一个递增1的有序序列且包含最后一个节点,range的话不包含。如果第二个参数小于初始值,则返回空流
// sqrt()返回n的正平方根,如n=4,sqrt(n)=+2;如果n=6,sqrt(n)=+2.445,但是将其转化为long型后就会将2.445直接舍去小数位
//所以4以前返回空流但是结果为true,5-8都为2
return rangeClosed(2, (long) Math.sqrt(n))
.noneMatch(i -> n % i == 0);//所有数据不满足条件才返回true,空也返回true
}
public static void main(String[] args) throws IOException {
Timer timer = new Timer();
List<String> primes =
iterate(2,i->i+1)
.parallel() //[1]
.filter(ParallelPrime::isPrime)
.limit(COUNT)
.mapToObj(Long::toString)
.collect(Collectors.toList());
System.out.println(timer.duration());
//System.out.println((long) Math.sqrt(8));
Files.write(Paths.get("primes.txt"), primes, StandardOpenOption.CREATE);
}
}
/* output: 1110 */
我们将数据保存在硬盘上以防止编译器过激的优化,最后会打印在磁盘上。
当注释掉 //[1] 时,我的结果用时大约是并行化的3倍。
3.1 parallel有其使用限制
实现对序列求和,有很多方式实现它,我们这里用计时器进行比较。一切数字严格使用long;
从一个计时方法开始,它采用 LongSupplier ,测量 getAsLong()调用的长度,将结果与 checkValue 进行比较并显示结果。
import com.gui.demo.thingInJava.concurrency.share.Timer;
import java.util.function.LongSupplier;
import java.util.stream.LongStream;
/** * * @Date 2021/8/16 10:34 * @Created by gt136 */
public class Summing {
static void timeTest(String id, long checkValue, LongSupplier operation) {
System.out.print(id + ": ");
Timer timer = new Timer();
long result = operation.getAsLong();
if (result == checkValue) {
System.out.println(timer.duration() + "ms");
}else {
System.out.format("result: %d%ncheckVallue:%d%n",result,checkValue);
}
}
public static final int SZ = 100_000_000;
//SZ以内数字的求和
public static final long CHECK = (long) SZ * ((long)SZ + 1) / 2;
public static void main(String[] args) {
System.out.println(CHECK);
timeTest("Sum Stream",CHECK, () -> LongStream.rangeClosed(0, SZ).sum());//正常求和
timeTest("Sum Stream Parallel",CHECK,()->LongStream.rangeClosed(0,SZ).parallel().sum());//并行化求和
timeTest("Sum Iterate: ",CHECK,()->LongStream.iterate(0,i->i+1).limit(SZ+1).sum());//内部迭代求和
timeTest("Sum Iterated Parallel",CHECK,()->LongStream.iterate(0,i->i+1).parallel().limit(SZ+1).sum());//内部迭代并行求和
}
}
/* output = 5000000050000000 Sum Stream: 52ms Sum Stream Parallel: 21ms Sum Iterate: : 97ms Sum Iterated Parallel: 2759ms */
*/
main()的第一个版本使用直接生成 Stream 并调用 sum() 的方法。我们可以看到流的好处在于即使 SZ 为一亿这么大,程序也可以很好的处理而没有溢出。而使用parallel的基本范围操作明显更快。
但是使用 iterate()来产生序列的速度是很慢的,可能是因为每次生成数字都必须调用 lambda 的原因。但是当我们尝试并行化它的时候,结果不仅比非并行版本时间更长,甚至当SZ 超过100万时,会耗尽机器内存(在某些机器上)。应用parallel是一个合理的尝试,但是会产生这些令人惊异的结果。但是我们可以对流并行算法进行初步观察:
- 流并行性将输入数据分成多个部分,因此算法可以应用于那些单独的部分。
- 数组分割成本低,分割均匀且对分割的大小有着完美的控制。
- 链表没有这些属性,“拆分”一个链表仅仅意味着把它分成“第一个元素”和“其余元素”,这没有什么实用性。
- 无状态生成器的行为类似于数组,上面使用的
range()就是无状态的。 - 迭代生成器的行为就像链表,
iterate()是一个迭代生成器。
现在我们尝试通过在数组中填值并对数组求和来解决问题,因为数组只分配了一次,所以我们不太可能遇到垃圾收集时序问题。
import java.util.Arrays;
/** * 在数组中填值并对数组求和 * @Date 2021/8/16 14:36 * @Created by gt136 */
public class Summing2 {
/** * 对数组求和 * @param ia * @return */
static long basicSum(long[] ia) {
long sum = 0;
int size = ia.length;
for (int i = 0; i < size; i++) {
sum += ia[i];
}
return sum;
}
public static final int SZ = 20_000_000;
//求和
public static final long CHECK = (long)SZ * ((long) SZ + 1)/2;
public static void main(String[] args) {
System.out.println(CHECK);
long[] la = new long[SZ + 1];
//根据第二个参数generator来对生成一个数组长度大小的有序队列,并将值其数组值替换,如果generator为null,抛出异常
Arrays.parallelSetAll(la, i -> i);
Summing.timeTest("Array Stream Sum", CHECK, () -> Arrays.stream(la).sum());
Summing.timeTest("Parallel", CHECK, () -> Arrays.stream(la).parallel().sum());
Summing.timeTest("Basic Sum", CHECK, () -> basicSum(la));
Summing.timeTest("ParallelPrefix",CHECK,()->{
//这个方法会将数组的前几位的值求和并加上第i位的值赋值给la[i];la[la.length - 1]的值就是求和
Arrays.parallelPrefix(la, Long::sum);
return la[la.length - 1];
});
}
}
/* output 200000010000000 Array Stream Sum: 25ms Parallel: 16ms Basic Sum: 19ms ParallelPrefix: 98ms */
第一个限制是内存大小,因为数组是预先分配的,所以没有办法创建与之前版本一样大的的。并行化可以加快速度,甚至比使用Basic Sum 循环还快一点,但是有趣的是:ParallelPrefix 反而最慢(因为它将前几位的值都算过和了)。
由于我们正在处理线程,因此我们必须将任何跟踪信息捕获到并发数据结构中。
下面的代码演示一堆线程都从一个生成器中取值,然后以limit 选择有限的结果集。
public class ParallelStreamPuzzle {
//新的线程安全队列
public static final Deque<String> TRACE = new ConcurrentLinkedDeque<>();
/** * int整数的生成器,实现了supplier。 */
static class IntGenerator implements Supplier<Integer> {
//原子类,
private AtomicInteger currentValue = new AtomicInteger();
@Override
public Integer get() {
//这一步会有多个线程进来调用 add 方法,但是ConcurrentLinkedDeque 是线程安全的,所以会轮流执行
TRACE.add(currentValue.get() + ": " + Thread.currentThread().getName());
//会让等待的线程依次执行。
return currentValue.getAndIncrement();
}
}
public static void main(String[] args) throws IOException {
List<Integer> x = Stream.generate(new IntGenerator())
.limit(10)
.parallel()
.collect(Collectors.toList());
System.out.println(x);
Files.write(Paths.get("PSP.txt"), TRACE);
}
}
/* output: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] */
上面代码的结果其实只有我第一次运行时是这样的,之后每次运行的每次结果都不一样。它的执行流程是这样的:一个流抽象出无限序列,按需生成。当你要求它产生并行流时,所有这些线程都尽可能的调用get(),加入limit()后,代表只需这几个。但是根据写出的 txt 可以观察到,尽管只需10个值,但是它也生产了1024个元素。
currentValue 是使用线程安全的 AtomicInteger 类定义的,可以防止竞争而对值产生的破坏。
如果你想生成一个int 流,你可以使用 IntStream.range().
public class ParallelStreamPuzzle2 {
public static void main(String[] args) {
List<Integer> x = IntStream.range(0,30)
.peek(e->System.out.println(e+": "+ Thread.currentThread().getName()))
.limit(10)
.parallel()
.boxed()
.collect(Collectors.toList());
System.out.println(x);
}
}
/* output: 8: main 2: ForkJoinPool.commonPool-worker-6 1: ForkJoinPool.commonPool-worker-3 7: ForkJoinPool.commonPool-worker-6 6: ForkJoinPool.commonPool-worker-5 4: ForkJoinPool.commonPool-worker-1 3: ForkJoinPool.commonPool-worker-7 0: ForkJoinPool.commonPool-worker-4 5: main 9: ForkJoinPool.commonPool-worker-2 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] */
添加的peek()可以验证parallel()确实有效。boxed()可以接受 int 流并将其封装打包为Integer 流。
现在我们可以得到多个线程产生的不同的值,但它只产生10个请求的值,而不是1024个再从里边取值。
至于打印的数据中 ForkJoinPoolcommonPool 是在实现多线程任务使用的默认线程池。
至此,parallel 的使用介绍就结束了,在这里总结一下,看到了大佬firefly_的总结这里借鉴一下:地址在此
parallelStream 适用的场景是CPU密集型的,只是做到别浪费CPU,假如本身电脑CPU的负载很大,那还到处用并行流,那并不能起到作用;
1. I/O密集型 磁盘I/O、网络I/O都属于I/O操作,这部分操作是较少消耗CPU资源,一般并行流中不适用于I/O密集型的操作,就比如使用并流行进行大批量的消息推送,涉及到了大量I/O,使用并行流反而慢了很多
2. CPU密集型 计算类型就属于CPU密集型了,这种操作并行流就能提高运行效率。
3. 不要在多线程中使用parallelStream,原因同上类似,大家都抢着CPU是没有提升效果,反而还会加大线程切换开销;
会带来不确定性,请确保每条处理无状态且没有关联;
4. 考虑NQ模型:N可用的数据量,Q针对每个数据元素执行的计算量,乘积 N*Q 越大,就越有可能获得并行提速。当N * Q > 10000(大概是集合大小1000以上)就会获得有效提升;
5. parallelStream是创建一个并行的Stream,而且它的并行操作是不具备线程传播性的,所以是无法获取ThreadLocal创建的线程变量的值;
在使用并行流的时候是无法保证元素的顺序的,也就是即使你用了同步集合也只能保证元素都正确但无法保证其中的顺序;
6. lambda的执行并不是瞬间完成的,所有使用parallel stream的程序都有可能成为阻塞程序的源头,并且在执行过程中程序中的其他部分将无法访问这些workers,这意味着任何依赖parallel streams的程序在什么别的东西占用着common ForkJoinPool时将会变得不可预知并且暗藏危机。
4、创建和运行任务
在上面最后的几个例子中可以看到,有的情况无法通过并行流实现并发,这时候就必须创建并运行自己的任务。稍后我们会学习 Java 8 新添加的 CompletableFutrue,但现在先使用一些更基本的概念。
4.1 Tasks 和 Executors
在Java 的早期版本中,你通过直接创建自己的 Thread 对象来使用线程(详见并发底层),甚至将它们子类化以创建你自己的特定“任务线程”对象。这种情况你手动调用了构造函数并自己启动了线程。
创建这些线程的开销是很重要的,所以不推荐手动操作方法。在 Java 5 中,添加了线程池,你可以将要执行的类的类型交给 ExecutorService 以运行该任务,而不是为每种不同类型的任务创建新的 Thread 子类型。ExecutorService 为你管理线程,并且在运行任务后重新循环线程而不是丢弃线程。
public class NapTask implements Runnable{
final int id;
public NapTask(int id) {
this.id = id;
}
@Override
public void run() {
new Nap(0.1);
System.out.println(this + " " + Thread.currentThread().getName());
}
@Override
public String toString() {
return "NapTask{" +
"id=" + id +
'}';
}
}
/***********************************************************/
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
IntStream.range(0, 10)
.mapToObj(NapTask::new)
.forEach(exec::execute);
System.out.println("All tasks submitted");
exec.shutdown();
while (!exec.isTerminated()) {
System.out.println(Thread.currentThread().getName() + " awaiting termination");
new Nap(0.1);//sleep一会儿
}
}
}
/* output All tasks submitted main awaiting termination main awaiting termination NapTask{id=0} pool-1-thread-1 NapTask{id=1} pool-1-thread-1 main awaiting termination main awaiting termination NapTask{id=2} pool-1-thread-1 main awaiting termination NapTask{id=3} pool-1-thread-1 main awaiting termination NapTask{id=4} pool-1-thread-1 main awaiting termination NapTask{id=5} pool-1-thread-1 NapTask{id=6} pool-1-thread-1 main awaiting termination main awaiting termination NapTask{id=7} pool-1-thread-1 main awaiting termination NapTask{id=8} pool-1-thread-1 NapTask{id=9} pool-1-thread-1 main awaiting termination */
Executors.newSingleThreadExecutor();是Executors 中的一个工厂方法,它创建特定类型的 ExecutorService。
创建了十个 NapTasks 并将它们提交给 ExecutorService,这意味着它们开始自己运行。但是,main 继续做自己的事。当运行exec.shutdown();时,它告诉 ExecutorService 完成并提交的事物,但不再接受任何新任务。此时,这些任务仍在运行,当所有执行完后,exec.isTerminated()变为true。
注意:main()中线程的名称是main,并且只有一个其他线程:pool-1-thread-1。此外,交错输出显示两个线程确实在同时运行。
其实,如果你只是调用exec.shutdown();程序就可以完成所有任务。也就是说,不需要后面的判断了。
4.2 使用更多线程
使用线程的重点是更快的完成任务,那么我们为什么要限制自己使用 SingleThreadExecutor 呢?查看Executors的JavaDoc,你可以发现更多选项。
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec
=Executors.newCachedThreadPool();
IntStream.range(0, 10)
.mapToObj(NapTask::new)
.forEach(exec::execute);
exec.shutdown();
}
}
/* output: NapTask[7] pool-1-thread-8 NapTask[4] pool-1-thread-5 NapTask[1] pool-1-thread-2 NapTask[3] pool-1-thread-4 NapTask[0] pool-1-thread-1 NapTask[8] pool-1-thread-9 NapTask[2] pool-1-thread-3 NapTask[9] pool-1-thread-10 NapTask[6] pool-1-thread-7 NapTask[5] pool-1-thread-6 */
当你运行这个程序时,你会发现它运行的时间更短一些。这是因为每个任务都有自己的线程,它们都并行运行,而不需要等待唯一一个线程来运行每个任务。但是,那为什么还要用到 SingleThreadExecutor呢?
看一个更复杂的任务:
public class InterferingTask implements Runnable{
final int id;
private static Integer val = 0;
public InterferingTask(int id) {
this.id = id;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
val++;
}
System.out.println(id + " " + Thread.currentThread().getName()+" "+ val);
}
}
/*=======================================*/
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
IntStream.range(0, 10)
.mapToObj(InterferingTask::new)
.forEach(exec::execute);
exec.shutdown();
}
}
/* output: 1 pool-1-thread-2 100 2 pool-1-thread-3 300 3 pool-1-thread-4 200 0 pool-1-thread-1 100 6 pool-1-thread-7 400 4 pool-1-thread-5 500 7 pool-1-thread-8 600 9 pool-1-thread-10 700 5 pool-1-thread-6 800 8 pool-1-thread-9 900 */
//===========================================
public class SingleThreadExecutor3 {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
IntStream.range(0, 10)
.mapToObj(InterferingTask::new)
.forEach(exec::execute);
exec.shutdown();
}
}
/* output = 0 pool-1-thread-1 100 1 pool-1-thread-1 200 2 pool-1-thread-1 300 3 pool-1-thread-1 400 4 pool-1-thread-1 500 5 pool-1-thread-1 600 6 pool-1-thread-1 700 7 pool-1-thread-1 800 8 pool-1-thread-1 900 9 pool-1-thread-1 1000 */
第一个类的任务就是使val 增加一百次。但是使用 CachedThreadPool 的输出并不是我们所期望的,并且从一次运行到下一次运行结果也不同。问题在于同时运行了多个任务,每个任务都试图写入 val 的单个实例,并且没有保护机制,我们称这样的类是线程不安全的。
SingleThreadExecutor 则输出了我们想要的结果,且多次运行结果都是一致的,尽管 InterferingTask 缺乏线程安全性。因为它一次运行一个任务,这些任务之间不会干扰,因此它加强了线程安全性。这种现象称为线程封闭,因为在单线程上运行任务限制了加速,但可以减少出现错误的调试和重写。
4.3 产生结果
InterferingTask 实现了Runnable ,所以没有返回值。在操作共享变量的过程中,多个任务同时修改同一个变量产生竞争。
而为了产生返回结果,我们创建 Callable 而不是 Runnable:
public class CountingTask implements Callable<Integer> {
final int id;
public CountingTask(int id) {
this.id = id;
}
@Override
public Integer call() throws Exception {
Integer val = 0;
for (int i = 0; i < 100; i++) {
val++;
}
System.out.println(id + " " + Thread.currentThread().getName()+" "+ val);
return val;
}
}
//===================================================
public class CachedThreadPool3 {
public static Integer extractResult(Future<Integer> future) {
try {
//返回计算的结果:会在所有任务执行完成后执行,用来检测可能发生的异常
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
List<CountingTask> tasks =
IntStream.range(0, 10)
.mapToObj(CountingTask::new)
.collect(Collectors.toList());
//一次性调用所有任务
List<Future<Integer>> futures = exec.invokeAll(tasks);
//reduce 合并方法:使用提供的标识值和关联累加函数对该流的元素执行归约,并返回归约后的值
Integer sum = futures.stream()
.map(CachedThreadPool3::extractResult)
.reduce(0, Integer::sum);//返回对每个参数运算后的结果
System.out.println("sum = " + sum);
exec.shutdown();
}
}
/* output: 0 pool-1-thread-1 100 3 pool-1-thread-4 100 1 pool-1-thread-2 100 6 pool-1-thread-7 100 2 pool-1-thread-3 100 7 pool-1-thread-8 100 5 pool-1-thread-6 100 4 pool-1-thread-5 100 8 pool-1-thread-9 100 9 pool-1-thread-10 100 sum = 1000 */
call()完全独立于所有其他 CountingTask 生成其结果,这意味着没有可变的共享状态(因为这里调用了new,构造了多个构造函数)。
ExecutorService 允许你使用 exec.invokeAll()启动集合中的每个 Callable。
只有在所有任务完成后,exec.invokeAll()才会返回一个 Future 列表(这是使用 future 的缺点,所以现在有了 CompletableFutrue),每个任务一个 Future。Future 是java 5 引入的,允许你提交任务而无需等待其完成。
future.get() 如果在尚未完成任务的 Future上调用,就会阻塞,直到结果可用(必须等待结果)。在这里,Future 似乎是多余的,因为 invokeAll() 甚至在所有任务完成前都不会返回。但是,这里的 Future 并不用于等待延迟结果,而是用于捕获任何可能发生的异常。
因为当你调用 get()时,Future 会阻塞,所以它只能等待任务完成时才暴露问题。最终,Futures 被认为是一种无效的解决方式。现在已经不鼓励使用它了,而是使用 Java 8 的 CompletableFuture。
public class Futures {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Integer> f = exec.submit(new CountingTask(99));
System.out.println(f.get());
exec.shutdown();
}
}
/* outputs: 99 pool-1-thread-1 100 100 */
这里使用了 ExecutorService.submit(),
此外,我们也可以使用并行 Stream,以更简单更优雅的方式解决这个问题:
public class CountingStream {
public static void main(String[] args) {
System.out.println(
IntStream.range(0,10)
.parallel()
.mapToObj(CountingTask::new)
.map(countingTask -> {
try {
return countingTask.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.reduce(0,Integer::sum)
);
}
}
/* output: 1 pool-1-thread-2 100 0 pool-1-thread-1 100 3 pool-1-thread-4 100 2 pool-1-thread-3 100 5 pool-1-thread-6 100 4 pool-1-thread-5 100 6 pool-1-thread-7 100 7 pool-1-thread-8 100 8 pool-1-thread-9 100 9 pool-1-thread-10 100 sum = 1000 */
4.4 Lambda和方法引用
在Java 8 有了lambdas 和方法引用后,你不需要受限于只能使用 Runnable 和 Callable。因为lambdas 和方法引用可以通过匹配方法签名来使用,所以我们将非 Runnable 或 Callable 的参数传递给 ExecutorService:
class NotRunnable {
public void go() {
System.out.println("NotRunnable");
}
}
class NotCallable {
public Integer get() {
System.out.println("NotCallable");
return 1;
}
}
public class LambdasAndMethodReferences {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
exec.submit(() -> System.out.println("lambda1"));
exec.submit(new NotRunnable()::go);
exec.submit(() -> {
System.out.println("lambda2");
return 1;
});
exec.submit(new NotCallable()::get);
exec.shutdown();
}
}
/* output: lambda1 NotRunnable lambda2 NotCallable */
5、终止耗时任务
并发程序通常使用长时间运行的任务。可调用任务在完成时返回值;虽然已经给它一个有限的寿命,但仍然很长。有的任务被设置为永远运行的后台任务。你需要一种方法在正常完成之前停止 Runnable 和 Callable 任务,例如当你关闭程序时。
最初的Java 设计提供了中断运行任务的机制。但是,中断机制包括阻塞问题,中断任务既乱又复杂,因为你必须了解可能发生中断的所有可能状态,以及可能导致的数据丢失。使用中断被视为反对模式。
任务中止的最佳方式是设置任务周期性检查的标志。然后任务可以通过自己的 shutdown 进程正常中止。不是在任务进行中随意关闭线程,而是要求任务在达到一个较好的时机时自行终止。这比中断可以产生更好的结果,以及更易理解更合理的代码。
设置任务时可以设置可视的 boolean flag,以便定期检查标志并执行正常终止。但是,这会涉及到共同的可变状态。
在之前的代码中,很多都使用了 volatile 关键字,这里将使用更简单的技术并避免所有异变的参数。
Java 5 引入了 Atomic 类,它提供了一组不必担心并发问题的类型。
public class QuittableTask implements Runnable {
final int id;
public QuittableTask(int id) {
this.id = id;
}
private AtomicBoolean running = new AtomicBoolean(true);
public void quit() {
running.set(false);
}
@Override
public void run() {
while (running.get()) {
//[1]
new Nap(0.1);
}
System.out.println(id + " ");
}
}
/*================================================*/
public class QuittingTasks {
public static final int COUNT = 150;
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
List<QuittableTask> tasks = IntStream.range(0, COUNT)
.mapToObj(QuittableTask::new)
.peek(qt -> exec.execute(qt))
.collect(Collectors.toList());
new Nap(1);
tasks.forEach(QuittableTask::quit);
exec.shutdown();
}
}
/* output: 71 42 39 35 21 24 30 18 38 50 33 46 41 36 40 52 53 65 76 68 16 77 84 58 63 43 12 47 114 99 98 79 91 100 87 101 95 113 90 104 83 74 82 93 78 96 75 66 54 55 88 92 59 89 67 85 115 62 70 105 80 72 73 81 69 60 61 56 57 49 44 64 45 48 37 25 109 28 111 17 108 29 32 107 110 103 15 102 106 112 7 22 97 0 86 2 5 9 94 6 8 3 10 4 11 23 14 27 1 20 34 19 26 13 31 51 139 117 134 127 125 129 131 124 135 120 132 146 136 138 133 137 143 140 142 144 141 145 147 148 149 118 121 126 116 130 128 119 123 122 */
虽然多个任务可以在同一个实例上成功调用 quit(),但是 AtomicBoolean 可以防止多个任务同时实际修改 running,从而使 quit 成为线程安全的。
第二个程序我们启动了很多 QuittableTasks 然后关闭它们,我们使用 peek 将QuittableTasks 传递给 ExecutorService,然后将这些任务收集到 List.main()中,只要任何任务仍在运行,就会阻止程序退出。即使为每个任务按照顺序调用quit(),任务也不会按它们创建的顺序关闭。所以独立运行的任务不会确定性地相应信号。
6、CompletableFuture 类
作为介绍,这里使用 CompletableFuture 来实现 QuitTasks 的任务:
public class QuittingCompletable {
public static void main(String[] args) {
List<QuittableTask> tasks = IntStream.range(0, QuittingTasks.COUNT)
.mapToObj(QuittableTask::new)
.collect(Collectors.toList());
List<CompletableFuture<Void>> cfutures =
tasks.stream()
/* * completableFuture和ExecutorService实现多线程的区别在于前者可以更好的适应流式编程,而 * runAsync可以实现异步执行这些任务,也就是一个CompletableFuture对应一个任务。 */
.map(CompletableFuture::runAsync)
.collect(Collectors.toList());
new Nap(1);
tasks.forEach(QuittableTask::quit);
cfutures.forEach(CompletableFuture::join);
}
}
/* output: 4 2 7 0 1 5 3 6 13 12 10 11 9 8 19 18 17 16 15 14 25 24 23 22 21 31 32 20 33 34 30 29 28 27 26 40 39 38 44 37 36 35 47 46 45 43 42 41 53 52 51 50 49 48 59 58 57 56 55 54 65 64 63 62 61 60 71 70 69 68 67 66 77 76 75 74 73 72 83 82 81 80 79 78 89 88 87 86 93 94 85 84 98 99 100 101 102 97 96 95 92 91 90 108 107 106 105 104 103 114 113 112 111 110 109 120 121 122 119 118 117 116 115 128 127 126 125 124 123 134 133 132 131 130 140 129 141 139 138 137 136 135 147 146 145 144 143 142 149 148 */
任务是List<QuittableTask>,就和在 QuittingTasks.java 中一样,但是在这个例子中,没有peek()将每个 QuittableTask 提交给 ExecutorService。而是在创建 cfutures 期间,每个任务都交给CompletableFuture::runAsync,并返回 CompletableFuture。因为底层的run()不返回任何内容,所以使用 CompletableFuture 调用join()来等待它完成。
在本例中需要注意的重要一点是:运行任务不需要使用 ExecutorService,而是直接交给 CompletableFuture 管理,你也不需要shutdown();事实上,除非你显式的调用 join(),否则程序将会尽快退出,而不必等待任务完成。
6.1 基本用法
下面将学习更多,以下是带有静态方法work()的类,它对该类的对象执行work():
//一个对象,但是没有实现Runnable或者Callable
public class Machina {
public enum State{
//State 初始
STATE,ONE,TWO,THREE,END;
State step() {
if (equals(END)) {
return END;
}
return values()[ordinal() + 1];
}
}
private State state = State.STATE;
private final int id;
public Machina(int id) {
this.id = id;
}
//将机器从一个状态移动到下一个状态。并且需要0.1 秒才能完成工作。
public static Machina work(Machina machina) {
if (machina.state.equals(State.END)) {
new Nap(0.1);
machina.state = machina.state.step();
}
System.out.println(machina);
return machina;
}
@Override
public String toString() {
return "Machina " + "id=" + id + (state.equals(State.END) ? "Complete" : state);
}
}
/***************************************************************/
public class CompletedMachina {
public static void main(String[] args) {
//completedFuture将会返回一个根据给的值而完成的CompletableFuture
CompletableFuture<Machina> cf = CompletableFuture.completedFuture(new Machina(0));
try {
Machina m = cf.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
work()方法将机器从一个状态移动到下一个状态。并且需要0.1 秒才能完成工作。
CompletableFuture 调用completedFuture()将传入的对象进行包装,它创建(返回)一个“已经完成”的 CompletableFuture 。
通常,get()在等待结果时会阻塞调用线程。但是此块不会再阻塞了,因为 CompletableFuture 已经完成了,所以它的结果立即可用。
import com.gui.demo.thingInJava.concurrency.share.Timer;
import java.util.concurrent.CompletableFuture;
/** * thenApply的链式使用 * @Date 2021/8/23 14:39 * @Created by gt136 */
public class CompletableApplyChained {
public static void main(String[] args) {
Timer timer = new Timer();
/* * thenApply用来接受一个输入并产生输出函数。在本例中,work()函数产生的类型与它所接收的相同(Machina), * 因此每个CompletableFuture 添加的操作的返回类型都为 Machina,但是(类似流中的`map()`)该函数也可以返回不同的类型, * 这将体现在返回类型中。 */
CompletableFuture<Machina> cf = CompletableFuture.completedFuture(new Machina(0))
.thenApply(Machina::work)//你可以在此处看到有关CompletableFutures 的重要信息,它们会在你执行操作时自动解包并重新包装它们所携带的对象。这使得编写和理解代码更容易。
.thenApply(Machina::work)
.thenApply(Machina::work)
.thenApply(Machina::work);
System.out.println(timer.duration());
}
}
/* outputs: Machina id=0: ONE Machina id=0: TWO Machina id=0: THREE Machina id=0: Complete 535 */
这里我们添加了一个Timer,它的功能在每一步都显性的增加100ms 等待时间,还将CompletableFuture 内部的 thenApply 带来的额外开销给体现出来了。CompletableFutures 的一个重要好处是它们鼓励使用私有子类原则(不共享任何东西)。默认情况下,使用 thenApply 来应对一个不对外通信的函数----它只需要一个参数并返回一个结果。这也是函数式编程的基础,并且它在并发特性方面非常有效。并行流和CompletableFutures旨在支持这些原则。只要你不决定共享数据你就可以写出相对安全的并发程序。
回调一个thenApply() 一旦开始一个操作,那么在完成所有任务之前,它就不会完成 CompletableFuture 的构建。虽然有时这很有用,但是启动所有的任务通常更有价值,以便你可以在它们运行时继续做其他事情。我们可以通过thenApplyAsync()来实现这个目的:
public class CompletableApplyAsync {
public static void main(String[] args) {
Timer timer = new Timer();
CompletableFuture<Machina> cf = CompletableFuture.completedFuture(new Machina(0))
.thenApplyAsync(Machina::work)
.thenApplyAsync(Machina::work)
.thenApplyAsync(Machina::work)
.thenApplyAsync(Machina::work);
System.out.println(timer.duration());
System.out.println(cf.join());
System.out.println(timer.duration());
}
}
/* outputs: 50 Machina id=0: ONE Machina id=0: TWO Machina id=0: THREE Machina id=0: Complete Machina id=0: Complete 484 */
同步调用(我们通常做的那样)意味着“当你工作结束时才返回”,而异步调用意味着“立即返回并继续后续工作”。正如你所看到的,cf 现在的创建速度非常快,每次调用thenApplyAsync()都会立即返回,因此可以进入下一个调用,整个方法调用完成的很快。
事实上,如果没有回调 cf.join() ,程序会在完成其工作之前就退出。而 cf.join() 可以直到 cf 操作完成之前,阻止 main 进程结束。我们也可以看到本例大部分时间消耗在cf.join() 上。
这种“立即返回”的异步能力是靠 CompletableFuture 库进行一些“秘密”地工作完成的。尤其是,它将你需要的操作链存储为一组回调。当操作的第一个链路(后台操作)完成并返回时,第二个链路(后台)必须获取生成的 Machina 并开始工作,以此类推。但是这种异步机制没有我们通过程序调用栈控制的序列,它的调用链路顺序会丢失,因此它使用一个函数地址表来存储回调解决这个问题。
这就是你需要了解的回调的全部信息,通过异步,CompletableFuture 帮你管理所有回调。
6.1.1 其他操作
下面的示例展示了所有“基本”操作,这些操作不涉及组合两个 CompletableFuture,也不涉及异常(后面介绍)。首先,为了提供简洁性和方便性,我们应该重用一下两个实用程序:
public class CompletableUtilities {
//获取和显示存储在cf中 的值
public static void showr(CompletableFuture<?> cf) {
try {
System.out.println(cf.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
//对于没有值的cf操作
public static void voidr(CompletableFuture<Void> cf) {
try {
cf.get();//返回void
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
showr()在 CompletableFuture 上调用 get(),并显示结果。voidr() 是 CompletableFuture 的showr 版本,也就是说,它只在任务完成或失败时显示信息。
import static com.gui.demo.thingInJava.concurrency.completablefutures.CompletableUtilities.*;
/** * @Classname CompletableOperations * @Description * @Date 2021/8/23 18:25 * @Created by gt136 */
public class CompletableOperations {
//`cfi()`是一个便利方法,它把一个整数包装在 CompletableFuture<Integer> 中
static CompletableFuture<Integer> cfi(int i) {
return CompletableFuture.completedFuture(
Integer.valueOf(i)
);
}
public static void main(String[] args) {
//测试showr() 可以使用
showr(cfi(1));
//调用runAsync,由于 Runnable 不产生返回值,因此使用了返回 CompletableFuture <Void> 的voidr() 方法。这里推荐使用方法引用(因为是静态方法)
voidr(cfi(2).runAsync(()-> System.out.println("runAsync")));
//runAsync和thenRunAsync功能似乎相同。
voidr(cfi(3).thenRunAsync(()-> System.out.println("thenRunAsync")));
//runAsync是静态方法,推荐这样使用
voidr(CompletableFuture.runAsync(()-> System.out.println("runAsync is static")));
//supplyAsync也是静态的,但是它需要一个 Supplier 而不是一个 Runnable,并产生一个CompletableFuture<Integer>而不是CompletableFuture<Void>。
showr(CompletableFuture.supplyAsync(() -> 99));
//与thenRunAsync不同,`cfi(4)`,`cfi(5)`和`cfi(6)`中的“then”方法的参数是未包装的 Integer。
//thenAcceptAsync 接受一个 Consumer,因此不会产生结果,返回一个 CompletionStage
voidr(cfi(4).thenAcceptAsync(i -> System.out.println("thenAcceptAsync" + i)));
//thenApplyAsync 接收一个Function,并产生一个CompletionStage(该结果的类型可以不同于其输入类型)
showr(cfi(5).thenApplyAsync(i -> i + 42));
//thenComposeAsync 与 thenApplyAsync 非常相似,唯一区别在于其 Function 必须产生已经包装在 Completable 中的结果。
showr(cfi(6).thenComposeAsync(i -> cfi(i + 99)));
//
CompletableFuture<Integer> c = cfi(7);
c.obtrudeValue(111);//重置值
showr(c);
//从CompletionStage 生成一个 CompletableFuture
showr(cfi(8).toCompletableFuture());
c = new CompletableFuture<>();
//通过给它一个结果来完成一个任务(Future),与obtrudeValue()相反,后者可能会迫使其结果替换该结果
c.complete(9);
showr(c);
//如果已经完成此任务,则正常结束,如果尚未完成,则使用CancellationException 完成此CompletableFuture
c.cancel(true);
System.out.println("canceled: " + c.isCancelled());
System.out.println("completed exceptionally: " + c.isCompletedExceptionally());
System.out.println("done: " + c.isDone());
System.out.println(c);
c = new CompletableFuture<>();
//如果任务(Future)完成,则返回CompletableFuture 的完成值,否则返回getNow() 的替换值。
System.out.println(c.getNow(777));
c = new CompletableFuture<>();
c.thenApplyAsync(i -> i + 42)
.thenApplyAsync(i -> i * 12);
//
System.out.println("dependents: " + c.getNumberOfDependents());
c.thenApplyAsync(i -> i / 2);
System.out.println("dependents: " + c.getNumberOfDependents());
}
}
/* outputs: 1 runAsync runRunAsync runAsync is static 99 thenAcceptAsync4 47 105 111 8 9 canceled: true completed exceptionally: true done: true [email protected][Completed exceptionally] 777 dependents: 1 dependents: 2 */
main()包含一系列可由其 int 值引用测试:
注意使用
cfi(3)的thenRunAsync 效果似乎与 2 相同,差异在于:runAsync()是一个 static 方法,所以你通常不会像在cfi(2)中那样调用它,而是像使用方法引用那样使用它。
后续测试的supplyAsync()也是静态方法,区别在于它需要一个 Supplier 而不是一个 Runnable,并产生一个CompletableFuture<Integer>而不是CompletableFuture<Void>。最后的依赖(dependence):如果我们将两个
thenApplyAsync()调用链路到 CompletableFuture 上,则依赖项的数量不会增加,还是1。但是,如果我们另外将一个thenApplyAsync()直接附加到 c,则现在有两个依赖项:两个一起的链路和另一个单独附加的链路。这表明你可以使用一个 CompletionStage,当其完成时,可以根据其结果派生多个新任务。
6.2 结合 CompletableFuture
CompletableFuture 的第二种类型的方法实现将两种 CompletableFuture 以不同的方式将它们组合在一起。就像两个人比赛一样,一个CompletableFuture 总是比另一个更早地到达终点。这些方法允许你以不同的方式处理结果。
为了测试这一点,我们将创建一个任务(Workable),它将完成的时间作为其参数之一,因此我们可以控制那个 CompletableFuture 先完成:
public class Workable {
String id;
final double duration;
public Workable(String id, double duration) {
this.id = id;
this.duration = duration;
}
@Override
public String toString() {
return "Workable{" + id + '}';
}
public static Workable work(Workable tt) {
//睡眠传入的参数的时间
new Nap(tt.duration);
tt.id = tt.id + "W";
System.out.println(tt);
return tt;
}
public static CompletableFuture<Workable> make(String id, double duration) {
//和上例一样不多赘述
return CompletableFuture.completedFuture(new Workable(id, duration))
.thenApplyAsync(Workable::work);
}
}
在make()中,work()方法应用于 CompletableFuture。work()需要一定的时间才能完成,然后它将字母 W 附加到 id 上,表示工作已经完成。
现在我们可以创建多个竞争的 CompletableFuture,并使用 CompletableFuture 中的各种方法来连接它们:
import static com.thingInJava.concurrency.completablefutures.CompletableUtilities.*;
/** * @Classname DualCompletableOperations * @Description * @Date 2021/8/26 15:21 * @Created by gt136 */
public class DualCompletableOperations {
static CompletableFuture<Workable> cfA, cfB;
static void init() {
cfA = Workable.make("A", 0.15);
cfB = Workable.make("B", 0.10);//总是赢
}
static void join() {
cfA.join();
cfB.join();
System.out.println("**********************");
}
public static void main(String[] args) {
init();
voidr(cfA.runAfterEitherAsync(cfB,()->
System.out.println("runAfterEitherAsync")));
join();
init();
voidr(cfA.runAfterBothAsync(cfB,()->{
System.out.println("runAfterBothAsync");
}));
join();
init();
showr(cfA.applyToEitherAsync(cfB,w->{
System.out.println("applyToEitherAsync:" + w);
return w;
}));
join();
init();
voidr(cfA.acceptEitherAsync(cfB,w->{
System.out.println("acceptEitherAsync: " + w);
}));
join();
init();
voidr(cfA.thenAcceptBothAsync(cfB,(w1,w2)->{
System.out.println("thenAcceptBothAsync: " + w1 + ", " + w2);
}));
join();
init();
showr(cfA.thenCombineAsync(cfB,(w1,w2)->{
System.out.println("thenCombineAsync: " + w1 + ", " + w2);
return w1;
}));
join();
init();
CompletableFuture<Workable>
cfC = Workable.make("C", 0.08),
cfD = Workable.make("D", 0.09);
CompletableFuture.anyOf(cfA, cfB, cfC, cfD)
.thenRunAsync(() -> System.out.println("anyOf"));
join();
init();
cfC = Workable.make("C", 0.08);
cfD = Workable.make("D", 0.09);
CompletableFuture.allOf(cfA, cfB, cfC, cfD)
.thenRunAsync(() -> System.out.println("allOf"));
join();
}
}
/* outputs: Workable{BW} runAfterEitherAsync Workable{AW} ********************** Workable{BW} Workable{AW} runAfterBothAsync ********************** Workable{BW} applyToEitherAsync:Workable{BW} Workable{BW} Workable{AW} ********************** Workable{BW} acceptEitherAsync: Workable{BW} Workable{AW} ********************** Workable{BW} Workable{AW} thenAcceptBothAsync: Workable{AW}, Workable{BW} ********************** Workable{BW} Workable{AW} thenCombineAsync: Workable{AW}, Workable{BW} Workable{AW} ********************** Workable{CW} Workable{DW} anyOf Workable{BW} Workable{AW} ********************** Workable{CW} Workable{DW} Workable{BW} Workable{AW} ********************** allOf */
- 为了方便访问,将 cfA 和cfB 定义为 static 的。
init()方法用于 A、B 初始化这两个变量,因为 B 总是给出比 A 较短的延迟,所以总是 win 的一方。join()是在两个方法上调用 并显示边框(打印*)的另一个便利方法。 - 所有这些“dual”(对偶)方法都以一个 CompletableFuture 作为调用该方法的对象,并将第二个 CompletableFuture 作为第一个参数,然后是要执行的操作。
- 通过使用
showr()和voidr()可以看到,“run” 和“accept” 是终端操作,而“apply”和“combine”则生成新的 payload-bearing(承载负载)的CompletableFuture。 - 方法的功能是不言自明的,你可以通过查看输出来验证这一点。一个特别有趣的方法是
combineAsync(),它等待两个 CompletableFuture 完成,然后将它们都交给 BiFunction 可以将结果加入到最终的 CompletableFuture 的有效负载中。
6.3 异常
7、死锁
由于任务可以被阻塞,因此一个任务有可能卡在等待另一个任务上,而后者又在等待别的任务,这样一直下去,直到这个链条上的任务又在等待第一个任务所持有的资源,使得大家都被锁住,没有那个线程能继续,这称为死锁。
真正问题在于,程序看起来良好,但是具有潜在的死锁风险。这时,死锁可能发生,事前却没有任何征兆且很难重现。因此在编写并发程序的时候,进行仔细的程序设计以防止死锁是关键部分。
“哲学家进餐问题”是经典的死锁例证,基本描述指定了五位哲学家,他们的餐具数量有限,每人只有一根筷子(哲学家数量与筷子数量相同),当一个哲学家进餐时,必须同时持有左边的和右边的筷子。如果任一侧的哲学家在使用筷子,则这个哲学家必须等待,直到得到必须的筷子。
public class StickHolder {
private static class Chopstick {
}
private Chopstick stick = new Chopstick();
private BlockingQueue<Chopstick> holders = new ArrayBlockingQueue<>(1);
public StickHolder() {
putDown();
}
public void pickUp() {
try {
holders.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void putDown() {
try {
holders.put(stick);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
StickHolder 类通过将单根筷子保持在大小为 1 的 BlockingQueue 中来管理它。BlookingQueue 是一个设计用于在并发编程中安全使用的集合,如果你调用 take()并且队列为空,则它将阻塞。将新元素放入队列后,将释放该块并返回该值。
为简单起见,ChopStick(static)实际上不是由 StickHolder 生成的,而是在其类中保持私有的。
如果你调用了pickUp(),而 stick 不可用,那么将阻塞该 stick,直到另一个哲学家调用 putDown()将 stick 返回。
注意:该类中的所有线程安全都是通过 BlockingQueue 实现的。
public class Philosopher implements Runnable{
private final int seat;
private final StickHolder left,right;
public Philosopher(int seat, StickHolder left, StickHolder right) {
this.seat = seat;
this.left = left;
this.right = right;
}
@Override
public String toString() {
return "P" + "seat=";
}
@Override
public void run() {
while (true) {
System.out.println("Thinking");
right.pickUp();
left.pickUp();
System.out.println(this + " eating");
right.putDown();
left.putDown();
}
}
}
每个哲学家都是一项任务,他们试图把筷子分别 pickUp()在左手和右手上,这样才能吃东西,然后通过putDown()放下 stick。
没有两个哲学家可以同时成功调用take()同一只筷子。且如果一个哲学家已经拿起筷子,那么下一个试图拿起同样筷子的哲学家将会阻塞,等待筷子被释放。
public class DiningPhilosophers {
private StickHolder[] sticks;
private Philosopher[] philosophers;
public DiningPhilosophers(int n) {
sticks = new StickHolder[n];
Arrays.setAll(sticks, i -> new StickHolder());
philosophers = new Philosopher[n];
Arrays.setAll(philosophers, i -> new Philosopher(i, sticks[i], sticks[(i + 1) % n]));//[1]
//
philosophers[1] = new Philosopher(0, sticks[0], sticks[1]);//[2]
Arrays.stream(philosophers)
.forEach(CompletableFuture::runAsync);//[3]
}
public static void main(String[] args) {
//
new DiningPhilosophers(5);//[4]
//
new Nap(3, "Shutdown");
}
}
结果就是一个看似无辜的线程陷入了死锁。我在这里使用数组而不是集合,只是因为这组语法更简介:
- 两核以上的计算机很容易产生死锁。
- 在 DiningPhilosophers.java 的构造方法中,每个哲学家都获得一个左右筷子的引用。除最后一个哲学家外,都是通过将哲学家置于下一双筷子之间来初始化,最后一位哲学家得到了第 0 根筷子作为他的右筷子。那是因为最后一位哲学家正坐在第一个哲学家的旁边,而且他们俩都共用 第 0 根筷子。[1] 显示了以 n 为模数选择的右筷子,将最后一个哲学家绕到第一个哲学家旁边。
- 现在,所有哲学家都可以尝试进食,每个哲学家都在等旁边哲学家放下筷子。为了让每个哲学家在[ 3] 上运行,调用
runAsync(),这意味着 DiningPhilosophers 的构造函数立即返回到 [4];如果没有任何东西阻止 main() 完成,程序就会退出,不会做太多事。 Nap 对象阻止 main() 退出,然后在 3 秒后强制退出死锁程序。在给的配置中,哲学家几乎不花时间来思考,因此,他们都争着用筷子,很快就会陷入僵局,你可以改变这个条件:①通过增加 [4] 的值来添加更多哲学家。②在DiningPhilosophers 中取消注释 [1]。
7.1 死锁产生的条件
要修正死锁问题,必须明白,当以下四个条件同时满足时,就会发生死锁:
- 互斥条件:任务使用的资源中至少有一个资源是不能共享的,这里,一根筷子每次只能被一个人用。
- 至少有一个任务它必须持有一个资源且正在等待获取另一个被当前别的任务所持有的资源。也就是说,要发生死锁,哲学家必有有一根筷子且在等另一根。
- 资源不能被任务抢占,任务必须把资源释放当成普通事件。哲学家很礼貌,不会从其他人手中抢筷子。
- 必须有循环等待。
因为必须满足所有条件才会死锁,所以要阻止死锁的话,只要破坏其中一个条件即可。
在此程序中,防止死锁的一种简单方法是打破第四个条件。之所以死锁,是因为每个哲学家都尝试按照特定顺序拾起自己的筷子:先右后左。因此,每个哲学家都有可能在等待左手的同时而握住右手的筷子,从而导致等待。但是,如果其中一位哲学家尝试先拿起左筷子,则该哲学家决不会组织紧邻右方的哲学家拿起筷子,从而排除了循环等待。
当然,避免并发问题的最简单,最好的方法是永远不要共享资源-不幸的是,这并不总是可能的。
边栏推荐
- Stm32mp1 cortex M4 development part 12: expansion board vibration motor control
- [practice] stm32mp157 development tutorial FreeRTOS system 3: FreeRTOS counting semaphore
- Electron checks the CPU and memory performance when the module is introduced
- The R language uses the sink function to export the dataframe data and save it as a CSV file in the specified directory. In practice, if no directory is specified, it will be output to the current wor
- Request and response must know
- The spring recruitment is also terrible. Ali asked at the beginning of the interview: how to design a high concurrency system? I just split
- ADO. Net - invalid size for size property, 0 - ado NET - The Size property has an invalid size of 0
- Style penetration of vant UI components -- that is, some styles in vant UI components cannot be modified
- Shortcut keys accumulated when using various software
- Unity中的地平面简介
猜你喜欢

Electron checks the CPU and memory performance when the module is introduced

Qsort sort string

Observation on the salary data of the post-90s: poor, counselled and serious

The spring recruitment is also terrible. Ali asked at the beginning of the interview: how to design a high concurrency system? I just split

Stm32mp1 cortex M4 development part 13: external interrupt of expansion board key

Abstractqueuedsynchronizer (AQS) source code detailed analysis - semaphore source code analysis

Float floating layout clear floating

Wechat applet
![[practice] stm32mp157 development tutorial FreeRTOS system 6: FreeRTOS list and list items](/img/28/51be35224959b1bf70f4edc8a54ff4.png)
[practice] stm32mp157 development tutorial FreeRTOS system 6: FreeRTOS list and list items

TC software detailed design document (mobile group control)
随机推荐
音视频格式简介、编解码、音视频同步
Source insight shortcut key cross reference
R language uses strptime function, format to format output parameters, and R environment options parameters to convert strings into time objects (the output time information includes time, minutes, se
如何选择嵌入式练手项目、嵌入式开源项目大全
C#中的list操作入门
Retrofit扩展阅读
Concurrency - condition variable
Merge sort of sorting
111. solve the problem of prohibiting scripts from running on vs code. For more information, see error reporting
JS resource disaster recovery
Summary of Web automated testing
109. use of usereducer in hooks (counter case)
Observation on the salary data of the post-90s: poor, counselled and serious
微信小程序
嵌入式软件项目流程、项目启动说明书(示例)
finally block can not complete normally
stm32mp1 Cortex M4开发篇12:扩展板震动马达控制
Storage of C language integer in memory
远程办公市场调查报告
Abstractqueuedsynchronizer (AQS) source code detailed analysis - condition queue process analysis