PrefaceJUC The following related source code continue to read , This shows a non blocking unbounded thread safe queue —— ConcurrentLinkedQueue, Let's have a look .
official account :『 Liu Zhihang 』, Record the skills in work study 、 Development and source notes ; From time to time to share some of the life experience . You are welcome to guide !
Introduce
Unbounded thread safe queue based on linked nodes , The element FIFO( fifo ) Sort . The head of the queue is the longest element in the queue , At the end of the queue is the element with the shortest time in the queue . Insert a new element at the end of the queue , The queue retrieval operation gets the elements of the queue header .
When many threads share access to a common collection ConcurrentLinkedQueue It's a good choice . Like most other concurrent collection implementations , This class is not allowed to use null Elements .
Basic use
public class ConcurrentLinkedQueueTest {
public static void main(String[] args) {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
// Inserts the specified element at the end of this queue .
queue.add("liuzhihang");
// Inserts the specified element at the end of this queue .
queue.offer("liuzhihang");
// Get but do not remove the header of this queue , Empty queue returns null.
queue.peek();
// Get and remove the header of this queue , This queue is empty and returns null.
queue.poll();
}
}
Source code analysis
The basic structure
Parameter Introduction
private static class Node<E> {
// The elements in the node
volatile E item;
// Next node
volatile Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
// CAS Set the node element in the way of
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// Set the next node
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// CAS Set the next node in the way of
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Omit ……
}
stay ConcurrentLinkedQueue Inner contains an inner class Node, As shown above , This inner class is used to identify a node in a linked list , You can see from the code that , stay ConcurrentLinkedQueue The linked list in is One way linked list
.
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
// Other omitted
// Head node
private transient volatile Node<E> head;
// Tail node
private transient volatile Node<E> tail;
}
Head and tail nodes use volatile
modification , Ensure memory visibility .
Constructors
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
When you create an object , The head and tail nodes point to an empty node .
Additive elements
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
// Verify that it is empty
checkNotNull(e);
// Create nodes
final Node<E> newNode = new Node<E>(e);
// Loop into the queue
// t Is the current tail node ,p For the initial t
for (Node<E> t = tail, p = t;;) {
// q Is the next node of the tail node
Node<E> q = p.next;
if (q == null) {
// It's empty , There is no node after it , be CAS Set tail node
if (p.casNext(null, newNode)) {
// here p.next yes newNode
// If p != t There is concurrency
if (p != t)
// Other threads have been updated tail
// q = p.next therefore q == null It's not true
// q Here we are t.next
// At this time will be tail Updated to New node
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
// multithreading , poll , Operation to remove elements , May lead to p == q
// At this point, search again
else if (p == q)
//
p = (t != (t = tail)) ? t : head;
else
// Check tail And update the
p = (p != t && t != (t = tail)) ? t : q;
}
}
Drawing description :
- In the case of single thread :
- When executed
Node<E> q = p.next;
when , The current situation is shown in the figure :
- Judge
q == null
, Meet the conditions , At this point, it will executep.casNext(null, newNode)
Use CAS Set up p.next. - After setting successfully ,
p == t
There is no change , So the program exits .
- multithreading :
- When executed
Node<E> q = p.next;
when , The current situation is shown in the figure :
- Multiple threads execute
p.casNext(null, newNode)
Use CAS Set up p.next. - A Threads CAS Set up the success :
- B Threads CAS Execution failure , Recycle , Will perform to the
p = (p != t && t != (t = tail)) ? t : q
.
- Loop again and you can set it up successfully .
Get elements
public E poll() {
restartFromHead:
// Infinite loop
for (;;) {
for (Node<E> h = head, p = h, q;;) {
// The first node iterm
E item = p.item;
// If the current node is not null CAS Set to null
if (item != null && p.casItem(item, null)) {
// CAS success Then the mark is removed
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// The current queue is not empty return null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// Self quoted , Repeat the cycle
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
The drawing process is as follows :
- When executing the inner loop , If the queue is empty :
E item = p.item;
here ,iterm by null, MeetingupdateHead(h, p)
And back to null. - Suppose there are concurrent inserts at the same time , Added an element , Now, as shown in the figure :
At this point, the final else take p = q
- Continue the loop to get item, And implement
p.casItem(item, null)
, And then determinep != h
, to update head And back to item.
The situation here is more complicated , Here is just a list of , If you need to, you can list a few more .
The code of viewing element is similar to that of getting element code .
size operation
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
CAS No locks , therefore size It's not accurate . also size I'll go through the list , It costs a lot of performance .
summary
ConcurrentLinkedQueue It's relatively less used at work , So when reading the relevant source code, I just looked at it , Learn about common API, And the underlying principles .
A simple summary is to use One way linked list To save the queue elements , Internal use of non blocking CAS Algorithm , No locks . So the calculation size It may not be accurate , Again size Will traverse the list , So it is not recommended to use .