当前位置:网站首页>多线程模型下的生产者消费者模式
多线程模型下的生产者消费者模式
2022-06-12 21:32:00 【NGC73】
多线程模型下的生产者消费者模式
多线程生产者消费者模式
- 不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
代码示例
1、生产者
package pro2cus;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
/** * @author tjj */
@Slf4j
public class NumberProducer implements Runnable{
private final int poisonPill;
private final int poisonPillPerProducer;
private BlockingQueue<Integer> numbersQueue;
public NumberProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
this.numbersQueue = numbersQueue;
}
@Override
public void run() {
try{
generateNumbers();
}catch (InterruptedException e){
Thread.currentThread ().interrupt ();
}
}
/** * generateNumbers() 将 100 个元素(生产100副药给武大郎吃)放入队列中。 * @throws InterruptedException */
private void generateNumbers() throws InterruptedException {
//泡药的消息
for (int i = 0; i < 100; i++) {
numbersQueue.put (ThreadLocalRandom.current ().nextInt (100));
log.info ("潘金莲-{}号,给武大郎的泡药!", Thread.currentThread ().getId ());
}
//毒 ( poison )丸( pill )(潘金莲给武大郎下毒)消息,以便知道在执行完成时放入队列的消息类型。
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put (poisonPill);
log.info ("潘金莲-{}号,往武大郎的药里放入第{}颗毒丸!", Thread.currentThread ().getId (), j + 1);
}
}
}
2、消费者
package pro2cus;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
/** * 每个消费者将使用 take() 方法从 BlockingQueue 获取一个元素,因此它将阻塞,直到队列中有一个元素 * @author tjj */
@Slf4j
public class NumberConsumer implements Runnable{
private final int poisonPill;
private BlockingQueue<Integer> queue;
public NumberConsumer(int poisonPill, BlockingQueue<Integer> queue) {
this.poisonPill = poisonPill;
this.queue = queue;
}
@Override
public void run() {
try{
while (true) {
Integer number = queue.take();
if (number.equals(poisonPill)) {
return;
}
log.info ("武大郎-{}号,喝药-编号:{}", Thread.currentThread ().getId (), number);
}
}catch (InterruptedException e){
Thread.currentThread ().interrupt ();
}
}
}
3、队列的使用
package pro2cus;
import java.util.ArrayList;
import java.util.concurrent.*;
/** * 需要注意的重要事项是队列的使用 * @author tjj */
public class Main {
//与生成器构造函数中的相同,队列作为参数传递。
//我们可以这样做,是因为 BlockingQueue 可以在线程之间共享而无需任何显式同步。
//既然我们有生产者和消费者,我们就可以开始我们的计划。我们需要定义队列的容量,并将其设置为 10个元素。
//我们创建4 个生产者线程,并且创建等于可用处理器数量的消费者线程:
public static void main(String[] args) {
int BOUND = 10;
int N_PRODUCERS = 16;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
//创建线程池进行管理
ThreadPoolExecutor customerExecutorService = new ThreadPoolExecutor(N_CONSUMERS,
16,
10L,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(BOUND),
new ThreadPoolExecutor.AbortPolicy()
);
// 设置拒绝策略
customerExecutorService.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 空闲队列存活时间
customerExecutorService.setKeepAliveTime(20,TimeUnit.SECONDS);
ArrayList<Object> list = new ArrayList<>(2000);
//潘金莲给武大郎熬药
for (int i = 1; i < N_PRODUCERS; i++) {
customerExecutorService.execute(()->{
new Thread (new NumberProducer(queue, poisonPill, poisonPillPerProducer)).start ();
});
}
//武大郎开始喝药
for (int j = 0; j < N_CONSUMERS; j++) {
customerExecutorService.execute(()->{
new Thread (new NumberConsumer(poisonPill, queue)).start ();
});
}
//潘金莲开始投毒,武大郎喝完毒药GG
customerExecutorService.execute(()->{
new Thread (new NumberProducer (queue, poisonPill, poisonPillPerProducer + mod)).start ();
});
}
}
佛系笔记:
边栏推荐
猜你喜欢
Image processing 12- image linear blending
leetcode:210. Schedule II
The salted fish has been transmitted for 5W times, and the latest spring recruit face-to-face test questions of bytes have been leaked
Teamwork collaboration application experience sharing | community essay solicitation
torch. nn. Linear() function
ICML2022 | GALAXY:極化圖主動學習
利用ADG Standby克隆PDB
SQL调优指南笔记18:Analyzing Statistics Using Optimizer Statistics Advisor
风控建模十:传统建模方法存在的问题探讨及改进方法探索
#113 Path Sum II
随机推荐
NPOI 创建Word
Oracle 19C installation documentation
NIO使用指南
Icml2022 | Galaxy: apprentissage actif des cartes de polarisation
Image processing 12- image linear blending
Simple understanding of cap and base theory
求解一维数组前缀和
Mxnet record IO details
Leetcode: 210. Programme II
SQL调优指南笔记11:Histograms
Digraph deep copy
gzip压缩解压缩
Zip压缩解压缩
Gather function in pytorch_
Solve one-dimensional array prefix sum
#886 Possible Bipartition
Vs2017 environmental issues
Shell script Basics
Test basis: unit test
Delphi XE7的蓝牙 Bluetooth