当前位置:网站首页>Linkedblockingqueue source code analysis - add and delete

Linkedblockingqueue source code analysis - add and delete

2022-07-07 23:58:00 InfoQ


LinkedBlockingQueue Source code analysis - Add and delete

newly added

put Method , hold e Add to the end of the queue , If there is room to add , Direct addition succeeded , Otherwise, the current thread falls into wait , Source code :

public void put(E e) throws InterruptedException {
 if (e == null) throw new NullPointerException();
 int c = -1;
 Node<E> node = new Node<E>(e);
 final ReentrantLock putLock = this.putLock;
 final AtomicInteger count = this.count;
 putLock.lockInterruptibly();
 try {
 while (count.get() == capacity) {
 notFull.await();
 }

 enqueue(node);

 c = count.getAndIncrement();

 if (c + 1 < capacity)
 notFull.signal();

 } finally {
 putLock.unlock();
 }
 if (c == 0)
 signalNotEmpty();
}
private void enqueue(Node<E> node) {
 last = last.next = node;
}

 if (e == null) throw new NullPointerException();
  If  e  It's empty , Throw an exception

 int c = -1;
  Preset  c  by  -1, If the negative number is, the addition will fail

while (count.get() == capacity) { }
If the queue is full, the thread will block , Wait for other threads to wake up ( Other threads  take  After success, it will wake up the blocked thread ,await  Wait indefinitely

 enqueue(node);
  The queue is not full , Add directly to the end of the queue

 c = count.getAndIncrement();
  Add count assignment

if (c + 1 < capacity)
  If the current size of the list   Less than the capacity of the list , Indicates that the queue is not full , Try to wake up a  put  Waiting thread of

if (c == 0) signalNotEmpty();
c==0, There is an element in the delegate queue , Try to wake up a take Waiting thread of

Delete

take Method ( Get the data ) Source code :

public E take() throws InterruptedException {
 E x;
 int c = -1;
 final AtomicInteger count = this.count;
 final ReentrantLock takeLock = this.takeLock;
 takeLock.lockInterruptibly();
 try {
 while (count.get() == 0) {
 notEmpty.await();
 }
 x = dequeue();
 c = count.getAndDecrement();
 
 if (c > 1)
 notEmpty.signal();
 } finally {
 takeLock.unlock();
 }
 if (c == capacity)
 signalNotFull();
 return x;
}
private E dequeue() {
 Node<E> h = head;
 Node<E> first = h.next;
 h.next = h; // help GC
 head = first;
 E x = first.item;
 first.item = null;
 return x;
}

public E peek() {
 if (count.get() == 0)
 return null;
 final ReentrantLock takeLock = this.takeLock;
 takeLock.lock();
 try {
 Node<E> first = head.next;
 if (first == null)
 return null;
 else
 return first.item;
 } finally {
 takeLock.unlock();
 }
}

private E dequeue() {
 Node<E> h = head;
 Node<E> first = h.next;
 h.next = h; // help GC
 head = first;
 E x = first.item;
 first.item = null;
 return x;
}

int c = -1;
  Default negative number , For failure

final AtomicInteger count = this.count;
 count  Represents the real size of the current linked list data

while (count.get() == 0) { notEmpty.await(); }
  When the queue is empty , Blocking , Wait for another thread to wake up

 x = dequeue();
  Non empty queue , Take one out of the head of the queue

if (c > 1) notEmpty.signal();
If there are values in the queue , from  take  Wake up in the waiting thread

 if (c == capacity) signalNotFull();
  If the queue is idle, there is one left , Try from  put  Wake up in the waiting thread of
原网站

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/188/202207072153473414.html