当前位置:网站首页>03 多线程与高并发 - ReentrantLock 源码解析
03 多线程与高并发 - ReentrantLock 源码解析
2022-08-04 12:24:00 【小刘说】
文章目录
ReentrantLock 介绍
- ReentrantLock 是互斥锁,跟 synchronized 一样
- ReentrantLock 实现了 Lock 接口。内部类 Sync,FairSync,NonfairSync 继承 AQS。
- lock 锁的使用相对 synchronized 成本更高(需要开发者手动解锁)
- synchronized 是非公平锁,lock 是公平+非公平锁
- lock 功能更加完善,提供 tryLock() 指定等待锁的时间,lockInterruptibly() 允许线程在获取锁的期间被中断 等方法
- synchronized 基于对象实现,lock 锁基于 AQS+CAS 实现
- 几乎没有竞争用 synchronized,竞争比较激烈用 lock 锁(synchronized只有锁升级,当升级到重量级锁后,无法降级到轻量级、偏向锁)
// 简单使用
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
lock.lock();
try{
// 业务代码
}finally{
lock.unlock();
}
}
AQS
AQS 就是一个并发包的基础组件,用来实现各种锁,各种同步组件的。它包含了state变量、加锁线程、等待队列等并发中的核心组件。
AQS中的双向链表是基于内部类 Node 在维护,Node中包含prev,next,thread属性,并且在AQS中还有三个关键属性,分别是head,tail,state
大白话聊聊Java并发面试问题之谈谈你对AQS的理解?
//AQS 主要属性
public abstract class AbstractQueuedSynchronizer{
private transient volatile Node head;//头节点
private transient volatile Node tail;//尾节点
private volatile int state;//资源State变量,默认值为0
private static final Unsafe unsafe = Unsafe.getUnsafe();//直接操作内存的unsafe工具包
static final class Node {
volatile int waitStatus;//线程Node节点状态
volatile Node prev;//前节点
volatile Node next;//后节点
volatile Thread thread;//Node节点的线程
}
}
ReentrantLock的 lock() 源码
整体脉络
// 公平锁的sync的lock方法
final void lock() {
acquire(1);
}
// 非公平锁的sync的lock方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire()
tryAcquire 分为公平和非公平,主要是尝试获取锁资源(重入和非重入)
// 非公平锁实现
final boolean nonfairTryAcquire(int acquires) {
// 拿到当前线程
final Thread current = Thread.currentThread();
// 拿到AQS的state
int c = getState();
// 如果state == 0,说明没有线程占用着当前的锁资源
if (c == 0) {
// 没人占用锁资源,我直接抢一波(不管有没有线程在排队)
if (compareAndSetState(0, acquires)) {
// 将当前占用这个互斥锁的线程属性设置为当前线程
setExclusiveOwnerThread(current);
// 返回true,拿锁成功
return true;
}
}
// 当前state != 0,说明有线程占用着锁资源
// 判断拿着锁的线程是不是当前线程(锁重入)
else if (current == getExclusiveOwnerThread()) {
// 将state再次+1
int nextc = c + acquires;
// 锁重入是否超过最大限制
// 01111111 11111111 11111111 11111111 + 1
// 10000000 00000000 00000000 00000000
// 抛出error
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 将值设置给state
setState(nextc);
// 返回true,拿锁成功
return true;
}
return false;
}
// 公平锁实现
protected final boolean tryAcquire(int acquires) {
// 拿到当前线程!
final Thread current = Thread.currentThread();
// 拿到AQS的state
int c = getState();
// 如果state == 0,说明没有线程占用着当前的锁资源
if (c == 0) {
// 判断是否有线程在排队,如果有线程排队,返回true,配上前面的!,那会直接不执行返回最外层的false
if (!hasQueuedPredecessors() &&
// 如果没有线程排队,直接CAS尝试获取锁资源
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter()
在获取锁资源失败后,需要将当前线程封装为Node对象,并且插入到AQS队列的末尾
// 将当前线程封装为Node对象,并且插入到AQS队列的末尾
private Node addWaiter(Node mode) {
// 将当前线程封装为Node对象,mode为null,代表互斥锁
Node node = new Node(Thread.currentThread(), mode);
// pred是tail节点
Node pred = tail;
// 如果pred不为null,有线程正在排队
if (pred != null) {
// 将当前节点的prev,指定tail尾节点
node.prev = pred;
// 以CAS的方式,将当前节点变为tail节点
if (compareAndSetTail(pred, node)) {
// 之前的tail的next指向当前节点
pred.next = node;
return node;
}
}
// 添加的流程为, 自己prev指向pred 、tail变成自己、pred.next指向自己
// 如果上述方式,CAS操作失败,导致加入到AQS末尾失败,如果失败,就基于enq的方式添加到AQS队列
enq(node);
return node;
}
// enq,无论怎样都添加进入
private Node enq(final Node node) {
for (;;) {
// 拿到tail
Node t = tail;
// 如果tail为null,说明当前没有Node在队列中
if (t == null) {
// 创建一个新的Node作为head,并且将tail和head指向一个Node
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 和上述代码一致!
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued()
查看当前排队的Node是否是head的next,如果是,尝试获取锁资源,如果不是或者获取锁资源失败那么就尝试将当前Node的线程挂起
在挂起线程前,需要确认当前节点的上一个节点的状态必须是小于等于0,
如果为1,代表是取消的节点,不能挂起
如果为-1,代表挂起当前线程
如果为-2,-3,需要将状态改为-1之后,才能挂起当前线程
final boolean acquireQueued(final Node node, int arg) {
// 标识。
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 拿到上一个节点
final Node p = node.predecessor();
if (p == head && // 说明当前节点是head的next
tryAcquire(arg)) {
// 竞争锁资源,成功:true,失败:false
// 进来说明拿到锁资源成功
// 将当前节点置位head,thread和prev属性置位null
setHead(node);
// 帮助快速GC
p.next = null;
// 设置获取锁资源成功
failed = false;
// 线程中断
return interrupted;
}
// 如果不是或者获取锁资源失败,尝试将线程挂起
// 第一个事情,当前节点的上一个节点的状态正常!
// 第二个事情,挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
// 通过LockSupport将当前线程挂起
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**这个方法的主要作用是,通过Node的状态来判断,ThreadA竞争锁失败以后是否应该被挂起。 1. 如果ThreadA的pred节点状态为SIGNAL,那就表示可以放心挂起当前线程 2. 通过循环扫描链表把CANCELLED状态的节点移除 3. 修改pred节点的状态为SIGNAL,返回false. 返回false时,也就是不需要挂起,返回true,则需要调用parkAndCheckInterrupt挂起当前线程 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//前置节点的
waitStatus
if (ws == Node.SIGNAL)//如果前置节点为SIGNAL,意味着只需要等待其他前置节点的线程被释放,
return true;//返回true,意味着可以直接放心的挂起了
if (ws > 0) {
//ws大于0,意味着prev节点取消了排队,直接移除这个节点就行
do {
node.prev = pred = pred.prev;
//相当于: pred=pred.prev;
node.prev=pred;
} while (pred.waitStatus > 0); //这里采用循环,从双向列表中移除CANCELLED的节点
pred.next = node;
} else {
//利用cas设置prev节点的状态为SIGNAL(-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
ReentrantLock的 unlock() 源码
release()
// 真正释放锁资源的方法
public final boolean release(int arg) {
// 核心的释放锁资源方法
if (tryRelease(arg)) {
// 释放锁资源释放干净了。 (state == 0)
Node h = head;
// 如果头节点不为null,并且头节点的状态不为0,唤醒排队的线程
if (h != null && h.waitStatus != 0)、
// 唤醒线程
unparkSuccessor(h);
return true;
}
// 释放锁成功,但是state != 0
return false;
}
tryRelease()
// 核心的释放锁资源方法
protected final boolean tryRelease(int releases) {
// 获取state - 1
int c = getState() - releases;
// 如果释放锁的线程不是占用锁的线程,抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否成功的将锁资源释放利索 (state == 0)
boolean free = false;
if (c == 0) {
// 锁资源释放干净。
free = true;
// 将占用锁资源的属性设置为null
setExclusiveOwnerThread(null);
}
// 将state赋值
setState(c);
// 返回true,代表释放干净了
return free;
}
unparkSuccessor()
// 唤醒节点
private void unparkSuccessor(Node node) {
// 拿到头节点状态
int ws = node.waitStatus;
// 如果头节点状态小于0,换为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 拿到当前节点的next
Node s = node.next;
// 如果s == null ,或者s的状态为1
if (s == null || s.waitStatus > 0) {
// next节点不需要唤醒,需要唤醒next的next
s = null;
// 从尾部往前找,找到状态正常的节点。(小于等于0代表正常状态)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 经过循环的获取,如果拿到状态正常的节点,并且不为null
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
为什么唤醒线程时,为啥从尾部往前找,而不是从前往后找?
// 从尾部往前找,找到状态正常的节点。(小于等于0代表正常状态)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
因为在 addWaiter() 操作时
- 将当前 Node 的 prev 指针指向前面的节点
- 然后是将 tail 指向当前 Node
- 将上一个节点的 next 指向当前 Node。
如果从前往后,通过next去找,可能会丢失某个节点,导致这个节点不会被唤醒(可能出现 第三步还未完成,前节点就释放掉了,此时 node.next 为空
)
如果从后往前找,肯定可以找到全部的节点。
边栏推荐
猜你喜欢
随机推荐
活动报名:如何高效应对当下的实时场景需求?
划重点!2022面试必刷461道大厂架构面试真题汇总+面经+简历模板
AI 助力双碳目标:让每一度电都是我们优化的
A Survey of Multi-Label Classification under Supervised and Semi-Supervised Learning
如何治理资源浪费?百度云原生成本优化最佳实践
形态学(膨胀、腐蚀)
树莓派入门
ES 节点2G内存分析
一分钟认识 IndexedDB 数据库,太强大了!
中电资讯 - 一路“标”升,喜迎Q3开门红
DC-DC电源中前馈电容的选择
Hit the interview!The latest interview booklet of Ali Jin, nine silver and ten is stable!
num_workers
String is a reference type
【RISC-V】Trap和Exception
从数学角度和编码角度解释 熵、交叉熵、KL散度
【水一个徽章】
Hands-on Deep Learning_LeNet
matlab串口读写
MySQL - Explain详解