当前位置:网站首页>自定义线程池
自定义线程池
2022-08-05 00:44:00 【周雨彤的小迷弟】
1、自定义线程池的实现
自定义线程池应包括 线程池 + 阻塞队列
实现
/** * @author houChen * @date 2022/6/14 21:36 * @Description: 自定义线程池 */
@Slf4j(topic = "c.main")
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
log.debug("{}", j);
});
}
}
}
//线程池
@Slf4j(topic = "c.testPool")
class ThreadPool {
//阻塞队列
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet();
//核心线程数
private int coreSize;
//超时时间
private long timeOut;
private TimeUnit timeUnit;
//执行任务
public void execute(Runnable task) {
synchronized (workers) {
//【注意】 当任务个数大于核心线程数时,先向任务队列push, 当任务队列满了,再创建max - core的线程数,最后再是任务队列的丢弃策略
//如果任务数小于核心线程数,直接交给worker对象执行
//当任务数超过核心线程数,加入任务队列暂存
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增worker{},task{}", worker, task);
workers.add(worker);
worker.start();
} else {
log.debug("加入任务队列{}", task);
taskQueue.push(task);
}
}
}
//构造方法
public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int capcity) {
this.coreSize = coreSize;
this.timeOut = timeOut;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(capcity);
}
class Worker extends Thread {
private Runnable task;
//构造器
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//执行任务
//1)task不为空时,执行任务
//2)task为空时,从任务队列取出任务并执行
if (task != null || (task = taskQueue.take()) != null) {
try {
log.debug("正在执行...{}", task);
this.task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker{} 被移除",this);
workers.remove(this);
}
}
}
}
//阻塞队列
class BlockingQueue<T> {
//1.任务队列
private Deque<T> queue;
//2.锁 当多个线程来取队列中的任务时,保证其互斥性
private ReentrantLock lock = new ReentrantLock();
//3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//5.容量
private int capcity;
public BlockingQueue(int capcity) {
this.queue = new ArrayDeque<>(capcity);
}
//阻塞获取 (从队列中获取一个任务)
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
//当队列为空时,阻塞
try {
emptyWaitSet.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//超时阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
//将timeout统一转换成纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
//当队列为空时,阻塞
try {
if (nanos < 0) {
return null;
}
//返回值是剩余等待时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//阻塞添加
public void push(T element) {
lock.lock();
try {
while (queue.size() == capcity) {
//当队列为空时,阻塞
try {
fullWaitSet.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
public void push(T element) {
}
//获取队列的大小
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
2、当任务数大于 core + 任务队列的长度时
当线程池需要执行的任务数 > 核心线程数 + 任务队列的容量时, 剩余的 任务应该怎么处理,由线程池的拒绝策略决定
3、任务队列push方法增强 - 带超时时间的阻塞添加
//带超时时间的阻塞添加
public boolean pushP(T element,long timeOut,TimeUnit timeUnit) {
lock.lock();
try {
//将timeout统一转换成纳秒
long nanos = timeUnit.toNanos(timeOut);
while (queue.size() == capcity) {
//当队列处于满
try {
if (nanos < 0) {
return false;
}
//返回值是剩余等待时间
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列{}", element);
queue.addLast(element);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
4、自定义线程池拒绝策略
1、定义一个函数式接口(也就是一个拒绝策略)
//线程池的拒绝策略
@FunctionalInterface
interface RejectPolicy<T> {
// 需要传入阻塞队列是因为拒绝这个行为是发生在队列上的
// 需要传入一个t,是因为阻塞队列对t进行处理
void reject(BlockingQueue<T> queue,T t);
}
2、线程池注入该策略
class ThreadPool {
//阻塞队列
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet();
//核心线程数
private int coreSize;
//超时时间
private long timeOut;
private TimeUnit timeUnit;
//线程池的拒绝策略
private RejectPolicy<Runnable> rejectPilicy;
//执行任务
public void execute(Runnable task) {
synchronized (workers) {
//【注意】 当任务个数大于核心线程数时,先向任务队列push, 当任务队列满了,再创建max - core的线程数,最后再是任务队列的丢弃策略
//如果任务数小于核心线程数,直接交给worker对象执行
//当任务数超过核心线程数,加入任务队列暂存
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增worker{},task{}", worker, task);
workers.add(worker);
worker.start();
} else {
//taskQueue.push(task);
//当任务队列满了,有以下几种方式
//1)死等
//2)超时等待 (加入任务队列超时,就放弃该任务)
//3)让调用者放弃任务执行
//4)让调用者抛出异常
//让任务队列来决定怎么处理该任务
taskQueue.tryPut(rejectPilicy,task);
}
}
}
//构造方法
public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int capcity, RejectPolicy<Runnable> rejectPilicy) {
this.coreSize = coreSize;
this.timeOut = timeOut;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(capcity);
this.rejectPilicy = rejectPilicy;
}
class Worker extends Thread {
private Runnable task;
//构造器
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//执行任务
//1)task不为空时,执行任务
//2)task为空时,从任务队列取出任务并执行
if (task != null || (task = taskQueue.take()) != null) {
try {
log.debug("正在执行...{}", task);
this.task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker{} 被移除",this);
workers.remove(this);
}
}
}
}
3、在阻塞队列的方法 tryPut中,根据传入的策略对 task进行处理
class BlockingQueue<T> {
//1.任务队列
private Deque<T> queue;
//2.锁 当多个线程来取队列中的任务时,保证其互斥性
private ReentrantLock lock = new ReentrantLock();
//3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//5.容量
private int capcity;
public BlockingQueue(int capcity) {
this.queue = new ArrayDeque<>(capcity);
}
//阻塞获取 (从队列中获取一个任务)
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
//当队列为空时,阻塞
try {
emptyWaitSet.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//超时阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
//将timeout统一转换成纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
//当队列为空时,阻塞
try {
if (nanos < 0) {
return null;
}
//返回值是剩余等待时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//阻塞添加
public void push(T element) {
lock.lock();
try {
while (queue.size() == capcity) {
//当队列为空时,阻塞
try {
fullWaitSet.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列{}", element);
queue.addLast(element);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
//带超时时间的阻塞添加
public boolean pushP(T element,long timeOut,TimeUnit timeUnit) {
lock.lock();
try {
//将timeout统一转换成纳秒
long nanos = timeUnit.toNanos(timeOut);
while (queue.size() == capcity) {
//当队列处于满
try {
if (nanos < 0) {
return false;
}
//返回值是剩余等待时间
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列{}", element);
queue.addLast(element);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
//获取队列的大小
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
//线程池任务队列的拒绝策略
public void tryPut(RejectPolicy<T> rejectPilicy, T task) {
lock.lock();
try {
if(queue.size() == capcity) {
//阻塞队列满了,就使用策略来处理任务
rejectPilicy.reject(this, task);
} else {
// 阻塞队列没满,则加入队列
log.debug("加入任务队列{}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}
4、测试时,构造线程池时,需要传入策略的具体实现
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10, (queue,task) -> {
queue.push(task);
});
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
log.debug("{}", j);
});
}
}
}
边栏推荐
- Inter-process communication and inter-thread communication
- Pytorch usage and tricks
- 《WEB安全渗透测试》(28)Burp Collaborator-dnslog外带技术
- 内存取证系列1
- 2022杭电多校第三场 K题 Taxi
- could not build server_names_hash, you should increase server_names_hash_bucket_size: 32
- 软件测试面试题:BIOS, Fat, IDE, Sata, SCSI, Ntfs windows NT?
- Difference between MBps and Mbps
- Software Testing Interview Questions: What aspects should be considered when designing test cases, i.e. what aspects should different test cases test against?
- EL定时刷新页面中的皕杰报表实例
猜你喜欢
oracle create tablespace
2022 Hangzhou Electric Power Multi-School Session 3 K Question Taxi
TinyMCE禁用转义
CNI(Container Network Plugin)
gorm联表查询-实战
Matlab uses plotting method for data simulation and simulation
Memory Forensics Series 1
TinyMCE disable escape
### Error querying database. Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionExcep
国内网站用香港服务器会被封吗?
随机推荐
Knowledge Points for Network Planning Designers' Morning Questions in November 2021 (Part 2)
E - Many Operations (bitwise consideration + dp thought to record the result after the operation
TinyMCE disable escape
The method of freely controlling concurrency in the sync package in GO
tiup status
gorm的Raw与scan
[FreeRTOS] FreeRTOS and stm32 built-in stack occupancy
JUC线程池(一): FutureTask使用
matlab 采用描点法进行数据模拟和仿真
Raw and scan of gorm
Software test interview questions: BIOS, Fat, IDE, Sata, SCSI, Ntfs windows NT?
Software testing interview questions: the difference and connection between black box testing, white box testing, and unit testing, integration testing, system testing, and acceptance testing?
2022牛客多校第三场 J题 Journey
【FreeRTOS】FreeRTOS与stm32内置堆栈的占用情况
torch.autograd.grad finds the second derivative
软件基础的理论
2022 Hangzhou Electric Power Multi-School Session 3 Question B Boss Rush
Pytorch使用和技巧
TinyMCE禁用转义
[230] Execute command error after connecting to Redis MISCONF Redis is configured to save RDB snapshots