当前位置:网站首页>Multi-threaded cases - blocking queue
Multi-threaded cases - blocking queue
2022-08-01 13:04:00 【Living_Amethyst】
This article tells you about it 阻塞式队列
的有关知识
什么是阻塞式队列
阻塞队列是一种特殊的队列. 也遵守 "先进先出"
的原则.
阻塞队列是一种线程安全的数据结构, 并且具有以下特性:
- 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
- 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素
阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.
生产者消费者模型
The producer-consumer pattern is through one容器
来解决生产者和消费者的强耦合问题
.
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取
- 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
- 比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,
- 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程).
- 这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.
- 这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮
A diagram to help understand
And when we applied the blocking queue
At this time even by a very large user access at the same timeA,Then there will be no pressureB上
There is extra pressure阻塞队列
承担了,Just keep the data in the queue for a while longer
- 阻塞队列也能使生产者和消费者之间 解耦
- 比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包.
- 擀饺子皮的人就是 “生产者”, 包饺子的人就是 “消费者”.
- 擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包),
- 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的)
We still use a picture to help us understand
如果是A直接给B发送数据,那么就是 耦合性比较强
在开发Acode needs to be consideredB是如何接收的
在开发Bcode needs to be consideredA是如何发送的
If at this time加入了C
,就需要修改A
而且如果A挂了,BThere is likely to be a problem too
B挂了,A也可能出问题
Let's join the blocking queue now
- ABNo more direct interaction
- 开发阶段: AJust think about how you interact with the queue, BJust think about how you interact with the queue. A BThey don't even need to know each other's existence
- 部署阶段,A和BOne of them hung up,Does not affect the other
- 此时加入C,ANo adjustments are required either
From the above two points, we can see that the producer-consumer model that applies the blocking queue has a great effect
标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可
BlockingQueue
是一个接口. 真正实现的类是LinkedBlockingQueue.
put
方法用于阻塞式的入队列,take
用于阻塞式的出队列.BlockingQueue
也有offer, poll, peek
等方法, 但是这些方法不带有阻塞特性
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();
生产者消费者模型
public class Demo15 {
public static void main(String[] args) {
BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();
Thread customer = new Thread(()->{
while(true){
try {
int val = queue.take();
System.out.println("消费元素:"+val);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
Thread producer = new Thread(()->{
int n = 0;
while(true){
try {
System.out.println("生产元素:"+n);
queue.put(n);
n++;
Thread.sleep(500); //Sleeps for a while each time an element is produced
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
}
}
阻塞队列模拟实现
这是一个循环队列
We do this with an array
Let's first look at how to write a normal queue
// Simulates implementing a blocking queue
// Array-based implementation
// Provides two core methods:
// 1.put入队列 2.take出队列
class MyBlockQueue {
// Assumed max1000个元素
private int[] items = new int[1000];
// The position of the team leader
private int head = 0;
// 队尾的位置
private int tail = 0;
// 队列的元素个数
private int size = 0;
//入队列
private void put(int value){
synchronized (this){
if(size == items.length){
//队列已满 无法插入
return;
}
items[tail] = value;
tail++;
if(tail == items.length){
// 如果 tail reached the end of the array,就需要从头开始
tail = 0;
}
size++;
}
}
//出队列
public Integer take(){
int ret = 0;
synchronized (this){
if(size == 0){
//队列为空 无法出队列
return null;
}
ret = items[head];
head++;
if(head == items.length){
head=0;
}
size--;
}
return ret;
}
}
What we're going to write is a blocking queue
Two characteristics of blocking queues:
- 线程安全(我们可以通过加锁方式实现)
- 阻塞(用wait):If the queue is empty, it will block and wake up when it is not empty,Wake up when the queue is full and the block is not full
// Simulates implementing a blocking queue
// Array-based implementation
// Provides two core methods:
// 1.put入队列 2.take出队列
class MyBlockingQueue {
// Assumed max1000个元素
private int[] items = new int[1000];
// The position of the team leader
private int head = 0;
// 队尾的位置
private int tail = 0;
// 队列的元素个数
volatile private int size = 0;
//入队列
public void put(int value) throws InterruptedException {
synchronized (this){
while(size == items.length){
//队列已满 无法插入
this.wait();
}
items[tail] = value;
tail++;
if(tail == items.length){
// 如果 tail reached the end of the array,就需要从头开始
tail = 0;
}
size++;
//Even if no one is waiting 多调用几次 notify There are no side effects
this.notify(); //当队列不空的时候 就唤醒
}
}
//出队列
public Integer take() throws InterruptedException {
int ret = 0;
synchronized (this){
while (size == 0){
//队列为空 ,就等待
this.wait();
}
ret = items[head];
head++;
if(head == items.length){
head=0;
}
size--;
this.notify();//当队列不满的时候,就唤醒
}
return ret;
}
}
Then write a producer-consumer model based on this blocking queue
// Simulates implementing a blocking queue
// Array-based implementation
// Provides two core methods:
// 1.put入队列 2.take出队列
class MyBlockingQueue {
// Assumed max1000个元素
private int[] items = new int[1000];
// The position of the team leader
private int head = 0;
// 队尾的位置
private int tail = 0;
// 队列的元素个数
volatile private int size = 0;
//入队列
public void put(int value) throws InterruptedException {
synchronized (this){
while(size == items.length){
//队列已满 无法插入
this.wait();
}
items[tail] = value;
tail++;
if(tail == items.length){
// 如果 tail reached the end of the array,就需要从头开始
tail = 0;
}
size++;
//Even if no one is waiting 多调用几次 notify There are no side effects
this.notify(); //当队列不空的时候 就唤醒
}
}
//出队列
public Integer take() throws InterruptedException {
int ret = 0;
synchronized (this){
while (size == 0){
//队列为空 ,就等待
this.wait();
}
ret = items[head];
head++;
if(head == items.length){
head=0;
}
size--;
this.notify();//当队列不满的时候,就唤醒
}
return ret;
}
}
public class Demo16 {
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue queue = new MyBlockingQueue();
Thread customer = new Thread(()->{
while(true){
try {
int value = queue.take();
System.out.println("消费:"+value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) ;
customer.start();
Thread producer = new Thread(()->{
int value = 0;
while(true){
try {
queue.put(value);
System.out.println("生产:"+value);
value++;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
}
}
边栏推荐
- 数字孪生北京故宫,元宇宙推进旅游业进程
- Apex installation error
- 消息中间件解析 | 如何正确理解软件应用系统中关于系统通信的那些事?
- 华盛顿大学、Allen AI 等联合 | RealTime QA: What's the Answer Right Now?(实时 QA:现在的答案是什么?)
- Detailed explanation of table join
- Process sibling data into tree data
- bpmn-process-designer基础上进行自定义样式(工具、元素、菜单)
- 批量替换Word中的表格为图片并保存
- 态路小课堂丨浅谈优质光模块需要具备的条件!
- Simulation implementation of new of Js handwritten function
猜你喜欢
随机推荐
LeetCode_位运算_简单_405.数字转换为十六进制数
态路小课堂丨浅谈优质光模块需要具备的条件!
数据挖掘-03
Multithreading Case - Timer
50W+小程序开发者背后的数据库降本增效实践
MMF的初步介绍:一个规范化的视觉-语言多模态任务框架
MVVM响应式
【2022蓝帽杯】file_session && 浅入opcode
字体反爬之好租
Apex installation error
Six Stones Programming: Problems must be faced, methods must be skillful, and functions that cannot be done well must be solved
《MySQL核心知识》第6章:查询语句
SQL函数 SQRT
PanGu-Coder:函数级的代码生成模型
formatdatetime function mysql (date sub function)
四足机器人软件架构现状分析
快速幂---学习笔记
一文带你读懂云原生、微服务与高可用
Alibaba Cloud Official Redis Development Specification
ddl and dml in sql (the difference between database table and view)