当前位置:网站首页>自定义线程池
自定义线程池
2022-08-05 00:44:00 【周雨彤的小迷弟】
1、自定义线程池的实现
自定义线程池应包括 线程池 + 阻塞队列
实现
/** * @author houChen * @date 2022/6/14 21:36 * @Description: 自定义线程池 */
@Slf4j(topic = "c.main")
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
log.debug("{}", j);
});
}
}
}
//线程池
@Slf4j(topic = "c.testPool")
class ThreadPool {
//阻塞队列
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet();
//核心线程数
private int coreSize;
//超时时间
private long timeOut;
private TimeUnit timeUnit;
//执行任务
public void execute(Runnable task) {
synchronized (workers) {
//【注意】 当任务个数大于核心线程数时,先向任务队列push, 当任务队列满了,再创建max - core的线程数,最后再是任务队列的丢弃策略
//如果任务数小于核心线程数,直接交给worker对象执行
//当任务数超过核心线程数,加入任务队列暂存
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增worker{},task{}", worker, task);
workers.add(worker);
worker.start();
} else {
log.debug("加入任务队列{}", task);
taskQueue.push(task);
}
}
}
//构造方法
public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int capcity) {
this.coreSize = coreSize;
this.timeOut = timeOut;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(capcity);
}
class Worker extends Thread {
private Runnable task;
//构造器
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//执行任务
//1)task不为空时,执行任务
//2)task为空时,从任务队列取出任务并执行
if (task != null || (task = taskQueue.take()) != null) {
try {
log.debug("正在执行...{}", task);
this.task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker{} 被移除",this);
workers.remove(this);
}
}
}
}
//阻塞队列
class BlockingQueue<T> {
//1.任务队列
private Deque<T> queue;
//2.锁 当多个线程来取队列中的任务时,保证其互斥性
private ReentrantLock lock = new ReentrantLock();
//3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//5.容量
private int capcity;
public BlockingQueue(int capcity) {
this.queue = new ArrayDeque<>(capcity);
}
//阻塞获取 (从队列中获取一个任务)
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
//当队列为空时,阻塞
try {
emptyWaitSet.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//超时阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
//将timeout统一转换成纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
//当队列为空时,阻塞
try {
if (nanos < 0) {
return null;
}
//返回值是剩余等待时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//阻塞添加
public void push(T element) {
lock.lock();
try {
while (queue.size() == capcity) {
//当队列为空时,阻塞
try {
fullWaitSet.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
public void push(T element) {
}
//获取队列的大小
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
2、当任务数大于 core + 任务队列的长度时
当线程池需要执行的任务数 > 核心线程数 + 任务队列的容量时, 剩余的 任务应该怎么处理,由线程池的拒绝策略决定
3、任务队列push方法增强 - 带超时时间的阻塞添加
//带超时时间的阻塞添加
public boolean pushP(T element,long timeOut,TimeUnit timeUnit) {
lock.lock();
try {
//将timeout统一转换成纳秒
long nanos = timeUnit.toNanos(timeOut);
while (queue.size() == capcity) {
//当队列处于满
try {
if (nanos < 0) {
return false;
}
//返回值是剩余等待时间
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列{}", element);
queue.addLast(element);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
4、自定义线程池拒绝策略
1、定义一个函数式接口(也就是一个拒绝策略)
//线程池的拒绝策略
@FunctionalInterface
interface RejectPolicy<T> {
// 需要传入阻塞队列是因为拒绝这个行为是发生在队列上的
// 需要传入一个t,是因为阻塞队列对t进行处理
void reject(BlockingQueue<T> queue,T t);
}
2、线程池注入该策略
class ThreadPool {
//阻塞队列
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet();
//核心线程数
private int coreSize;
//超时时间
private long timeOut;
private TimeUnit timeUnit;
//线程池的拒绝策略
private RejectPolicy<Runnable> rejectPilicy;
//执行任务
public void execute(Runnable task) {
synchronized (workers) {
//【注意】 当任务个数大于核心线程数时,先向任务队列push, 当任务队列满了,再创建max - core的线程数,最后再是任务队列的丢弃策略
//如果任务数小于核心线程数,直接交给worker对象执行
//当任务数超过核心线程数,加入任务队列暂存
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增worker{},task{}", worker, task);
workers.add(worker);
worker.start();
} else {
//taskQueue.push(task);
//当任务队列满了,有以下几种方式
//1)死等
//2)超时等待 (加入任务队列超时,就放弃该任务)
//3)让调用者放弃任务执行
//4)让调用者抛出异常
//让任务队列来决定怎么处理该任务
taskQueue.tryPut(rejectPilicy,task);
}
}
}
//构造方法
public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int capcity, RejectPolicy<Runnable> rejectPilicy) {
this.coreSize = coreSize;
this.timeOut = timeOut;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(capcity);
this.rejectPilicy = rejectPilicy;
}
class Worker extends Thread {
private Runnable task;
//构造器
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//执行任务
//1)task不为空时,执行任务
//2)task为空时,从任务队列取出任务并执行
if (task != null || (task = taskQueue.take()) != null) {
try {
log.debug("正在执行...{}", task);
this.task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker{} 被移除",this);
workers.remove(this);
}
}
}
}
3、在阻塞队列的方法 tryPut中,根据传入的策略对 task进行处理
class BlockingQueue<T> {
//1.任务队列
private Deque<T> queue;
//2.锁 当多个线程来取队列中的任务时,保证其互斥性
private ReentrantLock lock = new ReentrantLock();
//3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//5.容量
private int capcity;
public BlockingQueue(int capcity) {
this.queue = new ArrayDeque<>(capcity);
}
//阻塞获取 (从队列中获取一个任务)
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
//当队列为空时,阻塞
try {
emptyWaitSet.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//超时阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
//将timeout统一转换成纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
//当队列为空时,阻塞
try {
if (nanos < 0) {
return null;
}
//返回值是剩余等待时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//阻塞添加
public void push(T element) {
lock.lock();
try {
while (queue.size() == capcity) {
//当队列为空时,阻塞
try {
fullWaitSet.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列{}", element);
queue.addLast(element);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
//带超时时间的阻塞添加
public boolean pushP(T element,long timeOut,TimeUnit timeUnit) {
lock.lock();
try {
//将timeout统一转换成纳秒
long nanos = timeUnit.toNanos(timeOut);
while (queue.size() == capcity) {
//当队列处于满
try {
if (nanos < 0) {
return false;
}
//返回值是剩余等待时间
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列{}", element);
queue.addLast(element);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
//获取队列的大小
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
//线程池任务队列的拒绝策略
public void tryPut(RejectPolicy<T> rejectPilicy, T task) {
lock.lock();
try {
if(queue.size() == capcity) {
//阻塞队列满了,就使用策略来处理任务
rejectPilicy.reject(this, task);
} else {
// 阻塞队列没满,则加入队列
log.debug("加入任务队列{}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}
4、测试时,构造线程池时,需要传入策略的具体实现
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10, (queue,task) -> {
queue.push(task);
});
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
log.debug("{}", j);
});
}
}
}
边栏推荐
- 2022牛客多校第三场 J题 Journey
- Helm Chart
- 如何用 Solidity 创建一个“Hello World”智能合约
- node使用redis
- 2022牛客多校第三场 A Ancestor
- 仅3w报价B站up主竟带来1200w播放!品牌高性价比B站投放标杆!
- [FreeRTOS] FreeRTOS and stm32 built-in stack occupancy
- SV class virtual method of polymorphism
- Software testing interview questions: Have you used some tools for software defect (Bug) management in your past software testing work? If so, please describe the process of software defect (Bug) trac
- [230]连接Redis后执行命令错误 MISCONF Redis is configured to save RDB snapshots
猜你喜欢

QSunSync 七牛云文件同步工具,批量上传

JVM类加载简介

动态规划/背包问题总结/小结——01背包、完全背包

【FreeRTOS】FreeRTOS与stm32内置堆栈的占用情况
![[230]连接Redis后执行命令错误 MISCONF Redis is configured to save RDB snapshots](/img/fa/5bdc81b1ebfc22d31f42da34427f3e.png)
[230]连接Redis后执行命令错误 MISCONF Redis is configured to save RDB snapshots

oracle create user

Bit rate vs. resolution, which one is more important?

Lattice PCIe 学习 1

5. PCIe official example

gorm joint table query - actual combat
随机推荐
canvas 高斯模糊效果
tensor.nozero(), mask, [mask]
gorm联表查询-实战
could not build server_names_hash, you should increase server_names_hash_bucket_size: 32
软件测试面试题:请你分别画出 OSI 的七层网络结构图和 TCP/IP 的四层结构图?
面试汇总:为何大厂面试官总问 Framework 的底层原理?
If capturable=False, state_steps should not be CUDA tensors
Software Testing Interview Questions: About Automated Testing Tools?
Redis visual management software Redis Desktop Manager2022
软件测试面试题:LoadRunner 分为哪三个模块?
2022 Hangzhou Electric Power Multi-School Session 3 Question B Boss Rush
Software testing interview questions: Have you used some tools for software defect (Bug) management in your past software testing work? If so, please describe the process of software defect (Bug) trac
After the staged testing is complete, have you performed defect analysis?
Software Testing Interview Questions: What's the Difference Between Manual Testing and Automated Testing?
僵尸进程和孤儿进程
tiup uninstall
tiup status
软件测试面试题:软件测试类型都有哪些?
Raw and scan of gorm
Matlab uses plotting method for data simulation and simulation