当前位置:网站首页>自定义线程池拒绝策略

自定义线程池拒绝策略

2022-07-07 11:05:00 BUG指挥官

一. 默认的拒绝策略
        ThreadPoolExceutor.AbortPolicy : 丢弃任务并抛出RejectedExecutionException异常。
        ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。         ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务

        ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务

        一般来说, 默认的这4种拒绝策略, 在一些场景是不适用的, 比如我们不想丢弃任何任务, 那么前3种拒绝策略就不适用了

         第4种拒绝策略比较特殊, 当执行该策略后, 会把当前线程交给 提交任务的线程 去执行这个任务, 比如说你是通过 主线程 开启的线程任务, 那么这个拒绝策略就是把 被线程池拒绝执行的任务, 反手交给 主线程执行, 并发量高的场景下, 会阻塞主线程, 所以也不是很适用, 下面使用伪代码自定义拒绝策略

二. 自定义拒绝策略
      如果不想丢弃该任务和不想交给主线程来执行该任务, 我们是不是可以把这些被拒绝执行的任务收集起来? 所以可以考虑使用缓存容器, 并且一定要是并发安全的缓存容器, 所以可以使用 juc 包下的 java.util.concurrent.LinkedBlockingQueue, 先介绍下这个类

        LinkedBlockingQueue是一个单向链表实现的阻塞队列, 该队列按 FIFO(先进先出)排序元素, 新元素插入到队列的尾部, 并且队列获取操作会获得位于队列头部的元素, 链接队列的吞吐量通常要高于基于数组的队列, 但是在大多数并发应用程序中, 其可预知的性能要低
        此外, LinkedBlockingQueue还是可选容量的(防止过度膨胀), 即可以指定队列的容量, 如果不指定, 默认容量大小等于Integer.MAX_VALUE, 我这里就不指定队列的容量了

        LinkedBlockingQueue在实现 "多线程对竞争资源的互斥访问" 时, 对于该类中的 put 和 take  方法操作分别使用了不同的锁, 对于 put 操作, 通过 "插入锁 putLock" 进行同步, 对于取出操作, 通过 "取出锁takeLock" 进行同步

        若某线程(代码中模拟是 SchedulerTask)要取出数据时, 队列正好为空, 则该线程会执 notEmpty.await()进行等待, 当其它某个线程(代码中模拟是线程池的拒绝任务)向队列中插入了数据之后, 会调用 notEmpty.signal() 唤醒 "notEmpty上的等待线程”, 此时, (代码中模拟是 SchedulerTask)会被唤醒从而得以继续运行, 此外, (代码中模拟是 SchedulerTask)在执行取操作前, 会获取 takeLock, 在取操作执行完毕再释放 takeLock

        若某线程(代码中模拟是线程池的拒绝任务)要插入数据时, 队列已满, 则该线程会它执行   notFull.await() 进行等待, 当其它某个线程(代码中模拟是 SchedulerTask)取出数据之后,会调用notFull.signal() 唤醒 "notFull上的等待线程", 此时, (代码中模拟是线程池的拒绝任务)就会被唤醒从而得以继续运行, 此外, (代码中模拟是线程池的拒绝任务)在执行插入操作前,会获取 putLock, 在插入操作执行完毕才释放 putLock

2.1 首先自定义拒绝策略实现
        自定义一个类实现 java.util.concurrent.RejectedExecutionHandler 接口

public class CustomizeRejectionPolicy implements RejectedExecutionHandler {
 
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r != null) {
            // 线程池没来得及执行的任务先放入队列
            ThreadReader.put(r);
        }
    }
}

2.2 实现一个中间类, 用于声明 LinkedBlockingQueue

@Component
public class ThreadReader {
    
    private static final Logger logger = LoggerFactory.getLogger(ThreadReader.class);
    
    private static final BlockingQueue<Runnable> BLOCKING_QUEUE = new LinkedBlockingQueue<>();
 
    @Qualifier("thread-pool")
    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;
 
    public static void put(Runnable runnable) {
        BlockingQueue<Runnable> blockingQueue = getBlockingQueue();
        try {
            blockingQueue.put(runnable);
        } catch (InterruptedException e) {
            // 忽略
        }
    }
 
    public void take() {
        BlockingQueue<Runnable> blockingQueue = getBlockingQueue();
        if (CollectionUtils.isEmpty(blockingQueue)) {
            return;
        }
        try {
            Runnable runnable = blockingQueue.take();
            logger.info(LogUtils.format("取出当前线程池没来得及执行的任务, runnable=<{0}>", runnable));
            threadPoolExecutor.execute(runnable);
        } catch (InterruptedException e) {
            // 
        }
 
    }
    
    public static BlockingQueue<Runnable> getBlockingQueue() {
        return BLOCKING_QUEUE;
    }
}

2.3 定义该队列的取出类, 这里使用的是单线程, 不再是主线程

@Component
public class CustomizeScheduler implements ApplicationListener<ContextRefreshedEvent> {
 
    @Autowired
    private ThreadReader threadReader;
 
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        ThreadFactory threadFactory =
                new ThreadFactoryBuilder().setThreadFactory(new NamedThreadFactory("SchedulerTask")).build();
        ScheduledExecutorService scheduledExecutor =
                new ScheduledThreadPoolExecutor(1, threadFactory, new CustomizeRejectionPolicy());
        // 之后每隔1秒执行队列中没有来得及执行的任务
        scheduledExecutor.scheduleAtFixedRate(() -> threadReader.take(), 1, 1, TimeUnit.SECONDS);
    }
}

2.4 模拟测试

        首先线程池配置如下, 然后使用 jmeter 模拟2000个请求

@Configuration
public class ThreadPoolConfig {
    
    @Bean("thread-pool")
    public static ThreadPoolExecutor threadPoolExecutor() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16,1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100),
                new NamedThreadFactory("common-service"), new CustomizeRejectionPolicy());
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        return threadPoolExecutor;
    }
}

 之后查看控制台, 可以看到日志打印情况, 可以发现 单线程 每隔1秒从队列中捞取一个被线程池拒绝执行的任务, 并把该任务重新加入到线程池中, 再执行

 

三. 相关设计模式
        这里使用到了多线程的经典设计模式, Producer-Consumer模式, 线程池提交被拒绝执行的任务, 他相当于生产者, 单线程相当于消费者, 处理这些任务, 中间类 ThreadReader 相当于 Channel 通道, 此设计模式详解可以查看博客 : 多线程基础之设计模式Producer-Consumer模式

        为什么不能直接在拒绝策略中引入单线程去执行被拒绝的任务?

        这样程序的性能会降低, 线程池只需要把被拒绝的任务往队列中添加, 无需关心单线程怎么调度的, 也无需等待单线程对这些任务的处理, 就可以立即把下一个任务放入到队列中

原网站

版权声明
本文为[BUG指挥官]所创,转载请带上原文链接,感谢
https://blog.csdn.net/WXF_Sir/article/details/125619421