当前位置:网站首页>多线程模型下的生产者消费者模式
多线程模型下的生产者消费者模式
2022-06-12 21:32:00 【NGC73】
多线程模型下的生产者消费者模式
多线程生产者消费者模式
- 不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式

代码示例
1、生产者
package pro2cus;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
/** * @author tjj */
@Slf4j
public class NumberProducer implements Runnable{
private final int poisonPill;
private final int poisonPillPerProducer;
private BlockingQueue<Integer> numbersQueue;
public NumberProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
this.numbersQueue = numbersQueue;
}
@Override
public void run() {
try{
generateNumbers();
}catch (InterruptedException e){
Thread.currentThread ().interrupt ();
}
}
/** * generateNumbers() 将 100 个元素(生产100副药给武大郎吃)放入队列中。 * @throws InterruptedException */
private void generateNumbers() throws InterruptedException {
//泡药的消息
for (int i = 0; i < 100; i++) {
numbersQueue.put (ThreadLocalRandom.current ().nextInt (100));
log.info ("潘金莲-{}号,给武大郎的泡药!", Thread.currentThread ().getId ());
}
//毒 ( poison )丸( pill )(潘金莲给武大郎下毒)消息,以便知道在执行完成时放入队列的消息类型。
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put (poisonPill);
log.info ("潘金莲-{}号,往武大郎的药里放入第{}颗毒丸!", Thread.currentThread ().getId (), j + 1);
}
}
}
2、消费者
package pro2cus;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
/** * 每个消费者将使用 take() 方法从 BlockingQueue 获取一个元素,因此它将阻塞,直到队列中有一个元素 * @author tjj */
@Slf4j
public class NumberConsumer implements Runnable{
private final int poisonPill;
private BlockingQueue<Integer> queue;
public NumberConsumer(int poisonPill, BlockingQueue<Integer> queue) {
this.poisonPill = poisonPill;
this.queue = queue;
}
@Override
public void run() {
try{
while (true) {
Integer number = queue.take();
if (number.equals(poisonPill)) {
return;
}
log.info ("武大郎-{}号,喝药-编号:{}", Thread.currentThread ().getId (), number);
}
}catch (InterruptedException e){
Thread.currentThread ().interrupt ();
}
}
}
3、队列的使用
package pro2cus;
import java.util.ArrayList;
import java.util.concurrent.*;
/** * 需要注意的重要事项是队列的使用 * @author tjj */
public class Main {
//与生成器构造函数中的相同,队列作为参数传递。
//我们可以这样做,是因为 BlockingQueue 可以在线程之间共享而无需任何显式同步。
//既然我们有生产者和消费者,我们就可以开始我们的计划。我们需要定义队列的容量,并将其设置为 10个元素。
//我们创建4 个生产者线程,并且创建等于可用处理器数量的消费者线程:
public static void main(String[] args) {
int BOUND = 10;
int N_PRODUCERS = 16;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
//创建线程池进行管理
ThreadPoolExecutor customerExecutorService = new ThreadPoolExecutor(N_CONSUMERS,
16,
10L,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(BOUND),
new ThreadPoolExecutor.AbortPolicy()
);
// 设置拒绝策略
customerExecutorService.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 空闲队列存活时间
customerExecutorService.setKeepAliveTime(20,TimeUnit.SECONDS);
ArrayList<Object> list = new ArrayList<>(2000);
//潘金莲给武大郎熬药
for (int i = 1; i < N_PRODUCERS; i++) {
customerExecutorService.execute(()->{
new Thread (new NumberProducer(queue, poisonPill, poisonPillPerProducer)).start ();
});
}
//武大郎开始喝药
for (int j = 0; j < N_CONSUMERS; j++) {
customerExecutorService.execute(()->{
new Thread (new NumberConsumer(poisonPill, queue)).start ();
});
}
//潘金莲开始投毒,武大郎喝完毒药GG
customerExecutorService.execute(()->{
new Thread (new NumberProducer (queue, poisonPill, poisonPillPerProducer + mod)).start ();
});
}
}
佛系笔记:
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YumyILTn-1654071116204)(C:\Users\25060\AppData\Roaming\Typora\typora-user-images\image-20220601103019125.png)]](/img/91/a802e35f42e80373b93ab197c9dbe5.png)
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vOh7ASRp-1654071116204)(C:\Users\25060\AppData\Roaming\Typora\typora-user-images\image-20220601104515587.png)]](/img/34/813b703714fd62e17eb210394c5b09.png)
边栏推荐
- 如何自己动手写一个vscode插件,实现插件自由!
- Risk control modeling X: Discussion on problems existing in traditional modeling methods and Exploration on improvement methods
- 结构体知识点all in
- 测试基础之:单元测试
- Data batch writing
- Module 8: Design message queue MySQL table for storing message data
- Binary search
- 插入排序
- Graphics2d class basic use
- Select sort
猜你喜欢
随机推荐
求解一维数组前缀和
[target detection] |dive detector into box for object detection new training method based on fcos
A high-value MySQL management tool
ZGC concurrent identity and multi view address mapping in concurrent transition phase
#886 Possible Bipartition
SQL调优指南笔记15:Controlling the Use of Optimizer Statistics
C language learning notes (II)
ORM implements the mapping relationship between classes and tables, class attributes and fields
ORM 实现类与表,类属性与字段的映射关系
Cv2.lut() (populates the output array with values from the lookup table)
Yanghui triangle code implementation
Delphi XE7的蓝牙 Bluetooth
Rearrangement exercises
实现从字符串中删除某个字符操作
在同花顺开户安全么,买股票怎么网上开户
linux备份mysql
jsonUtils
大一下学年学期总结
Can flush open an account? Can you directly open the security of securities companies on the app
zgc 并发标识和并发转移阶段的多视图地址映射







