当前位置:网站首页>Linked blocking queue based on linked list

Linked blocking queue based on linked list

2020-11-08 23:46:00 Liu Zhihang

Preface

The previous section looked at bounded blocking queues based on data ArrayBlockingQueue Source code , Through reading the source code, we know that in ArrayBlockingQueue Both in and out queue operations are used ReentrantLock To ensure thread safety . Let's look at another bounded blocking queue :LinkedBlockingQueue.

official account :『 Liu Zhihang 』, Record the skills in work study 、 Development and source notes ; From time to time to share some of the life experience . You are welcome to guide !

Introduce

A link node based , Optional binding BlockingQueue Blocking queues .

The element FIFO( fifo ) Sort . The head of the queue is the element that has been in the queue for the longest time . At the end of the queue are the elements that appear in the queue for the shortest time . Insert the new element at the end of the queue , And retrieve the queue operation to get the elements at the beginning of the queue .

Join table based queues usually have higher throughput than array based queues , But most concurrent applications have poor predictability .

Basic use

public class LinkedBlockingQueueTest {

    private static final LinkedBlockingQueue<String> QUEUE = new LinkedBlockingQueue<>(10);

    public static void main(String[] args) throws InterruptedException {

        //  Queue entry 
        QUEUE.put("put  Queue entry ,  When the queue is full, it will block waiting ");

        QUEUE.add("add  Queue entry ,  If the queue is full, an exception will be thrown ");

        QUEUE.offer("offer  Queue entry ,  When the queue is full, it will return  false");

        //  Outgoing queue 
        //  Queue empty return  null
        String poll = QUEUE.poll();

        //  Empty queues block waiting 
        String take = QUEUE.take();

        //  Just look at the first elements in the queue 
        String peek = QUEUE.peek();

    }

}

Question question

  1. LinkedBlockingQueue What is the implementation principle of ?
  2. LinkedBlockingQueue and ArrayBlockingQueue What's the difference ?

Source code analysis

The basic structure

LinkedBlockingQueue-uml-Ma14n3

Parameter Introduction

static class Node<E> {
    
    E item;
    /**
    * One of:
    * -  The real successor node 
    * -  Valuable , The successor is head.next
    * - null, There is no successor ( This is the last node )
    */
    Node<E> next;

    Node(E x) { item = x; }
}

First, in the LinkedBlockingQueue There is a static inner class in Node Supports generics , Let's look at the other fields :

/**  Initial capacity , without , Then for Integer.MAX_VALUE */
private final int capacity;

/**  The current number of elements  */
private final AtomicInteger count = new AtomicInteger();

/**
*  Chain head 
*  What remains unchanged is : head.item == null
*/
transient Node<E> head;

/**
*  Chain tail 
*  What remains unchanged is : last.next == null
*/
private transient Node<E> last;

/**  perform  take, poll  And so on, you need to get  takeLock */
private final ReentrantLock takeLock = new ReentrantLock();

/**  Waiting for execution  take  Thread of operation , It will be put into this conditional queue  */
private final Condition notEmpty = takeLock.newCondition();

/**  perform  put, offer  And so on, you need to get  putLock */
private final ReentrantLock putLock = new ReentrantLock();

/**  Waiting for execution  put  Thread of operation , Will be put into the condition queue  */
private final Condition notFull = putLock.newCondition();

Constructors

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

//  Specify capacity when creating 
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

You can see from the constructor that , Initializing LinkedBlockingQueue when , If the capacity is not passed in, it is specified by default Integer.MAX_VALUE.

Additive elements

add Method is the parent of a direct call AbstractQueue Methods , Internally invoked LinkedBlockingQueue Self realized offer Method

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

The main reading is still LinkedBlockingQueue Of put and offer Method :

public void put(E e) throws InterruptedException {
    //  Insert the element 
    if (e == null) throw new NullPointerException();
    // Note:  all put / take / etc The Convention in is to default local variables 
    //  Keeping the count negative means failure , Unless it's set .
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
       
        //  If you've reached the maximum capacity , Is waiting for  
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        //  Increase the total number ,  The previous capacity is returned 
        c = count.getAndIncrement();
        //  Determine whether it is necessary to wake up the blocked thread in the queue 
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        //  Wake up because call  notEmpty  Of  await  Method and the blocked thread 
        signalNotEmpty();
}
public boolean offer(E e) {
    //  Throw an exception for null 
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    //  If you've reached the maximum capacity , return  false
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

You can see from the above two codes that put and offer The biggest difference is whether it's blocked or not . put Method when the queue reaches the specified capacity , It will block , Wait for an element to come out of the queue . and offer Method will return directly false.

At the same time, both method operation elements are called in the queue enqueue(node) Method , Let's take a look at enqueue Method .

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

stay enqueue In the method , Specify the current tail node directly next For the element passed in .

Get elements

public E poll() {
    final AtomicInteger count = this.count;
    //  Empty queue returns  null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    //  Lock 
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            //  Reduce the queue element count , It returns the old value 
            c = count.getAndDecrement();
            if (c > 1)
            //  The old value is greater than  1 , That is, the current situation is greater than  0
            //  Wake up call  notEmpty.await  Waiting thread 
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        //  If the old value is equal to  capacity  Indicates that there is currently an empty position 
        signalNotFull();
    return x;
}
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        //  Block waiting 
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

As you can see from the code above poll and take The logic of the method is roughly the same . The difference is the processing logic when the current queue is empty .poll Returns when the current queue is empty null,take Will block waiting , Know that there are elements in the current queue .

poll and take Try it out dequeue() Method to get the element from the queue .

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

dequeue() Method logic is to get the header node , And will head Point to next node .

Check out the elements

public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

peek() The method is simple , Direct access to head The element value of .

summary

Q&A

Q: LinkedBlockingQueue Implementation principle of ?

A: LinkedBlockingQueue It is based on linked list , For internal use ReentrantLock The mutex , Prevent conflicting issues with placing elements or removing elements concurrently .

  1. take、poll、peek And so on to get elements from the queue takeLock lock .
  2. add、put、offer Add elements to the queue putLock lock .
  3. notEmpty and notFull yes Condition type , stay take and put In operation , If the queue is empty or full , The corresponding await Put the thread in the conditional queue .

Q: What is the difference between the in queue and the out of queue method ?

Method effect
add Additive elements , The queue is full , Add failure throw exception
offer Additive elements , The queue is full , Add failure , return false
put Additive elements , The queue is full , Block waiting
poll Pop up elements , If the queue is empty, return null
take Pop up elements , If the queue is empty, wait for elements in the queue
peek Look at the earliest element placed in the queue

Conclusion

LinkedBlockingQueue Use and ArrayBlockingQueue There's no difference , Internal implementations are all used ReentrantLock, You can read in contrast to . meanwhile Condition This also needs to focus on understanding .

版权声明
本文为[Liu Zhihang]所创,转载请带上原文链接,感谢