Preface
The previous section looked at bounded blocking queues based on data ArrayBlockingQueue Source code , Through reading the source code, we know that in ArrayBlockingQueue Both in and out queue operations are used ReentrantLock To ensure thread safety . Let's look at another bounded blocking queue :LinkedBlockingQueue.
official account :『 Liu Zhihang 』, Record the skills in work study 、 Development and source notes ; From time to time to share some of the life experience . You are welcome to guide !
Introduce
A link node based , Optional binding BlockingQueue Blocking queues .
The element FIFO( fifo ) Sort . The head of the queue is the element that has been in the queue for the longest time . At the end of the queue are the elements that appear in the queue for the shortest time . Insert the new element at the end of the queue , And retrieve the queue operation to get the elements at the beginning of the queue .
Join table based queues usually have higher throughput than array based queues , But most concurrent applications have poor predictability .
Basic use
public class LinkedBlockingQueueTest {
private static final LinkedBlockingQueue<String> QUEUE = new LinkedBlockingQueue<>(10);
public static void main(String[] args) throws InterruptedException {
// Queue entry
QUEUE.put("put Queue entry , When the queue is full, it will block waiting ");
QUEUE.add("add Queue entry , If the queue is full, an exception will be thrown ");
QUEUE.offer("offer Queue entry , When the queue is full, it will return false");
// Outgoing queue
// Queue empty return null
String poll = QUEUE.poll();
// Empty queues block waiting
String take = QUEUE.take();
// Just look at the first elements in the queue
String peek = QUEUE.peek();
}
}
Question question
- LinkedBlockingQueue What is the implementation principle of ?
- LinkedBlockingQueue and ArrayBlockingQueue What's the difference ?
Source code analysis
The basic structure
Parameter Introduction
static class Node<E> {
E item;
/**
* One of:
* - The real successor node
* - Valuable , The successor is head.next
* - null, There is no successor ( This is the last node )
*/
Node<E> next;
Node(E x) { item = x; }
}
First, in the LinkedBlockingQueue There is a static inner class in Node
/** Initial capacity , without , Then for Integer.MAX_VALUE */
private final int capacity;
/** The current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Chain head
* What remains unchanged is : head.item == null
*/
transient Node<E> head;
/**
* Chain tail
* What remains unchanged is : last.next == null
*/
private transient Node<E> last;
/** perform take, poll And so on, you need to get takeLock */
private final ReentrantLock takeLock = new ReentrantLock();
/** Waiting for execution take Thread of operation , It will be put into this conditional queue */
private final Condition notEmpty = takeLock.newCondition();
/** perform put, offer And so on, you need to get putLock */
private final ReentrantLock putLock = new ReentrantLock();
/** Waiting for execution put Thread of operation , Will be put into the condition queue */
private final Condition notFull = putLock.newCondition();
Constructors
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// Specify capacity when creating
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
You can see from the constructor that , Initializing LinkedBlockingQueue when , If the capacity is not passed in, it is specified by default Integer.MAX_VALUE.
Additive elements
add Method is the parent of a direct call AbstractQueue Methods , Internally invoked LinkedBlockingQueue Self realized offer Method
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
The main reading is still LinkedBlockingQueue Of put and offer Method :
public void put(E e) throws InterruptedException {
// Insert the element
if (e == null) throw new NullPointerException();
// Note: all put / take / etc The Convention in is to default local variables
// Keeping the count negative means failure , Unless it's set .
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// If you've reached the maximum capacity , Is waiting for
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
// Increase the total number , The previous capacity is returned
c = count.getAndIncrement();
// Determine whether it is necessary to wake up the blocked thread in the queue
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
// Wake up because call notEmpty Of await Method and the blocked thread
signalNotEmpty();
}
public boolean offer(E e) {
// Throw an exception for null
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// If you've reached the maximum capacity , return false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
You can see from the above two codes that put and offer The biggest difference is whether it's blocked or not . put Method when the queue reaches the specified capacity , It will block , Wait for an element to come out of the queue . and offer Method will return directly false.
At the same time, both method operation elements are called in the queue enqueue(node) Method , Let's take a look at enqueue Method .
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
stay enqueue In the method , Specify the current tail node directly next For the element passed in .
Get elements
public E poll() {
final AtomicInteger count = this.count;
// Empty queue returns null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
// Lock
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
// Reduce the queue element count , It returns the old value
c = count.getAndDecrement();
if (c > 1)
// The old value is greater than 1 , That is, the current situation is greater than 0
// Wake up call notEmpty.await Waiting thread
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
// If the old value is equal to capacity Indicates that there is currently an empty position
signalNotFull();
return x;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// Block waiting
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
As you can see from the code above poll and take The logic of the method is roughly the same . The difference is the processing logic when the current queue is empty .poll Returns when the current queue is empty null,take Will block waiting , Know that there are elements in the current queue .
poll and take Try it out dequeue() Method to get the element from the queue .
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
dequeue() Method logic is to get the header node , And will head Point to next node .
Check out the elements
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
peek() The method is simple , Direct access to head The element value of .
summary
Q&A
Q: LinkedBlockingQueue Implementation principle of ?
A: LinkedBlockingQueue It is based on linked list , For internal use ReentrantLock The mutex , Prevent conflicting issues with placing elements or removing elements concurrently .
- take、poll、peek And so on to get elements from the queue takeLock lock .
- add、put、offer Add elements to the queue putLock lock .
- notEmpty and notFull yes Condition type , stay take and put In operation , If the queue is empty or full , The corresponding await Put the thread in the conditional queue .
Q: What is the difference between the in queue and the out of queue method ?
Method | effect |
---|---|
add | Additive elements , The queue is full , Add failure throw exception |
offer | Additive elements , The queue is full , Add failure , return false |
put | Additive elements , The queue is full , Block waiting |
poll | Pop up elements , If the queue is empty, return null |
take | Pop up elements , If the queue is empty, wait for elements in the queue |
peek | Look at the earliest element placed in the queue |
Conclusion
LinkedBlockingQueue Use and ArrayBlockingQueue There's no difference , Internal implementations are all used ReentrantLock, You can read in contrast to . meanwhile Condition This also needs to focus on understanding .