当前位置:网站首页>ThreadPoolExecutor源码分析
ThreadPoolExecutor源码分析
2022-07-23 16:35:00 【编程小白的笔记分享和感悟】
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
一、ThreadPoolExecutor应用方式
为什么使用线程池而不是线程
在java中,如果每个请求到达就创建一个新线程,开销是相当大的。在实际使用中,服务器在创建和销毁线程上花费的时间和消耗的系统资源都相当大,甚至可能要比在处理实际的用户请求的时间和资源要多的多。除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个jvm里创建太多的线程,可能会使系统由于过度消耗内存或“切换过度”而导致系统资源不足。为了防止资源不足,服务器应用程序需要采取一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁线程的次数,特别是一些资源耗费比较大的线程的创建和销毁,尽量利用已有对象来进行服务,这就是“池化资源”技术产生的原因。
线程池主要用来解决线程生命周期开销问题和资源不足问题。通过对多个任务重复使用线程,线程创建的开销就被分摊到了多个任务上了,而且由于在请求到达时线程已经存在,所以消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使用应用程序响应更快。另外,通过适当的调整线程中的线程数目可以防止出现资源不足的情况。
为什么阿里规范中要求使用ThreadPoolExecutor,而不是Executors?
自己new线程池的操作是阿里的一个规范、如果不去手动的去控制核心参数,就没办法让线程池的性能达到最优的状态,毕竟Executors中提供的线程池,很多都是定死的
可能并不适合现在的业务和我们的CPU硬件,我们的业务有的是IO密集的,有的是CPU密集(什么是IO密集型什么是CPU密集型)的
(总结就是Executors工具中提供的线程池的参数都是定死的,创建的线程池不一定是适合我们的业务或者硬件资源的)
ThreadPoolExecutor创建线程池,可以达到定制化,创建更适合我们自己业务和CPU硬件资源的线程池
二、ThreadPoolExecutor核心参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

- corePoolSize 核心线程数
- maximumPoolSize 最大线程数, 最大线程数-核心线程数等于救急线程数
- keepAliveTime 救急线程的空闲时间
- unit :keepAliveTime的时间单位
- workQueue 工作队列
- threadFactory 线程工厂
- handler :拒绝策略
JDK默认四个拒绝策略
- AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
- CallerRunsPolicy 让调用者运行任务
- DiscardPolicy 放弃本次任务
- DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
可以实现RejectedExecutionHandler接口,自定义拒绝策略
executor和submit的区别
executor只能传Runnable的任务
submit可以传Runnable和Callable,也就说submit可以通过Callable拿到future,获取到执行任务的结果
三、ThreadPoolExecutor执行流程
我们通过execute方法看线程池的执行流程

四、ThreadPoolExecutor状态
4.1 线程池的核心属性ctl


4.2 线程池的状态变换




