当前位置:网站首页>Blocking queue for concurrent programming
Blocking queue for concurrent programming
2022-07-25 13:20:00 【Ziqian 2014】
One 、 Blocking queues
A queue is one that allows deletion only at one end , A linear table inserted at the other end , The end allowed to be inserted is called the end of the queue 、 The end allowed to be deleted is called the team leader . So block the queue , In fact, two operations are added to the queue .
- Support blocking insertion : When the queue is full , Will block the thread that continues to add data to the queue , Until the queue element is released .
- Support blocking removal : When the queue is empty , Will block the thread that gets the element from the queue , Until a new element is added to the queue .
1.1 Blocking methods in the queue
- Additive elements
For different processing strategies after the queue is full
add -> If the queue is full , Throw an exception
offer -> true/false , Add successfully, return true, Otherwise return to false
put -> If the queue is full , It's blocking all the time
offer(timeout) , With a timeout . If you add an element , The queue is full , It will block timeout Duration , Blocking time exceeded , return false. - Remove elements
element-> The queue is empty , Throw exceptions
peek -> true/false, Remove successfully returned true, Otherwise return to false
take -> Keep blocking
poll(timeout) -> If I run out of time , There are no elements yet , Then return to null
dequeue -> LIFO , FIFO Queues .
1.2 J.U.C Blocking queue in
ArrayBlockingQueue
Based on array structure
LinkedTransferQueue
Infinite blocking queue .
transfer Ability
LinkedBlockingQueue + TransferQueue
LinkedBlockingQueue
A normal blocking queue based on linked list structure , Unbounded queue .
LinkedBlockingDeque
A queue composed of two-way linked lists .
Support two-way insertion and removal .
To some extent, it can solve the problem of multi-threaded competition .
Fork/Join - Job theft
PriorityBlcokingQueue
Based on priority queues
DelayQueue
A queue that allows delayed execution
SynchronousQueue
Queues without any storage structure
Through the transmission of information to achieve the blocking and awakening of producers and consumers .
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
Store producer threads in a blocked state 、 Consumer thread .
1000 Requests are dropped into the thread pool , You must find the corresponding consumer thread to handle . 1000 Threads ( The life cycle is 60s).
/* Cacheable thread pool .
Tasks that can handle very large requests . 1000 A mission come here , Then the thread pool needs to be allocated 1000 Threads to execute .*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
1.3 DelayQueue application
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueExampleTask implements Delayed {
private String orderId;
private long start=System.currentTimeMillis();
private long time; //
public DelayQueueExampleTask(String orderId, long time){
this.orderId=orderId;
this.time=time;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((start+time)-System.currentTimeMillis(),TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return (int)(this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "DelayQueueExampleTask{" +
"orderId='" + orderId + '\'' +
", start=" + start +
", time=" + time +
'}';
}
}
import java.util.concurrent.DelayQueue;
public class DelayQueueMain {
private static DelayQueue<DelayQueueExampleTask> delayQueue=new DelayQueue();
public static void main(String[] args) {
delayQueue.offer(new DelayQueueExampleTask("1001",1000));
delayQueue.offer(new DelayQueueExampleTask("1002",5000));
delayQueue.offer(new DelayQueueExampleTask("1003",3000));
delayQueue.offer(new DelayQueueExampleTask("1004",6000));
delayQueue.offer(new DelayQueueExampleTask("1005",2000));
delayQueue.offer(new DelayQueueExampleTask("1006",8000));
delayQueue.offer(new DelayQueueExampleTask("1007",3000));
while(true){
try {
DelayQueueExampleTask task=delayQueue.take();
System.out.println(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1.4 LinkedBlockingQueue application ( The chain of responsibility model )
Business scenario : Verify the request parameters
Request.java
public class Request {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Request{" +
"name='" + name + '\'' +
'}';
}
}
IRequestProcessor.java
public interface IRequestProcessor {
void processRequest(Request request);
}
ValidProcessor.java
public class ValidProcessor extends Thread implements IRequestProcessor {
protected IRequestProcessor nextProcessor;
protected BlockingQueue<Request> requests=new LinkedBlockingQueue<>();
public ValidProcessor(IRequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
@Override
public void processRequest(Request request) {
requests.add(request);
}
@Override
public void run() {
while(true){
try {
// Request processing asynchronously
Request request=requests.take();
System.out.println("ValidProcessor:"+request);
if(null!=nextProcessor){
nextProcessor.processRequest(request);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
PrintProcessor.java
public class PrintProcessor extends Thread implements IRequestProcessor {
protected IRequestProcessor nextProcessor;
protected BlockingQueue<Request> requests=new LinkedBlockingQueue<>();
public PrintProcessor(IRequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
@Override
public void processRequest(Request request) {
//doSomething;
requests.add(request);
}
@Override
public void run() {
while(true){
try {
// Request processing asynchronously
Request request=requests.take();
System.out.println("PrintProcessor:"+request);
if(null!=nextProcessor){
nextProcessor.processRequest(request);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
ChainExample.java
public class ChainExample {
public static void main(String[] args) {
PrintProcessor printProcessor=new PrintProcessor(null);
printProcessor.start();
ValidProcessor validProcessor=new ValidProcessor(printProcessor);
validProcessor.start();
Request request=new Request();
request.setName(" heartbeat ");
validProcessor.processRequest(request);
}
}
1.5 Simulate blocking queue usage condition
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionBlockedQueueExample {
// Represents a container in a blocking queue
private List<String> items;
// Element number ( Indicates the number of elements that have been added )
private volatile int size;
// The capacity of the array
private volatile int count;
private Lock lock=new ReentrantLock();
// Give Way take Methods block ->wait/notify
private final Condition notEmpty=lock.newCondition();
// discharge add Methods block
private final Condition notFull=lock.newCondition();
public ConditionBlockedQueueExample(int count){
this.count=count;
items=new ArrayList<>(count); // It's dead
}
// Add an element , And block the addition of
public void put(String item) throws InterruptedException {
lock.lock();
try{
if(size>=count){
System.out.println(" The queue is full , You need to wait a minute ");
notFull.await();
}
++size; // Increase the number of elements
items.add(item);
notEmpty.signal();
}finally {
lock.unlock();
}
}
public String take() throws InterruptedException {
lock.lock();
try{
if(size==0){
System.out.println(" The blocking queue is empty , Wait a minute ");
notEmpty.await();
}
--size;
String item=items.remove(0);
notFull.signal();
return item;
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionBlockedQueueExample cbqe=new ConditionBlockedQueueExample(10);
// Producer thread
Thread t1=new Thread(()->{ Random random=new Random(); for (int i = 0; i < 1000; i++) { String item="item-"+i; try { cbqe.put(item); // If the queue is full ,put It will block System.out.println(" Produce an element :"+item); Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread.sleep(100);
Thread t2=new Thread(()->{ Random random=new Random(); for (;;) { try { String item=cbqe.take(); System.out.println(" The consumer thread consumes an element :"+item); Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t2.start();
}
}
边栏推荐
- 安装mujoco报错:distutils.errors.DistutilsExecError: command ‘gcc‘ failed with exit status 1
- Blindly expanding the scale of the meta universe has deviated from the development logic of the meta universe
- Convolutional neural network model -- lenet network structure and code implementation
- JS Array indexOf includes sort() 冒号排序 快速排序 去重和随机样本 random
- Excel add key run macro
- [machine learning] experimental notes - emotion recognition
- [Video] Markov chain Monte Carlo method MCMC principle and R language implementation | data sharing
- How to realize the configuration method of user password free login?
- [ai4code final chapter] alphacode: competition level code generation with alphacode (deepmind)
- 机器学习强基计划0-4:通俗理解奥卡姆剃刀与没有免费午餐定理
猜你喜欢

卷积神经网络模型之——VGG-16网络结构与代码实现

mysql函数汇总之日期和时间函数

【AI4Code】《Pythia: AI-assisted Code Completion System》(KDD 2019)

【GCN-RS】Learning Explicit User Interest Boundary for Recommendation (WWW‘22)

Shell常用脚本:判断远程主机的文件是否存在

【视频】马尔可夫链原理可视化解释与R语言区制转换MRS实例|数据分享

Django 2 ----- database and admin

【AI4Code】CodeX:《Evaluating Large Language Models Trained on Code》(OpenAI)

B tree and b+ tree

从输入网址到网页显示
随机推荐
全网最简单解决方式1045-Access denied for user [email protected](using password:YES)
VIM basic operation summary
Numpy快速入门
程序员成长第二十七篇:如何评估需求优先级?
【CSDN 年终总结】结束与开始,一直在路上—— “1+1=王”的2021总结
【视频】马尔可夫链蒙特卡罗方法MCMC原理与R语言实现|数据分享
Masscode is an excellent open source code fragment manager
C#基础学习(二十三)_窗体与事件
Based on Baiwen imx6ull_ Ap3216 experiment driven by Pro development board
Simple understanding of flow
vim基础操作汇总
pycharm不能输入中文解决方法
Docker learning - redis cluster -3 master and 3 slave - capacity expansion - capacity reduction building
Generate SQL script file by initializing the latest warehousing time of vehicle attributes
Excel import and export source code analysis
Django 2 ----- 数据库与Admin
ThreadLocal&Fork/Join
Convolutional neural network model -- googlenet network structure and code implementation
Oran special series-21: major players (equipment manufacturers) and their respective attitudes and areas of expertise
stable_baselines快速入门