当前位置:网站首页>Linked blocking Queue Analysis of blocking queue

Linked blocking Queue Analysis of blocking queue

2020-11-06 01:18:00 Clamhub's blog

1、 Five kinds of blocking queues are introduced

  • ArrayBlockingQueue
    Bounded queues , The bottom layer is implemented by array , Concurrent control use ReentrantLock control , Whether it's an insert operation or a read operation , You need to get the lock before you can execute .
  • LinkedBlockingQueue
    The bottom layer is based on one-way linked list , It can be regarded as a bounded queue , It can also be used as an unbounded queue . Use two ReentrantLock Implement concurrency control :takelock and putlock.
  • SynchronousQueue
    The bottom layer uses one-way linked list , There's only one element , Synchronization means that a write operation must wait for a read operation to return , It refers to the synchronization of read-write threads .
  • PriorityBlockingQueue
    Implementation of blocking queue with sorting , Using arrays to implement . Concurrent control use ReentrantLock, The queue is unbounded .
    There are initialization parameters to specify the size of the queue , But it will automatically expand . Using the smallest heap for sorting .
  • DelayedQueue
    DelayedQueue It's using PriorityBlockingQueue and Delayed Realized , A priority queue is defined internally , When calling offer When , hold Delayed Objects are added to the queue , Use take The first first Take the object out (peek), If you don't reach the threshold , Conduct await Handle .

2、poll and peek The difference between

Are used to get the header of the queue ,poll The header node will be deleted ,peek The header node will not be deleted .

3、LinkedBlockingQueue

  • It's the first in, first out line FIFO.
  • use ReentrantLock Ensure thread safety

3.1、 function

3.1.1、 increase

There are three ways to increase , Premise : The queue is full

The way put add offer
characteristic Keep blocking Throw exceptions return false
3.1.2、 Delete

There are three ways to delete , Premise : The queue is empty

The way remove poll take
characteristic NoSuchElementException return false Blocking

3.2、 Simple analysis

  • LinkedBlockingQueue It's a blocking queue , The interior is made up of two ReentrantLock To achieve thread safety in and out of the queue , By their own Condition Object's await and signal To achieve the wait and wake function .
  • Based on one-way linked list 、 The range is arbitrary ( In fact, there is a boundary )、FIFO Blocking queues .
  • The head and tail nodes always point to a sentinel's node in the beginning , It doesn't hold the actual data , When there is data in the queue , The head node still points to the sentinel , The tail node points to the last node of valid data . The advantage of doing so is , And counter count After combining , The team leader 、 The visit at the end of the team can be done independently , It is not necessary to judge the relationship between the head node and the tail node .

LinkedBlockingQueue Inheritance diagram .png

3.2.1、 Nodes and properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Internal class of linked list node 
static class Node<E> {
// Node elements
E item;
Node<E> next;
Node(E x) {
item = x;
}
}
// Capacity limits , If not set , Then for Integer Maximum
private final int capacity;
// Number of current elements
private final AtomicInteger count = new AtomicInteger();
// The head of the chain :head.item == null
transient Node<E> head;
// The end of the list :last.next == null
private transient Node<E> last;
//take,poll Wait for the lock
private final ReentrantLock takeLock = new ReentrantLock();
// Waiting queue for waiting tasks
private final Condition notEmpty = takeLock.newCondition();
//put,offer Wait until the lock is inserted
private final ReentrantLock putLock = new ReentrantLock();
// Waiting queue waiting to be inserted
private final Condition notFull = putLock.newCondition();
3.2.2、 Insert thread and get mutual notification of thread

signalNotEmpty() Method , Called when the insertion thread finds that the queue is empty , Tell the fetch thread to wait .
signalNotFull() Method , Called when the fetch thread finds that the queue is full , Tell the insert thread to wait .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Said wait take.put/offer call , Otherwise, it's not usually locked takeLock.
private void signalNotEmpty() {
// obtain takeLock
final ReentrantLock takeLock = this.takeLock;
// lock takeLock
takeLock.lock();
try {
// Wake up the take Thread wait queue
notEmpty.signal();
} finally {
// Release the lock
takeLock.unlock();
}
}
// Said wait put,take/poll call
private void signalNotFull() {
// obtain putLock
final ReentrantLock putLock = this.putLock;
// lock putLock
putLock.lock();
try {
// Wake up insert thread wait queue
notFull.signal();
} finally {
// Release the lock
putLock.unlock();
}
}
3.2.3、 Entry and exit operations

enqueue() Methods can only be held in putLock Lock down execution ,dequeue() In the hold takeLock Lock down execution .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Insert... At the end of the queue 
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
//last.next Point to the present node
// Tail pointer back
last = last.next = node;
}
// Remove queue header
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
// Save the head pointer
Node<E> h = head;
// Get the first element of the current linked list
Node<E> first = h.next;
// Head pointer next Point to your
h.next = h; // help GC
// The head pointer points to the first element
head = first;
// Get the value of the first element
E x = first.item;
// Leave the value of the first element empty
first.item = null;
// Returns the value of the first element
return x;
}
3.2.4、 Lock and release two locks

When two locks need to be locked at the same time , Encapsulate the sequence of lock and release into methods , Make sure that everything is consistent . Moreover, the lock is not responding to the interrupt , Until the lock is successful , This prevents the first lock from being locked successfully , And the second lock failed to lock, resulting in the risk that the lock will not be released .

1
2
3
4
5
6
7
8
9
10
// lock putLock and takeLock
void fullyLock() {
putLock.lock();
takeLock.lock();
}
// And fullyLock The order of locking is the opposite , Unlock first takeLock, Unlock again putLock
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

