当前位置:网站首页>JUC(三):锁核心类AQS ing
JUC(三):锁核心类AQS ing
2022-08-03 11:36:00 【学到的心态】
一. AbstractQueuedSynchronizer简介
用来构建锁和同步器的框架,使用AQS能简单且高效的构造出大量的构造器,比如ReentrantLock,Semaphore。当然我们也可以利用AQS轻松构建出符合自己需求的同步器。
1. AQS 核心思想
核心思想是,如果被请求的共享资源空闲,则将当前请求的线程设置为有效的工作线程,并锁定资源。如果资源被占用,那就需要一套线程堵塞以及被唤醒时锁分配的机制(CLH),即将线程加入到队列中。
AQS是将线程封装成CLH锁队列的一个节点(Node)来实现锁的分配。
AQS使用一个int成员变量表示同步状态,通过FIFO队列来完成排队,AQS通过CAS完成对状态的修改。
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2. AQS 对资源的共享方式
AQS定义两种资源共享方式
- Exclusive(独占):只有一个线程能执行,如ReentrantLock。又分为公平锁和非公平锁:
- 公平锁:按照队列中的排队顺序去获取锁
- 非公平锁:无视队列顺序,谁抢到就是谁的
- share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。
二. AbstractQueuedSynchronizer数据结构
底层数据结构使用CLH队列(一个虚拟的双向队列:虚拟:即不存在队列实例,仅存在节点之间的关联关系)。
AQS是将(请求资源的)线程封装成CLH锁队列的一个节点来实现锁的分配。
- sync queue 同步队列,使用的是双向链表,其中head节点主要用作后续的调度
- condition queue 不是必须的,是一个单向链表,只有使用condition时才会使用此单向链表。
三. AbstractQueuedSynchronizer源码分析
内部类 - Node类
static final class Node {
// 模式,分为共享与独占
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态
// CANCELLED,值为1,表示当前的线程被取消
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
// 值为0,表示当前节点在sync队列中,等待着获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 结点状态
volatile int waitStatus;
// 前驱和后继
volatile Node prev;
volatile Node next;
// 结点所对应的线程
volatile Thread thread;
// 下一个等待者
Node nextWaiter;
// 结点是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前驱结点,若前驱结点为空,抛出异常
final Node predecessor() throws NullPointerException {
// 保存前驱结点
Node p = prev;
if (p == null) // 前驱结点为空,抛出异常
throw new NullPointerException();
else // 前驱结点不为空,返回
return p;
}
// 无参构造方法
Node() {
// Used to establish initial head or SHARED marker
}
// 构造方法
Node(Thread thread, Node mode) {
// Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 构造方法
Node(Thread thread, int waitStatus) {
// Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
内部类 - conditionObject类
此类实现了Condition接口,Condition接口定义了条件操作规范,具体如下
public interface Condition {
// 等待,当前线程在接到信号或被中断之前一直处于等待状态
void await() throws InterruptedException;
// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
void awaitUninterruptibly();
//等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
void signal();
// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
void signalAll();
}
类的属性
属性中包含了头节点head,尾结点tail,状态state、自旋时间spinForTimeoutThreshold,还有AbstractQueuedSynchronizer抽象的属性在内存中的偏移地址,通过该偏移地址,可以获取和设置该属性的值,同时还包括一个静态初始化块,用于加载内存偏移地址。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
// 头/尾 节点
private transient volatile Node head;
private transient volatile Node tail;
// 状态
private volatile int state;
// 自旋时间
static final long spinForTimeoutThreshold = 1000L;
// Unsafe类实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
/** * 内存地址 */
// state内存偏移地址
private static final long stateOffset;
// head内存偏移地址
private static final long headOffset;
// state内存偏移地址
private static final long tailOffset;
// tail内存偏移地址
private static final long waitStatusOffset;
// next内存偏移地址
private static final long nextOffset;
}
类的核心方法 - acquire方法
该方法以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。
若tryAcquire失败,则调用addWaiter方法,将调用此方法的线程封装成为一个结点并放入Sync queue。
调用acquireQueued方法,此方法完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。
首先分析addWaiter方法
private Node addWaiter(Node mode) {
// 新生成一个结点,默认为独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 保存尾结点
Node pred = tail;
if (pred != null) {
// 尾结点不为空,即已经被初始化
// 将node结点的prev域连接到尾结点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 比较pred是否为尾结点,是则将尾结点设置为node
// 设置尾结点的next域为node
pred.next = node;
return node; // 返回新生成的结点
}
}
enq(node); // 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列
return node;
}
//enq方法会使用无限循环来确保节点的成功插入。
private Node enq(final Node node) {
for (;;) {
// 无限循环,确保结点能够成功入队列
// 保存尾结点
Node t = tail;
if (t == null) {
// 尾结点为空,即还没被初始化
if (compareAndSetHead(new Node())) // 头节点为空,并设置头节点为新生成的结点
tail = head; // 头节点与尾结点都指向同一个新生结点
} else {
// 尾结点不为空,即已经被初始化过
// 将node结点的prev域连接到尾结点
node.prev = t;
if (compareAndSetTail(t, node)) {
// 比较结点t是否为尾结点,若是则将尾结点设置为node
// 设置尾结点的next域为node
t.next = node;
return t; // 返回尾结点
}
}
}
}
现在,分析acquireQueue方法。其源码如下
// sync队列中的结点在独占且忽略中断的模式下获取(资源)
final boolean acquireQueued(final Node node, int arg) {
// 标志
boolean failed = true;
try {
// 中断标志
boolean interrupted = false;
for (;;) {
// 无限循环
// 获取node节点的前驱结点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 前驱为头节点并且成功获得锁
setHead(node); // 设置头节点
p.next = null; // help GC
failed = false; // 设置标志
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先获取当前节点的前驱节点,如果前驱节点是头节点并且能够获取(资源),代表该当前节点能够占有锁,设置头节点为当前节点,返回。否则,调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法,首先,我们看shouldParkAfterFailedAcquire方法,代码如下
// 当获取(资源)失败后,检查并且更新结点状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱结点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 状态为SIGNAL,为-1
/* * This node has already set status asking a release * to signal it, so it can safely park. */
// 可以进行park操作
return true;
if (ws > 0) {
// 表示状态为CANCELLED,为1
/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点
// 赋值pred结点的next域
pred.next = node;
} else {
// 为PROPAGATE -3 或者是0 表示无状态,(为CONDITION -2时,表示此节点在condition queue中)
/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
// 比较并设置前驱结点的状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 不能进行park操作
return false;
}
只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。再看parkAndCheckInterrupt方法,源码如下
// 进行park操作并且返回该线程是否被中断
private final boolean parkAndCheckInterrupt() {
// 在许可可用之前禁用当前线程,并且设置了blocker
LockSupport.park(this);
return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位
}
parkAndCheckInterrupt方法里的逻辑是首先执行park操作,即禁用当前线程,然后返回该线程是否已经被中断。再看final块中的cancelAcquire方法,其源码如下:
// 取消继续获取(资源)
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
// node为空,返回
if (node == null)
return;
// 设置node结点的thread为空
node.thread = null;
// Skip cancelled predecessors
// 保存node的前驱结点
Node pred = node.prev;
while (pred.waitStatus > 0) // 找到node前驱结点中第一个状态小于0的结点,即不为CANCELLED状态的结点
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 获取pred结点的下一个结点
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 设置node结点的状态为CANCELLED
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
// node结点为尾结点,则设置尾结点为pred结点
// 比较并设置pred结点的next节点为null
compareAndSetNext(pred, predNext, null);
} else {
// node结点不为尾结点,或者比较设置不成功
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// (pred结点不为头节点,并且pred结点的状态为SIGNAL)或者
// pred结点状态小于等于0,并且比较并设置等待状态为SIGNAL成功,并且pred结点所封装的线程不为空
// 保存结点的后继
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 后继不为空并且后继的状态小于等于0
compareAndSetNext(pred, predNext, next); // 比较并设置pred.next = next;
} else {
unparkSuccessor(node); // 释放node的前一个结点
}
node.next = node; // help GC
}
}
该方法完成的功能就是取消当前线程对资源的获取,即设置该结点的状态为CANCELLED,接着我们再看unparkSuccessor方法,源码如下
// 释放后继结点
private void unparkSuccessor(Node node) {
/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
// 获取node结点的等待状态
int ws = node.waitStatus;
if (ws < 0) // 状态值小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
// 比较并且设置结点等待状态,设置为0
compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
// 获取node节点的下一个结点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 下一个结点为空或者下一个节点的等待状态大于0,即为CANCELLED
// s赋值为空
s = null;
// 从尾结点开始从后往前开始遍历
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) // 找到等待状态小于等于0的结点,找到最前的状态小于等于0的结点
// 保存结点
s = t;
}
if (s != null) // 该结点不为为空,释放许可
LockSupport.unpark(s.thread);
}
该方法的作用就是为了释放node节点的后继结点。
对于cancelAcquire与unparkSuccessor方法,如下示意图可以清晰的表示
类的核心方法 - release方法
以独占模式释放对象,其源码如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 释放成功
// 保存头节点
Node h = head;
if (h != null && h.waitStatus != 0) // 头节点不为空并且头节点状态不为0
unparkSuccessor(h); //释放头节点的后继结点
return true;
}
return false;
}
AbstractQueuedSynchronizer总结
对于AbstractQueuedSynchronizer的分析,最核心的就是sync queue的分析。
- 每一个结点都是由前一个结点唤醒
- 当结点发现前驱结点是head并且尝试获取成功,则会轮到该线程运行。
- condition queue中的结点向sync queue中转移是通过signal操作完成的。
- 当结点的状态为SIGNAL时,表示后面的结点需要运行
参考:https://pdai.tech/md/java/thread/java-thread-x-lock-AbstractQueuedSynchronizer.html
边栏推荐
- Dva.js 新手入门指南
- 优维低代码:Provider 构件
- LeetCode 899 有序队列[字典序] HERODING的LeetCode之路
- 永寿 永寿农特产品-苹果
- This article takes you to understand the principle of CDN technology
- 浅谈SVN备份
- [Bubble sort and odd-even sorting]
- Programmers architecture practice way: software architecture basic concepts and thinking
- [论文阅读] (23)恶意代码作者溯源(去匿名化)经典论文阅读:二进制和源代码对比
- Simple implementation of a high-performance clone of Redis using .NET (1)
猜你喜欢
asdn涨薪技术之apifox+Jenkins如何玩转接口自动化测试
FR9811S6 SOT-23-6 23V, 2A Synchronous Step-Down DC/DC Converter
Matlab学习13-图像处理之可视化GUI程序
微信小程序获取用户手机号码
Cross-chain bridge protocol Nomad suffers hacker attack, losing more than $150 million
【一起学Rust 基础篇】Rust基础——变量和数据类型
字节最爱问的智力题,你会几道?
GET 和 POST 有什么区别?
零信任架构分析【扬帆】
LeetCode 899 有序队列[字典序] HERODING的LeetCode之路
随机推荐
What is the ERC20 token standard?
[Detailed explanation of binary search plus recursive writing method] with all the code
【一起学Rust】Rust包管理工具Cargo初步了解
MySQL - 2059 - Authentication plugin ‘caching_sha2_password‘ cannot be loaded
Programmers architecture practice way: software architecture basic concepts and thinking
Matlab学习13-图像处理之可视化GUI程序
【JS 逆向百例】某网站加速乐 Cookie 混淆逆向详解
What is the relationship between The Matrix and 6G?
一文带你弄懂 CDN 技术的原理
用于发票处理的 DocuWare,摆脱纸张和数据输入的束缚,自动处理所有收到的发票
深度学习中数据到底要不要归一化?实测数据来说明!
Matlab学习11-图像处理之图像变换
OFDM 十六讲 4 -What is a Cyclic Prefix in OFDM
Cross-chain bridge protocol Nomad suffers hacker attack, losing more than $150 million
什么是Weex
从零开始Blazor Server(6)--基于策略的权限验证
LP流动性挖矿DAPP系统开发丨流动性挖矿功能原理及说明
进程内存
数据库一席谈:打造开源的数据生态,支撑产业数字化浪潮
【TypeScript】为什么要选择 TypeScript?