1、五种阻塞队列介绍
ArrayBlockingQueue 有界队列,底层使用数组实现,并发控制使用ReentrantLock控制,不管是插入操作还是读取操作,都需要获取锁之后才能执行。
LinkedBlockingQueue 底层基于单向链表实现,既可以当做有界队列,也可以当做无界队列使用。使用两个ReentrantLock实现并发控制:takelock和putlock。
SynchronousQueue 底层使用单向链表实现,只有一个元素,同步的意思是一个写操作必须等到一个读操作之后才返回,指的是读写线程的同步。
PriorityBlockingQueue 带排序的阻塞队列的实现,使用数组进行实现。并发控制使用ReentrantLock,队列为无界队列。 有初始化参数指定队列大小,但是会自动扩容。使用最小堆来实现排序。
DelayedQueue DelayedQueue是使用PriorityBlockingQueue和Delayed实现的,内部定义了一个优先级队列,当调用offer的时候,把Delayed对象加入队列中,使用take先把first对象拿出来(peek),如果没有到达阈值,进行await处理。
2、poll和peek的区别
都用于取队列的头结点,poll会删除头结点,peek不会删除头结点。
3、LinkedBlockingQueue
是先进先出队列FIFO。
采用ReentrantLock保证线程安全
3.1、功能
3.1.1、增加
增加有三种方式,前提:队列满
方式
put
add
offer
特点
一直阻塞
抛异常
返回false
3.1.2、删除
删除有三种方式,前提:队列为空
方式
remove
poll
take
特点
NoSuchElementException
返回false
阻塞
3.2、简单分析
LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。
基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。
头结点和尾结点一开始总是指向一个哨兵的结点,它不持有实际数据,当队列中有数据时,头结点仍然指向这个哨兵,尾结点指向有效数据的最后一个结点。这样做的好处在于,与计数器 count 结合后,对队头、队尾的访问可以独立进行,而不需要判断头结点与尾结点的关系。
3.2.1、节点与属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
static class Node <E > { E item; Node<E> next; Node(E x) { item = x; } } private final int capacity; private final AtomicInteger count = new AtomicInteger();transient Node<E> head;private transient Node<E> last;private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();
3.2.2、插入线程与获取线程的相互通知
signalNotEmpty()方法,在插入线程发现队列为空时调用,告知获取线程需要等待。 signalNotFull()方法,在获取线程发现队列已满时调用,告知插入线程需要等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
private void signalNotEmpty () { final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } private void signalNotFull () { final ReentrantLock putLock = this .putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
3.2.3、入队与出队操作
enqueue()方法只能在持有 putLock 锁下执行,dequeue()在持有 takeLock 锁下执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
private void enqueue (Node<E> node) { last = last.next = node; } private E dequeue () { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null ; return x; }
3.2.4、对两把锁的加锁与释放
在需要对两把锁同时加锁时,把加锁的顺序与释放的顺序封装成方法,确保所有地方都是一致的。而且获取锁时都是不响应中断的,一直获取直到加锁成功,这就避免了第一把锁加锁成功,而第二把锁加锁失败导致锁不释放的风险。
1 2 3 4 5 6 7 8 9 10
void fullyLock () { putLock.lock(); takeLock.lock(); } void fullyUnlock () { takeLock.unlock(); putLock.unlock(); }
3.3、源码解读
简单介绍一下LinkedBlockingQueue中API的源码,如构造方法,新增,获取,删除,drainTo。
3.3.1、构造函数
LinkedBlockingQueue有三个构造方法,其中无参构造尽量少用,因为容量为Integer的最大值,操作不当会出现内存溢出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
public LinkedBlockingQueue () { this (Integer.MAX_VALUE); } public LinkedBlockingQueue (int capacity) { if (capacity <= 0 ) throw new IllegalArgumentException(); this .capacity = capacity; last = head = new Node<E>(null ); } public LinkedBlockingQueue (Collection<? extends E> c) { this (Integer.MAX_VALUE); final ReentrantLock putLock = this .putLock; putLock.lock(); try { int n = 0 ; for (E e : c) { if (e == null ) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full" ); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
3.3.2、offer(E e)
将给定的元素设置到队列中,如果设置成功返回true, 否则返回false。 e的值不能为空,否则抛出空指针异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
public boolean offer (E e) { if (e == null ) throw new NullPointerException(); final AtomicInteger count = this .count; if (count.get() == capacity) return false ; int c = -1 ; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this .putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); return c >= 0 ; }
3.3.3、put(E e)
将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException(); int c = -1 ; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); }
3.3.4、peek()
非阻塞的获取队列中的第一个元素,不出队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
public E peek () { if (count.get() == 0 ) return null ; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null ) return null ; else return first.item; } finally { takeLock.unlock(); } }
3.3.5、poll()
非阻塞的获取队列中的值,未获取到返回null。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
public E poll () { final AtomicInteger count = this .count; if (count.get() == 0 ) return null ; E x = null ; int c = -1 ; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { if (count.get() > 0 ) { x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
3.3.6、remove(Object o)
从队列中移除指定的值。将两把锁都锁定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
public boolean remove (Object o) { if (o == null ) return false ; fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null ; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true ; } } return false ; } finally { fullyUnlock(); } } void unlink (Node<E> p, Node<E> trail) { p.item = null ; trail.next = p.next; if (last == p) last = trail; if (count.getAndDecrement() == capacity) notFull.signal(); }
3.3.7、clear()
清空队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
public void clear () { fullyLock(); try { for (Node<E> p, h = head; (p = h.next) != null ; h = p) { h.next = h; p.item = null ; } head = last; if (count.getAndSet(0 ) == capacity) notFull.signal(); } finally { fullyUnlock(); } }
3.3.8、drainTo(Collection c)
将队列中值,全部移除,并发设置到给定的集合中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
public int drainTo (Collection<? super E> c, int maxElements) { if (c == null ) throw new NullPointerException(); if (c == this ) throw new IllegalArgumentException(); if (maxElements <= 0 ) return 0 ; boolean signalNotFull = false ; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { int n = Math.min(maxElements, count.get()); Node<E> h = head; int i = 0 ; try { while (i < n) { Node<E> p = h.next; c.add(p.item); p.item = null ; h.next = h; h = p; ++i; } return n; } finally { if (i > 0 ) { head = h; signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) signalNotFull(); } }