当前位置:网站首页>LinkedBlockingQueue源码分析-新增和删除

LinkedBlockingQueue源码分析-新增和删除

2022-07-07 21:54:00 InfoQ


LinkedBlockingQueue源码分析-新增和删除

新增

put方法,把e新增到队列的尾部,如果有可以新增的空间的话,直接新增成功,否则当前线程陷入等待,源码:

public void put(E e) throws InterruptedException {
 if (e == null) throw new NullPointerException();
 int c = -1;
 Node<E> node = new Node<E>(e);
 final ReentrantLock putLock = this.putLock;
 final AtomicInteger count = this.count;
 putLock.lockInterruptibly();
 try {
 while (count.get() == capacity) {
 notFull.await();
 }

 enqueue(node);

 c = count.getAndIncrement();

 if (c + 1 < capacity)
 notFull.signal();

 } finally {
 putLock.unlock();
 }
 if (c == 0)
 signalNotEmpty();
}
private void enqueue(Node<E> node) {
 last = last.next = node;
}

 if (e == null) throw new NullPointerException();
 如果 e 为空,则抛出异常

 int c = -1;
 预先设置 c 为 -1,如果负数为则会新增失败

while (count.get() == capacity) { }
队列满了则线程阻塞,等待其他线程的唤醒(其他线程 take 成功后就会唤醒此处被阻塞的线程,await 无限等待

 enqueue(node);
 队列没有满,直接新增到队列的尾部

 c = count.getAndIncrement();
 新增计数赋值

if (c + 1 < capacity)
 如果链表现在的大小 小于链表的容量,说明队列未满,尝试唤醒一个 put 的等待线程

if (c == 0) signalNotEmpty();
c==0,代表队列里面有一个元素,尝试唤醒一个take的等待线程

删除

take方法(阻塞拿数据)源码:

public E take() throws InterruptedException {
 E x;
 int c = -1;
 final AtomicInteger count = this.count;
 final ReentrantLock takeLock = this.takeLock;
 takeLock.lockInterruptibly();
 try {
 while (count.get() == 0) {
 notEmpty.await();
 }
 x = dequeue();
 c = count.getAndDecrement();
 
 if (c > 1)
 notEmpty.signal();
 } finally {
 takeLock.unlock();
 }
 if (c == capacity)
 signalNotFull();
 return x;
}
private E dequeue() {
 Node<E> h = head;
 Node<E> first = h.next;
 h.next = h; // help GC
 head = first;
 E x = first.item;
 first.item = null;
 return x;
}

public E peek() {
 if (count.get() == 0)
 return null;
 final ReentrantLock takeLock = this.takeLock;
 takeLock.lock();
 try {
 Node<E> first = head.next;
 if (first == null)
 return null;
 else
 return first.item;
 } finally {
 takeLock.unlock();
 }
}

private E dequeue() {
 Node<E> h = head;
 Node<E> first = h.next;
 h.next = h; // help GC
 head = first;
 E x = first.item;
 first.item = null;
 return x;
}

int c = -1;
 默认负数,代表失败

final AtomicInteger count = this.count;
 count 代表当前链表数据的真实大小

while (count.get() == 0) { notEmpty.await(); }
 空队列时,阻塞,等待其他线程唤醒

 x = dequeue();
 非空队列,从队列的头部拿一个出来

if (c > 1) notEmpty.signal();
如果队列里面有值,从 take 的等待线程里面唤醒

 if (c == capacity) signalNotFull();
 如果队列空闲还剩下一个,尝试从 put 的等待线程中唤醒
原网站

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://xie.infoq.cn/article/7e48355889f20c31c40ab26e7