五、execute方法
第一点核心:通过execute方法,查看线程池整体执行流程,以及一些避免并发情况的判断
第二点核心:为什么线程池会添加一个空任务的非核心线程(救急线程)到线程池
(为了处理工作线程数为0,但是任务队列还有任务的情况)
public void execute(Runnable command) {
//非空判断
if (command == null)
throw new NullPointerException();
//获取ctl的值
int c = ctl.get();
//工作线程个数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 通过addWorker方法添加一个核心线程数去执行command任务
if (addWorker(command, true))
//添加核心线程数成功,返回true,直接返回
return;
// 如果再·并发情况下,添加核心线程失败的线程,需要重新获取一次ctl值
c = ctl.get();
}
//创建核心线程数失败
//判断当前线程池状态是否是RUNNING
//如果是RUNNING,执行offer方法将任务添加到工作队列
if (isRunning(c) && workQueue.offer(command)) {
//添加任务到工作队列成功
//再次重新获取ctl值
int recheck = ctl.get();
//判断当前线程池状态是否是RUNNING状态,如果不是,需要移除工作队列中的任务
if (! isRunning(recheck) && remove(command))
//执行拒绝策略(线程池状态不正确,执行拒绝策略)
reject(command);
//判断工作线程数是否为0
else if (workerCountOf(recheck) == 0)
//工作线程数为0,但是工作队列中有任务在排队
//添加一个空任务,救急线程,处理工作队列中排队的任务
addWorker(null, false);
}
//添加任务到工作队列失败,添加救急线程执行当前任务
else if (!addWorker(command, false))
//添加救急线程失败,执行reject拒绝策略
reject(command);
}
1、首先判断任务是否是空任务,如果是直接抛出异常NullPointerException
2、判断当亲核心线程数用没用完,如果还有核心线程,就把尝试任务交给核心线程,如果交给核心线程成功,直接返回
这个【尝试】这个次很重要,因为并发情况下,核心线程数可能在一瞬间就被抢完了
3、如果在交给核心线程的过程中,核心线程被别人占用了完了,那就重新获取ctl的值,通过高3三位判断线程池是否是RUNNING状态,如果是,就尝试把任务交队列
4、如果线程池不是RUNNING状态,或者任务队列满了,就尝试交给救急线程,如果尝试失败了,直接执行决绝策略
5、如果线程池池RUNNING状态而且尝试把任务交队列成功了,会再次获取ctl的值,再次判断线程池的状态是不是RUNNING,如果不是就尝试把之前添加的任务移除,成功后执行拒绝策略
6、如果不是RUNNING,或者是RUNNING但是尝试移除任务失败了,就判断线程池中是否还有工作的线程,如果没有,就创建一个空任务的救急线程把任务队列的任务清空
六、 addWorker方法
添加工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
//对线程池状态的判断,以及对工作线程数的判断
//外层for循环的标识
retry:
for (;;) {
//获取ctl的值
int c = ctl.get();
// 拿到线程池的状态
int rs = runStateOf(c);
//如果线程池的状态不是RUNNING 而且 没有同时满足【线程池的状态是SHUTDOWN并且任务是空并且队列不为空】的条件就直接返回false (对应 addWorker(null, false))
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))
return false;
for (;;) {
//获取当前工作线程数
int wc = workerCountOf(c);
//判断当前工作线程数是否大于最大值
if (wc >= CAPACITY ||
//如果是核心线程,是否大于corePoolSize ,如果是救急线程,是否大于maximumPoolSize
wc >= (core ? corePoolSize : maximumPoolSize))
// 当前工作线程已经达到最大值了
return false;
//以CAS的方式,对工作线程数+1,如果成功,直接跳出外层for循环
if (compareAndIncrementWorkerCount(c))
break retry;
//重新获取ctl值
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) //基于新获取的ctl拿到线程池状态,判断和之前的rs状态是否一致
continue retry; // 说明并发操作导致线程池状态变化,需要重新判断状态
}
}
//添加工作线程并启动工作线程
//工作线程是否启动
boolean workerStarted = false;
//工作线程是否添加成功
boolean workerAdded = false;
//Worker 就是工作线程
Worker w = null;
try {
// new Worker构建工作线程,将任务扔到Worker中
w = new Worker(firstTask);
//拿到Worker中绑定的Thread的线程
final Thread t = w.thread;
if (t != null) {
//肯定不为null ,健壮性判断
//加锁。。。。(并发情况下,别人可能会执行shutDown)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//基于重新获取的ctl,拿到线程池的状态
int rs = runStateOf(ctl.get());
//如果满足线程池状态为RUNNING,就添加工作线程
if (rs < SHUTDOWN ||
//如果线程池的状态为SHUTDOWN并且传入的任务是null(对应 addWorker(null, false))
(rs == SHUTDOWN && firstTask == null)) {
//添加工作线程
//判断当前线程是否处于run状态(健壮性判断)
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将构建好的Worker对象添加到workers(HashSet)中
workers.add(w);
//获取工作线程个数
int s = workers.size();
//如果现在的工作线程数,大于历史最大的工作线程数,就重新复制给largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
//将工作线程添加的标识设置为true
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
if (workerAdded) {
//添加工作任务成功,启动线程
t.start();
//将工作线程启动的标识设置为true
workerStarted = true;
}
}
} finally {
//如果工作线程启动失败(可能启动时线程池的状态发生了变化)
if (! workerStarted)
//移除workers中的工作线程,将工作线程数-1,尝试修改线程池状态为TIDYING
addWorkerFailed(w);
}
return workerStarted;
}
//启动线程失败后,做的补救操作
private void addWorkerFailed(Worker w) {
//加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//判断之前创建的工作线程是否成功
if (w != null)
//如果成功,将workers中的当前工作线程移除
workers.remove(w);
//将工程线程数-1
decrementWorkerCount();
//尝试将线程池状态变化为TIDYING
tryTerminate();
} finally {
mainLock.unlock();
}
}
七、 Worker类(工作线程类)的简单认识
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//工作线程的Thread对象,初始化时构建出来
final Thread thread;
//需要执行的任务
Runnable firstTask;
volatile long completedTasks;
//刚刚初始化的工作线程不允许被中断
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
//第一次new的时候会把任务赋值给firstTask
this.firstTask = firstTask;
//给Worker构建Thread对象
this.thread = getThreadFactory().newThread(this);
}
//调用t.start(),执行run方法
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() {
acquire(1); }
public boolean tryLock() {
return tryAcquire(1); }
public void unlock() {
release(1); }
public boolean isLocked() {
return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
1、Worker类继承AbstractQueuedSynchronizer并且实现Runnable
2、为什么Worker自己实现AQS锁?
不允许可重入,如果别的线程执行了shutdown的话,工作线程是不会被中断的,但是如果是stop就强制中断了
中断线程不是立即停止线程,而是将thread的中断标识设置为true
下面源码也可以看出Worker是不支持重入锁的
public void lock() {
acquire(1); }
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
八、 runWorker方法
execute()—>addWorker()——>t.run()——>run() (Worker中的run方法)
public void run() {
runWorker(this);
}
执行任务的流程,并做了中断线程相关的lock操作
final void runWorker(Worker w) {
//拿到当前线程
Thread wt = Thread.currentThread();
//获取Worker中的任务
Runnable task = w.firstTask;
//把Worker中的任务置为null
w.firstTask = null;
//unlock:把Worker中state置为0,同时把ExclusiveOwnerThread置为null(state由-1变为0,代表可以被中断)
w.unlock(); // allow interrupts
//执行任务时,勾子函数是否出现异常的标记
boolean completedAbruptly = true;
try {
//获取任务的第一个方式,就是执行execute、submit时,传入的任务直接处理
//获取任务的第二个方式,就是从工作队列中获取任务执行
while (task != null || (task = getTask()) != null) {
//加锁,在SHUTDOWN状态下,当前线程不允许被中断
//在Worker内部实现的锁,并不是可重入锁,因为在中断时,也需要对worker进行lock,不能获取就代表当前工作线程正在执行任务
w.lock();
//如果线程池状态变为了STOP状态,必须将当前线程中断
//第一个状态,判断当前线程是否为STOP状态
//第二个判断:查看中断标记位,并归位,如果为false,说明不是STOP,如果变为true,需要再次查看是否并发操作导致线程池状态为STOP
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted()&&runStateAtLeast(ctl.get(), STOP)))
//查看当前线程中断标记是否为false,如果为false,就执行wt.interrupt()
&&!wt.isInterrupted())
//将中断标记设置为true
wt.interrupt();
try {
//执行任务的勾子函数,前置增强,不是动态代理(其实就是个空方法,程序员可以重写)
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务的勾子函数,后置增强,不是动态代理(其实就是个空方法,程序员可以重写)
afterExecute(task, thrown);
}
} finally {
//将task置为null
task = null;
//执行成功的任务+1
w.completedTasks++;
//将state标记置为0
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
九、getTask方法
如何从工作队列,workQueue中获取到任务
从工作i队列中获取任务
private Runnable getTask() {
//标识(非核心线程可以干掉)
boolean timedOut = false; // Did the last poll() time out?
//死循环
for (;;) {
//获取ctl值
int c = ctl.get();
//拿到线程池的状态
int rs = runStateOf(c);
// =============================判断线程池状态=======================
// Check if queue empty only if necessary.
//如果进入if,需要干掉当前工作线程
//线程池状态进入SHUTDOWN、STOP
//如果线程池状态大于STOP,需要移除掉当前工作线程
//如果线程池状态为SHUTDOWN,并且工作队列为空,需要移除掉当前工作线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 移除当前工作线程
decrementWorkerCount();
//返回null,交给processWorkerExit移除当前工作线程
return null;
}
// =============================判断工作线程数量=======================
//拿到工作线程数
int wc = workerCountOf(c);
//allowCoreThreadTimeOut :是否允许核心线程超时(一般为false)
//工作线程是否大于核心线程
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//工作线程是否已经大于最大线程了
//工作线程数大于核心线程数,并且当前线程已经超时
//尝试干掉当前线程
if ((wc > maximumPoolSize || (timed && timedOut))
//如果工作线程数大于1,或者工作队列为空
//如果工作队列为空,我就干掉我自己
//如果工作线程数大于1,我就干掉我自己
&& (wc > 1 || workQueue.isEmpty())) {
//基于CAS操作移除当前线程,只有一个线程会CAS成功
if (compareAndDecrementWorkerCount(c))
//返回null,交给processWorkerExit移除当前工作线程
return null;
continue;
}
// =============================从工作队列中获取任务=======================
try {
Runnable r = timed ?
//阻塞一定时间从工作队列拿任务(可以理解为非核心走这个方法)
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//一直阻塞,(可以理解为核心走这个方法)
workQueue.take();
if (r != null)
//如果拿到任务,就直接返回
return r;
//从队列中取任务超时了(达到了当前工作线程最大生存时间)
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
十、processWorkerExit方法
//移除当前工作线程的操作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果执行了processWorkerExit方法的操作不睡getTask中的操作,是直接因为异常引起的(一般时勾子)
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
//因为执行方式不合法,手动扣减工作线程数
decrementWorkerCount();
//加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//记录当前线程池一共处理了多少任务
completedTaskCount += w.completedTasks;
//移除工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试将线城池关系(过度状态——销毁状态)
tryTerminate();
//重新获取ctl
int c = ctl.get();
//当前线程池状态,讲到这,说明是SHUTDOWN、RUNNING
if (runStateLessThan(c, STOP)) {
//如果正常状态移除当前工作线程
if (!completedAbruptly) {
//核心线程数最小值,允许多少
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果工作队列中任务不为空,设置工作线程的最小值
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//还有工作线程在线程池中
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//说明不是正常的方式移除的工作线程,再添加一个工作线程
//线程池工作队列不为空,并且没有工作线程,再添加一个工作线程
addWorker(null, false);
}
}
线程池常见面试题:
1、线程池的7个参数?
2、线程池的执行流程?
3、线程池的决绝策略?
4、线程池的核心属性ctl是什么?
5、线程池的状态以及转变的方式是什么?
6、线程池的execute和submit的区别
7、工作线程在线程池中是如何表述的
8、工作线程存储在什么位置
边栏推荐
- 关于分组查询的一道sql题
- Past weaving of zero one-2022
- 1259. Programmation dynamique de poignée de main disjointe
- [heavyweight] focusing on the terminal business of securities companies, Borui data released a new generation of observable platform for the core business experience of securities companies' terminals
- JS convert pseudo array to array
- Gradle【图文安装及使用演示 精讲】
- ResponseBodyAdvice接口使用导致的报错及解决
- 如何成为建模师?工业建模和游戏建模哪一个更加吃香?
- Build a PHP development environment (apache+php+mysql) "suggestions collection"
- String length function strlen().. String function header file string.h "suggestions collection"
猜你喜欢

大佬在线复盘:我在训练 DALL·E 时犯过的错

一文了解 NebulaGraph 上的 Spark 项目

Rhcsa notes 7

LeetCode 剑指 Offer II 115.重建序列:图解 - 拓扑排序

Deepstream learning notes (II): description of GStreamer and deepstream-test1

JUC并发编程【详解及演示】

多线程【全面学习 图文精讲】
![[the whole process of Game Modeling and model production] create the game soldier character with ZBrush](/img/35/3be94833b6ff1cd251561fb6d92b1e.png)
[the whole process of Game Modeling and model production] create the game soldier character with ZBrush

What is the current situation of the next generation industry? 90% of career changing modelers are learning this process

UAV circulating an unknown target under a GPS deniedenvironment with range only measurements translation
随机推荐
UAV circulating an unknown target under a GPS deniedenvironment with range only measurements translation
[the whole process of Game Modeling and model production] create the game soldier character with ZBrush
Is 3D modeling promising? Is employment guaranteed with high salary or is it more profitable to take orders in sideline industry
80 + guests took the stage, users from more than 10 countries attended the meeting, and 70000 + viewers watched the end of "Gwei 2022 Singapore"
OSI模型第一层:物理层,基石般的存在!
Spark installation and startup
Opencv (13): brief introduction to cv2.findcontours, cv:: findcontours and description of cv2.findcontours function in various versions of opencv
[sharing 3D modeling and production skills] how ZBrush turns pictures into relief models
80+嘉宾登台,10余国用户参会,7万+观众收看,「GWEI 2022-新加坡」落幕
Installation and use of flame graphs
Behind the recovery of the B-END Market: who stands in front of the stage?
Can self-study 3D modeling succeed? Can self-study lead to employment?
一文了解 NebulaGraph 上的 Spark 项目
ros(27):rosparam简单使用与一种通过launch传递参数不成功与解决
入门学习3D建模一般会遇到哪些问题?实在是太多了
错误“ Failed to fetch “xxx”Temporary failure resolvingW: Some index files failed to download“解决办法
Paddlenlp之UIE分类模型【以情感倾向分析新闻分类为例】含智能标注方案)
建模刚开始学习很迷茫,次世代角色建模该怎么学习?
ROS (27): the simple use of rosparam and the unsuccessful transfer of parameters through launch and its solution
Analysis on the implementation of Flink exactly once delivery