当前位置:网站首页>并发编程系列之FutureTask源码学习笔记
并发编程系列之FutureTask源码学习笔记
2022-07-01 21:45:00 【51CTO】
并发编程系列之FutureTask源码学习笔记
1、什么是FutureTask类?
在上一章节的学习中,我们知道了Future类的基本用法,知道了Future其实就是为了监控线程任务执行的,接着本博客继续学习FutureTask。然后什么是FutureTask类?
Future是1.5版本引入的异步编程的顶层抽象接口,FutureTask则是Future的基础实现类。同时FutureTask还实现了Runnable接口,所以FutureTask也可以作为一个独立的Runnable任务。
2、使用FutureTask封装Callable任务
线程中是不能直接传入Callable任务的,所以需要借助FutureTask,FutureTask可以用来封装Callable任务,下面给出一个例子:
package com.example.concurrent.future;
import java.util.Random;
import java.util.concurrent.*;
/**
* <pre>
* FutureTask例子
* </pre>
* <p>
* <pre>
* @author nicky.ma
* 修改记录
* 修改后版本: 修改人: 修改日期: 2021/08/28 18:04 修改内容:
* </pre>
*/
public class FutureTaskExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask(new CallableTask());
Thread t = new Thread(futureTask);
t.start();
System.out.println(futureTask.get());
}
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception{
Thread.sleep(1000L);
return new Random().nextInt();
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
3、FutureTask UML类图
翻下FutureTask的源码,可以看出实现了RunnableFuture接口
RunnableFuture接口是怎么样的?可以看出其实是继承了Runnable,Future
在idea里画出FutureTask的uml类图:
在这里插入图片描述
所以,可以说FutureTask本质就是一个Runnable任务
4、FutureTask源码学习
- FutureTask类属性
public class FutureTask<V> implements RunnableFuture<V> {
// 状态:存在以下7中状态
private volatile int state;
// 新建
private static final int NEW = 0;
// 任务完成中
private static final int COMPLETING = 1;
// 任务正常完成
private static final int NORMAL = 2;
// 任务异常
private static final int EXCEPTIONAL = 3;
// 任务取消
private static final int CANCELLED = 4;
// 任务中断中
private static final int INTERRUPTING = 5;
// 任务已中断
private static final int INTERRUPTED = 6;
// 支持结果返回的Callable任务
private Callable<V> callable;
// 任务执行结果:包含正常和异常的结果,通过get方法获取
private Object outcome;
// 任务执行线程
private volatile Thread runner;
// 栈结构的等待队列,该节点是栈中的最顶层节点
private volatile WaitNode waiters;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 构造方法
// 传入callable任务
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
// 传入runnable任务、结果变量result
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 是一个Runnable任务,run方法实现
public void run() {
// 两种情况直接返回
// 1:状态不是NEW,说明已经执行过,获取已经取消任务,直接返回
// 2:状态是NEW,将当前执行线程保存在runner字段(runnerOffset)中,如果赋值失败,直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行了给如的Callable任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 异常的情况,设置异常
setException(ex);
}
if (ran)
// 任务正常执行,设置结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 任务被中断,执行中断处理
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
setException方法:
protected void setException(Throwable t) {
// CAS,将状态由NEW改为COMPLETING(中间状态)
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 返回结果
outcome = t;
// 将状态改为EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- get获取执行结果
get超时的方法
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
// unit是时间单位,必须传
if (unit == null)
throw new NullPointerException();
int s = state;
// 超过阻塞时间timeout,抛出TimeoutException
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
重点看下awaitDone方法:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 计算截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
//
boolean queued = false;
// 无限循环,判断条件是否符合
for (;;) {
// 1、线程是否被中断,是的情况,移除节点,同时抛出InterruptedException
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 2、获取当前状态,如果状态大于COMPLETING
// 说明任务完成了,有可能正常执行完成,也有可能是取消了任务
int s = state;
if (s > COMPLETING) {
if (q != null)
// thread置为null 等待JVM gc
q.thread = null;
//返回结果
return s;
}
//3、如果状态处于中间状态COMPLETING
//表示任务已经结束但是任务执行线程还没来得及给outcome赋值
else if (s == COMPLETING) // cannot time out yet
// 这种情况线程yield让出执行权,给其它线程先执行
Thread.yield();
// 4、如果等待节点为空,则构造一个等待节点
else if (q == null)
q = new WaitNode();
// 5、如果还没有入队列,则把当前节点加入waiters首节点并替换原来waiters
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
//如果需要等待特定时间,则先计算要等待的时间
// 如果已经超时,则删除对应节点并返回对应的状态
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞等待特定时间
LockSupport.parkNanos(this, nanos);
}
else
// 让线程等待,阻塞当前线程
LockSupport.park(this);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- cancel取消任务
public boolean cancel(boolean mayInterruptIfRunning) {
// 如果任务已经结束,则直接返回false
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 需要中断任务的情况
if (mayInterruptIfRunning) {
try {
Thread t = runner;
// 调用线程的interrupt来停止线程
if (t != null)
t.interrupt();
} finally { // final state
// 修改状态为INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
finishCompletion方法:
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 无限循环,遍历waiters列表,唤醒节点中的线程,然后将Callable置为null
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
// 置为null,让JVM gc
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
边栏推荐
- Can I choose to open an account for stock trading on flush? Is it safe?
- 旁路由设置的正确方式
- Matlab traverses images, string arrays and other basic operations
- The correct way to set the bypass route
- 从零开始学 MySQL —数据库和数据表操作
- List announced | outstanding intellectual property service team in China in 2021
- Medium pen test questions: flip the string, such as ABCD, print out DCBA
- Mask wearing detection method based on yolov5
- 基于K-means的用户画像聚类模型
- Several ways of writing main function in C
猜你喜欢
mysql 学习笔记-优化之SQL优化
K-means based user portrait clustering model
Application of real estate management based on 3D GIS
【juc学习之路第9天】屏障衍生工具
100年仅6款产品获批,疫苗竞争背后的“佐剂”江湖
In the past 100 years, only 6 products have been approved, which is the "adjuvant" behind the vaccine competition
[intelligent QBD risk assessment tool] Shanghai daoning brings you leanqbd introduction, trial and tutorial
按照功能对Boost库进行分类
Training on the device with MIT | 256Kb memory
指标陷阱:IT领导者易犯的七个KPI错误
随机推荐
Pytest Collection (2) - mode de fonctionnement pytest
Airserver mobile phone third-party screen projection computer software
PCB线路板塞孔工艺的那些事儿~
[monomer] recommended configuration of streaming information i-bpsv3 server
Yan Rong looks at how to formulate a multi cloud strategy in the era of hybrid cloud
MySQL之MHA高可用配置及故障切换
[commercial terminal simulation solution] Shanghai daoning brings you Georgia introduction, trial and tutorial
EasyExcel 复杂数据导出
收到一封CTO来信,邀约面试机器学习工程师
微信小程序,连续播放多段视频。合成一个视频的样子,自定义视频进度条
idea中类中显示成员变量和方法
What is the difference between consonants and Initials? (difference between initials and consonants)
What is the difference between PMP and NPDP?
NIO与传统IO的区别
信标委云原生专题组组长,任重道远!
Burpsuite simple packet capturing tutorial [easy to understand]
Chapter 9 Yunji datacanvas company has been ranked top 3 in China's machine learning platform market
Getting started with the lockust series
"The silk road is in its youth and looks at Fujian" is in the hot collection of works in the Fujian foreign youth short video competition
Training on the device with MIT | 256Kb memory