当前位置:网站首页>AQS explanation of concurrent programming
AQS explanation of concurrent programming
2022-06-11 00:36:00 【xujingyiss】
Java The core of concurrent programming is java.util.concurrent package ( abbreviation juc).
The author of this whole package is :doug lea
What is? AQS
AQS yes AbstractQueuedSynchronizer For short .AbstractQueuedSynchronizer Is an abstract class .
Although I won't use this class directly , But this class is Java The underlying implementation of many concurrency tools . stay juc in , Commonly used classes ( for example ReentrantLock、CountDownLatch、Semaphore), There is a... Maintained internally Sync attribute ( synchronizer ), and Sync Is an inheritance AbstractQueuedSynchronizer The inner class of .

AQS Two resource sharing methods are defined :
- Exclusive- Monopoly , Only one thread can execute , Such as ReentrantLock
- Share- share , Multiple threads can execute at the same time , Such as Semaphore/CountDownLatch
node Node And key attributes
AQS As defined in Node
AQS Internal maintenance of a two-way linked list , The elements of this two-way linked list are Node Express .
Node It is defined in AbstractQueuedSynchronizer in :
static final class Node {
// Tag node not shared mode
static final Node SHARED = new Node();
// Mark nodes in exclusive mode
static final Node EXCLUSIVE = null;
// The thread waiting in the synchronization queue timed out or was interrupted , Need to cancel waiting from synchronization queue
static final int CANCELLED = 1;
/**
* The thread of the successor node is waiting , If the current node releases the synchronization state or is cancelled ,
* Subsequent nodes will be notified , Enables subsequent nodes to run threads .
*/
static final int SIGNAL = -1;
/**
* The node is in the waiting queue , The thread of the node is waiting in Condition On , When other threads Condition Called signal() After the method ,
* The node moves from the waiting queue to the synchronization queue , Add to get synchronization state
*/
static final int CONDITION = -2;
// Indicates that the next shared synchronization state acquisition will be unconditionally propagated
static final int PROPAGATE = -3;
/**
* Flag the semaphore status of the current node (1,0,-1,-2,-3)5 States
* Use CAS Change state ,volatile Ensure thread visibility , High concurrency scenarios , After being modified by a thread , The state will immediately be visible to other threads .
*/
volatile int waitStatus;
// Precursor node , The current node is set to join the synchronization queue
volatile Node prev;
// The subsequent nodes
volatile Node next;
// Node synchronization state thread
volatile Thread thread;
/**
* Wait for the successor node in the queue , If the current node is shared , So this field is a SHARED Constant ,
* That is to say, node type ( Exclusive and shared ) Share the same field with subsequent nodes in the waiting queue .
*/
Node nextWaiter;
// If the node is shared , return true
final boolean isShared() {
return nextWaiter == SHARED;
}
// Return to the precursor node
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}AQS Key attributes
// Point to the head node of the synchronization waiting queue
private transient volatile Node head;
// Point to the tail node of the synchronization waiting queue
private transient volatile Node tail;
// Synchronize resource status . For example, in ReentrantLock in ,state Identify the number of locks
private volatile int state;Fair lock & Not fair lock
Fair lock

Fair lock , If there is already a thread waiting , Then the thread will directly enter the waiting queue . Except for the first thread in the waiting queue , All threads will not block , need CPU To wake up .
Only when no thread is currently waiting , To get the lock :

The advantage is that all threads can get resources ; The disadvantage is that there is only the first thread in the queue , Other threads will block ,cpu The cost of waking up blocked threads can be very high .
Not fair lock

Unfair lock , When a thread attempts to acquire a lock , Even if there are other threads in the waiting queue , The thread will also try to acquire the lock directly (compareAndSetState).
You can also see through the code , When locking, you can go directly through cas Attempt to acquire lock :

The advantage of unfair lock is , If the thread acquires the lock directly , You don't have to add it to the waiting thread , Sure Reduce CPU Wake up thread overhead , So the performance is higher than that of fair lock ; Shortcomings just as the name suggests , Unfair ! It may cause that the thread in the middle of the queue can't get the lock all the time or for a long time .
In the code is :FairSync( Fair lock ) And NonfairSync( Not fair lock ),ReentrantLock The default is " Not fair lock ".
stay ReentrantLock There is a constructor that can pass in whether it is a fair lock :

Reentrant lock & Do not reenter the lock
Reentrant means that a lock can be locked repeatedly , Otherwise, you cannot re-enter .
Reentrant lock
Reentrant lock refers to : In threads , When a thread acquires an object lock , This thread can get the lock on this object again ( Other threads are not allowed ).
ReentrantLock It's a typical reentry lock . Let's take an example :

