ArrayBlockingQueue
文章目录
1. 成员
ArrayBlockingQueue
是基于数组这种数据结构实现的阻塞队列
final
Object[]
items;
// 存放队列元素的数组
int
takeIndex;
// 指向出队元素的指针
int
putIndex;
// 入队指针
int
count;
// 队列元素数量
阻塞队列实现的关键:一把锁 + 两个等待队列
final
ReentrantLock
lock;
// 入队出队要加锁
private
final
Condition
notEmpty;
//
private
final
Condition
notFull;
//
2. 基本操作
2.1 构造方法
指定容量构造,默认是非公平锁
public
ArrayBlockingQueue(
int
capacity) {
this(
capacity,
false);
}
实际构造方法:
public
ArrayBlockingQueue(
int
capacity,
boolean
fair) {
if (
capacity
<=
0)
throw
new
IllegalArgumentException();
this.
items
=
new
Object[
capacity];
lock
=
new
ReentrantLock(
fair);
notEmpty
=
lock.
newCondition();
notFull
=
lock.
newCondition();
}
2.2 入队
元素入队需要加锁,当队列满了则阻塞当前线程,offer()
入队成功返回true
,失败直接返回false
public
boolean
offer(
E
e) {
checkNotNull(
e);
final
ReentrantLock
lock
=
this.
lock;
lock.
lock();
// 每次入队都需要加锁
try {
if (
count
==
items.
length)
//如果队列已满,直接返回false
return
false;
else {
enqueue(
e);
return
true;
}
}
finally {
lock.
unlock();
//入队成功则解锁
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
在看一个add()
方法,实际调的还是offer,但是如果入队失败了,是直接抛出一个【队列已满】的异常
public
boolean
add(
E
e) {
return
super.
add(
e);
}
public
boolean
add(
E
e) {
if (
offer(
e))
//实际调的还是offer,但是如果入队失败了,是直接抛出一个【队列已满】的异常
return
true;
else
throw
new
IllegalStateException(
"Queue full");
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
再看一个put()
方法
public
void
put(
E
e)
throws
InterruptedException {
checkNotNull(
e);
final
ReentrantLock
lock
=
this.
lock;
lock.
lockInterruptibly();
try {
while (
count
==
items.
length)
// 跟offer方法不同的是,如果队列已满,则会一直阻塞
notFull.
await();
enqueue(
e);
}
finally {
lock.
unlock();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
实际入队方法:
private
void
enqueue(
E
x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final
Object[]
items
=
this.
items;
items[
putIndex]
=
x;
// 入队就是在putindex指针对应位置添加元素后移动putIndex指针
if (
++
putIndex
==
items.
length)
// 如果putIndex到了数组末端,则重新从数组头部开始
putIndex
=
0;
count
++;
notEmpty.
signal();
// 入队成功后,唤醒在等待的队列消费者
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
综上所述:
-
offer()
:入队成功失败都有一定的返回值 -
add()
:实际调的就是offer()
,但是入队成功返回的是true
,失败的时候是直接抛出异常 -
put()
:put()
如果队列已满,则会一直阻塞
2.3 出队
出队和入队一样,都需要操作前先加锁
poll()
:如果队列是空的,直接返回null,否则执行出队
public
E
poll() {
final
ReentrantLock
lock
=
this.
lock;
lock.
lock();
try {
return (
count
==
0)
?
null :
dequeue();
//如果队列是空的,直接返回null,否则执行出队
}
finally {
lock.
unlock();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
在看一个出队方法:take()
public
E
take()
throws
InterruptedException {
final
ReentrantLock
lock
=
this.
lock;
lock.
lockInterruptibly();
try {
while (
count
==
0)
notEmpty.
await();
// 和put一样,队列为空时,一直阻塞
return
dequeue();
}
finally {
lock.
unlock();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
实际出队方法:
private
E
dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final
Object[]
items
=
this.
items;
@SuppressWarnings(
"unchecked")
E
x
= (
E)
items[
takeIndex];
// 跟入队一样,出队操作的是takeIndex指向的元素
items[
takeIndex]
=
null;
// null to help GC
if (
++
takeIndex
==
items.
length)
takeIndex
=
0;
count
--;
if (
itrs
!=
null)
itrs.
elementDequeued();
notFull.
signal();
// 出队完成,唤醒在等待的生产者
return
x;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
3. 注意
enqueue()
,dequeue()
操作的时候,调用signal
而不是调用signalAll
是因为什么?
-
signalAll()
唤醒全部等待的线程,signal()
随机唤醒一个等待的线程,因为每次只能一个线程获取到锁执行操作,如果唤醒全部线程则会增加竞争锁的概率从而降低了效率
原网站版权声明
本文为[51CTO]所创,转载请带上原文链接,感谢
https://blog.51cto.com/u_13716384/5525945