当前位置:网站首页>线程池原理与实践|从入门到放弃,深度解析
线程池原理与实践|从入门到放弃,深度解析
2022-08-02 18:42:00 【InfoQ】
一、背景

二、什么场景下该使用线程池?
三、怎样设置核⼼参数?
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}四、线程池怎么运行调度的?
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{}executorService.submit((Callable<String>) () -> null);executorService.execute(() -> {});ExecutorService executorService = new ThreadPoolExecutor(
16,
8,
60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100),
new CustomThreadFactory(), new CustomRejectHandler());
executorService.submit((Callable<String>) () -> null);public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask);return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return; c = ctl.get(); }if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command)) reject(command);else if (workerCountOf(recheck) == 0) addWorker(null, false); }else if (!addWorker(command, false)) reject(command);}private boolean addWorker(Runnable firstTask, boolean core) {
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() { runWorker(this);}final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}private Runnable getTask() {
boolean timedOut = false;
for (;;) {
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
} catch (InterruptedException retry) {
boolean timedOut = false;
}
}
}五、实践参数如何设计
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}/** Maximum number of items in the deque */
private final int capacity;
那么我们完全可以自定义一个CustomBlockingDeque,而后我们把capacity定义为可修改不就可以了。下面是实践的截图,供大家参考。
public static ThreadPoolExecutor getThreadPoolInstance(){
return new ThreadPoolExecutor(10,15,60, TimeUnit.SECONDS,
new CustomBlockingQueue<>(20), customThreadFactory, new ThreadPoolExecutor.AbortPolicy());
}
static ThreadPoolExecutor executor = getThreadPoolInstance();
static public void submitTask(String name){
for (int i = 0; i < 30; i++){
Callable<String> task = () -> {
System.out.println(LocalDateTime.now()+"-" +Thread.currentThread().getName()+name+"核心线程数:"
+executor.getCorePoolSize()+",最大线程数:"
+executor.getMaximumPoolSize()+",当前排队数:"
+executor.getQueue().size()+",任务完成数:"
+executor.getCompletedTaskCount()+",队列大小"
+(executor.getQueue().size()+executor.getQueue().remainingCapacity())
);
TimeUnit.SECONDS.sleep(10);
return null;
};
executor.submit(task);
}
}
public static void main(String[] args) throws InterruptedException {
submitTask("参数改变前-");
TimeUnit.SECONDS.sleep(1);
executor.setCorePoolSize(20);
executor.setMaximumPoolSize(20);
CustomBlockingQueue queue = (CustomBlockingQueue) executor.getQueue();
queue.setCapacity(100);
submitTask("参数改变后-");
}
关于领创集团(Advance Intelligence Group)
往期回顾 BREAK AWAY
边栏推荐
- Electronic Industry Inventory Management Pain Points and WMS Warehouse Management System Solutions
- Mysql基础篇(视图)
- 进程与线程
- What is the use of IT assets management software
- Go----Go 语言快速体验之开发环境搭建及第一个项目HelloWord
- 监控易火星版即将亮相:分布式运维帮助TOP3000大企业跨越管理鸿沟
- 动态折线图,制作原来是这么简单
- 快手web did可用生成
- 深度学习-学习笔记(持续更新)
- Mppt photovoltaic maximum power point tracking control matlab simulation
猜你喜欢

监控易火星版即将亮相:分布式运维帮助TOP3000大企业跨越管理鸿沟

AtomicInteger详解
![[论文分享] VideoFlow: A Flow-Based Generative Model for Video](/img/da/eac862ab2457384846a0b6b20ea3a9.png)
[论文分享] VideoFlow: A Flow-Based Generative Model for Video

实例033:列表转字符串

【C语言刷题】牛客网刷题——替换空格

VSTO踩坑记录(1)- 从零开始开发outlook插件

I have 8 years of experience in the Ali test, and I was able to survive by relying on this understanding.

Three components of NIO foundation

T31开发笔记:metaipc测试

流量分析四—蓝牙
随机推荐
VSTO踩坑记录(1)- 从零开始开发outlook插件
实例034:调用函数
【C语言刷题】牛客JZ65——不用四则运算作加法
流量分析三—远程登陆
「日志」深度学习 CUDA环境配置
LeetCode 2353. 设计食物评分系统(sortedcontainers)
AI智能剪辑,仅需2秒一键提取精彩片段
86.(cesium之家)cesium叠加面接收阴影效果(gltf模型)
请教一个数据库连接池的问题,目前已知是事务未设置超时,又有一块代码事务没有提交,一直把连接给耗尽了,
指针常量和常量指针概述
sed 命令
说一件事
阿里35+老测试员生涯回顾,自动化测试真的有这么吃香吗?
ETH Zurich重磅综述 | 人脸-素描合成:一个新的挑战
元旦快乐(2022)
How can services start smoothly under tens of millions of QPS
C#里如何简单的校验时间格式
固态硬盘接口类型介绍
NIO基础之三大组件
EasyCVR平台通过国标GB28181接入柯达NVR显示注册失败,该如何解决?