当前位置:网站首页>Multithreading and high concurrency III: AQS underlying source code analysis and implementation classes
Multithreading and high concurrency III: AQS underlying source code analysis and implementation classes
2022-06-28 04:14:00 【smartjiang-java】
List of articles
I said CAS and volatile, This article mainly introduces CAS and volatile Of AQS.
1:AQS
1.1AQS Introduce
AQS yes AbstractQueuedSynchronizer For short .
There is a shared resource variable state:AQS state It's a volatile logo Int Plastic number , Support visibility under multithreading . Take different meanings according to subclasses .
for example :
ReentranLock At the bottom state If it is 1, Indicates that the current thread has acquired the lock ; from 1 Rise to 2, Indicates that a reentrant lock is added , Once again ; If state yes 0, Indicates that the lock has been released .
CountDownLatch At the bottom state Indicate need countdown The number of times .
Follow state And there's another FIFO Waiting in line , It's a two-way list , A node represents a thread , With front node , Back node . Multithreading competition state Blocked will enter this queue , The way of implementation is CAS. When a node in the waiting queue gets state Value , It is equivalent to that the thread in the node gets the lock . It explains why AQS=CAS+Volatile
1.2AQS Source code analysis
You can read the specific source code by yourself , But reading the source code is hard , Generally, if you can't run, don't read , Just solve the problem ( Purpose ), A clue to the end , Skip irrelevant details , Generally do not read static , General dynamic reading , Read the source code first, read the framework, etc . Of course , It is also a good way to refer to some blogs on the Internet , for example :
https://www.jianshu.com/p/0f876ead2846 - Talking about Java Of AQS
Here is my answer to AQS Some knowledge of ( Only exclusive access to resources is involved ), How to get the lock :
// Enter into ReentrantLock The inner class of NonfairSync Medium lock() Method
// Use CAS To get state resources , If set successfully 1, representative state Resource acquisition lock succeeded ,
// At this point, record the current occupation state The thread of setExclusiveOwnerThread(Thread.currentThread());
// If CAS Set up state by 1 Failure ( Represents a lock acquisition failure ), execute acquire(1) Method , Try it again
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// Here's how to get the lock acquire(1) Method
// First call tryAcquire Try to get state, If it works , Then skip the next step . If you fail , execute //acquireQueued Add threads to CLH Waiting in the queue
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
First try to get the lock , Get into tryAcquire(1) Method , Enter into
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
Keep following , Enter into
This code shows that there are two main ways to obtain locks
1:state by 0 when , Represents that the lock has been released , You can get , So using CAS To regain lock resources , If successful , On behalf of the success of the competition lock , Use setExclusiveOwnerThread(current) Record the thread holding the lock at this time , See here CAS, It should be easy to understand why the current implementation is unfair , Because both the thread in the queue and the new thread can CAS Get the lock , New threads don't need to queue
2: If state Not for 0, Represents that a thread has occupied a lock before , If the thread at this time is still the thread that occupied the lock before (current == getExclusiveOwnerThread() by true), Represents that the thread once again holds the lock ( Reentrant lock ), Update at this time state, Record the number of times the lock has been occupied ( Number of lock re entrances ), there setState Methods don't need to use CAS to update , Because the lock is occupied by the current thread , Other threads have no chance to enter this code execution . So update at this point state It's thread safe
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// If c be equal to 0, Indicates that the resource is idle at this time ( The lock is released ), Reuse CAS Get the lock
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// This condition indicates that a thread has previously acquired a lock , And the thread gets the lock again , The number of times to obtain resources is increased 1,
// This also proves that ReentrantLock For re entrant locks
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
If tryAcquire(arg) Execution failure , Represents a lock acquisition failure , execute acquireQueued Method , Add threads to FIFO Waiting in line .
First of all to see addWaiter(Node.EXCLUSIVE) Will contain the Node Join the team , Node.EXCLUSIVE Represents that this node is in exclusive mode
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Get tail node , If the tail node is not empty , use CAS Queue the thread that failed to acquire the lock
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// If the node is empty , perform enq Method
enq(node);
return node;
}
track enq() Method , First judgement tail Is it empty , If it is empty, it means FIFO Queued head,tail It's not built yet , At this point, first build the head node , After building, use CAS Queue this thread node in the same way . head Nodes are virtual nodes , It only means that there are threads occupying state, As for occupation state Which thread is , In fact, it is called the above setExclusiveOwnerThread(current) , It is recorded in exclusiveOwnerThread In the attribute .
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// The tail node is empty , explain FIFO Queue not initialized , So first initialize its head node
if (t == null) {
// Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// The tail node is not empty , Will wait for the thread to queue
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
Here comes the question , How the thread in the waiting queue acquires the lock ,AQS The way to do this is to let the threads spin to compete for locks . Spin once or twice to compete without blocking the lock so as to wait for the front node to release the lock before waking it up . In addition, if the lock is interrupted during spin , Or spin timeout , Should be in 「 Cancel 」 state .
First of all, let's take a look at Node The properties of the node define
static final class Node {
// The identity waiting node is in shared mode
static final Node SHARED = new Node();
// Identifies that the waiting node is in exclusive mode
static final Node EXCLUSIVE = null;
// Due to timeout or interruption , The node has been canceled
static final int CANCELLED = 1;
// Node blocking (park) Must be in its precursor node as SIGNAL It can only be carried out in the state of , If the node is SIGNAL,
// After the lock is released or cancelled , Can pass unpark Wake up the next node ,
static final int SIGNAL = -1;
// Indicates that the thread is waiting for a condition variable ( Get the lock first , Join the conditional wait queue , Then release the lock , Wait for the condition variable to satisfy the condition ;
// Only after the lock is reacquired can it be returned )
static final int CONDITION = -2;
// Indicates that subsequent nodes will propagate wake-up operations , It works in shared mode
static final int PROPAGATE = -3;
// Wait state : about condition node , Initialize to CONDITION; Other circumstances , The default is 0, adopt CAS Operation atomic update
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
// Through the definition of state , We can guess AQS Processing of thread spin : If the previous node of the current node is not head,
// And its state is SIGNAL, Then the node enters the blocking state .
Look at the waiting queue trying to acquire the lock
final boolean acquireQueued(final Node node, int arg) {
// Failed to identify whether to get resources
boolean failed = true;
try {
// Identifies whether the current thread has been interrupted
boolean interrupted = false;
// Spin operation
for (;;) {
// Get the predecessor node of the current node
final Node p = node.predecessor();
// If the predecessor node is the head node , It means that you are in line immediately , Try spinning to acquire the lock , If successful
if (p == head && tryAcquire(arg)) {
// take head The node points to the current node , primary head Node out
setHead(node);
// This indicates that the predecessor node has released resources , Put it next empty , To facilitate the virtual machine to recycle the previous node
p.next = null; // help GC
// Identify the success of obtaining resources
failed = false;
// Return interrupt flag
return interrupted;
}
// If the predecessor node is not the head node , Or failed to get resources ,
// You have to go through shouldParkAfterFailedAcquire function
// Determine whether the thread held by the node needs to be blocked
// if shouldParkAfterFailedAcquire The function returns true,
// Then continue parkAndCheckInterrupt() function ,
// Block the thread and check if it can be interrupted , If returns true, Will interrupted The sign is placed on true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// If the thread spin fails to acquire the lock and finally acquire the resource due to exceptions and other reasons , Then the current node gives up acquiring resources
if (failed)
cancelAcquire(node);
}
}
see shouldParkAfterFailedAcquire() Method
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 1. If the state of the leading vertex is SIGNAL, Indicates that the current node can be blocked
return true;
if (ws > 0) {
// 2. Remove the node in the cancelled state
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 3. If the front node's ws Not for 0, Then it is set to SIGNAL,
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
How to block threads , call parkAndCheckInterrupt() Method
private final boolean parkAndCheckInterrupt() {
// Blocking threads
LockSupport.park(this);
// Returns whether the thread was interrupted , And clear the interrupt state ( An interrupt is made up after the lock is acquired )
return Thread.interrupted();
}
But why judge whether the thread has been interrupted , Because if a thread receives an interrupt during a block , Wake up the ( Turn to running state ) After acquiring a lock (acquireQueued by true) We need to make up for an interruption
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// If it's the thread that was wakened by the interrupt , After getting the lock, you need to make up the interrupt
selfInterrupt();
}
thus , The process of obtaining lock has been analyzed , But there's another puzzle that we haven't solved yet : I'm not saying Node If the status is cancelled, will it be cancelled , that Node When will it be set to cancel .
To look back cancelAcquire() Method
private void cancelAcquire(Node node) {
// If the node is empty , Go straight back to
if (node == null)
return;
node.thread = null;
// The next step shows that node Of pre Point to the first node in the previous non cancel state
//( That is to skip all nodes in cancelled state ),waitStatus > 0 Indicates that the current node status is cancelled
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// Get filtered pre Of next node , This step is mainly used in the following CAS Set up pre Of next Node
Node predNext = pred.next;
// Set the current node to cancel
node.waitStatus = Node.CANCELLED;
// If the current cancellation node is the tail node , Use CAS The tail node is set as its precursor node , If the setting is successful ,
// Then the tail node of next The pointer is set to null
if (node == tail && compareAndSetTail(node, pred)) {
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// This step is a bit of a detour , We think about , If the current node is cancelled , Do you want to point the precursor node of the current node to
// The successor node of the current node , But we said before , To wake up or block a node , The state of the precursor node must be
//SIGNAL It can only be operated under certain conditions , So in setting pre Of next Make sure that pre The state of the node is SIGNAL
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// If pre by head, perhaps pre State settings for SIGNAL Failure , Then wake up the following nodes to compete for locks ,
// We said before , SIGNAL The node of ( Or release the lock ) After that, you can wake up the following nodes
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
Released :
Whether it's fair lock or unfair lock , It's all in tune in the end AQS The following template method is used to release the lock
public final boolean release(int arg) {
// Whether the lock is released successfully
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// What should I do when the lock is released successfully , Obviously after the awakening head After the node , Let it compete for locks
//1: If h == null, There are two possibilities , One is a thread competing for locks , Now it's released ,
// Of course, there is no so-called wake-up successor node , One is that other threads are running competitive locks , It's just that the head node hasn't been initialized yet ,
// Now that other threads are running , There is no need to perform a wake-up operation
//2: If h != null And h.waitStatus == 0, explain head The successor node of is spin competing for lock ,
// That is to say, the thread is running , No need to wake up .
//3: If h != null And h.waitStatus < 0, here waitStatus The value could be SIGNAL,
// or PROPAGATE, These two cases show that the subsequent nodes need to be awakened due to blocking
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease Methods are defined in AQS Subclasses of Sync In the method
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// Only the thread holding the lock can release the lock , So if the current lock is not the thread holding the lock , Throw exception
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// All the locks held by the thread are released , Need to release exclusiveOwnerThread Hold thread of
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
Take a look at the wake-up method unparkSuccessor
private void unparkSuccessor(Node node) {
// obtain head Of waitStatus( Suppose it is SIGNAL), And use CAS Set it to 0, Why do you want to do this step , We've analyzed it many times before , Actually waitStatus = SIGNAL(< -1) or PROPAGATE(-·3) It's just a sign , In this state , Subsequent nodes can wake up , Now that the successor node is waking up , Nature can reset it to 0, Of course, if it fails, it will not affect its wake-up successor nodes
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// The following operation is to get the first node in the queue that is not cancelled , And wake it up
Node s = node.next;
// s The state is not empty , Or it's cancelled , explain s Is an invalid node , It needs to be executed at this time if Logic in
if (s == null || s.waitStatus > 0) {
s = null;
// The following operation is to get the last node not cancelled from the tail forward
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
Why should we look for the first node in the queue that is not in cancel state from the back to the front , Because node queuing is not atomic , as follows
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Thread spin is executed first node.pre = pred, And then execute pred.next = node, If unparkSuccessor Just in between , I can't find head The following nodes of , as follows 
1.3: How to use it AQS Customize a mutex
AQS By providing state And FIFO Queue management , It provides us with a set of general methods to realize the underlying lock , Basically define a lock , It's all defined inside AQS Subclasses of , call AQS It's implemented by the underlying method of , because AQS At the bottom, we've defined these fetches state And FIFO Queue management , It's easier for us to implement a lock , We can base it on AQS To implement a non reentrant mutex , As shown below
public class Mutex {
private Sync sync = new Sync();
public void lock () {
sync.acquire(1);
}
public void unlock () {
sync.release(1);
}
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire (int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease (int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively () {
return getState() == 1;
}
}
}
2:AQS Some implementation classes of
2.1:Reentranlock
Reentrant lock , An exclusive lock :
synchronized Alternative methods :Lock lock=new ReentrantLock(); lock.lock(); lock.unlock();Synchronized It's automatically unlocked , but Reentranlock But it is not unlocked manually , To be in finally Unlock in block . meanwhile : The parameters that can be passed are true, Fair lock , Execute in waiting order , Will check whether the waiting queue is empty . can To carry on tryLock,synchronized If you can't get the lock, you will directly enter the blocking state. It's OK lockinterupptibly( Be interrupted ).
2.2:CountDownLatch
The countdown bolt : Wait for how many threads to end before starting execution ,latch.wait(); latch.countdown();
2.3:CycilcBarier
fence , The number of threads before starting the departure
2.4:Phase
Execute the thread in different stages
2.5:ReadWriteLock
Read-write lock , Shared lock and exclusive lock . Read the lock , Allow to read , No Allow writing ; Write lock , Reading and writing are not allowed .
2.6:Semaphore
Semaphore , Current limiting , Permission must be obtained first , How many threads are allowed to run simultaneously , Such as driveways and toll booths
2.7:Exchanger
Exchange data , As soon as the thread executes exchange Methods block , Will value T1 preservation ; Thread two executes exchange Method , Will value T2 preservation , Get in the jam ;T1 And T2 Exchange value , The two threads continue to run down .
2.8:LockSupport
Lock support ,LockSupport.park(); Is the current thread blocking LockSupport.unpark(); Unseal , Keep the thread running ,onpack Can now pack Previous call , Used in place of wait and notify, More flexible , Can wake up a specific thread . The bottom layer is unsafe Of park Method .
Be careful : This article only represents the personal views of novice bloggers , If something is wrong or there is a better idea through the technology , Welcome to leave a message , Sharing and communication make us progress , thank you .
边栏推荐
- GenICam GenTL 标准 ver1.5(2)
- PostgreSQL implements batch update, deletion and insertion
- 光伏板怎么申请ASTM E108阻燃测试?
- [small program practice series] e-commerce platform source code and function implementation
- 04 summary of various query operations and aggregation operations of mongodb
- 基于arm5718的Shell脚本参数传递的2种方法
- @Transactional失效的几种场景
- 27年,微软IE结束了!
- Reverse a stack with recursive functions and stack operations only
- 02 MongoDB数据类型、重要概念以及shell常用指令
猜你喜欢

【小程序实战系列】电商平台源码及功能实现

From zero to one, I will teach you to build a "search by text and map" search service (I)

Reading notes of top performance version 2 (II) -- CPU monitoring

Design a stack with getmin function

In the era of video explosion, who is supporting the high-speed operation of video ecological network?

Problems with cat and dog queues

Introduction to multi project development, basic design class library project use

@Several scenarios of transactional failure

Introduction to SQLSERVER database

抖音實戰~關注博主
随机推荐
What is the process of en 1101 flammability test for curtains?
上线MES系统后,企业发生了这些变化......
基于正点原子stm32的mini板的TFTLCD显示实验
11_刻意练习精讲
02 MongoDB数据类型、重要概念以及shell常用指令
Analyzing the comprehensive application ability of educational robot
Multi project design and development · introduction to class library project
MSc 307 (88) (2010 FTPC code) Part 2 smoke and toxicity test
How to learn a programming language systematically| Dark horse programmer
Detailed explanation of iptables firewall rules and firewalld firewall rules
仅用递归函数和栈操作逆序一个栈
GenICam GenTL 标准 ver1.5(2)
Meichuang data security management platform has obtained the evaluation certificate of "data security product capability verification plan" of the Institute
有关函数模板的那些小知识-.-
Tiktok actual battle ~ take off the blogger
Another option for ERP upgrade, MES system
从零到一,教你搭建「以文搜图」搜索服务(一)
Moonbeam集成Coin98,给予用户在多链道路上的更多选择
A queue of two stacks
[small program practice series] e-commerce platform source code and function implementation