当前位置:网站首页>AQS之BlockingQueue源码解析
AQS之BlockingQueue源码解析
2022-06-29 09:28:00 【玄郭郭】
目录
BlockingQueue介绍
BlockingQueue 是一个先进先出(FIFO)的阻塞队列。
BlockingQueue,java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的最有用的类,它的特性是在任意时刻只有一个线程可以进行take或者put操作,并且BlockingQueue提供了超时return null的机制。
BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于数据的入队,出队的原理,却是大同小异。
队列类型
1. 无限队列 (unbounded queue ) - 几乎可以无限增长
2. 有限队列 ( bounded queue ) - 定义了最大容量
通常用链表或者数组实现
一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级
队列主要操作:入队(enqueue)与出队(dequeue)
常见的5种阻塞队列
ArrayBlockingQueue 由数组支持的有界队列
LinkedBlockingQueue 由链接节点支持的可选有界队列
PriorityBlockingQueue 由优先级堆支持的无界优先级队列
DelayQueue 由优先级堆支持的、基于时间的调度队列
SynchronousQueue 没有容量,不存储元素的阻塞队列,也即单个元素的队列
BlockingQueue中API
offer(E e): 将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
offer(E e, long timeout, TimeUnit unit): 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
add(E e): 将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
put(E e): 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
take(): 从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
poll(long timeout, TimeUnit unit): 在给定的时间里,从队列中获取值,如果没有取到会抛出异常。
remainingCapacity():获取队列中剩余的空间。
remove(Object o): 从队列中移除指定的值。
contains(Object o): 判断队列中是否拥有该值。
drainTo(Collection c): 将队列中值,全部移除,并发设置到给定的集合中。
BlockingQueue源码解析
下面的源码以ArrayBlockingQueue为例。ArrayBlockingQueue是一个典型的生产者-消费者的模式。带着生产者-消费者的思想去阅读源码
参数解析:
//保存数据的数组,可以看出队列的底层一个数组的数据结构 final Object[] items; //下一次获取元素的数组下标 int takeIndex; //下一次添加元素的数组下标 int putIndex; //队列中元素的数量 int count; //可重入锁 final ReentrantLock lock; //队列不为空的信号量 private final Condition notEmpty; //队列不满的信号量 private final Condition notFull; //当前迭代器的共享状态,如果已知不存在任何迭代器,则为null。允许队列操作更新迭代器状态。 transient Itrs itrs = null;
//构造器
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化数组
this.items = new Object[capacity];
//创建非公平锁
lock = new ReentrantLock(fair);
//初始化不为空的条件锁
notEmpty = lock.newCondition();
//初始化不满的条件锁
notFull = lock.newCondition();
}//实例化一个ArrayBlockingQueue static BlockingQueue<String> queue = new ArrayBlockingQueue(16); //其他业务代码 //往队列中添加数据 queue.put(data);
将添加数据的操作看成一个生产者,往队列中添加数据的常用的2种方式,offer和put
//往队列插入一条数据,成功返回true,否则返回false,其实add()最终调的也是offer()
public boolean offer(E e) {
//校验参数是否为null
checkNotNull(e);
//开始加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果队列里的元素已经放满了,返回false
if (count == items.length)
return false;
else {
//队列还未满,元素入队
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}//往队列插入一条数据,如果队列满了,就进行阻塞,等到队列有空间的时候再入队
public void put(E e) throws InterruptedException {
//校验参数是否为null
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
//如果队列里的元素已经放满了,则进行阻塞,最终会调用LockSupport.park(this); 进行阻塞,然后等待元素出队时进行唤醒
notFull.await();
//队列还未满,元素入队
enqueue(e);
} finally {
lock.unlock();
}
}入队操作:
//元素入队
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//将参数赋值,这个不知道怎么解释,根据图来理解吧
items[putIndex] = x;
if (++putIndex == items.length)
//如果队列已经满了,那么下一个参数就是从头(数组下标为0的)开始入队
putIndex = 0;
//统计队列中元素的数量
count++;
//一旦有数据入队,说明队列不为空,即通知消费者去从队列中拿数据,标识队列不为空的信号量
notEmpty.signal();
}public final void signal() {
//判断当前线程是不是获取锁线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//从条件队列中获取第一个节点
Node first = firstWaiter;
if (first != null)
//第一个节点不为空时,去唤醒第二个节点中的线程(此时这个线程应该是消费者),最终会调用LockSupport.unpark(node.thread);
//为什么是取去唤醒第二个节点中的线程?因为真实存放线程的是从第二个节点开始的,详情可见ReentrantLock的源码解析
doSignal(first);
}此时数据已经入队成功,将获取数据看成一个消费者从队列中获取数据,获取数据常用的2种方式,poll和take
//直接从队列头中去取,有就取出来,并且删除队列中的元素,队列中没有元素就返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//有元素的话,直接从队列头中去取
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}//从队列头中去取元素,有就取出来,并且删除队列中的元素,队列中没有元素则阻塞等待,一直等到有元素就取出来
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
//队列中没有元素则阻塞等待,最终会调用LockSupport.park(this); 进行阻塞,然后等待元素入队时进行唤醒
notEmpty.await();
//有元素的话,直接从队列头中去取
return dequeue();
} finally {
lock.unlock();
}
}元素出队操作:
//取队列元素
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取到获取元素的指针的那个数据
E x = (E) items[takeIndex];
//并且将该数据从队列中删除
items[takeIndex] = null;
if (++takeIndex == items.length)
//如果该元素已经是队列的最后一个元素了,那么下一个参数就是从头(数组下标为0的)开始获取
takeIndex = 0;
//队列元素减一
count--;
if (itrs != null)
itrs.elementDequeued();
//一旦有数据出队,说明队列不满,即通知生产者去往队列中补充数据,(标识队列不满的信号量)
notFull.signal();
return x;
}public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//从条件队列中获取第一个节点
Node first = firstWaiter;
if (first != null)
//第一个节点不为空时,去唤醒第二个节点中的线程(此时这个线程应该是生产者),最终会调用LockSupport.unpark(node.thread);
//为什么是取去唤醒第二个节点中的线程?因为真实存放线程的是从第二个节点开始的,详情可见ReentrantLock的源码解析
doSignal(first);
}至此,一个元素从生产者入队列,再到消费者出队列的一个完整的流程就走完了。源码解析中的 doSignal(first); 这块代码的解析我没有给贴出来,因为这里面的逻辑比较绕,是将一个Node节点在CLH队列和条件队列相互转移,比较复杂,有些地方我一时还表达不好,所以就不详细说明了,这里面的逻辑大家可以自己跟进去看一下。
ArrayBlockingQueue简单图解




