当前位置:网站首页>【详解】线程池及其自定义线程池的实现
【详解】线程池及其自定义线程池的实现
2022-08-02 03:33:00 【Money、坤】
1. 为什么使用线程池?
线程池是运用场景最多的并发框架,几乎所有需要一步或者并发执行任务的程序都可以使用线程池。使用线程池一般有以下三个好处:
①降低资源的消耗,通过重复利用已经创建的线程降低线程创建和销毁造成的消耗。
②提高相应速度,当任务到达的时候,任务可以不需要等到线程创建就能立刻执行。
③提高线程的可管理性,线程是稀缺资源,使用线程池可以统一的分配、调优和监控。
2. Java中的线程池ThreadPool
- jdk中的线程池构造方法
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...}
- 线程池参数介绍
1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
2)runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几 个阻塞队列。
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原 则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。吞吐量通常要高于Linked-BlockingQueue。
PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
3)maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
4)keepAliveTime(线程活动保持时间): 线程池的工作线程空闲后,保持存活的时间。所以, 如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
5)ThreadFactory(线程工厂):用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
6)RejectedExecutionHandler(拒绝策略): 当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法 处理新任务时抛出异常。在JDK1.5中Java线程池框架提供了以下4种策略。
AbortPolicy:直接抛出异常。
CallerRunsPolicy:只用调用者所在线程来运行任务。
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
DiscardPolicy:不处理,丢弃掉。
3. 简单实现一个自定义线程池
- 创建流程
* 创建流程:
* 1.优先创建正式员工去处理;
* 2.加入阻塞队列
* 3.创建零时工去处理
* 4.拒绝策略
- 处理模型
- 主类
import java.util.concurrent.*;
/** * 自定义线程池 */
public class MyThreadPoolExecutor implements Executor {
//创建线程的工厂对象
private final ThreadFactory threadFactory;
//临时工的空闲时间上限
private final long keepAliveTime;
private final TimeUnit unit;
//当前正式员工的数量
private int currentCoreSize;
//最大正式员工的数量
private final int corePoolSize;
//当前临时工的数量
private int currentTemporarySize;
//临时员工的上限
private int temporaryPoolSize;
//传递任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//构造方法
/** * @param corePoolSize 正式员工的数量 * @param maximumPoolSize 员工上限 * @param keepAliveTime 临时工执行时间上限 * @param unit 时间单位 * @param workQueue 工作队列 * @param threadFactory 线程工厂 * @param handler 拒绝策略 */
public MyThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
){
this.corePoolSize=corePoolSize;
this.temporaryPoolSize=maximumPoolSize-corePoolSize;
this.workQueue=workQueue;
this.threadFactory=threadFactory;
this.keepAliveTime =keepAliveTime;
this.unit=unit;
}
//向线程池中提交任务
@Override
public void execute(Runnable command) {
//1.当正式员工的数量小于员工上限时,则优先创建正式员工处理
if (currentCoreSize <corePoolSize){
//优先创建正式员工
//创建一个线程,这个线程中的任务就是不断的取任务-做任务,不考虑退出的问题
CoreJob job =new CoreJob(workQueue,command);
//线程工厂创建线程
Thread thread =threadFactory.newThread(job);
String name=String.format("正式员工-%d",currentCoreSize);
thread.setName(name);
thread.start();
currentCoreSize++;
return;
}
//走到这里,表示正式员工的数量==正式员工的上限
//2.将任务放入优先级队列中,如果放入成功,execute执行结束,否则还需要继续
//带阻塞的放入,需要立即看到结果
boolean success = workQueue.offer(command);
if (success==true){
//说明放入成功
return;
}
//队列已满
//3.继续判断临时工的数量是否到达上限
if (currentTemporarySize<temporaryPoolSize){
//创建临时工进行处理
//创建一个线程,这个线程中的任务就是不断的取任务-做任务,不考虑退出的问题
TemporaryJob job =new TemporaryJob(keepAliveTime,unit,workQueue,command);
//线程工厂创建线程
Thread thread =threadFactory.newThread(job);
String name=String.format("临时员工-%d",currentTemporarySize);
thread.setName(name);
thread.start();
currentTemporarySize++;
return;
}
//4.临时员工到达上限,执行拒绝策略
//为实现方便,暂不考虑其他策略
throw new RejectedExecutionException();
}
}
- 核心线程任务
import java.util.concurrent.BlockingQueue;
//一个正式员工要要完成的任务
public class CoreJob implements Runnable{
//需要阻塞队列
private final BlockingQueue<Runnable> workQueue;
private Runnable firstCommand;
public CoreJob(BlockingQueue<Runnable> workQueue,Runnable firstCommand){
this.workQueue=workQueue;
this.firstCommand =firstCommand;
}
@Override
public void run() {
try {
//优先把刚提交的任务执行掉
firstCommand.run();
firstCommand =null; //设置为空的目的是为不影响firstCommand对象被GC回收
while (true){
//不断的从队列中去取任务,执行
Runnable command =workQueue.take();
command.run();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 临时线程任务
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
//一个临时员工要要完成的任务
public class TemporaryJob implements Runnable{
//需要阻塞队列
private final BlockingQueue<Runnable> workQueue;
//临时工的空闲时间上限
private final long keepAliveTime;
private final TimeUnit unit;
private Runnable firstCommand;
public TemporaryJob(long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Runnable firstCommand){
this.keepAliveTime=keepAliveTime;
this.unit=unit;
this.workQueue=workQueue;
this.firstCommand =firstCommand;
}
@Override
public void run() {
try {
//优先把刚提交的任务执行掉
firstCommand.run();
firstCommand =null; //设置为空的目的是为不影响firstCommand对象被GC回收
//与正式工不同的是,临时工在一定时间内没有任务时会退出
//keepAliveTime+unit 记录起来
//如果一定时间内无法从任务队列中取出任务,则认为到达时间上限
while (true){
//不断的从队列中去取任务,执行
final Runnable command = workQueue.poll(keepAliveTime, unit);
if (command==null){
//表明没有取到任务,超时时间已到,退出
break;
}
command.run();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 测试
import java.util.concurrent.*;
public class Test {
static class Task implements Runnable{
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class MyThreadFactory implements ThreadFactory{
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
}
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue =new ArrayBlockingQueue<>(5);
//3正式,2临时,队列中5,最多任务上限10,第11个任务执行拒绝策略
MyThreadPoolExecutor executor =new MyThreadPoolExecutor(
3,5,20, TimeUnit.SECONDS,
workQueue,new MyThreadFactory(),new ThreadPoolExecutor.AbortPolicy()
);
//测试,创建线程任务
for (int i = 0; i <50 ; i++) {
System.out.println("提交任务:"+i);
Task task =new Task();
executor.execute(task);
}
}
}
- 测试结果
执行第11个任务时,会执行拒绝策略。
边栏推荐
- 【MQ-2 可燃气体和烟雾传感器与 Arduino 配合使用】
- AD8361检波器
- TC358860XBG BGA65 东芝桥接芯片 HDMI转MIPI
- 使用批处理脚本修改hosts文件
- openwrt RK3568_EVB移植
- 【Arduino连接时钟模块在LCD1602上显示时间】
- Pylon CLI 低成本的本地环境管控工具应用实例
- Modify hosts file using batch script
- [Popular Science Post] I2C Communication Protocol Detailed Explanation - Partial Software Analysis and Logic Analyzer Example Analysis
- GM7150 CVBS转BT656视频解码芯片详细内容及设计要求
猜你喜欢
物联网方案
[DS3231 RTC real-time clock module and Arduino interface to build a digital clock]
【Popular Science Post】UART Interface Communication Protocol
MPU6050 加速度计和陀螺仪传感器与 Arduino 连接
联阳IT6561|IT6561FN方案电路|替代IT6561方案设计DP转HDMI音视频转换器资料
MOS管开关原理及应用详解
uniCloud use
【NTC 热敏电阻与 Arduino 读取温度】
【nRF24L01 与 Arduino 连接实现无线通信】
[Arduino connects the clock module to display the time on LCD1602]
随机推荐
MQ-5 可燃气体传感器与 Arduino 接口
汇编语言跳转指令总结
AD PCB导出Gerber文件(非常详细的步骤)
【plang 1.4.4】编写贪吃蛇脚本
bluez5.50蓝牙文件传输
[DS3231 RTC real-time clock module and Arduino interface to build a digital clock]
移动云物联网预研及阿里云开发对比分析
功率计,物联网,智能插座电路设计【毕业设计】
LT9211芯片资料分享
本地数据库 sqlite3 编译和使用
引擎开发日志:场景编辑器开发难点
Host your own website with Vercel
【科普贴】MDIO接口详解
【nRF24L01 connects with Arduino to realize wireless communication】
振芯GM7123C:功能RGB转VGA芯片方案简介
【MQ-3 酒精检测器与 Arduino检测酒精】
振芯科技GM8285C:功能TTL转LVDS芯片简介
使用Vercel托管自己的网站
Comparative analysis of OneNET Studio and IoT Studio
uniCloud使用