state Used to store the number of locks . Lock every time ,state It's worth adding 1; Every unlock ,state Value reduction 1;
Do not reenter the lock
When a thread acquires an object lock , This thread cannot acquire the lock again , The lock must be released before it can be acquired again .

JDK There is no implementation of non reentrant locks in .
Waiting in line
CLH Waiting in line , It's a Double linked list , It's based on Node Class . Every time a new node is added , Will be added to the end of the queue .

When a thread comes in , When waiting , Will join in CLH Waiting in the queue . The implementation is to create a Node object , Add to the double linked list .
Look at it in code , It's through Node Of prev and next Quantity connected .

Condition queue
Some threads wait for a condition together (Condition), Only when the condition is met , These wait threads will wake up , So we can fight for the lock again .
The conditional queue is also based on Node Class , But with CLH The waiting queue is different , It's a One way linked list .

Conditional queue ,Node Of waitStatus = Node.CONDITION = -2. And Node Connection between , adopt nextWaiter, instead of prev and next.
Before operating the condition queue, you need to acquire the exclusive lock successfully , After successfully acquiring the exclusive lock , If the current conditions are not met , Is suspended on the condition queue of the current lock , At the same time, release the lock resource currently acquired , Reawakening after the condition is met .
BlockingQueue It is implemented through the condition queue .
No more details , Condition queue at AQS in , Relatively independent of other functions , Write a separate article later .
Exclusive way
The exclusive way is : A thread gets the lock , Other threads can no longer take the lock .
adopt Node.EXCLUSIVE Indicates exclusive mode

AbstractOwnableSynchronizer Inherited the abstract class AbstractOwnableSynchronizer

In this class , Defines a variable exclusiveOwnerThread, It means that in exclusive mode , Which thread holds the synchronizer .

1) Get the lock
With ReentrantLock For example , It is exclusive mode .
First, the first thread passes through CAS Get the lock , Then set the exclusiveOwnerThread Is the current thread :

2) Join the blocking queue
When another thread wants to acquire a lock , If you find that the lock has been held by another thread , You cannot lock successfully :

If locking fails , Normally, it will be added to the waiting list and blocked .

addWaiter Method to create Node Node and put it at the end of the waiting queue :
private Node addWaiter(Node mode) {
// Build the current thread to Node type
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// At present tail Whether the node is null?
if (pred != null) {
node.prev = pred;
// CAS Insert the node into the end of the synchronization queue , then return Current node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// At present tail Node is empty , Indicates that the waiting queue is empty . At this point, create a waiting queue , And will Node Add to the end of the waiting queue
enq(node);
return node;
}acquireQueued Method is used when a lock is not acquired , Block the current thread , Before blocking, you will still try to acquire the lock again :
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// If prev Node is head node , Then try to acquire the lock again
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (
/**
* Judge whether the lock acquisition fails , Do you need to block
* The internal will be set to SIGNAL state , So it will return soon true, Then you can block the thread
* So we won't let this loop run all the time CPU
*/
shouldParkAfterFailedAcquire(p, node) &&
// Block the current thread
parkAndCheckInterrupt()
)
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}Take an example :
public class ExclusiveTest {
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
lock.lock();
System.out.println(" Threads 1 Locking success ");
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
System.out.println(" Threads 1 Unlock complete ");
});
Thread t2 = new Thread(() -> {
lock.lock();
System.out.println(" Threads 2 Locking success ");
lock.unlock();
System.out.println(" Threads 2 Unlock complete ");
});
t1.start();
Thread.sleep(500);
t2.start();
System.out.println("==============");
}
}Start two threads , Threads t1(Thread-0) Acquire the lock , But it has not been released . Threads 2(Thread-1) Trying to get the lock again will fail , Will be added to the waiting list .

