当前位置:网站首页>线程池

线程池

2022-06-09 21:09:00 InfoQ


线程池

合理地使用线程池能够带来3个好处。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

null
ThreadPoolExecutor执行execute方法分下面4种情况。1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

public void execute(Runnable command) {
 if (command == null)
 throw new NullPointerException();
 int c = ctl.get();
 // 如果线程数小于基本线程数,则创建线程并执行当前任务
 if (workerCountOf(c) < corePoolSize) {
 if (addWorker(command, true))
 return;
 c = ctl.get();
 }
 // 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。
 if (isRunning(c) && workQueue.offer(command)) {
 int recheck = ctl.get();
 if (! isRunning(recheck) && remove(command))
 reject(command);
 else if (workerCountOf(recheck) == 0)
 addWorker(null, false);
 }
 // 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量, 则创建一个线程执行任务。
 else if (!addWorker(command, false))
 // 抛出RejectedExecutionException异常
 reject(command);
}

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

线程池中的线程执行任务分两种情况,如下。

1)在execute()方法中创建一个线程时,会让这个线程执行当前任务。

2)这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行。

工作线程:线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到:

final void runWorker(Worker w) {
 Thread wt = Thread.currentThread();
 Runnable task = w.firstTask;
 w.firstTask = null;
 w.unlock(); // allow interrupts
 boolean completedAbruptly = true;
 try {
 while (task != null || (task = getTask()) != null) {
 w.lock();
 // If pool is stopping, ensure thread is interrupted;
 // if not, ensure thread is not interrupted. This
 // requires a recheck in second case to deal with
 // shutdownNow race while clearing interrupt
 if ((runStateAtLeast(ctl.get(), STOP) ||
 (Thread.interrupted() &&
 runStateAtLeast(ctl.get(), STOP))) &&
 !wt.isInterrupted())
 wt.interrupt();
 try {
 beforeExecute(wt, task);
 Throwable thrown = null;
 try {
 task.run();
 } catch (RuntimeException x) {
 thrown = x; throw x;
 } catch (Error x) {
 thrown = x; throw x;
 } catch (Throwable x) {
 thrown = x; throw new Error(x);
 } finally {
 afterExecute(task, thrown);
 }
 } finally {
 task = null;
 w.completedTasks++;
 w.unlock();
 }
 }
 completedAbruptly = false;
 } finally {
 processWorkerExit(w, completedAbruptly);
 }
}

原网站

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://xie.infoq.cn/article/857346cf2384acd9ff944a565