当前位置:网站首页>Multi-threaded cases - blocking queue
Multi-threaded cases - blocking queue
2022-08-01 13:04:00 【Living_Amethyst】
This article tells you about it 阻塞式队列 的有关知识
什么是阻塞式队列
阻塞队列是一种特殊的队列. 也遵守 "先进先出" 的原则.
阻塞队列是一种线程安全的数据结构, 并且具有以下特性:
- 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
- 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素
阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.
生产者消费者模型
The producer-consumer pattern is through one容器来解决生产者和消费者的强耦合问题.
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取
- 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
- 比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,
- 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程).
- 这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.
- 这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮
A diagram to help understand
And when we applied the blocking queue
At this time even by a very large user access at the same timeA,Then there will be no pressureB上
There is extra pressure阻塞队列承担了,Just keep the data in the queue for a while longer
- 阻塞队列也能使生产者和消费者之间 解耦
- 比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包.
- 擀饺子皮的人就是 “生产者”, 包饺子的人就是 “消费者”.
- 擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包),
- 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的)
We still use a picture to help us understand
如果是A直接给B发送数据,那么就是 耦合性比较强
在开发Acode needs to be consideredB是如何接收的
在开发Bcode needs to be consideredA是如何发送的
If at this time加入了C,就需要修改A
而且如果A挂了,BThere is likely to be a problem too
B挂了,A也可能出问题
Let's join the blocking queue now
- ABNo more direct interaction
- 开发阶段: AJust think about how you interact with the queue, BJust think about how you interact with the queue. A BThey don't even need to know each other's existence
- 部署阶段,A和BOne of them hung up,Does not affect the other
- 此时加入C,ANo adjustments are required either
From the above two points, we can see that the producer-consumer model that applies the blocking queue has a great effect
标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可
BlockingQueue是一个接口. 真正实现的类是LinkedBlockingQueue.put方法用于阻塞式的入队列,take用于阻塞式的出队列.BlockingQueue也有offer, poll, peek等方法, 但是这些方法不带有阻塞特性
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
生产者消费者模型
public class Demo15 {
public static void main(String[] args) {
BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();
Thread customer = new Thread(()->{
while(true){
try {
int val = queue.take();
System.out.println("消费元素:"+val);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
Thread producer = new Thread(()->{
int n = 0;
while(true){
try {
System.out.println("生产元素:"+n);
queue.put(n);
n++;
Thread.sleep(500); //Sleeps for a while each time an element is produced
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
}
}
阻塞队列模拟实现
这是一个循环队列
We do this with an array

Let's first look at how to write a normal queue
// Simulates implementing a blocking queue
// Array-based implementation
// Provides two core methods:
// 1.put入队列 2.take出队列
class MyBlockQueue {
// Assumed max1000个元素
private int[] items = new int[1000];
// The position of the team leader
private int head = 0;
// 队尾的位置
private int tail = 0;
// 队列的元素个数
private int size = 0;
//入队列
private void put(int value){
synchronized (this){
if(size == items.length){
//队列已满 无法插入
return;
}
items[tail] = value;
tail++;
if(tail == items.length){
// 如果 tail reached the end of the array,就需要从头开始
tail = 0;
}
size++;
}
}
//出队列
public Integer take(){
int ret = 0;
synchronized (this){
if(size == 0){
//队列为空 无法出队列
return null;
}
ret = items[head];
head++;
if(head == items.length){
head=0;
}
size--;
}
return ret;
}
}
What we're going to write is a blocking queue
Two characteristics of blocking queues:
- 线程安全(我们可以通过加锁方式实现)
- 阻塞(用wait):If the queue is empty, it will block and wake up when it is not empty,Wake up when the queue is full and the block is not full
// Simulates implementing a blocking queue
// Array-based implementation
// Provides two core methods:
// 1.put入队列 2.take出队列
class MyBlockingQueue {
// Assumed max1000个元素
private int[] items = new int[1000];
// The position of the team leader
private int head = 0;
// 队尾的位置
private int tail = 0;
// 队列的元素个数
volatile private int size = 0;
//入队列
public void put(int value) throws InterruptedException {
synchronized (this){
while(size == items.length){
//队列已满 无法插入
this.wait();
}
items[tail] = value;
tail++;
if(tail == items.length){
// 如果 tail reached the end of the array,就需要从头开始
tail = 0;
}
size++;
//Even if no one is waiting 多调用几次 notify There are no side effects
this.notify(); //当队列不空的时候 就唤醒
}
}
//出队列
public Integer take() throws InterruptedException {
int ret = 0;
synchronized (this){
while (size == 0){
//队列为空 ,就等待
this.wait();
}
ret = items[head];
head++;
if(head == items.length){
head=0;
}
size--;
this.notify();//当队列不满的时候,就唤醒
}
return ret;
}
}
Then write a producer-consumer model based on this blocking queue
// Simulates implementing a blocking queue
// Array-based implementation
// Provides two core methods:
// 1.put入队列 2.take出队列
class MyBlockingQueue {
// Assumed max1000个元素
private int[] items = new int[1000];
// The position of the team leader
private int head = 0;
// 队尾的位置
private int tail = 0;
// 队列的元素个数
volatile private int size = 0;
//入队列
public void put(int value) throws InterruptedException {
synchronized (this){
while(size == items.length){
//队列已满 无法插入
this.wait();
}
items[tail] = value;
tail++;
if(tail == items.length){
// 如果 tail reached the end of the array,就需要从头开始
tail = 0;
}
size++;
//Even if no one is waiting 多调用几次 notify There are no side effects
this.notify(); //当队列不空的时候 就唤醒
}
}
//出队列
public Integer take() throws InterruptedException {
int ret = 0;
synchronized (this){
while (size == 0){
//队列为空 ,就等待
this.wait();
}
ret = items[head];
head++;
if(head == items.length){
head=0;
}
size--;
this.notify();//当队列不满的时候,就唤醒
}
return ret;
}
}
public class Demo16 {
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue queue = new MyBlockingQueue();
Thread customer = new Thread(()->{
while(true){
try {
int value = queue.take();
System.out.println("消费:"+value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) ;
customer.start();
Thread producer = new Thread(()->{
int value = 0;
while(true){
try {
queue.put(value);
System.out.println("生产:"+value);
value++;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
}
}
边栏推荐
- 《MySQL核心知识》第6章:查询语句
- 快速理解拉格朗日乘子法
- This article will take you to thoroughly clarify the working mechanism of certificates in Isito
- tensorflow2.0 handwritten digit recognition (tensorflow handwriting recognition)
- 初级必备:单例模式的7个问题
- Meshlab & Open3D SOR filtering
- 动态库、静态库浅析
- Feign 从注册到调用原理分析
- 四足机器人软件架构现状分析
- CloudCompare & PCL ICP registration (point to face)
猜你喜欢

Efficiency tools to let programmers get off work earlier

Apex installation error

态路小课堂丨浅谈优质光模块需要具备的条件!

8. How does the SAP ABAP OData service support the Create operation

小程序插件如何帮助开发者受益?

安全又省钱,“15岁”老小区用上管道燃气

华盛顿大学、Allen AI 等联合 | RealTime QA: What's the Answer Right Now?(实时 QA:现在的答案是什么?)

找出相同属性值的对象 累加数量 汇总

力扣160题,相交链表

8. SAP ABAP OData 服务如何支持创建(Create)操作
随机推荐
AI目标分割能力,无需绿幕即可实现快速视频抠图
SQL函数 STR
消息中间件解析 | 如何正确理解软件应用系统中关于系统通信的那些事?
AI目标分割能力,无需绿幕即可实现快速视频抠图
MMF的初步介绍:一个规范化的视觉-语言多模态任务框架
SQL function SQUARE
Aeraki Mesh Joins CNCF Cloud Native Panorama
计算器:中缀表达式转后缀表达式
HMS Core音频编辑服务音源分离与空间音频渲染,助力快速进入3D音频的世界
shell 中的 分发系统 expect脚本 (传递参数、自动同步文件、指定host和要传输的文件、(构建文件分发系统)(命令批量执行))
pandas connects to the oracle database and pulls the data in the table into the dataframe, filters all the data from the current time (sysdate) to one hour ago (filters the range data of one hour)
多线程案例——阻塞式队列
关于亚马逊测评,你了解多少?
Simulation implementation of new of Js handwritten function
tensorflow2.0手写数字识别(tensorflow手写体识别)
一文带你彻底厘清 Isito 中的证书工作机制
动态库、静态库浅析
Js手写函数之new的模拟实现
什么是一致性哈希?可以应用在哪些场景?
深入解析volatile关键字