3) After the thread holding the lock releases the lock , Wake up the thread in the waiting queue
When the thread holding the lock releases the lock , Will wake up the thread in the waiting queue .
When calling unlock() Method to release the lock ,
/**
* Release the lock held by exclusive mode
*/
public final boolean release(int arg) {
// tryRelease(arg):arg = 1, Release the lock once ,state - 1
if (tryRelease(arg)) {
Node h = head;
// If head The node is in SIGNAL state (waitStatus = -1), It indicates that there are threads that need to be awakened
if (h != null && h.waitStatus != 0)
// Wake up the subsequent (next) Threads in nodes
unparkSuccessor(h);
return true;
}
return false;
}First , The thread holding the lock attempts to release the lock (tryRelease), Inside it will put (state-1), That is, release “ once ” lock , Only when state Reduced to 0, This thread will finally release the lock , return true, Otherwise return to false
protected final boolean tryRelease(int releases) {
// state reduce 1
int c = getState() - releases;
// If the lock is not held by the current thread , Throw an exception
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// If state It has been reduced to 0 了 , This indicates that this thread has completely released the lock
free = true;
// Because the current thread has released the lock , therefore exclusiveOwnerThread empty
setExclusiveOwnerThread(null);
}
// Set up the latest state value
setState(c);
return free;
}If the thread successfully releases the lock , Then look at head Whether the node is in SIGNAL state (waitStatus=-1), Then wake up the thread in the waiting queue (head Successor node ), The concrete realization is unparkSuccessor() In the method :
/**
* Wakes up the corresponding thread of the node . Under normal circumstances , What wakes up is head Thread in the successor node of
*/
private void unparkSuccessor(Node node) {
// this node yes head node
int ws = node.waitStatus;
if (ws < 0)
// Will wait for status waitStatus Set to initial value 0. In fact, the most common node in the waiting queue waitStatus Namely 0
compareAndSetWaitStatus(node, ws, 0);
// head Successor node
Node s = node.next;
// If the subsequent node is empty , Or the status is expired (CANCEL=1)
if (s == null || s.waitStatus > 0) {
s = null;
// Traverse forward from the back tail to find the first node in the normal blocking state to wake up
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// Call local (native) Method wake up thread
LockSupport.unpark(s.thread);
}After the thread is awakened , You can get the lock .
How to share
The sharing method is : Support multiple threads to acquire locks at the same time , Access shared resources .
With Semaphore For example ( The default is to create an unfair lock ).Semaphore The function of is to limit current . stay Semaphore in ,state Indicates the total number of resources .
For example, only... Are allowed at the same time 2 A thread can access resources , that state The beginning is 2.
for instance :
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 1; i < 6; i++) {
Thread thread = new Thread(() -> {
try {
semaphore.acquire();
System.out.println(" Threads " + Thread.currentThread().getName()
+ " Locking success ==>" + System.currentTimeMillis());
Thread.sleep(5000);
semaphore.release();
System.out.println(" Threads " + Thread.currentThread().getName()
+ " Unlock complete ==>" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, " Threads " + i);
thread.start();
}
}
}rise 5 Threads , Sleep in thread 5 second , It can be seen that there can only be 2 Threads get locks ,5 Seconds later , After a thread releases the lock , Other threads can continue to acquire locks . But there can only be 2 Threads can acquire locks at the same time .

Next, let's take a look at how it is implemented in the source code .
1) Get the lock
If you can get the lock , Just go back to normal . Don't throw exceptions , It doesn't block .
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// Attempt to acquire the Shared lock , If tryAcquireShared return >=0, Then it means that it is successful
if (tryAcquireShared(arg) < 0)
// If you don't get the lock , Normally, the thread should be added to the waiting queue
doAcquireSharedInterruptibly(arg);
}Determine whether the shared lock can be obtained :
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// state, The initial value is passed in the constructor permits, That is, the number of threads supported at the same time
int available = getState();
// Number of executable threads remaining = state - 1
int remaining = available - acquires;
// If remaining<0, It means that the locks have been occupied . otherwise cas modify state value
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}2) If the lock cannot be acquired , Add the current thread to the wait queue , And block
If you don't get the lock ( Back to remaining Less than 0), Generally, it will be put in the waiting queue , And block the current thread .
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// establish Node node , And join the waiting queue
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// prev node
final Node p = node.predecessor();
if (p == head) {
// If at present Node Of prev Node is head, Then try again to get the lock
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (
/**
* Judge whether the lock acquisition fails , Do you need to block
* The internal will be set to SIGNAL state , So it will return soon true, Then you can block the thread
* So we won't let this loop run all the time CPU
*/
shouldParkAfterFailedAcquire(p, node) &&
// call UNSAFE Methods , Block thread
parkAndCheckInterrupt()
)
throw new InterruptedException();
}
} finally {
if (failed)
// Cancel acquisition lock
cancelAcquire(node);
}
}addWaiter Method to create Node Node and put it at the end of the waiting queue . The source code is explained in exclusive mode .
enq Method is used when the wait queue is empty , Create a waiting queue , And will Node Add to the end of the waiting queue :
private Node enq(final Node node) {
// Dead cycle , Ensure that the node is successfully added to the end of the wait list , Create a list without a list
for (;;) {
Node t = tail;
if (t == null) {
// The queue is empty and needs to be initialized , Create an empty header node head And tail nodes tail
if (compareAndSetHead(new Node()))
tail = head;
} else {
// After the queue is not empty , Put the present Node Add to the end of the list
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}shouldParkAfterFailedAcquire Method is used to judge whether the lock acquisition fails , Do you need to block :
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// if prev The state of the node is SIGNAL, This means that the current node can be safely park
return true;
if (ws > 0) {
// If prev The node status is canceled (waitStatus=1), Then move out of the queue
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// If prev The node is in other states , Set it to SIGNAL state , It can be safely park
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}shouldParkAfterFailedAcquire Method is internally set to SIGNAL state , So it will return soon true. And then it calls parkAndCheckInterrupt() Blocking the current thread . So it doesn't always spin CPU!
3) Thread release lock
When a thread ends using a common resource , Will release the lock .state The value will be added back .
First, check whether it can be released once (releases=1) Shared lock , If you can ,state First plus 1, Back again true:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// Get current state value
int current = getState();
// releases = 1,
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
// adopt cas Way update state,state += 1
if (compareAndSetState(current, next))
return true;
}
}tryReleaseShared return true, Indicates that the lock can be released , Start waking up the thread in the waiting queue :
/**
* Set the current node to SIGNAL perhaps PROPAGATE
* Wake up the head.next(B node ),B After the node wakes up, it can compete for locks , After success head->B.next, And then it wakes up B.next, Repeat until the shared nodes wake up
* head The node status is SIGNAL, Reset head.waitStatus->0, Wake up the head Node thread , After wake-up, threads compete for shared locks
* head The node status is 0, take head.waitStatus->Node.PROPAGATE Communication status , Indicates that the state needs to be propagated to the next node
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// head yes SIGNAL state , Note that the following node threads can be awakened
if (ws == Node.SIGNAL) {
/**
* head Status is SIGNAL, Reset head node waitStatus by 0, It's not directly set to Node.PROPAGATE,
* Because unparkSuccessor(h) in , If ws<0 Will be set to 0, therefore ws Set to 0, Set it to Node.PROPAGATE
*/
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// waitStatus Set up 0 Failure words , Recycle
continue;
/**
* call native Method wake up head.next Node thread , The awakened thread will compete for the lock
* After success head Will point to the next node , That is to say head There is a change
*/
unparkSuccessor(h);
}
/**
* If head Node waitStatus Is in reset state (waitStatus==0) Of , Set it to “ spread ” state .
* It means that the state needs to be propagated to the next node
*/
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// If head Changed , Recycle , Keep waking up head The next node of
if (h == head)
break;
}
}Be careful , It wakes up all the node threads in the waiting queue , Then they go back and compete for the lock .
边栏推荐
猜你喜欢

