当前位置:网站首页>Application and principle of ThreadPoolExecutor thread pool
Application and principle of ThreadPoolExecutor thread pool
2022-07-01 05:10:00 【People below two meters are mortals】
1、 Custom thread pool
A thread is a system resource , Every time you create , Will consume certain system resources , If you are in a high concurrency scenario , Each task creates a new thread , So this will occupy a considerable amount of the system , There may even be a memory overflow problem , And the number of threads cannot be too large , If too many , Frequent context switching , There will also be a great burden , For these two reasons , We can make full use of existing threads , Instead of creating new threads every time
The first is blocking queues
@Slf4j
class BlockingQueue<T> {
/** * Default blocking task queue size */
private static final Integer DEFAULT_QUEUE_CAPACITY = 5;
/** * Blocking queues */
private Queue<T> queue;
/** * Blocking queues may have multiple thread operations , Need to ensure thread safety , So a lock is needed */
private final ReentrantLock lock = new ReentrantLock();
/** * If the queue is full , Then you can't add any more tasks , The thread adding the task needs to be blocked */
private final Condition putWaitSet = lock.newCondition();
/** * If the queue is already empty , Then you can't take any more tasks , The thread that fetches the task needs to be blocked */
private final Condition getWaitSet = lock.newCondition();
/** * Blocking task queue capacity */
private final int capacity;
/** * No arguments structure */
public BlockingQueue() {
this.capacity = DEFAULT_QUEUE_CAPACITY;
initQueue();
}
/** * A parametric structure that carries the initial capacity * * @param capacity Custom capacity size */
public BlockingQueue(int capacity) {
this.capacity = capacity;
initQueue();
}
/** * Create a task queue */
private void initQueue() {
queue = new ArrayDeque<>(this.capacity);
}
/** * Block acquisition task , Without timeout * * @return T Task object */
public T get() {
lock.lock();
try {
while (queue.size() == 0) {
try {
getWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.poll();
putWaitSet.signalAll();
return t;
} finally {
lock.unlock();
}
}
/** * Block acquisition task , With timeout * * @param timeout Timeout time * @param timeUnit Time out unit * @return T Task object */
public T get(long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanosTime = timeUnit.toNanos(timeout);
while (queue.size() == 0) {
try {
if (nanosTime <= 0) {
return null;
}
nanosTime = getWaitSet.awaitNanos(nanosTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.poll();
putWaitSet.signalAll();
return t;
} finally {
lock.unlock();
}
}
/** * Block add task * * @param t Task object */
public void put(T t) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
log.info(" Mission :{} Waiting to join the blocking task queue ", t);
putWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(t);
getWaitSet.signalAll();
log.info(" Mission :{} Join the blocking task queue ", t);
} finally {
lock.unlock();
}
}
/** * Get task queue size * * @return int Task queue size */
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
Then there is the custom thread pool
@Slf4j
class ThreadPool {
/** * Blocked task queue */
private final BlockingQueue<Runnable> blockingQueue;
/** * The worker thread */
private final Set<Worker> workers;
/** * The default number of worker threads */
private static final int DEFAULT_POOL_SIZE = 5;
/** * The default number of blocked task queue tasks */
private static final int DEFAULT_QUEUE_SIZE = 5;
/** * The default waiting time unit */
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
/** * Default blocking wait time */
private static final int DEFAULT_TIMEOUT = 5;
/** * Number of worker threads */
private final int poolSize;
/** * The maximum number of tasks in the task queue */
private final int queueSize;
/** * Waiting time unit */
private final TimeUnit timeUnit;
/** * Blocking waiting time */
private final int timeout;
/** * No arguments structure */
public ThreadPool() {
poolSize = DEFAULT_POOL_SIZE;
queueSize = DEFAULT_QUEUE_SIZE;
timeUnit = DEFAULT_TIME_UNIT;
timeout = DEFAULT_TIMEOUT;
blockingQueue = new BlockingQueue<>(queueSize);
workers = new HashSet<>(poolSize);
}
/** * Belt structure * * @param queueSize Maximum number of blocked task queue tasks * @param poolSize Number of worker threads */
public ThreadPool(int queueSize, int poolSize, TimeUnit timeUnit, int timeout) {
this.poolSize = poolSize;
this.queueSize = queueSize;
this.timeUnit = timeUnit;
this.timeout = timeout;
this.blockingQueue = new BlockingQueue<>(queueSize);
this.workers = new HashSet<>(poolSize);
}
/** * Perform a task * * @param task Task object */
public void execute(Runnable task) {
synchronized (workers) {
if (workers.size() < poolSize) {
Worker worker = new Worker(task);
log.info(" Create a new task worker thread : {} , Perform tasks :{}", worker, task);
workers.add(worker);
worker.start();
} else {
blockingQueue.put(task);
}
}
}
/** * Wrapper classes for worker threads */
@Data
@EqualsAndHashCode(callSuper = false)
@NoArgsConstructor
@AllArgsConstructor
class Worker extends Thread {
private Runnable task;
@Override
public void run() {
while (task != null || (task = blockingQueue.get(timeout, timeUnit)) != null) {
try {
log.info(" Being implemented :{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.info(" The worker thread is removed :{}", this);
workers.remove(this);
}
}
}
}
test
@Slf4j
public class TestPool {
public static void main(String[] args) {
ThreadPool pool = new ThreadPool(10, 2, TimeUnit.SECONDS, 2);
for (int i = 0; i < 15; i++) {
int j = i;
pool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(String.valueOf(j));
});
}
}
}
[main] INFO com.phz.juc.ThreadPool - Create a new task worker thread : ThreadPool.Worker(task=com.phz.juc.TestPool$$Lambda$2/1225358173@76fb509a) , Perform tasks :com.phz.juc.TestPool$$Lambda$2/1225358173@76fb509a
[main] INFO com.phz.juc.ThreadPool - Create a new task worker thread : ThreadPool.Worker(task=com.phz.juc.TestPool$$Lambda$2/1225358173@2e817b38) , Perform tasks :com.phz.juc.TestPool$$Lambda$2/1225358173@2e817b38
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@c4437c4 Join the blocking task queue
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@76fb509a
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@433c675d Join the blocking task queue
[Thread-1] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@2e817b38
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@3f91beef Join the blocking task queue
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@1a6c5a9e Join the blocking task queue
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@37bba400 Join the blocking task queue
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@179d3b25 Join the blocking task queue
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@254989ff Join the blocking task queue
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@5d099f62 Join the blocking task queue
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@37f8bb67 Join the blocking task queue
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@49c2faae Join the blocking task queue
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@20ad9418 Waiting to join the blocking task queue
[Thread-0] INFO com.phz.juc.TestPool - 0
[Thread-1] INFO com.phz.juc.TestPool - 1
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@c4437c4
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@20ad9418 Join the blocking task queue
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@439f5b3d Waiting to join the blocking task queue
[Thread-1] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@433c675d
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@439f5b3d Join the blocking task queue
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@1d56ce6a Waiting to join the blocking task queue
[Thread-1] INFO com.phz.juc.TestPool - 3
[Thread-0] INFO com.phz.juc.TestPool - 2
[Thread-1] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@3f91beef
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@1a6c5a9e
[main] INFO com.phz.juc.BlockingQueue - Mission :com.phz.juc.TestPool$$Lambda$2/1225358173@1d56ce6a Join the blocking task queue
[Thread-1] INFO com.phz.juc.TestPool - 4
[Thread-0] INFO com.phz.juc.TestPool - 5
[Thread-1] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@37bba400
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@179d3b25
[Thread-1] INFO com.phz.juc.TestPool - 6
[Thread-0] INFO com.phz.juc.TestPool - 7
[Thread-1] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@254989ff
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@5d099f62
[Thread-0] INFO com.phz.juc.TestPool - 9
[Thread-1] INFO com.phz.juc.TestPool - 8
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@37f8bb67
[Thread-1] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@49c2faae
[Thread-0] INFO com.phz.juc.TestPool - 10
[Thread-1] INFO com.phz.juc.TestPool - 11
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@20ad9418
[Thread-1] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@439f5b3d
[Thread-0] INFO com.phz.juc.TestPool - 12
[Thread-1] INFO com.phz.juc.TestPool - 13
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$2/1225358173@1d56ce6a
[Thread-1] INFO com.phz.juc.ThreadPool - The worker thread is removed :ThreadPool.Worker(task=null)
[Thread-0] INFO com.phz.juc.TestPool - 14
[Thread-0] INFO com.phz.juc.ThreadPool - The worker thread is removed :ThreadPool.Worker(task=null)
2、 Refusal strategy
If the task to be performed takes a long time , So what happens ?
@Slf4j
public class TestPool {
public static void main(String[] args) {
ThreadPool pool = new ThreadPool(10, 2, TimeUnit.SECONDS, 2);
for (int i = 0; i < 20; i++) {
int j = i;
pool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(String.valueOf(j));
});
}
}
}
You can see that the main thread has been waiting here , Unable to continue the follow-up task , In this case, should we throw an exception or refuse to execute to optimize , Or just wait for death , Or let the caller perform the task himself … So many possibilities , If applied to code , There will probably be a lot of if Branch it , At this time, we can adopt the strategy mode , To sum up all the rejection strategies
@FunctionalInterface
interface RejectPolicy<T> {
/** * Reject policy method * * @param queue Blocking queues * @param task Task object */
void reject(ArrayDeque<T> queue, T task);
}
ThreadPool Make corresponding modifications
@Slf4j
class ThreadPool {
/** * Blocked task queue */
private final BlockingQueue<Runnable> blockingQueue;
/** * The worker thread */
private final Set<Worker> workers;
/** * The default number of worker threads */
private static final int DEFAULT_POOL_SIZE = 5;
/** * The default number of blocked task queue tasks */
private static final int DEFAULT_QUEUE_SIZE = 5;
/** * The default waiting time unit */
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
/** * Default blocking wait time */
private static final int DEFAULT_TIMEOUT = 5;
/** * Number of worker threads */
private final int poolSize;
/** * The maximum number of tasks in the task queue */
private final int queueSize;
/** * Waiting time unit */
private final TimeUnit timeUnit;
/** * Blocking waiting time */
private final int timeout;
/** * Refusal strategy */
private final RejectPolicy<Runnable> rejectPolicy;
/** * No arguments structure */
public ThreadPool() {
poolSize = DEFAULT_POOL_SIZE;
queueSize = DEFAULT_QUEUE_SIZE;
timeUnit = DEFAULT_TIME_UNIT;
timeout = DEFAULT_TIMEOUT;
blockingQueue = new BlockingQueue<>(queueSize);
workers = new HashSet<>(poolSize);
// The default calling thread executes by itself
rejectPolicy = (queue, task) -> task.run();
}
/** * Belt structure * * @param queueSize Maximum number of blocked task queue tasks * @param poolSize Number of worker threads */
public ThreadPool(int queueSize, int poolSize, TimeUnit timeUnit, int timeout, RejectPolicy<Runnable> rejectPolicy) {
this.poolSize = poolSize;
this.queueSize = queueSize;
this.timeUnit = timeUnit;
this.timeout = timeout;
this.blockingQueue = new BlockingQueue<>(queueSize);
this.workers = new HashSet<>(poolSize);
this.rejectPolicy = rejectPolicy;
}
/** * Perform a task * * @param task Task object */
public void execute(Runnable task) {
synchronized (workers) {
if (workers.size() < poolSize) {
Worker worker = new Worker(task);
log.info(" Create a new task worker thread : {} , Perform tasks :{}", worker, task);
workers.add(worker);
worker.start();
} else {
blockingQueue.tryPut(rejectPolicy, task);
}
}
}
/** * Wrapper classes for worker threads */
@Data
@EqualsAndHashCode(callSuper = false)
@NoArgsConstructor
@AllArgsConstructor
class Worker extends Thread {
private Runnable task;
@Override
public void run() {
while (task != null || (task = blockingQueue.get(timeout, timeUnit)) != null) {
try {
log.info(" Being implemented :{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.info(" The worker thread is removed :{}", this);
workers.remove(this);
}
}
}
}
The blocking queue also needs to be modified
@Slf4j
class BlockingQueue<T> {
/** * Default blocking task queue size */
private static final Integer DEFAULT_QUEUE_CAPACITY = 5;
/** * Blocking queues */
private ArrayDeque<T> queue;
/** * Blocking queues may have multiple thread operations , Need to ensure thread safety , So a lock is needed */
private final ReentrantLock lock = new ReentrantLock();
/** * If the queue is full , Then you can't add any more tasks , The thread adding the task needs to be blocked */
private final Condition putWaitSet = lock.newCondition();
/** * If the queue is already empty , Then you can't take any more tasks , The thread that fetches the task needs to be blocked */
private final Condition getWaitSet = lock.newCondition();
/** * Blocking task queue capacity */
private final int capacity;
/** * No arguments structure */
public BlockingQueue() {
this.capacity = DEFAULT_QUEUE_CAPACITY;
initQueue();
}
/** * A parametric structure that carries the initial capacity * * @param capacity Custom capacity size */
public BlockingQueue(int capacity) {
this.capacity = capacity;
initQueue();
}
/** * Create a task queue */
private void initQueue() {
queue = new ArrayDeque<>(this.capacity);
}
/** * Block acquisition task , Without timeout * * @return T Task object */
public T get() {
lock.lock();
try {
while (queue.size() == 0) {
try {
getWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.poll();
putWaitSet.signalAll();
return t;
} finally {
lock.unlock();
}
}
/** * Block acquisition task , With timeout * * @param timeout Timeout time * @param timeUnit Time out unit * @return T Task object */
public T get(long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanosTime = timeUnit.toNanos(timeout);
while (queue.size() == 0) {
try {
if (nanosTime <= 0) {
return null;
}
nanosTime = getWaitSet.awaitNanos(nanosTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.poll();
putWaitSet.signalAll();
return t;
} finally {
lock.unlock();
}
}
/** * Block add task * * @param t Task object */
public void put(T t) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
log.info(" Mission :{} Waiting to join the blocking task queue ", t);
putWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(t);
getWaitSet.signalAll();
log.info(" Mission :{} Join the blocking task queue ", t);
} finally {
lock.unlock();
}
}
/** * Block adding tasks with timeout * * @param t Task object * @param timeout Time out period * @param timeUnit Timeout duration unit */
public boolean put(T t, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
log.info(" Join task timeout , Cancel adding ");
return false;
}
log.info(" Mission :{} Waiting to join the blocking task queue ", t);
nanos = putWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(t);
getWaitSet.signalAll();
log.info(" Mission :{} Join the blocking task queue ", t);
return true;
} finally {
lock.unlock();
}
}
/** * Get task queue size * * @return int Task queue size */
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
/** * Add task with reject policy * * @param rejectPolicy Refusal strategy * @param task Task object */
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capacity) {
rejectPolicy.reject(queue, task);
} else {
log.info(" Join the task queue : {}", task);
queue.add(task);
getWaitSet.signalAll();
}
} finally {
lock.unlock();
}
}
}
test
ThreadPool pool = new ThreadPool(10, 2, TimeUnit.SECONDS, 2,
// Give up execution
(blockingQueue, task) -> {
log.info(" give up {}", task);
}
);
for (int i = 0; i < 20; i++) {
int j = i;
pool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(String.valueOf(j));
});
}
[main] INFO com.phz.juc.ThreadPool - Create a new task worker thread : ThreadPool.Worker(task=com.phz.juc.TestPool$$Lambda$3/2121055098@4d405ef7) , Perform tasks :com.phz.juc.TestPool$$Lambda$3/2121055098@4d405ef7
[main] INFO com.phz.juc.ThreadPool - Create a new task worker thread : ThreadPool.Worker(task=com.phz.juc.TestPool$$Lambda$3/2121055098@3f91beef) , Perform tasks :com.phz.juc.TestPool$$Lambda$3/2121055098@3f91beef
[main] INFO com.phz.juc.BlockingQueue - Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@1a6c5a9e
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@4d405ef7
[Thread-1] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@3f91beef
[main] INFO com.phz.juc.BlockingQueue - Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@37bba400
[main] INFO com.phz.juc.BlockingQueue - Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@179d3b25
[main] INFO com.phz.juc.BlockingQueue - Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@254989ff
[main] INFO com.phz.juc.BlockingQueue - Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@5d099f62
[main] INFO com.phz.juc.BlockingQueue - Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@37f8bb67
[main] INFO com.phz.juc.BlockingQueue - Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@49c2faae
[main] INFO com.phz.juc.BlockingQueue - Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@20ad9418
[main] INFO com.phz.juc.BlockingQueue - Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@31cefde0
[main] INFO com.phz.juc.BlockingQueue - Join the task queue : com.phz.juc.TestPool$$Lambda$3/2121055098@439f5b3d
[main] INFO com.phz.juc.TestPool - give up com.phz.juc.TestPool$$Lambda$3/2121055098@1d56ce6a
[main] INFO com.phz.juc.TestPool - give up com.phz.juc.TestPool$$Lambda$3/2121055098@5197848c
[main] INFO com.phz.juc.TestPool - give up com.phz.juc.TestPool$$Lambda$3/2121055098@17f052a3
[main] INFO com.phz.juc.TestPool - give up com.phz.juc.TestPool$$Lambda$3/2121055098@2e0fa5d3
[main] INFO com.phz.juc.TestPool - give up com.phz.juc.TestPool$$Lambda$3/2121055098@5010be6
[main] INFO com.phz.juc.TestPool - give up com.phz.juc.TestPool$$Lambda$3/2121055098@685f4c2e
[main] INFO com.phz.juc.TestPool - give up com.phz.juc.TestPool$$Lambda$3/2121055098@7daf6ecc
[main] INFO com.phz.juc.TestPool - give up com.phz.juc.TestPool$$Lambda$3/2121055098@2e5d6d97
[Thread-0] INFO com.phz.juc.TestPool - 0
[Thread-1] INFO com.phz.juc.TestPool - 1
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@1a6c5a9e
[Thread-1] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@37bba400
[Thread-1] INFO com.phz.juc.TestPool - 3
[Thread-0] INFO com.phz.juc.TestPool - 2
[Thread-1] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@179d3b25
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@254989ff
[Thread-0] INFO com.phz.juc.TestPool - 5
[Thread-1] INFO com.phz.juc.TestPool - 4
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@5d099f62
[Thread-1] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@37f8bb67
[Thread-1] INFO com.phz.juc.TestPool - 7
[Thread-0] INFO com.phz.juc.TestPool - 6
[Thread-1] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@49c2faae
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@20ad9418
[Thread-1] INFO com.phz.juc.TestPool - 8
[Thread-0] INFO com.phz.juc.TestPool - 9
[Thread-1] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@31cefde0
[Thread-0] INFO com.phz.juc.ThreadPool - Being implemented :com.phz.juc.TestPool$$Lambda$3/2121055098@439f5b3d
[Thread-1] INFO com.phz.juc.TestPool - 10
[Thread-0] INFO com.phz.juc.TestPool - 11
[Thread-1] INFO com.phz.juc.ThreadPool - The worker thread is removed :ThreadPool.Worker(task=null)
[Thread-0] INFO com.phz.juc.ThreadPool - The worker thread is removed :ThreadPool.Worker(task=null)
Other rejection strategies
- The caller executes
(blockingQueue, task) -> {
task.run();
}
- Death etc.
BlockingQueue::put
- Throw an exception
(blockingQueue, task) -> {
throw new RuntimeException(" Task execution failed " + task);
}
- Wait with timeout
(blockingQueue, task) -> {
blockingQueue.put(task, 1500, TimeUnit.MILLISECONDS);
}
- Give up execution
(blockingQueue, task) -> {
log.info(" give up {}", task);
}
3、ThreadPoolExecutor
3.1、 Thread pool state
ThreadPoolExecutor Use int The height of 3 Bit to represent thread pool status , low 29 Bits indicate the number of threads
Status name | high 3 position | Receive new tasks | Processing blocking queue tasks | explain |
---|---|---|---|---|
RUNNING | 111 | Y | Y | |
SHUTDOWN | 000 | N | Y | No new missions , However, the remaining blocking queue will be processed Mission |
STOP | 001 | N | N | It interrupts the task in progress , And abandon the blocking queue Mission |
TIDYING | 010 | - | - | All the tasks have been completed , The active thread is 0 About to enter End |
TERMINATED | 011 | - | - | Put an end to the state |
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
Compare... Numerically ,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING (111, The highest bit is 1, A negative number ), This information is stored in an atomic variable ctl in , The purpose is to combine the thread pool state with the number of threads , So you can use it once CAS Atomic operations are used to assign values
// rs Is the top three , According to state , wc For low 29 position , Indicates the number of threads , Then merge them
private static int ctlOf(int rs, int wc) {
return rs | wc; }
//c For the original value ,ctlOf The return value is the expected value
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
3.2、 Construction method
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize Number of core threads ( The maximum number of threads reserved )
- maximumPoolSize Maximum number of threads ( Subtract the number of core threads = Number of emergency threads )
- keepAliveTime Time to live - For emergency threads unit Time unit - For emergency threads
- workQueue Blocking queues
- threadFactory Thread factory ( You can give a good name for thread creation , It is convenient to debug )
- handler Refusal strategy
The emergency thread is used for , The tasks performed by the core thread are not over yet , The blocking queue is full , Will see if there is an emergency thread , If there is one, carry it out , After the emergency thread is executed , For a certain time ( Time to live ) It will be destroyed after , But after the execution of the core thread , It's not going to be destroyed , Will survive backstage , If the core thread , Blocking queues , The emergency threads are full , The rejection strategy is executed
It works as follows :
There are no threads in the thread pool at first , When a task is submitted to the thread pool , The thread pool creates a new thread to perform the task .
When the number of threads reaches corePoolSize No threads are idle , Then join the task , New tasks will be added workQueue Queuing , Until there are idle threads .
If the queue selects Bounded queues , So when the task exceeds the queue size , Will create maximumPoolSize - corePoolSize Number of threads to save .
If the thread arrives maximumPoolSize If there are still new tasks, the reject policy will be executed . Refusal strategy JDK Provides 4 Kind of implementation , Other well-known frameworks also provide implementations
- AbortPolicy Let the caller throw RejectedExecutionException abnormal , This is the default strategy
- CallerRunsPolicy Let the caller run the task
- DiscardPolicy Give up this mission
- DiscardOldestPolicy Abandon the earliest task in the queue , This task replaces
- Dubbo The implementation of the , Throwing out RejectedExecutionException The log will be recorded before the exception , and dump Thread stack information , Easy to locate problems
- Netty The implementation of the , Is to create a new thread to execute the task
- ActiveMQ The implementation of the , With overtime waiting (60s) Attempt to put in queue , Similar to the rejection policy we customized before
- PinPoint ( Link tracking ) The implementation of the , It uses a rejection policy chain , Each rejection policy in the policy chain will be tried one by one
When the peak is over , exceed corePoolSize If there is no task to do for a period of time , Need to end saving resources , This time is by keepAliveTime and unit To control .
According to this construction method ,JDK Executors Class provides many factory methods to create thread pools for various purposes
3.3、 Several factory methods
Relevant methods can enter Executors Class
3.3.1、newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- Number of core threads == Maximum number of threads ( No emergency threads have been created ), Therefore, no timeout is required
- The blocking queue is unbounded , You can put any number of tasks
It can be used when the amount of task is known , Relatively time-consuming tasks
3.3.2、newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- The number of core threads is 0, The maximum number of threads is Integer.MAX_VALUE, The idle lifetime of the emergency thread is 60s, signify
- It's all emergency threads (60s It can be recycled )
- Emergency threads can be created indefinitely
- The queue uses SynchronousQueue The implementation features are , It has no capacity , You can't put it in without a thread ( Hand in hand 、 Delivery on one hand )
SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
try {
log.info("putting {} ", 1);
integers.put(1);
log.info("{} putted...", 1);
log.info("putting...{} ", 2);
integers.put(2);
log.info("{} putted...", 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
try {
log.info("taking {}", 1);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
try {
log.info("taking {}", 2);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t3").start();
The whole thread pool shows that the number of threads will increase according to the number of tasks , There is no upper limit , When the task is finished , Free 1 Release thread in minutes . Suitable for intensive tasks , But when each task takes a short time
3.2.3、newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
It is applicable to tasks that you want to queue for execution . The number of threads is fixed to 1, There are more tasks than 1 when , Will be placed in an unbounded queue . Task completed , The only thread will not be released .
Only one thread , Can it also be called thread pool ?
Create a single thread to execute the task serially , If the task fails and terminates, there is no remedy , The thread pool will also create a new thread , Ensure the normal operation of the pool
Executors.newSingleThreadExecutor() The number of threads is always 1, Do not modify
- Back to a FinalizableDelegatedExecutorService , Parameters are created ThreadPoolExecutor , The decorator mode is applied here , Only exposed ExecutorService Interface , It limits that we can only call methods of the interface , Instead of calling ThreadPoolExecutor The unique method in
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
Executors.newFixedThreadPool(1) When the initial for 1, It can be modified later
- What's exposed is ThreadPoolExecutor object , Can be strong after the call. setCorePoolSize And so on
3.4、 Submit tasks
// Perform tasks
void execute(Runnable command);
// Submit tasks task, Use return value Future Get mission results
<T> Future<T> submit(Callable<T> task);
// Submit tasks All tasks in
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// Submit tasks All tasks in
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// Submit tasks All tasks in , With timeout
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// Submit tasks All tasks in , Which task is successfully completed first , Returns the result of this task , Other tasks cancelled
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// Submit tasks All tasks in , Which task is successfully completed first , Returns the result of this task , Other tasks cancelled , With timeout
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
3.5、 Close thread pool
- shutdown
// The thread pool state changes to SHUTDOWN, No new missions , But the submitted task will be finished ( Includes executing and blocking queues ), This method does not block the execution of the calling thread
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// Modify thread pool state
advanceRunState(SHUTDOWN);
// Only idle threads will be interrupted
interruptIdleWorkers();
onShutdown(); // The extension point To subclass ScheduledThreadPoolExecutor With
} finally {
mainLock.unlock();
}
// Try to end
tryTerminate();
}
- shutdownNow
// The thread pool state changes to STOP, No new missions , The task in the task queue will be returned , And use interrupt To interrupt a task in progress
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// Modify thread pool state
advanceRunState(STOP);
// Break all threads
interruptWorkers();
// Get the remaining tasks in the queue
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// Try to end
tryTerminate();
return tasks;
}
- Other methods
// be not in RUNNING Thread pool for state , This method returns true
boolean isShutdown();
// Whether the thread pool state is TERMINATED
boolean isTerminated();
// call shutdown after , Because the calling thread does not wait for all tasks to run , So if it wants to pool TERMINATED Do something later , You can use this method to wait for
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
4、 Task scheduling thread pool
4.1、 Delayed execution
stay 『 Task scheduling thread pool 』 Before adding the function , have access to java.util.Timer To realize the timing function ,Timer The advantage is that it's easy to use , But because all tasks are scheduled by the same thread , So all tasks are executed serially , There can only be one task at a time , The delay or exception of the previous task will affect the later task .
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@SneakyThrows
@Override
public void run() {
log.info("task 1");
TimeUnit.SECONDS.sleep(2);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.info("task 2");
}
};
// Use timer Add two tasks , I hope they're all there 1s After execution
// But because of timer There is only one thread in the queue to execute the tasks in sequence , therefore 『 Mission 1』 Time delay of , Affected 『 Mission 2』 Implementation
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
Use task scheduling thread pool to transform :
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// Add two tasks , I hope they're all there 1s After execution
executor.schedule(() -> {
System.out.println(" Mission 1, execution time :" + new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
System.out.println(" Mission 2, execution time :" + new Date());
}, 1000, TimeUnit.MILLISECONDS);
If one of the threads reports an error , It will not affect the execution of other threads
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
// Add two tasks , I hope they're all there 1s After execution
executor.schedule(() -> {
System.out.println(" Mission 1, execution time :" + new Date());
int i = 1 / 0;
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
System.out.println(" Mission 2, execution time :" + new Date());
}, 1000, TimeUnit.MILLISECONDS);
But the expected exception did not appear , What's going on? ? For anomalies , We can have the following solutions
- Catch yourself
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
// Add two tasks , I hope they're all there 1s After execution
executor.schedule(() -> {
try {
System.out.println(" Mission 1, execution time :" + new Date());
int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
}
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
System.out.println(" Mission 2, execution time :" + new Date());
}, 1000, TimeUnit.MILLISECONDS);
- Use future
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool.submit(() -> {
log.info("task1");
int i = 1 / 0;
return true;
});
log.info("result:{}", f.get());
4.2、 Timing execution
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {
log.info("running...");
}, 1, 1, TimeUnit.SECONDS);
What if the task itself takes longer than the interval ?
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {
log.info("running..." + new Date());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, 1, TimeUnit.SECONDS);
- In limine , Time delay 1s, Next , Due to task execution time > Time interval between , Interval is 『 support 』 here we are 2s, That is, take the maximum value of interval time and actual time
If you want two tasks to be executed at a certain interval , have access to scheduleWithFixedDelay Method
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleWithFixedDelay(() -> {
log.info("running..." + new Date());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, 1, TimeUnit.SECONDS);
- Time delay 1s,scheduleWithFixedDelay What's the interval End of last task <-> Time delay <-> The next task begins So the intervals are 3s
5、Tomcat Thread pool
- LimitLatch For current limiting , You can control the maximum number of connections , similar JUC Medium Semaphore
- Acceptor Only responsible for accepting new Socket Connect
- Poller Only monitor Socket Channel Whether there is readable IO event , Once readable , Encapsulates a socketProcessor Task object , Submit to Executor Thread pool processing
- Executor The worker thread in is ultimately responsible for processing requests
Tomcat Thread pools are also called ThreadPoolExecutor , It's based on JDK From the modification in , So and JDK One of them is slightly different
- If the number of bus passes reaches maximumPoolSize, It will not be thrown immediately RejectedExecutionException abnormal , Instead, try to put the task in the queue , If it still fails , Will throw this exception
Source code tomcat-catalina-9.0.58
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
// If the thread is full , Will throw RejectedExecutionException abnormal
executeInternal(command);
} catch (RejectedExecutionException rx) {
if (getQueue() instanceof TaskQueue) {
// If Executor Approaching the maximum pool size , Concurrent invocations execute() May lead to ( because Tomcat Use TaskQueue) Some tasks are rejected instead of queued . If this happens , Please add them to the queue .
// The blocking queue has also been extended
final TaskQueue queue = (TaskQueue) getQueue();
try {
// Try to join the queue
if (!queue.force(command, timeout, unit)) {
// An exception will be thrown if the join fails
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
Connector Configuration item , Corresponding Tomcatserver.xml file Connector label
Configuration item | The default value is | explain |
---|---|---|
acceptorThreadCount | 1 | acceptor Number of threads |
pollerThreadCount | 1 | poller Number of threads |
minSpareThreads | 10 | Number of core threads , namely corePoolSize |
maxThreads | 200 | Maximum number of threads , namely maximumPoolSize |
executor | - | Executor name , Used to reference the following Executor |
- acceptor: The number of threads is 1 Just fine , It is blocked when there is no request , There is no need to use too many threads
- poller: The idea of multiplexing is adopted , Can monitor a lot of Channel Read and write events , therefore 1 One is OK
- executor: A tag refers to the following configuration , If the , It will cover minSpareThreads and maxThreads
Executor Thread configuration , Corresponding Tomcatserver.xml file Executor label ( The default is commented out )
Configuration item | The default value is | explain |
---|---|---|
threadPriority | 5 | Thread priority |
daemon | true | Whether to guard threads |
minSpareThreads | 25 | Number of core threads , namely corePoolSize |
maxThreads | 200 | Maximum number of threads , namely maximumPoolSize |
maxIdleTime | 60000 | Thread lifetime , In milliseconds , The default value is 1 minute |
maxQueueSize | Integer.MAX_VALUE | The queue length |
prestartminSpareThreads | false | Whether the core thread starts when the server starts |
- daemon:tomcat All threads in are daemon threads , Once the main service is shut down , The rest of the threads will shut down
- maxIdleTime: That is, the survival time of the emergency thread
- maxQueueSize: The length of the blocking queue , The default is unbounded queue , If the server is under too much pressure , It may cause the accumulation of tasks
- prestartminSpareThreads: Whether the creation of core threads is lazy
Since the blocking queue is unbounded , Is the emergency thread useless , Because in JDK in , If the blocking queue is bounded , Only when the blocking queue is full , Will create an emergency thread , The blocking queue here is
An unbounded , The emergency thread will soon be useless
In fact, it's just Tomcat Source code ,TaskQueue Extensions have been made ,Tomcat The task being executed will be counted in , Submission will add one , One will be deducted at the end , If the number of tasks submitted is still less than the number of core threads , Directly join the blocking queue for execution , If it is larger than the core thread , It will not join the blocking queue immediately , It will first determine whether the number of tasks submitted is greater than the maximum number of threads , If it is less than , Create an emergency thread to execute , If it is greater than , Will join the blocking queue
边栏推荐
- How to traverse massive data in redis
- Several methods of creating thread classes
- Leetcode522- longest special sequence ii- hash table - String - double pointer
- AcWing 884. Gauss elimination for solving XOR linear equations
- 解决:拖动xib控件到代码文件中,报错setValue:forUndefinedKey:this class is not key value coding-compliant for the key
- 【暑期每日一题】洛谷 P1629 邮递员送信(未完待续...)
- Vmware workstation network card settings and three common network modes
- Copier le matériel de conseils de bébé ne peut pas être vide, comment résoudre?
- Distributed transactions - Solutions
- LevelDB源码分析之memtable
猜你喜欢
在Rainbond中一键部署高可用 EMQX 集群
LeetCode522-最长特殊序列II-哈希表-字符串-双指针
Distributed - summary list
Solution: drag the Xib control to the code file, and an error setvalue:forundefined key:this class is not key value coding compliant for the key is reported
Neural network - nonlinear activation
Pytorch convolution operation
How to use common datasets in pytorch
C# wpf 使用DockPanel实现截屏框
Common methods in transforms
Usage and principle of synchronized
随机推荐
How to meet the requirements of source code confidentiality and source code security management
[daily question in summer] Luogu p5740 [deep foundation 7. Example 9] the best student
el-form表单新增表单项动态校验;el-form校验动态表单v-if不生效;
Global and Chinese market for instant messaging security and compliance solutions 2022-2028: Research Report on technology, participants, trends, market size and share
Use and modification of prior network model
[NLP Li Hongyi] notes
How to traverse massive data in redis
Global and Chinese markets for soft ferrite cores 2022-2028: Research Report on technology, participants, trends, market size and share
Data loading and preprocessing
Worried about infringement? Must share copyrightless materials on the website. Don't worry about the lack of materials for video clips
Application of industrial conductive slip ring
打印流与System.setout();
LeetCode522-最长特殊序列II-哈希表-字符串-双指针
Simple read / write verification of qdatastream
轻松上手Fluentd,结合 Rainbond 插件市场,日志收集更快捷
STM32 expansion board digital tube display
Neural network convolution layer
【暑期每日一题】洛谷 P5740【深基7.例9】最厉害的学生
Usage and principle of synchronized
Spanner 论文小结