当前位置:网站首页>线程池原理与实践|从入门到放弃,深度解析
线程池原理与实践|从入门到放弃,深度解析
2022-08-02 18:42:00 【InfoQ】
一、背景
data:image/s3,"s3://crabby-images/9e570/9e570bcb3ccc8eb1ccf16d550ffe9c4d5013a3ad" alt=""
二、什么场景下该使用线程池?
三、怎样设置核⼼参数?
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
四、线程池怎么运行调度的?
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{}
executorService.submit((Callable<String>) () -> null);
executorService.execute(() -> {});
ExecutorService executorService = new ThreadPoolExecutor(
16,
8,
60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100),
new CustomThreadFactory(), new CustomRejectHandler());
executorService.submit((Callable<String>) () -> null);
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask);return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return; c = ctl.get(); }if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command)) reject(command);else if (workerCountOf(recheck) == 0) addWorker(null, false); }else if (!addWorker(command, false)) reject(command);}
private boolean addWorker(Runnable firstTask, boolean core) {
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() { runWorker(this);}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
} catch (InterruptedException retry) {
boolean timedOut = false;
}
}
}
五、实践参数如何设计
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
/** Maximum number of items in the deque */
private final int capacity;
那么我们完全可以自定义一个CustomBlockingDeque,而后我们把capacity定义为可修改不就可以了。下面是实践的截图,供大家参考。
public static ThreadPoolExecutor getThreadPoolInstance(){
return new ThreadPoolExecutor(10,15,60, TimeUnit.SECONDS,
new CustomBlockingQueue<>(20), customThreadFactory, new ThreadPoolExecutor.AbortPolicy());
}
static ThreadPoolExecutor executor = getThreadPoolInstance();
static public void submitTask(String name){
for (int i = 0; i < 30; i++){
Callable<String> task = () -> {
System.out.println(LocalDateTime.now()+"-" +Thread.currentThread().getName()+name+"核心线程数:"
+executor.getCorePoolSize()+",最大线程数:"
+executor.getMaximumPoolSize()+",当前排队数:"
+executor.getQueue().size()+",任务完成数:"
+executor.getCompletedTaskCount()+",队列大小"
+(executor.getQueue().size()+executor.getQueue().remainingCapacity())
);
TimeUnit.SECONDS.sleep(10);
return null;
};
executor.submit(task);
}
}
public static void main(String[] args) throws InterruptedException {
submitTask("参数改变前-");
TimeUnit.SECONDS.sleep(1);
executor.setCorePoolSize(20);
executor.setMaximumPoolSize(20);
CustomBlockingQueue queue = (CustomBlockingQueue) executor.getQueue();
queue.setCapacity(100);
submitTask("参数改变后-");
}
data:image/s3,"s3://crabby-images/755a4/755a4e52c2a4567418a7909b2ddd68711b91fc96" alt="null"
关于领创集团(Advance Intelligence Group)
往期回顾 BREAK AWAY
边栏推荐
猜你喜欢
阿里35+老测试员生涯回顾,自动化测试真的有这么吃香吗?
被审稿人吐槽没有novelty!深度学习方向怎么找创新点?
WIFi 开关控制实现-ESP8266 物联网 android studio arduino QT多线程服务器
I have 8 years of experience in the Ali test, and I was able to survive by relying on this understanding.
Openharmony - 基于ArkUI(TS)开发颜色选择器
下载mysql的源码包
JVM内存和垃圾回收-05.虚拟机栈
【C语言刷题】牛客JZ65——不用四则运算作加法
实例034:调用函数
研发了 5 年的时序数据库,到底要解决什么问题?
随机推荐
Boyun Selected as Gartner China DevOps Representative Vendor
请教一个数据库连接池的问题,目前已知是事务未设置超时,又有一块代码事务没有提交,一直把连接给耗尽了,
有什么好用的IT资产管理软件
How to deal with security risks posed by machine identities
LeetCode 2353. 设计食物评分系统(sortedcontainers)
浅谈混迹力扣和codeforces上的几个月
Mppt photovoltaic maximum power point tracking control matlab simulation
详解卡尔曼滤波原理
仿制药的未来商机--个人研发的体会
What are the useful real-time network traffic monitoring software
实例033:列表转字符串
Mysql基础篇(视图)
T31开发笔记:metaipc测试
Win11主题下载一直转圈怎么办?Win11主题下载一直转圈的解决方法
Detailed explanation of AtomicInteger
Jellyfin 打造家庭影院 & 视频硬解 (威联通 QNAP)
洛谷P4799 世界冰球锦标赛
7.21 - 每日一题 - 408
Functional test points for time, here is a comprehensive summary for you
JVM内存和垃圾回收-05.虚拟机栈