【无标题】4555

Bluetooth development (6) -- literacy of Bluetooth protocol architecture

Njuptn Nanyou Discrete Mathematics_ Experiment 4

Detailed decomposition of the shortest path problem in Figure
![[network planning] 2.5 brief introduction to P2P architecture](/img/a8/74a1b44ce4d8b0b1a85043a091a91d.jpg)
[network planning] 2.5 brief introduction to P2P architecture

【JVM】垃圾回收机制
![[network planning] 1.3 packet switching and circuit switching in the network core](/img/a8/74a1b44ce4d8b0b1a85043a091a91d.jpg)
[network planning] 1.3 packet switching and circuit switching in the network core

Bluetooth development (3) -- look at the air bag

Njupt Nanyou Discrete Mathematics_ Experiment 3

How word inserts a guide (dot before page number) into a table of contents
随机推荐
Dual wing layout
微信小程序实现OCR扫描识别
[JVM] memory model
电脑录屏免费软件gif等格式视频
Exemple VTK - - trois plans qui se croisent
Unity自定义文件夹图标颜色 个性化Unity编译器
array_ column() expects parameter 1 to be array, array given
three hundred and thirty-three thousand three hundred and thirty-three
市值215亿,这个四川80后会让电视机成为历史?
博文推荐|构建 IoT 应用——FLiP 技术栈简介
Blog recommendation | building IOT applications -- Introduction to flip technology stack
[network planning] 1.3 packet switching and circuit switching in the network core
String time sorting, sorting time format strings
Shengteng AI development experience based on target detection and identification of Huawei cloud ECS [Huawei cloud to jianzhiyuan]
Multipass中文文档-教程
763. 划分字母区间
Philips coo will be assigned to solve the dual crisis of "supply chain and product recall" in the face of crisis due to personnel change
Test it first
Room第一次使用
Review of software architecture in Harbin Institute of technology -- LSP principle, covariance and inversion
