当前位置:网站首页>多线程模型下的生产者消费者模式
多线程模型下的生产者消费者模式
2022-06-12 21:32:00 【NGC73】
多线程模型下的生产者消费者模式
多线程生产者消费者模式
- 不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式

代码示例
1、生产者
package pro2cus;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
/** * @author tjj */
@Slf4j
public class NumberProducer implements Runnable{
private final int poisonPill;
private final int poisonPillPerProducer;
private BlockingQueue<Integer> numbersQueue;
public NumberProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
this.numbersQueue = numbersQueue;
}
@Override
public void run() {
try{
generateNumbers();
}catch (InterruptedException e){
Thread.currentThread ().interrupt ();
}
}
/** * generateNumbers() 将 100 个元素(生产100副药给武大郎吃)放入队列中。 * @throws InterruptedException */
private void generateNumbers() throws InterruptedException {
//泡药的消息
for (int i = 0; i < 100; i++) {
numbersQueue.put (ThreadLocalRandom.current ().nextInt (100));
log.info ("潘金莲-{}号,给武大郎的泡药!", Thread.currentThread ().getId ());
}
//毒 ( poison )丸( pill )(潘金莲给武大郎下毒)消息,以便知道在执行完成时放入队列的消息类型。
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put (poisonPill);
log.info ("潘金莲-{}号,往武大郎的药里放入第{}颗毒丸!", Thread.currentThread ().getId (), j + 1);
}
}
}
2、消费者
package pro2cus;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
/** * 每个消费者将使用 take() 方法从 BlockingQueue 获取一个元素,因此它将阻塞,直到队列中有一个元素 * @author tjj */
@Slf4j
public class NumberConsumer implements Runnable{
private final int poisonPill;
private BlockingQueue<Integer> queue;
public NumberConsumer(int poisonPill, BlockingQueue<Integer> queue) {
this.poisonPill = poisonPill;
this.queue = queue;
}
@Override
public void run() {
try{
while (true) {
Integer number = queue.take();
if (number.equals(poisonPill)) {
return;
}
log.info ("武大郎-{}号,喝药-编号:{}", Thread.currentThread ().getId (), number);
}
}catch (InterruptedException e){
Thread.currentThread ().interrupt ();
}
}
}
3、队列的使用
package pro2cus;
import java.util.ArrayList;
import java.util.concurrent.*;
/** * 需要注意的重要事项是队列的使用 * @author tjj */
public class Main {
//与生成器构造函数中的相同,队列作为参数传递。
//我们可以这样做,是因为 BlockingQueue 可以在线程之间共享而无需任何显式同步。
//既然我们有生产者和消费者,我们就可以开始我们的计划。我们需要定义队列的容量,并将其设置为 10个元素。
//我们创建4 个生产者线程,并且创建等于可用处理器数量的消费者线程:
public static void main(String[] args) {
int BOUND = 10;
int N_PRODUCERS = 16;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
//创建线程池进行管理
ThreadPoolExecutor customerExecutorService = new ThreadPoolExecutor(N_CONSUMERS,
16,
10L,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(BOUND),
new ThreadPoolExecutor.AbortPolicy()
);
// 设置拒绝策略
customerExecutorService.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 空闲队列存活时间
customerExecutorService.setKeepAliveTime(20,TimeUnit.SECONDS);
ArrayList<Object> list = new ArrayList<>(2000);
//潘金莲给武大郎熬药
for (int i = 1; i < N_PRODUCERS; i++) {
customerExecutorService.execute(()->{
new Thread (new NumberProducer(queue, poisonPill, poisonPillPerProducer)).start ();
});
}
//武大郎开始喝药
for (int j = 0; j < N_CONSUMERS; j++) {
customerExecutorService.execute(()->{
new Thread (new NumberConsumer(poisonPill, queue)).start ();
});
}
//潘金莲开始投毒,武大郎喝完毒药GG
customerExecutorService.execute(()->{
new Thread (new NumberProducer (queue, poisonPill, poisonPillPerProducer + mod)).start ();
});
}
}
佛系笔记:
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YumyILTn-1654071116204)(C:\Users\25060\AppData\Roaming\Typora\typora-user-images\image-20220601103019125.png)]](/img/91/a802e35f42e80373b93ab197c9dbe5.png)
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vOh7ASRp-1654071116204)(C:\Users\25060\AppData\Roaming\Typora\typora-user-images\image-20220601104515587.png)]](/img/34/813b703714fd62e17eb210394c5b09.png)
边栏推荐
- Kdd2022 | graphmae: self supervised mask map self encoder
- ZGC concurrent identity and multi view address mapping in concurrent transition phase
- CUDA out of memory
- linux备份mysql
- Experiment 7-2-6 print Yanghui triangle (20 points)
- #981 Time Based Key-Value Store
- USB mechanical keyboard changed to Bluetooth Keyboard
- VagrantBox重新安装vboxsf驱动
- Recursively call knowledge points - including example solving binary search, frog jumping steps, reverse order output, factorial, Fibonacci, Hanoi tower.
- Icml2022 | Galaxy: apprentissage actif des cartes de polarisation
猜你喜欢

JUC并发工具包使用指南
![Fill in the checklist & lt; int&gt; Have default values? [repeat] - fill list & lt; int&gt; with default values? [duplicate]](/img/65/a214d137e230b1a1190feb03660f2c.jpg)
Fill in the checklist & lt; int&gt; Have default values? [repeat] - fill list & lt; int&gt; with default values? [duplicate]

#141 Linked List Cycle

Risk control modeling X: Discussion on problems existing in traditional modeling methods and Exploration on improvement methods

User guide for JUC concurrency Toolkit

ASCII code comparison table

测试基础之:单元测试

leetcode:210. Schedule II

Introduction to the characteristics of balancer decentralized exchange market capitalization robot

torch. nn. Linear() function
随机推荐
Libmysqlclient A static library
Rearrangement exercises
nn. PReLU(planes)
实现从字符串中删除某个字符操作
冒泡排序
递归调用知识点-包含例题求解二分查找、青蛙跳台阶、逆序输出、阶乘、斐波那契、汉诺塔。
The ifeq, filter and strip of makefile are easy to use
Insert sort
Can tonghuashun open an account? Can the security of securities companies be directly opened on the app? How to open an account for securities accounts
torch. clamp_ min_ method
User guide for JUC concurrency Toolkit
Oracle数据库中查询执行计划的权限
The service did not report any errors MySQL
#886 Possible Bipartition
Cookies and sessions
SQL调优指南笔记13:Gathering Optimizer Statistics
二分查找
ASCII code comparison table
Linux backup MySQL
USB机械键盘改蓝牙键盘