边栏推荐
- 2019.11.17 training summary
- Related problems of pointer array, array pointer and parameter passing
- C language library function --strstr()
- Power strings [KMP cycle section]
- 1146 topological order (25 points)
- September 17, 2020 gateway business process has two tasks: referer certification and non commodity Templating
- C语言库函数--strstr()
- 函数指针、函数指针数组、计算器+转移表等归纳总结
- BUUCTF--reverse1
- MySQL innodb每行数据长度的限制
猜你喜欢
随机推荐
Rikka with cake (segment tree + segment tree)
CLR via C reading notes - single instance application
IIS服务器相关错误
2019.10.23 training summary
Reading notes of CLR via C -clr boarding and AppDomain
Nacos environmental isolation
查看CSDN的博客排名
拼图小游戏中学到的Graphics.h
I would like to know how to open an account for free online stock registration? In addition, is it safe to open a mobile account?
std::unique_ptr<T>与boost::scoped_ptr<T>的特殊性
To 3 --- 最后的编程挑战
C#中Attribute(特性)
To 3 -- the last programming challenge
C语言库函数--strstr()
&和&&的区别
如何快速完成磁盤分區
MySQL innodb每行数据长度的限制
Virtual machine port scanning
Arc view and arc viewpager
Application of Pgp in encryption technology