3.3、 Source code interpretation

A brief introduction LinkedBlockingQueue in API Source code , Such as the construction method , newly added , obtain , Delete ,drainTo.

3.3.1、 Constructors

LinkedBlockingQueue There are three construction methods , Among them, the nonparametric structure should be used as little as possible , Because the capacity is Integer The maximum of , Improper operation will cause memory overflow .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
// Parameter checking
if (capacity <= 0) throw new IllegalArgumentException();
// Set capacity
this.capacity = capacity;
// The first and last nodes point to an empty node
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
// obtain putLock
final ReentrantLock putLock = this.putLock;
// lock
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
3.3.2、offer(E e)

Set the given element to the queue , If the setting is successful, return to true, Otherwise return to false. e The value of cannot be empty , Otherwise, a null pointer exception is thrown .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// If you can insert the specified element to the end of the queue immediately without exceeding the queue capacity , Return to... After success true, If the queue is full , return false. When using a queue with limited capacity , This method is usually better than method BlockingQueue#add preferable , The latter can only insert elements by throwing exceptions .
public boolean offer(E e) {
// Judge not empty
if (e == null) throw new NullPointerException();
// Counter
final AtomicInteger count = this.count;
// If the queue is full , Direct return insert failure
if (count.get() == capacity)
return false;
int c = -1;
// The new node
Node<E> node = new Node<E>(e);
// Get the insert lock
final ReentrantLock putLock = this.putLock;
// lock
putLock.lock();
try {
// If the queue is not full
if (count.get() < capacity) {
// Insert queue
enqueue(node);
// Count
c = count.getAndIncrement();
// There's still room
if (c + 1 < capacity)
// Wake up insert thread
notFull.signal();
}
} finally {
// Unlock
putLock.unlock();
}
// If the queue is empty
if (c == 0)
// Notification get thread block
signalNotEmpty();
// Return success or insert failure
return c >= 0;
}
3.3.3、put(E e)

Set the element to the queue , If there is no extra space in the queue , This method will always block , Until there's extra space in the queue .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void put(E e) throws InterruptedException {
// You can't insert empty elements
if (e == null) throw new NullPointerException();

// all put/take/etc The conventions in are all preset local var
// Unless set , Otherwise, keeping the count negative means failure .
int c = -1;
// The new node
Node<E> node = new Node<E>(e);
// obtain putLock
final ReentrantLock putLock = this.putLock;
// Get counter
final AtomicInteger count = this.count;
// Interruptible locking , In other words, the interrupt state is not handled during lock acquisition , Instead, it throws an interrupt exception directly , Interrupts are handled by upper callers .
putLock.lockInterruptibly();
try {
/*
* Be careful count stay wait The guard thread uses , Even if it's not protected by a lock .
* This is because count It can only be reduced at this time ( All the others put All locked off ),
* If it changes from capacity , We ( Or some other waiting put) Will receive a signal .
* Similarly ,count The same is true for all other uses in other waiting guard threads .
*/
// As long as the current queue is full
while (count.get() == capacity) {
// Notify the insert thread to wait
notFull.await();
}
// Insert queue
enqueue(node);
// Quantity plus 1
c = count.getAndIncrement();
// If the queue increases 1 The elements are not yet full
if (c + 1 < capacity)
// Wake up the insertion process
notFull.signal();
} finally {
// Unlock
putLock.unlock();
}
// If there are no more elements in the queue
if (c == 0)
// Notify the fetch thread to wait
signalNotEmpty();
}
3.3.4、peek()

Non blocking gets the first element in the queue , Not out of line .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public E peek() {
// The queue is empty , Go straight back to
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// Get first element , Non sentinels
Node<E> first = head.next;
// Element is empty , return null
if (first == null)
return null;
else
// Returns the first element value
return first.item;
} finally {
takeLock.unlock();
}
}
3.3.5、poll()

Non blocking access to values in the queue , Return not obtained null.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E poll() {
final AtomicInteger count = this.count;
// The queue is empty , Go straight back to
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// The queue is not empty , Get the elements in the queue
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
3.3.6、remove(Object o)

Removes the specified value from the queue . Lock both locks .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public boolean remove(Object o) {
// I won't support it null
if (o == null) return false;
// Lock two locks
fullyLock();
try {
// Iteration queue
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
// adopt equals Method matches the element to be deleted
if (o.equals(p.item)) {
// remove p node
unlink(p, trail);
// success
return true;
}
}
// Failure
return false;
} finally {
// Unlock
fullyUnlock();
}
}
// Put the internal nodes p Disconnect from the previous trace
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
//p The node content is empty
p.item = null;
//trail Node next Point to p Of next
trail.next = p.next;
// If p It's the end of the team
if (last == p)
//trail To the end of the team
last = trail;
// If the queue is full
if (count.getAndDecrement() == capacity)
// Notification insert thread blocking
notFull.signal();
}
3.3.7、clear()

Clear queue .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Atomically removes all elements from the queue . When this call returns , The queue will be empty .
public void clear() {
// lock
fullyLock();
try {
// Empty data , Help with garbage collection
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
// If the capacity is 0
if (count.getAndSet(0) == capacity)
// Wake up insert thread
notFull.signal();
} finally {
// Unlock
fullyUnlock();
}
}
3.3.8、drainTo(Collection c)

Put the median in the queue , Remove all , Set concurrency to a given set .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public int drainTo(Collection<? super E> c, int maxElements) {
// All kinds of judgments
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
// lock
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// Get the quantity to transfer
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
// Assemble the assembly
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}

tencent.jpg

版权声明
本文为[Clamhub's blog]所创,转载请带上原文链接,感谢