当前位置:网站首页>线程池原理与实践|从入门到放弃,深度解析
线程池原理与实践|从入门到放弃,深度解析
2022-08-02 18:42:00 【InfoQ】
一、背景

二、什么场景下该使用线程池?
三、怎样设置核⼼参数?
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
四、线程池怎么运行调度的?
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{}
executorService.submit((Callable<String>) () -> null);
executorService.execute(() -> {});
ExecutorService executorService = new ThreadPoolExecutor(
16,
8,
60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100),
new CustomThreadFactory(), new CustomRejectHandler());
executorService.submit((Callable<String>) () -> null);
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask);return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return; c = ctl.get(); }if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command)) reject(command);else if (workerCountOf(recheck) == 0) addWorker(null, false); }else if (!addWorker(command, false)) reject(command);}
private boolean addWorker(Runnable firstTask, boolean core) {
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() { runWorker(this);}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
} catch (InterruptedException retry) {
boolean timedOut = false;
}
}
}
五、实践参数如何设计
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
/** Maximum number of items in the deque */
private final int capacity;
那么我们完全可以自定义一个CustomBlockingDeque,而后我们把capacity定义为可修改不就可以了。下面是实践的截图,供大家参考。
public static ThreadPoolExecutor getThreadPoolInstance(){
return new ThreadPoolExecutor(10,15,60, TimeUnit.SECONDS,
new CustomBlockingQueue<>(20), customThreadFactory, new ThreadPoolExecutor.AbortPolicy());
}
static ThreadPoolExecutor executor = getThreadPoolInstance();
static public void submitTask(String name){
for (int i = 0; i < 30; i++){
Callable<String> task = () -> {
System.out.println(LocalDateTime.now()+"-" +Thread.currentThread().getName()+name+"核心线程数:"
+executor.getCorePoolSize()+",最大线程数:"
+executor.getMaximumPoolSize()+",当前排队数:"
+executor.getQueue().size()+",任务完成数:"
+executor.getCompletedTaskCount()+",队列大小"
+(executor.getQueue().size()+executor.getQueue().remainingCapacity())
);
TimeUnit.SECONDS.sleep(10);
return null;
};
executor.submit(task);
}
}
public static void main(String[] args) throws InterruptedException {
submitTask("参数改变前-");
TimeUnit.SECONDS.sleep(1);
executor.setCorePoolSize(20);
executor.setMaximumPoolSize(20);
CustomBlockingQueue queue = (CustomBlockingQueue) executor.getQueue();
queue.setCapacity(100);
submitTask("参数改变后-");
}

关于领创集团(Advance Intelligence Group)
往期回顾 BREAK AWAY
边栏推荐
- Golang sync/atomic 包的原子操作说明
- From the technical panorama to the actual scene, analyze the evolutionary breakthrough of "narrowband high-definition"
- WPF login with Prism
- 3年半测试经验,20K我都没有,看来是时候跳槽了
- codeforces:E. Add Modulo 10【状态压缩 + 找规律】
- 备战无人机配送:互联网派To C、技术派To B
- 详解卡尔曼滤波原理
- 洛谷P1502 窗口的星星
- 【C语言刷题】牛客网刷题——替换空格
- 下载mysql的源码包
猜你喜欢
I have 8 years of experience in the Ali test, and I was able to survive by relying on this understanding.
Mobile Banking Experience Test: How to Get the Real User Experience
arcgis 分子式标注
“12306”的架构到底有多牛逼?
WIFi 开关控制实现-ESP8266 物联网 android studio arduino QT多线程服务器
JVM内存和垃圾回收-04.程序计数器(PC寄存器)
【C语言刷题】牛客JZ65——不用四则运算作加法
7.25 - 每日一题 - 408
固态硬盘接口类型介绍
Mysql基础篇(视图)
随机推荐
流量分析第二题
简单有效又有用的关闭antimalware service executable的方法·备份记录
LSB利器-zsteg
WPF使用Prism登录
流量分析第一题
JVM内存和垃圾回收-03.运行时数据区概述及线程
药品研发--检验记录与检验报告书的书写细则
情景剧《重走长征路》上演
就刚刚,鸿蒙3.0发布了,华为还一口气发布了十一款产品
洛谷P5094 MooFest G 加强版
指针常量和常量指针概述
js Fetch返回数据res.json()报错问题
荐号 | 当一个人不联系你,不拉黑你,原因只有一个……!
大事务故障案例
如何获取EasyCVR平台设备通道的RTMP视频流地址?
阿里35+老测试员生涯回顾,自动化测试真的有这么吃香吗?
微服务-gateway【服务网关入门】
LeetCode 2353. 设计食物评分系统(sortedcontainers)
Mysql基础篇(视图)
Unity 打包和切换平台|Build Settings窗口介绍