当前位置:网站首页>并发编程之阻塞队列
并发编程之阻塞队列
2022-07-25 13:14:00 【紫乾2014】
一、阻塞队列
队列是一种只允许在一端进行删除操作,在另一端进行插入操作的线性表,允许插入的一端称为队尾、允许删除的一端称为队头。那么阻塞队列,实际上是在队列的基础上增加了两个操作。
- 支持阻塞插入:队列满了的情况下,会阻塞继续往队列中添加数据的线程,直到队列元素被释放。
- 支持阻塞移除:队列为空的情况下,会阻塞从队列中获取元素的线程,直到队列添加了新的元素。
1.1 阻塞队列中的方法
- 添加元素
针对队列满了之后的不同的处理策略
add -> 如果队列满了,抛出异常
offer -> true/false , 添加成功返回true,否则返回false
put -> 如果队列满了,则一直阻塞
offer(timeout) , 带了一个超时时间。如果添加一个元素,队列满了,此时会阻塞timeout时长,超过阻塞时长,返回false。 - 移除元素
element-> 队列为空,抛异常
peek -> true/false,移除成功返回true,否则返回false
take -> 一直阻塞
poll(timeout) -> 如果超时了,还没有元素,则返回null
dequeue -> LIFO , FIFO的队列.
1.2 J.U.C 中的阻塞队列
ArrayBlockingQueue
基于数组结构
LinkedTransferQueue
无界阻塞队列.
transfer 能力
LinkedBlockingQueue + TransferQueue
LinkedBlockingQueue
一个正常的基于链表结构的阻塞队列, 无界队列。
LinkedBlockingDeque
双向链表组成的队列.
支持双向插入和移除.
在一定程度上能够解决多线程的竞争问题。
Fork/Join -工作窃取
PriorityBlcokingQueue
基于优先级队列
DelayQueue
允许延时执行的队列
SynchronousQueue
没有任何存储结构的的队列
通过信息的传递来实现生产者和消费者的阻塞和唤醒。
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
存储处于阻塞状态下的生产者线程、消费者线程.
1000个请求丢入到线程池,必须要找到对应的消费者线程来处理. 1000个线程(生存周期是60s)。
/*可缓存的线程池。
可以处理非常大请求的任务。 1000个任务过来,那么线程池需要分配1000个线程来执行。*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
1.3 DelayQueue应用
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueExampleTask implements Delayed {
private String orderId;
private long start=System.currentTimeMillis();
private long time; //
public DelayQueueExampleTask(String orderId, long time){
this.orderId=orderId;
this.time=time;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((start+time)-System.currentTimeMillis(),TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return (int)(this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "DelayQueueExampleTask{" +
"orderId='" + orderId + '\'' +
", start=" + start +
", time=" + time +
'}';
}
}
import java.util.concurrent.DelayQueue;
public class DelayQueueMain {
private static DelayQueue<DelayQueueExampleTask> delayQueue=new DelayQueue();
public static void main(String[] args) {
delayQueue.offer(new DelayQueueExampleTask("1001",1000));
delayQueue.offer(new DelayQueueExampleTask("1002",5000));
delayQueue.offer(new DelayQueueExampleTask("1003",3000));
delayQueue.offer(new DelayQueueExampleTask("1004",6000));
delayQueue.offer(new DelayQueueExampleTask("1005",2000));
delayQueue.offer(new DelayQueueExampleTask("1006",8000));
delayQueue.offer(new DelayQueueExampleTask("1007",3000));
while(true){
try {
DelayQueueExampleTask task=delayQueue.take();
System.out.println(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1.4 LinkedBlockingQueue应用(责任链模式)
业务场景:对请求参数进行校验
Request.java
public class Request {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Request{" +
"name='" + name + '\'' +
'}';
}
}
IRequestProcessor.java
public interface IRequestProcessor {
void processRequest(Request request);
}
ValidProcessor.java
public class ValidProcessor extends Thread implements IRequestProcessor {
protected IRequestProcessor nextProcessor;
protected BlockingQueue<Request> requests=new LinkedBlockingQueue<>();
public ValidProcessor(IRequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
@Override
public void processRequest(Request request) {
requests.add(request);
}
@Override
public void run() {
while(true){
try {
//异步进行请求处理
Request request=requests.take();
System.out.println("ValidProcessor:"+request);
if(null!=nextProcessor){
nextProcessor.processRequest(request);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
PrintProcessor.java
public class PrintProcessor extends Thread implements IRequestProcessor {
protected IRequestProcessor nextProcessor;
protected BlockingQueue<Request> requests=new LinkedBlockingQueue<>();
public PrintProcessor(IRequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
@Override
public void processRequest(Request request) {
//doSomething;
requests.add(request);
}
@Override
public void run() {
while(true){
try {
//异步进行请求处理
Request request=requests.take();
System.out.println("PrintProcessor:"+request);
if(null!=nextProcessor){
nextProcessor.processRequest(request);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
ChainExample.java
public class ChainExample {
public static void main(String[] args) {
PrintProcessor printProcessor=new PrintProcessor(null);
printProcessor.start();
ValidProcessor validProcessor=new ValidProcessor(printProcessor);
validProcessor.start();
Request request=new Request();
request.setName("心跳机制");
validProcessor.processRequest(request);
}
}
1.5 模拟阻塞队列使用condition
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionBlockedQueueExample {
//表示阻塞队列中的容器
private List<String> items;
//元素个数(表示已经添加的元素个数)
private volatile int size;
//数组的容量
private volatile int count;
private Lock lock=new ReentrantLock();
//让take方法阻塞 ->wait/notify
private final Condition notEmpty=lock.newCondition();
//放add方法阻塞
private final Condition notFull=lock.newCondition();
public ConditionBlockedQueueExample(int count){
this.count=count;
items=new ArrayList<>(count); //写死了
}
//添加一个元素,并且阻塞添加
public void put(String item) throws InterruptedException {
lock.lock();
try{
if(size>=count){
System.out.println("队列满了,需要先等一会");
notFull.await();
}
++size; //增加元素个数
items.add(item);
notEmpty.signal();
}finally {
lock.unlock();
}
}
public String take() throws InterruptedException {
lock.lock();
try{
if(size==0){
System.out.println("阻塞队列空了,先等一会");
notEmpty.await();
}
--size;
String item=items.remove(0);
notFull.signal();
return item;
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionBlockedQueueExample cbqe=new ConditionBlockedQueueExample(10);
//生产者线程
Thread t1=new Thread(()->{ Random random=new Random(); for (int i = 0; i < 1000; i++) { String item="item-"+i; try { cbqe.put(item); //如果队列满了,put会阻塞 System.out.println("生产一个元素:"+item); Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread.sleep(100);
Thread t2=new Thread(()->{ Random random=new Random(); for (;;) { try { String item=cbqe.take(); System.out.println("消费者线程消费一个元素:"+item); Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t2.start();
}
}
边栏推荐
- Arrays常用方法
- C#基础学习(二十三)_窗体与事件
- 备战2022 CSP-J1 2022 CSP-S1 初赛 视频集
- massCode 一款优秀的开源代码片段管理器
- Brpc source code analysis (III) -- the mechanism of requesting other servers and writing data to sockets
- Cyberspace Security penetration attack and defense 9 (PKI)
- 面试官问我:Mysql的存储引擎你了解多少?
- 并发编程之AQS
- R语言GLM广义线性模型:逻辑回归、泊松回归拟合小鼠临床试验数据(剂量和反应)示例和自测题
- Summary of Niuke forum project deployment
猜你喜欢

【GCN-RS】Towards Representation Alignment and Uniformity in Collaborative Filtering (KDD‘22)

Emqx cloud update: more parameters are added to log analysis, which makes monitoring, operation and maintenance easier

【服务器数据恢复】HP EVA服务器存储RAID信息断电丢失的数据恢复

Docekr学习 - MySQL8主从复制搭建部署

0716RHCSA

【GCN-RS】Region or Global? A Principle for Negative Sampling in Graph-based Recommendation (TKDE‘22)

Programmer growth chapter 27: how to evaluate requirements priorities?

R语言GLM广义线性模型:逻辑回归、泊松回归拟合小鼠临床试验数据(剂量和反应)示例和自测题

cv2.resize函数报错:error: (-215:Assertion failed) func != 0 in function ‘cv::hal::resize‘

【CSDN 年终总结】结束与开始,一直在路上—— “1+1=王”的2021总结
随机推荐
面试官问我:Mysql的存储引擎你了解多少?
Generate SQL script file by initializing the latest warehousing time of vehicle attributes
Date and time function of MySQL function summary
Masscode is an excellent open source code fragment manager
EMQX Cloud 更新:日志分析增加更多参数,监控运维更省心
Shell common script: check whether a domain name and IP address are connected
0713RHCSA
Convolutional neural network model -- googlenet network structure and code implementation
Jupyter Notebook介绍
[Video] Markov chain Monte Carlo method MCMC principle and R language implementation | data sharing
【GCN-RS】Towards Representation Alignment and Uniformity in Collaborative Filtering (KDD‘22)
Vim技巧:永远显示行号
0717RHCSA
【历史上的今天】7 月 25 日:IBM 获得了第一项专利;Verizon 收购雅虎;亚马逊发布 Fire Phone
The larger the convolution kernel, the stronger the performance? An interpretation of replknet model
[review SSM framework series] 15 - Summary of SSM series blog posts [SSM kill]
Mlx90640 infrared thermal imager temperature sensor module development notes (V)
Word style and multi-level list setting skills (II)
卷积神经网络模型之——VGG-16网络结构与代码实现
0710RHCSA