当前位置:网站首页>多线程案例——阻塞式队列
多线程案例——阻塞式队列
2022-08-01 12:46:00 【Living_Amethyst】
本篇文章讲给大家带来有关 阻塞式队列 的有关知识
什么是阻塞式队列
阻塞队列是一种特殊的队列. 也遵守 "先进先出" 的原则.
阻塞队列是一种线程安全的数据结构, 并且具有以下特性:
- 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
- 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素
阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.
生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取
- 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
- 比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,
- 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程).
- 这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.
- 这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮
一张图帮助理解
而当我们应用了阻塞队列之后
这时即使由很大规模的用户同时访问A,那么压力也不会给到B上
多出来的压力就有阻塞队列承担了,只要把数据在队列中多存放一会就可以了
- 阻塞队列也能使生产者和消费者之间 解耦
- 比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包.
- 擀饺子皮的人就是 “生产者”, 包饺子的人就是 “消费者”.
- 擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包),
- 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的)
我们仍然是用一张图来帮助我们理解
如果是A直接给B发送数据,那么就是 耦合性比较强
在开发A的代码时需要考虑B是如何接收的
在开发B的代码时需要考虑A是如何发送的
此时要是加入了C,就需要修改A
而且如果A挂了,B很可能也要出问题
B挂了,A也可能出问题
我们现在再加入阻塞队列
- AB不再直接交互
- 开发阶段: A只考虑自己和队列如何交互, B只考虑自己和队列如何交互. A B之间甚至不需要知道对方的存在
- 部署阶段,A和B中有一个挂了,对另一个也不影响
- 此时加入C,A也不需要做任何调整
从上面两点我们可以看出应用了阻塞队列的生产者消费者模型有很大的作用
标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可
BlockingQueue是一个接口. 真正实现的类是LinkedBlockingQueue.put方法用于阻塞式的入队列,take用于阻塞式的出队列.BlockingQueue也有offer, poll, peek等方法, 但是这些方法不带有阻塞特性
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
生产者消费者模型
public class Demo15 {
public static void main(String[] args) {
BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();
Thread customer = new Thread(()->{
while(true){
try {
int val = queue.take();
System.out.println("消费元素:"+val);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
Thread producer = new Thread(()->{
int n = 0;
while(true){
try {
System.out.println("生产元素:"+n);
queue.put(n);
n++;
Thread.sleep(500); //每生产一个元素就休眠一会
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
}
}
阻塞队列模拟实现
这是一个循环队列
我们用一个数组来实现

我们先看看普通的队列怎么写
// 模拟实现一个阻塞队列
// 基于数组的方式实现
// 提供两个核心方法:
// 1.put入队列 2.take出队列
class MyBlockQueue {
// 假定最大1000个元素
private int[] items = new int[1000];
// 队首的位置
private int head = 0;
// 队尾的位置
private int tail = 0;
// 队列的元素个数
private int size = 0;
//入队列
private void put(int value){
synchronized (this){
if(size == items.length){
//队列已满 无法插入
return;
}
items[tail] = value;
tail++;
if(tail == items.length){
// 如果 tail 达到数组末尾,就需要从头开始
tail = 0;
}
size++;
}
}
//出队列
public Integer take(){
int ret = 0;
synchronized (this){
if(size == 0){
//队列为空 无法出队列
return null;
}
ret = items[head];
head++;
if(head == items.length){
head=0;
}
size--;
}
return ret;
}
}
我们要写的是阻塞队列
阻塞队列的两个特点:
- 线程安全(我们可以通过加锁方式实现)
- 阻塞(用wait):队列为空就阻塞不为空时唤醒,队列满时阻塞不满时唤醒
// 模拟实现一个阻塞队列
// 基于数组的方式实现
// 提供两个核心方法:
// 1.put入队列 2.take出队列
class MyBlockingQueue {
// 假定最大1000个元素
private int[] items = new int[1000];
// 队首的位置
private int head = 0;
// 队尾的位置
private int tail = 0;
// 队列的元素个数
volatile private int size = 0;
//入队列
public void put(int value) throws InterruptedException {
synchronized (this){
while(size == items.length){
//队列已满 无法插入
this.wait();
}
items[tail] = value;
tail++;
if(tail == items.length){
// 如果 tail 达到数组末尾,就需要从头开始
tail = 0;
}
size++;
//即使没人在等待 多调用几次 notify 也没啥副作用
this.notify(); //当队列不空的时候 就唤醒
}
}
//出队列
public Integer take() throws InterruptedException {
int ret = 0;
synchronized (this){
while (size == 0){
//队列为空 ,就等待
this.wait();
}
ret = items[head];
head++;
if(head == items.length){
head=0;
}
size--;
this.notify();//当队列不满的时候,就唤醒
}
return ret;
}
}
然后基于这个阻塞队列写一个生产者消费者模型
// 模拟实现一个阻塞队列
// 基于数组的方式实现
// 提供两个核心方法:
// 1.put入队列 2.take出队列
class MyBlockingQueue {
// 假定最大1000个元素
private int[] items = new int[1000];
// 队首的位置
private int head = 0;
// 队尾的位置
private int tail = 0;
// 队列的元素个数
volatile private int size = 0;
//入队列
public void put(int value) throws InterruptedException {
synchronized (this){
while(size == items.length){
//队列已满 无法插入
this.wait();
}
items[tail] = value;
tail++;
if(tail == items.length){
// 如果 tail 达到数组末尾,就需要从头开始
tail = 0;
}
size++;
//即使没人在等待 多调用几次 notify 也没啥副作用
this.notify(); //当队列不空的时候 就唤醒
}
}
//出队列
public Integer take() throws InterruptedException {
int ret = 0;
synchronized (this){
while (size == 0){
//队列为空 ,就等待
this.wait();
}
ret = items[head];
head++;
if(head == items.length){
head=0;
}
size--;
this.notify();//当队列不满的时候,就唤醒
}
return ret;
}
}
public class Demo16 {
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue queue = new MyBlockingQueue();
Thread customer = new Thread(()->{
while(true){
try {
int value = queue.take();
System.out.println("消费:"+value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) ;
customer.start();
Thread producer = new Thread(()->{
int value = 0;
while(true){
try {
queue.put(value);
System.out.println("生产:"+value);
value++;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
}
}
边栏推荐
- MySQL调优
- pandas connects to the oracle database and pulls the data in the table into the dataframe, filters all the data from the current time (sysdate) to one hour ago (filters the range data of one hour)
- 安装apex报错
- 小程序插件如何帮助开发者受益?
- 【StoneDB Class】Introduction Lesson 2: Analysis of the Overall Architecture of StoneDB
- formatdatetime函数 mysql(date sub函数)
- 2022 Go ecosystem rpc framework Benchmark
- 大中型网站列表页翻页过多怎么优化?
- Envoy source code flow chart
- sql中ddl和dml(数据库表与视图的区别)
猜你喜欢

MySQL调优

10年稳定性保障经验总结,故障复盘要回答哪三大关键问题?|TakinTalks大咖分享

Favorites|Mechanical Engineer Interview Frequently Asked Questions

Audio and Video Technology Development Weekly | 256

那些利用假期学习的职场人,后来都怎么样了?

ECCV22|只能11%的参数就能优于Swin,微软提出快速预训练蒸馏方法TinyViT

MarkDown公式指导手册

bpmn-process-designer基础上进行自定义样式(工具、元素、菜单)

实现集中式身份认证管理的案例
![[5 days countdown] to explore the secret behind the great quality promotion, gift waiting for you to take of $one thousand](/img/de/1e6069e84183d1400c90a6ec574f72.png)
[5 days countdown] to explore the secret behind the great quality promotion, gift waiting for you to take of $one thousand
随机推荐
Dapr 与 NestJs ,实战编写一个 Pub & Sub 装饰器
SQL函数 STR
【讲座分享】“营收“看金融
【面试高频题】难度 1.5/5,二分经典运用题
Meshlab&Open3D SOR滤波
Istio Meetup China: Full Stack Service Mesh - Aeraki Helps You Manage Any Layer 7 Traffic in an Istio Service Mesh
markdown常用数学符号cov(markdown求和符号)
2022 Go生态圈 rpc 框架 Benchmark
How to get the address of WeChat video account (link address of WeChat public account)
态路小课堂丨浅谈优质光模块需要具备的条件!
Aeraki Mesh 正式成为 CNCF 沙箱项目
为什么最大值加一等于最小值
[Open class preview]: Research and application of super-resolution technology in the field of video image quality enhancement
Aeraki Mesh became CNCF sandbox project
数据湖 delta lake和spark版本对应关系
初级必备:单例模式的7个问题
一文带你彻底厘清 Isito 中的证书工作机制
CAN通信标准帧和扩展帧介绍
CloudCompare & PCL ICP registration (point to face)
Envoy 源码流程图