当前位置:网站首页>AQS 之 ReentrantReadWriteLock 源码分析
AQS 之 ReentrantReadWriteLock 源码分析
2022-06-09 05:11:00 【smartjiang-java】
文章目录
1:ReentrantReadWriteLock 基础知识
1:读锁不支持条件变量,写锁支持
2:重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
3:重入时降级支持:即持有写锁的情况下允许去获取读锁(获取写锁前必须释放读锁)
4:拥有两把锁,共用 AQS 的 state ,不同的是写锁状态占了 state 的低16位,而读锁使用的是 state 的高16位
2:读写锁原理
2.1:thread-0 加写锁:成功
thread-0 调用 WriteLock 的 lock() 方法
public void lock() {
// 进入 acquire(1) 方法
sync.acquire(1);
}
进入 acquire(1) 方法
public final void acquire(int arg) {
// 进入 tryAcquire(1) 方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
进入 tryAcquire(1) 方法
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
//从 state 取到表示写锁的部分:低16 位
int w = exclusiveCount(c);
// 这里是已经加锁的:但是不能判断是加了读锁还是写锁。显然我们不进入这个 if 语句
if (c != 0) {
// w 等于0 表示加了读锁,现在还想加写锁,返回 false
// w 不等于 0 表示加了写锁,在判断加锁线程是否是当前线程,不是,返回 false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果写锁 +1 超过了 65546,抛出异常,说明重入锁的可重入次数有限制
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// state 加 1 ,并更新
setState(c + acquires);
return true;
}
// writerShouldBlock():如果是非公平锁,返回 false;公平锁,且等待队列中存在元素,返回 true
// writerShouldBlock()返回了 false,,尝试用 cas 去获取写锁,低16 位+1 ,和整体加 1 没区别
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
2.2:thread-0 加读锁:成功
我们假设 thread-0 获取写锁成功,再次获取读锁,调用 ReadLock.lock ()
public void lock() {
// 进入到 acquireShared(1) 方法
sync.acquireShared(1);
}
进入到 acquireShared(1) 方法
public final void acquireShared(int arg) {
// 进入 tryAcquireShared(1) 方法
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
进入 tryAcquireShared(1) 方法
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 判断写锁部分是不是 0,而且获得锁的线程是不是当前线程,thread-0 不走这里
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 表示读锁的高 16 位
int r = sharedCount(c);
// 判断读锁是否应该阻塞,且 读锁没有超过数值范围 ,尝试给读锁 +1 ,其实是加了 65536,返回1,加锁成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果读锁标志是 0,将当前线程设置成 第一个获得读锁的线程,计数设置成 1
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 表示读锁的可重入,计数加1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 最后一个线程获取读锁的保持次数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
上面返回 1,表示加读锁成功。
2.3:thread-1 加读锁:失败
假设此时 来了 thread-1,尝试加读锁。
调用 ReadLock.lock ()
public void lock() {
// 进入到 acquireShared(1) 方法
sync.acquireShared(1);
}
进入到 acquireShared(1) 方法
public final void acquireShared(int arg) {
// 进入 tryAcquireShared(1) 方法
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
进入 tryAcquireShared(1) 方法
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 判断写锁部分是不是 0,而且获得锁的线程是不是当前线程,返回 -1 表示获取失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
thread-1 加读锁失败,回到 acquireShared(1) 方法
public final void acquireShared(int arg) {
// tryAcquireShared(1) 返回 -1,进入 doAcquireShared(1) 方法
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
进入 doAcquireShared(1) 方法
private void doAcquireShared(int arg) {
// 添加节点作为尾节点,和 ReentrantLock 类似,只不过节点类型是 Node.SHARED 而非 Node.EXCLUSIVE 模式
// 这里如果是首个入队元素,会添加一个亚元节点作为头节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 首次进入循环:判断该当前节点是不是除头节点之外的第二个节点
final Node p = node.predecessor();
if (p == head) {
// 是的话再次尝试获取一下读锁,这里肯定是不成功的,返回 -1
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// shouldParkAfterFailedAcquire() :cas 将亚元节点的 waitStatus 变成 -1,返回 false
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
二次循环,进for 循环
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 二次进入循环:判断该当前节点是不是除头节点之外的第二个节点
final Node p = node.predecessor();
if (p == head) {
// 是的话再次尝试获取一下读锁,这里肯定是不成功的,返回 -1
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// shouldParkAfterFailedAcquire() :cas 将亚元节点的 waitStatus已经变成-1了,这次 返回 true
// 执行 parkAndCheckInterrupt() 方法,将当前线程阻塞住。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
thread-1(share) 在等待队列阻塞住了
2.4:thread-2 加写锁:失败
假设此时 来了 thread-2,尝试加写锁,WriteLock.lock()
public void lock() {
// 进入 acquire(1) 方法
sync.acquire(1);
}
进入 acquire(1) 方法
public final void acquire(int arg) {
// 进入 tryAcquire(1) 方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
进入 tryAcquire(1) 方法
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
//从 state 取到表示写锁的部分:低16 位
int w = exclusiveCount(c);
// 这里是已经加锁的:但是不能判断是加了读锁还是写锁。进入这个语句
if (c != 0) {
// w 等于0 表示加了读锁,现在还想加写锁,返回 false
// w 不等于 0 表示加了写锁,在判断加锁线程是否是当前线程,不是,返回 false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
thread -2 返回 false,回到 acquire(1) 方法
public final void acquire(int arg) {
// tryAcquire(1)返回 false,进入 addWaiter(Node.EXCLUSIVE)方法,将当前线程节点加到等待队列尾端,并返回
// 进入 acquireQueued(thread-2,1) 方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
进入 acquireQueued(thread-2,1) 方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 首次循环:获取当前节点的上一个节点
final Node p = node.predecessor();
// 上一个节点是 thread-1,不是头节点,不尝试获取
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 进入 shouldParkAfterFailedAcquire()方法,将 thread-1 的 waitStatus 从 0 变成 -1,返回 false
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
二次循环
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 二次循环:获取当前节点的上一个节点
final Node p = node.predecessor();
// 上一个节点是 thread-1,不是头节点,不尝试获取
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 进入 shouldParkAfterFailedAcquire()方法,将 thread-1 的 waitStatus 是 -1,返回 true
// 进入 parkAndCheckInterrupt() 方法,将当前线程 thread-2 阻塞住。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
thread-2(write) 在等待队列阻塞住了
2.5:解锁:thread-0先解写锁
thread-0 调用 WriteLock.unlock() 方法
public void unlock() {
// 进入 release(1) 方法
sync.release(1);
}
进入 release(1) 方法
public final boolean release(int arg) {
// 进入 tryRelease(1)方法
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
进入 tryRelease(1)方法
protected final boolean tryRelease(int releases) {
// 如果解锁线程不是锁的持有者,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// state 的 低16位 -1 ,和 state -1 一样
int nextc = getState() - releases;
// 判断 写锁的低16 位是否为0 ,是的话将 写锁的持有者置为 null,修改 state 状态,返回 true
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
回到 release(1) 方法
public final boolean release(int arg) {
// tryRelease(1) 返回 true
if (tryRelease(arg)) {
Node h = head;
// 判断等待队列的头节点的状态。进入 unparkSuccessor(h) 方法
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
进入 unparkSuccessor(h) 方法
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// waitStatus<0 .用 cas 修改成 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 找到下一个节点,我们知道是 thread-1
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// thread-1 的 waitStatus 是 -1. 解除 thread-1 的阻塞状态
if (s != null)
LockSupport.unpark(s.thread);
}
2.6:thead-1 加读锁被唤醒,加读锁
线程 thread-1 回到 doAcquireShared (1) 方法,再次循环
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 恢复运行进入循环:判断该当前节点是不是除头节点之外的第二个节点
final Node p = node.predecessor();
if (p == head) {
// 进入 tryAcquireShared(1)方法
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquireShared(1)方法
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 判断写锁的低 16 位不为0,我们知道是 0 ,不满足
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 取到读锁的高 16 位 ,1
int r = sharedCount(c);
// readerShouldBlock () 是否需要阻塞,返回 false,!readerShouldBlock () 就是 true
// cas 将 读锁部分加 1
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// firstReader 是 thread-0,r=1 ,进入这部分
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
// 返回 1
return 1;
}
return fullTryAcquireShared(current);
}
回到 doAcquireShared (1) 方法
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 恢复运行进入循环:判断该当前节点是不是除头节点之外的第二个节点
final Node p = node.predecessor();
if (p == head) {
// 进入 tryAcquireShared(1)方法.,尝试获取读锁,成功,返回1
int r = tryAcquireShared(arg);
if (r >= 0) {
// 进入 setHeadAndPropagate(node,1) 方法
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
进入 setHeadAndPropagate(node,1) 方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 把 thread-1 所在节点置位头节点
setHead(node);
// propagate 表示有共享资源
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 找到 thread-1 后的下一个节点,判断这个节点是加的是不是读锁,如果是继续唤醒下一个
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
被唤醒的请求读锁的线程继续 2.3:请求读锁的线程被唤醒 的一系列操作
2.7:解锁:thead-0 解读锁
thread-0 调用 ReadLock.unlock() 方法
public void unlock() {
// 进入 releaseShared(1) 方法
sync.releaseShared(1);
}
进入 releaseShared(1) 方法
public final boolean releaseShared(int arg) {
// 进入 tryReleaseShared(1) 方法
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
进入 tryReleaseShared(1) 方法
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 读锁的第一个线程是当前线程 thread-0 ,加锁次数是 1,将 读锁第一个线程置为 null
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 进入循环,进入读锁解锁过程,读锁的高16位-1, 因为还有 thread-1 持有读锁,不为 0,返回 false
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
回到 releaseShared(1) 方法
public final boolean releaseShared(int arg) {
// tryReleaseShared(1) 返回 false,流程结束
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
2.8:解锁:thead-1 解读锁
thread-1 调用 ReadLock.unlock() 方法
public void unlock() {
// 进入 releaseShared(1) 方法
sync.releaseShared(1);
}
进入 releaseShared(1) 方法
public final boolean releaseShared(int arg) {
// 进入 tryReleaseShared(1) 方法
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
进入 tryReleaseShared(1) 方法
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 读锁的第一个线程是当前线程 thread-1 ,加锁次数是 1,将 读锁第一个线程置为 null
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 进入循环,进入读锁解锁过程,读锁的高16位-1,为 0,返回 true
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
回到 releaseShared(1) 方法
public final boolean releaseShared(int arg) {
// tryReleaseShared(1) 返回 true,进入 doReleaseShared()方法
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
进入 doReleaseShared()方法
private void doReleaseShared() {
for (;;) {
Node h = head;
// 首次循环:先判断等待队列是是否有除亚元之外的其他节点,有 thread-2
if (h != null && h != tail) {
int ws = h.waitStatus;
// 头节点的 waitStatus 是-1,cas 将 -1 变成 0,进入 unparkSuccessor(h) 方法
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
进入 unparkSuccessor(h) 方法
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// waitStatus<0 .用 cas 修改成 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 找到下一个节点,我们知道是 thread-2,waitStatus 是 0
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// thread-2 的 waitStatus 是 0. 解除thread-2 的阻塞状态
if (s != null)
LockSupport.unpark(s.thread);
}
2.9:thead-2 加写锁被唤醒,加写锁
回到 acquireQueued()方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 唤醒循环
for (;;) {
// thread-2 的前一个节点找到是头节点,尝试去获取写锁,很明显会成功
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 将 thread-2 所在节点设置为 头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
边栏推荐
- Typescript learning [7] advanced types: Union type and cross type
- Three paradigms of database
- MySQL scheduled backup restore
- Example of name call obtained by ID (Tencent IM)
- The 27th issue of product weekly report | members' new interests of black users; CSDN app v5.1.0 release
- Openstack Learning Series 1: openstack introduction, installation and deployment of basic environment
- 2021 national vocational skills competition Liaoning "Cyberspace Security Competition" and its analysis (ultra detailed)
- Differences between tinyint and int
- 1030. 距离顺序排列矩阵单元格●
- oracle网吧设计用程序实现插入更新删除的问题
猜你喜欢

Faster RCNN
![[005] [ESP32开发笔记] ADF基本框架](/img/4a/45a3e467615be4b32531af64549cf2.png)
[005] [ESP32开发笔记] ADF基本框架

Troubleshooting: MySQL containers in alicloud lightweight application servers stop automatically

软键盘出现搜索

The half year revenue of mushroom street was 168million yuan: a year-on-year decrease of 29% and an operating loss of 240million yuan

Faster RCNN

Design owlook network novel recommendation system

记录一次将dmp文件导入oracle数据库(本地导线上),所遇到的问题及解决方法
![[Django学习笔记 - 12]:数据库操作](/img/d8/2c4b6c036532c213477754b6f9758e.png)
[Django学习笔记 - 12]:数据库操作

Listing of Yubang new material on Shenzhen Stock Exchange: market value of 4.7 billion, net profit deducted in the first quarter decreased by 18%
随机推荐
2022年茶艺师(中级)考试题模拟考试题库及模拟考试
Program implementation of inserting, updating and deleting in Oracle Internet cafe design
Pattern recognition big job PCA & Fisher & KNN & kmeans
P1743 Audiophobia
The website is frequently suspended, leading to a decline in ranking
Test question bank and online simulation test for operation certificate of main principals of hazardous chemical business units in 2022
Openstack Learning Series 12: installing CEPH and docking openstack
Openstack Learning Series 12: installing CEPH and docking openstack
2022 "Cyberspace Security" event module B of Jiangxi secondary vocational group - SQL injection test
Typescript learning [8] enumeration type
Record the problems encountered in importing the DMP file into Oracle database (on the local wire) and the solutions
Camtasia studio2022 activation code serial number
宇邦新材深交所上市:市值47亿 第一季扣非净利降18%
Hengyuan cloud (gpushare)_ Beyond the model of pre training NLP
^25进程与线程
myql报错 Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column
How to understand CTF information security competition
Example of name call obtained by ID (Tencent IM)
Pull down the new project code and make it red
application. Properties mysql. cj. jdbc. Driver red