当前位置:网站首页>Multi-threaded cases - blocking queue
Multi-threaded cases - blocking queue
2022-08-01 13:04:00 【Living_Amethyst】
This article tells you about it 阻塞式队列 的有关知识
什么是阻塞式队列
阻塞队列是一种特殊的队列. 也遵守 "先进先出" 的原则.
阻塞队列是一种线程安全的数据结构, 并且具有以下特性:
- 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
- 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素
阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.
生产者消费者模型
The producer-consumer pattern is through one容器来解决生产者和消费者的强耦合问题.
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取
- 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
- 比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,
- 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程).
- 这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.
- 这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮
A diagram to help understand
And when we applied the blocking queue
At this time even by a very large user access at the same timeA,Then there will be no pressureB上
There is extra pressure阻塞队列承担了,Just keep the data in the queue for a while longer
- 阻塞队列也能使生产者和消费者之间 解耦
- 比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包.
- 擀饺子皮的人就是 “生产者”, 包饺子的人就是 “消费者”.
- 擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包),
- 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的)
We still use a picture to help us understand
如果是A直接给B发送数据,那么就是 耦合性比较强
在开发Acode needs to be consideredB是如何接收的
在开发Bcode needs to be consideredA是如何发送的
If at this time加入了C,就需要修改A
而且如果A挂了,BThere is likely to be a problem too
B挂了,A也可能出问题
Let's join the blocking queue now
- ABNo more direct interaction
- 开发阶段: AJust think about how you interact with the queue, BJust think about how you interact with the queue. A BThey don't even need to know each other's existence
- 部署阶段,A和BOne of them hung up,Does not affect the other
- 此时加入C,ANo adjustments are required either
From the above two points, we can see that the producer-consumer model that applies the blocking queue has a great effect
标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可
BlockingQueue是一个接口. 真正实现的类是LinkedBlockingQueue.put方法用于阻塞式的入队列,take用于阻塞式的出队列.BlockingQueue也有offer, poll, peek等方法, 但是这些方法不带有阻塞特性
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
生产者消费者模型
public class Demo15 {
public static void main(String[] args) {
BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();
Thread customer = new Thread(()->{
while(true){
try {
int val = queue.take();
System.out.println("消费元素:"+val);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
Thread producer = new Thread(()->{
int n = 0;
while(true){
try {
System.out.println("生产元素:"+n);
queue.put(n);
n++;
Thread.sleep(500); //Sleeps for a while each time an element is produced
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
}
}
阻塞队列模拟实现
这是一个循环队列
We do this with an array

Let's first look at how to write a normal queue
// Simulates implementing a blocking queue
// Array-based implementation
// Provides two core methods:
// 1.put入队列 2.take出队列
class MyBlockQueue {
// Assumed max1000个元素
private int[] items = new int[1000];
// The position of the team leader
private int head = 0;
// 队尾的位置
private int tail = 0;
// 队列的元素个数
private int size = 0;
//入队列
private void put(int value){
synchronized (this){
if(size == items.length){
//队列已满 无法插入
return;
}
items[tail] = value;
tail++;
if(tail == items.length){
// 如果 tail reached the end of the array,就需要从头开始
tail = 0;
}
size++;
}
}
//出队列
public Integer take(){
int ret = 0;
synchronized (this){
if(size == 0){
//队列为空 无法出队列
return null;
}
ret = items[head];
head++;
if(head == items.length){
head=0;
}
size--;
}
return ret;
}
}
What we're going to write is a blocking queue
Two characteristics of blocking queues:
- 线程安全(我们可以通过加锁方式实现)
- 阻塞(用wait):If the queue is empty, it will block and wake up when it is not empty,Wake up when the queue is full and the block is not full
// Simulates implementing a blocking queue
// Array-based implementation
// Provides two core methods:
// 1.put入队列 2.take出队列
class MyBlockingQueue {
// Assumed max1000个元素
private int[] items = new int[1000];
// The position of the team leader
private int head = 0;
// 队尾的位置
private int tail = 0;
// 队列的元素个数
volatile private int size = 0;
//入队列
public void put(int value) throws InterruptedException {
synchronized (this){
while(size == items.length){
//队列已满 无法插入
this.wait();
}
items[tail] = value;
tail++;
if(tail == items.length){
// 如果 tail reached the end of the array,就需要从头开始
tail = 0;
}
size++;
//Even if no one is waiting 多调用几次 notify There are no side effects
this.notify(); //当队列不空的时候 就唤醒
}
}
//出队列
public Integer take() throws InterruptedException {
int ret = 0;
synchronized (this){
while (size == 0){
//队列为空 ,就等待
this.wait();
}
ret = items[head];
head++;
if(head == items.length){
head=0;
}
size--;
this.notify();//当队列不满的时候,就唤醒
}
return ret;
}
}
Then write a producer-consumer model based on this blocking queue
// Simulates implementing a blocking queue
// Array-based implementation
// Provides two core methods:
// 1.put入队列 2.take出队列
class MyBlockingQueue {
// Assumed max1000个元素
private int[] items = new int[1000];
// The position of the team leader
private int head = 0;
// 队尾的位置
private int tail = 0;
// 队列的元素个数
volatile private int size = 0;
//入队列
public void put(int value) throws InterruptedException {
synchronized (this){
while(size == items.length){
//队列已满 无法插入
this.wait();
}
items[tail] = value;
tail++;
if(tail == items.length){
// 如果 tail reached the end of the array,就需要从头开始
tail = 0;
}
size++;
//Even if no one is waiting 多调用几次 notify There are no side effects
this.notify(); //当队列不空的时候 就唤醒
}
}
//出队列
public Integer take() throws InterruptedException {
int ret = 0;
synchronized (this){
while (size == 0){
//队列为空 ,就等待
this.wait();
}
ret = items[head];
head++;
if(head == items.length){
head=0;
}
size--;
this.notify();//当队列不满的时候,就唤醒
}
return ret;
}
}
public class Demo16 {
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue queue = new MyBlockingQueue();
Thread customer = new Thread(()->{
while(true){
try {
int value = queue.take();
System.out.println("消费:"+value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) ;
customer.start();
Thread producer = new Thread(()->{
int value = 0;
while(true){
try {
queue.put(value);
System.out.println("生产:"+value);
value++;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
}
}
边栏推荐
- 初级必备:单例模式的7个问题
- 线上问题排查常用命令,总结太全了,建议收藏!!
- Beyond Compare 4 trial period expires
- Fault 007: The dexp derivative is inexplicably interrupted
- Do wildcard SSL certificates not support multiple domains?
- formatdatetime函数 mysql(date sub函数)
- Software designer test center summary (interior designer personal summary)
- 一文带你读懂云原生、微服务与高可用
- PyTorch 进阶之路:在 GPU 上训练深度神经网络
- SQL函数 SQUARE
猜你喜欢
随机推荐
ddl and dml in sql (the difference between database table and view)
Feign 从注册到调用原理分析
MySQL调优
Data Mining-04
34、树莓派进行人体姿态检测并进行语音播报
【讲座分享】“营收“看金融
Find objects with the same property value Cumulative number Summarize
高仿项目协作工具【Worktile】,从零带你一步步实现组织架构、网盘、消息、项目、审批等功能
六石编程学:问题要面对,办法要技巧,做不好的功能要想办法
芝加哥丰田技术学院 | Leveraging Natural Supervision for Language Representation Learning and Generation(利用自然监督进行语言表示学习和生成)
华盛顿大学、Allen AI 等联合 | RealTime QA: What's the Answer Right Now?(实时 QA:现在的答案是什么?)
How does the SAP ABAP OData service support the Create operation trial version
[Unity3D Plugin] AVPro Video Plugin Share "Video Player Plugin"
CloudCompare&PCL ICP配准(点到面)
字体反爬之好租
windows IDEA + PHP+xdebug 断点调试
Deep understanding of Istio - advanced practice of cloud native service mesh
How to integrate 3rd party service center registration into Istio?
如何使用OpenCV测量图像中物体之间的距离
ECCV22|只能11%的参数就能优于Swin,微软提出快速预训练蒸馏方法TinyViT









