当前位置:网站首页>并发编程之阻塞队列
并发编程之阻塞队列
2022-07-25 13:14:00 【紫乾2014】
一、阻塞队列
队列是一种只允许在一端进行删除操作,在另一端进行插入操作的线性表,允许插入的一端称为队尾、允许删除的一端称为队头。那么阻塞队列,实际上是在队列的基础上增加了两个操作。
- 支持阻塞插入:队列满了的情况下,会阻塞继续往队列中添加数据的线程,直到队列元素被释放。
- 支持阻塞移除:队列为空的情况下,会阻塞从队列中获取元素的线程,直到队列添加了新的元素。
1.1 阻塞队列中的方法
- 添加元素
针对队列满了之后的不同的处理策略
add -> 如果队列满了,抛出异常
offer -> true/false , 添加成功返回true,否则返回false
put -> 如果队列满了,则一直阻塞
offer(timeout) , 带了一个超时时间。如果添加一个元素,队列满了,此时会阻塞timeout时长,超过阻塞时长,返回false。 - 移除元素
element-> 队列为空,抛异常
peek -> true/false,移除成功返回true,否则返回false
take -> 一直阻塞
poll(timeout) -> 如果超时了,还没有元素,则返回null
dequeue -> LIFO , FIFO的队列.
1.2 J.U.C 中的阻塞队列
ArrayBlockingQueue
基于数组结构
LinkedTransferQueue
无界阻塞队列.
transfer 能力
LinkedBlockingQueue + TransferQueue
LinkedBlockingQueue
一个正常的基于链表结构的阻塞队列, 无界队列。
LinkedBlockingDeque
双向链表组成的队列.
支持双向插入和移除.
在一定程度上能够解决多线程的竞争问题。
Fork/Join -工作窃取
PriorityBlcokingQueue
基于优先级队列
DelayQueue
允许延时执行的队列
SynchronousQueue
没有任何存储结构的的队列
通过信息的传递来实现生产者和消费者的阻塞和唤醒。
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
存储处于阻塞状态下的生产者线程、消费者线程.
1000个请求丢入到线程池,必须要找到对应的消费者线程来处理. 1000个线程(生存周期是60s)。
/*可缓存的线程池。
可以处理非常大请求的任务。 1000个任务过来,那么线程池需要分配1000个线程来执行。*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
1.3 DelayQueue应用
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueExampleTask implements Delayed {
private String orderId;
private long start=System.currentTimeMillis();
private long time; //
public DelayQueueExampleTask(String orderId, long time){
this.orderId=orderId;
this.time=time;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((start+time)-System.currentTimeMillis(),TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return (int)(this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "DelayQueueExampleTask{" +
"orderId='" + orderId + '\'' +
", start=" + start +
", time=" + time +
'}';
}
}
import java.util.concurrent.DelayQueue;
public class DelayQueueMain {
private static DelayQueue<DelayQueueExampleTask> delayQueue=new DelayQueue();
public static void main(String[] args) {
delayQueue.offer(new DelayQueueExampleTask("1001",1000));
delayQueue.offer(new DelayQueueExampleTask("1002",5000));
delayQueue.offer(new DelayQueueExampleTask("1003",3000));
delayQueue.offer(new DelayQueueExampleTask("1004",6000));
delayQueue.offer(new DelayQueueExampleTask("1005",2000));
delayQueue.offer(new DelayQueueExampleTask("1006",8000));
delayQueue.offer(new DelayQueueExampleTask("1007",3000));
while(true){
try {
DelayQueueExampleTask task=delayQueue.take();
System.out.println(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1.4 LinkedBlockingQueue应用(责任链模式)
业务场景:对请求参数进行校验
Request.java
public class Request {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Request{" +
"name='" + name + '\'' +
'}';
}
}
IRequestProcessor.java
public interface IRequestProcessor {
void processRequest(Request request);
}
ValidProcessor.java
public class ValidProcessor extends Thread implements IRequestProcessor {
protected IRequestProcessor nextProcessor;
protected BlockingQueue<Request> requests=new LinkedBlockingQueue<>();
public ValidProcessor(IRequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
@Override
public void processRequest(Request request) {
requests.add(request);
}
@Override
public void run() {
while(true){
try {
//异步进行请求处理
Request request=requests.take();
System.out.println("ValidProcessor:"+request);
if(null!=nextProcessor){
nextProcessor.processRequest(request);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
PrintProcessor.java
public class PrintProcessor extends Thread implements IRequestProcessor {
protected IRequestProcessor nextProcessor;
protected BlockingQueue<Request> requests=new LinkedBlockingQueue<>();
public PrintProcessor(IRequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
@Override
public void processRequest(Request request) {
//doSomething;
requests.add(request);
}
@Override
public void run() {
while(true){
try {
//异步进行请求处理
Request request=requests.take();
System.out.println("PrintProcessor:"+request);
if(null!=nextProcessor){
nextProcessor.processRequest(request);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
ChainExample.java
public class ChainExample {
public static void main(String[] args) {
PrintProcessor printProcessor=new PrintProcessor(null);
printProcessor.start();
ValidProcessor validProcessor=new ValidProcessor(printProcessor);
validProcessor.start();
Request request=new Request();
request.setName("心跳机制");
validProcessor.processRequest(request);
}
}
1.5 模拟阻塞队列使用condition
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionBlockedQueueExample {
//表示阻塞队列中的容器
private List<String> items;
//元素个数(表示已经添加的元素个数)
private volatile int size;
//数组的容量
private volatile int count;
private Lock lock=new ReentrantLock();
//让take方法阻塞 ->wait/notify
private final Condition notEmpty=lock.newCondition();
//放add方法阻塞
private final Condition notFull=lock.newCondition();
public ConditionBlockedQueueExample(int count){
this.count=count;
items=new ArrayList<>(count); //写死了
}
//添加一个元素,并且阻塞添加
public void put(String item) throws InterruptedException {
lock.lock();
try{
if(size>=count){
System.out.println("队列满了,需要先等一会");
notFull.await();
}
++size; //增加元素个数
items.add(item);
notEmpty.signal();
}finally {
lock.unlock();
}
}
public String take() throws InterruptedException {
lock.lock();
try{
if(size==0){
System.out.println("阻塞队列空了,先等一会");
notEmpty.await();
}
--size;
String item=items.remove(0);
notFull.signal();
return item;
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionBlockedQueueExample cbqe=new ConditionBlockedQueueExample(10);
//生产者线程
Thread t1=new Thread(()->{ Random random=new Random(); for (int i = 0; i < 1000; i++) { String item="item-"+i; try { cbqe.put(item); //如果队列满了,put会阻塞 System.out.println("生产一个元素:"+item); Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread.sleep(100);
Thread t2=new Thread(()->{ Random random=new Random(); for (;;) { try { String item=cbqe.take(); System.out.println("消费者线程消费一个元素:"+item); Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t2.start();
}
}
边栏推荐
- Convolutional neural network model -- googlenet network structure and code implementation
- 录制和剪辑视频,如何解决占用空间过大的问题?
- Simple understanding of flow
- B tree and b+ tree
- How to realize the configuration method of user password free login?
- web安全入门-UDP测试与防御
- Mid 2022 review | latest progress of large model technology Lanzhou Technology
- 基于百问网IMX6ULL_PRO开发板移植LCD多点触摸驱动(GT911)
- Redis visualizer RDM installation package sharing
- 从输入网址到网页显示
猜你喜欢

【重温SSM框架系列】15 - SSM系列博文总结【SSM杀青篇】

Common operations for Yum and VIM

0713RHCSA
![[six articles talk about scalablegnn] around www 2022 best paper PASCA](/img/ac/150f6397724593a30aab2805ba5084.png)
[six articles talk about scalablegnn] around www 2022 best paper PASCA
![[Video] Markov chain Monte Carlo method MCMC principle and R language implementation | data sharing](/img/20/bb43ab1bc447b519c3b1de0f809b31.png)
[Video] Markov chain Monte Carlo method MCMC principle and R language implementation | data sharing

【CSDN 年终总结】结束与开始,一直在路上—— “1+1=王”的2021总结

web安全入门-UDP测试与防御

How to understand metrics in keras

Generate SQL script file by initializing the latest warehousing time of vehicle attributes

Atcoder beginer contest 261e / / bitwise thinking + DP
随机推荐
【GCN-RS】Region or Global? A Principle for Negative Sampling in Graph-based Recommendation (TKDE‘22)
Migrate PaloAlto ha high availability firewall to panorama
AtCoder Beginner Contest 261E // 按位思考 + dp
卷积神经网络模型之——AlexNet网络结构与代码实现
[机器学习] 实验笔记 – 表情识别(emotion recognition)
手写一个博客平台~第一天
并发编程之并发工具集
【GCN-RS】Towards Representation Alignment and Uniformity in Collaborative Filtering (KDD‘22)
The programmer's father made his own AI breast feeding detector to predict that the baby is hungry and not let the crying affect his wife's sleep
【AI4Code】《Unified Pre-training for Program Understanding and Generation》 NAACL 2021
Online Learning and Pricing with Reusable Resources: Linear Bandits with Sub-Exponential Rewards: Li
Excel import and export source code analysis
Common operations for Yum and VIM
Substance Designer 2021软件安装包下载及安装教程
vim基础操作汇总
Simple understanding of flow
【GCN-CTR】DC-GNN: Decoupled GNN for Improving and Accelerating Large-Scale E-commerce Retrieval WWW22
R language GLM generalized linear model: logistic regression, Poisson regression fitting mouse clinical trial data (dose and response) examples and self-test questions
全球都热炸了,谷歌服务器已经崩掉了
【历史上的今天】7 月 25 日:IBM 获得了第一项专利;Verizon 收购雅虎;亚马逊发布 Fire Phone