当前位置:网站首页>Custom thread pool rejection policy

Custom thread pool rejection policy

2022-07-07 13:29:00 Bug Commander

One . Default deny policy
        ThreadPoolExceutor.AbortPolicy : Discard the task and throw it out RejectedExecutionException abnormal .
        ThreadPoolExecutor.DiscardPolicy: Discarding the task , But no exception is thrown .         ThreadPoolExecutor.DiscardOldestPolicy: Discard the top task in the queue , Then resubmit the rejected task

        ThreadPoolExecutor.CallerRunsPolicy: By the calling thread ( The thread that submitted the task ) Deal with the task

        Generally speaking , Default this 4 Type of rejection policy , It is not applicable in some scenarios , For example, we don't want to discard any tasks , So before 3 The two rejection strategies are not applicable

          The first 4 This rejection strategy is special , When this policy is implemented , Will give the current thread to The thread that submitted the task To perform this task , For example, you are through The main thread Open thread task , So this rejection strategy is to put Tasks rejected by the thread pool , Backhand to Main thread execution , Scenarios with high concurrency , Will block the main thread , So it is not very applicable , The following uses pseudo code to customize the rejection policy

Two . Custom reject policy
      If you don't want to discard the task and leave it to the main thread to execute the task , Can we collect these rejected tasks ? So consider using a cache container , And it must be a concurrency safe cache container , So you can use juc Under bag java.util.concurrent.LinkedBlockingQueue, Introduce this class first

        LinkedBlockingQueue Is a one-way linked list implementation of the blocking queue , The queue by FIFO( fifo ) Sorting elements , The new element is inserted at the end of the queue , And the queue get operation will get the element at the head of the queue , The throughput of linked queues is usually higher than that of array based queues , But in most concurrent applications , Its predictable performance is lower
        Besides , LinkedBlockingQueue Or optional capacity ( Prevent excessive expansion ), That is, you can specify the capacity of the queue , If you don't specify , The default capacity size is equal to Integer.MAX_VALUE, I will not specify the capacity of the queue here

        LinkedBlockingQueue In the realization of " Mutex access to competing resources by multiple threads " when , For put and take   Method operations use different locks , about put operation , adopt " Insert the lock putLock" To synchronize , For the extraction operation , adopt " Take out the lock takeLock" To synchronize

        If a thread ( The simulation in the code is SchedulerTask) To retrieve data , The queue is just empty , The thread will execute notEmpty.await() Wait for , When some other thread ( The simulation in the code is the reject task of the thread pool ) After inserting data into the queue , Would call notEmpty.signal() Wake up the "notEmpty On the waiting thread ”, here , ( The simulation in the code is SchedulerTask) Will be awakened to continue to run , Besides , ( The simulation in the code is SchedulerTask) Before the fetch operation , Will get takeLock, Release after the fetch operation is completed takeLock

        If a thread ( The simulation in the code is the reject task of the thread pool ) To insert data , The queue is full , Then the thread will execute   notFull.await() Wait for , When some other thread ( The simulation in the code is SchedulerTask) After taking out the data , Would call notFull.signal() Wake up the "notFull On the waiting thread ", here , ( The simulation in the code is the reject task of the thread pool ) Will be awakened to continue to run , Besides , ( The simulation in the code is the reject task of the thread pool ) Before performing the insert operation , Will get putLock, It is not released until the insertion operation is completed putLock

2.1 First, customize the rejection policy implementation
        Customize a class implementation java.util.concurrent.RejectedExecutionHandler Interface

public class CustomizeRejectionPolicy implements RejectedExecutionHandler {
 
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r != null) {
            //  Tasks that the thread pool does not have time to execute are put into the queue first 
            ThreadReader.put(r);
        }
    }
}

2.2 Implement an intermediate class , For declaration  LinkedBlockingQueue

@Component
public class ThreadReader {
    
    private static final Logger logger = LoggerFactory.getLogger(ThreadReader.class);
    
    private static final BlockingQueue<Runnable> BLOCKING_QUEUE = new LinkedBlockingQueue<>();
 
    @Qualifier("thread-pool")
    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;
 
    public static void put(Runnable runnable) {
        BlockingQueue<Runnable> blockingQueue = getBlockingQueue();
        try {
            blockingQueue.put(runnable);
        } catch (InterruptedException e) {
            //  Ignore 
        }
    }
 
    public void take() {
        BlockingQueue<Runnable> blockingQueue = getBlockingQueue();
        if (CollectionUtils.isEmpty(blockingQueue)) {
            return;
        }
        try {
            Runnable runnable = blockingQueue.take();
            logger.info(LogUtils.format(" Fetch the tasks that the current thread pool has not been able to execute , runnable=<{0}>", runnable));
            threadPoolExecutor.execute(runnable);
        } catch (InterruptedException e) {
            // 
        }
 
    }
    
    public static BlockingQueue<Runnable> getBlockingQueue() {
        return BLOCKING_QUEUE;
    }
}

2.3 Define the fetch class of the queue , Single thread is used here , No longer the main thread

@Component
public class CustomizeScheduler implements ApplicationListener<ContextRefreshedEvent> {
 
    @Autowired
    private ThreadReader threadReader;
 
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        ThreadFactory threadFactory =
                new ThreadFactoryBuilder().setThreadFactory(new NamedThreadFactory("SchedulerTask")).build();
        ScheduledExecutorService scheduledExecutor =
                new ScheduledThreadPoolExecutor(1, threadFactory, new CustomizeRejectionPolicy());
        //  After every 1 Second, there is no time to execute tasks in the execution queue 
        scheduledExecutor.scheduleAtFixedRate(() -> threadReader.take(), 1, 1, TimeUnit.SECONDS);
    }
}

2.4 Simulation test

         First Thread pool The configuration is as follows , And then use jmeter simulation 2000 A request

@Configuration
public class ThreadPoolConfig {
    
    @Bean("thread-pool")
    public static ThreadPoolExecutor threadPoolExecutor() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16,1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100),
                new NamedThreadFactory("common-service"), new CustomizeRejectionPolicy());
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        return threadPoolExecutor;
    }
}

  Then check the console , You can see the log printing , You can find   Single thread   every other 1 Seconds from the queue Tasks rejected by the thread pool , And add the task back to the thread pool , Re execution

 

3、 ... and . Related design patterns
        The classic design pattern of multithreading is used here , Producer-Consumer Pattern , The thread pool submits a task that is denied execution , He is equivalent to a producer , Single thread is equivalent to consumer , Deal with these tasks , Intermediate class ThreadReader amount to Channel passageway , This design pattern can be explained in detail in the blog : Multithreading based design patterns Producer-Consumer Pattern

        Why not directly introduce a single thread into the rejection policy to execute the rejected task ?

        In this way, the performance of the program will be reduced , The thread pool only needs to add the rejected tasks to the queue , There is no need to care about how a single thread is scheduled , There is no need to wait for a single thread to process these tasks , You can immediately put the next task in the queue

原网站

版权声明
本文为[Bug Commander]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/188/202207071104519533.html