当前位置:网站首页>Abstractqueuedsynchronizer (AQS) source code detailed analysis - semaphore source code analysis
Abstractqueuedsynchronizer (AQS) source code detailed analysis - semaphore source code analysis
2022-06-21 08:33:00 【*Wucongcong*】
1、 brief introduction
- Semaphore, Semaphore , It holds a series of licenses (permits), Every time you call
acquire()Methods will consume a license , Every time you callrelease()Methods will return a license . - Semaphore Usually used to limit the number of accesses to shared resources at the same time , That is to say, it is often said that current limiting .
- Semaphore Semaphore , Get a peer certification flowchart .

2、 Introductory cases
Case study 1
public class Pool {
/** * The maximum number of threads that can access resources at the same time */
private static final int MAX_AVAILABLE = 100;
/** * Semaphore Express : Available object passes */
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
/** * Shared resources , It can be thought of as items The memory of the array is Connection object Simulation is connection pool */
protected Object[] items = new Object[MAX_AVAILABLE];
/** * Shared resource usage , And items The array corresponds one by one , such as : * items[0] Object is occupied by an external thread , that used[0] == true, otherwise used[0] == false */
protected boolean[] used = new boolean[MAX_AVAILABLE];
/** * Get a free object * If there are no free objects in the current pool , Is waiting for .. Until there are free objects */
public Object getItem() throws InterruptedException {
// Every time you call acquire() Will consume a license (permits)
available.acquire();
return getNextAvailableItem();
}
/** * Return objects to the pool */
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
/** * Get a free object in the pool , If successful, return Object, Failure to return Null * After success, the corresponding used[i] = true */
private synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
/** * Return objects to the pool , Returned successfully true * Return failed : * 1. The object reference does not exist in the pool , return false * 2. The object reference... Exists in the pool , But the current state of the object is idle , Also returned false */
private synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
Case study 2
public class SemaphoreTest02 {
public static void main(String[] args) throws InterruptedException {
// Declare semaphores , Initial license (permits) by 2
// Fair model :fair by true
final Semaphore semaphore = new Semaphore(2, true);
Thread tA = new Thread(() ->{
try {
// Every time you call acquire() Will consume a license (permits)
semaphore.acquire();
System.out.println(" Threads A Obtain the pass successfully ");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}finally {
// Every time you call release() Will return a permit (permits)
semaphore.release();
}
});
tA.start();
// Ensure threads A Has been performed
TimeUnit.MILLISECONDS.sleep(200);
Thread tB = new Thread(() ->{
try {
// call acquire(2) Will consume 2 Permission (permits)
semaphore.acquire(2);
System.out.println(" Threads B Obtain the pass successfully ");
} catch (InterruptedException e) {
}finally {
// call release(2) Will be returned 2 Permission (permits)
semaphore.release(2);
}
});
tB.start();
// Ensure threads B Has been performed
TimeUnit.MILLISECONDS.sleep(200);
Thread tC = new Thread(() ->{
try {
// Every time you call acquire() Will consume a license (permits)
semaphore.acquire();
System.out.println(" Threads C Obtain the pass successfully ");
} catch (InterruptedException e) {
}finally {
// Every time you call release() Will return a permit (permits)
semaphore.release();
}
});
tC.start();
}
}
The results are as follows :
Threads A Obtain the pass successfully
Threads B Obtain the pass successfully
Threads C Obtain the pass successfully
3、 Source code analysis
3.1、 Inner class Sync
- adopt Sync Several implementation methods of , We get the following information :
- The number of licenses is passed in when constructing the method
- The license is stored in the state variable state in
- When trying to get a license , be state Value reduce 1
- When state The value of is 0 When , You can't get permission
- When releasing a license , be state The value of the add 1
- The number of licenses can be changed dynamically
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// Construction method , Number of incoming licenses , Put in state in
Sync(int permits) {
setState(permits);
}
// Number of permits obtained
final int getPermits() {
return getState();
}
// The unfair model tries to get permission
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// Let's see how many licenses there are
int available = getState();
// Minus the licenses that need to be obtained this time, there are still several licenses left
int remaining = available - acquires;
// If the remaining permission is less than 0 Go straight back to
// If the remaining permission is not less than 0, Try atom update state Value , Successfully returned remaining licenses
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// Release license
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// Let's see how many licenses there are
int current = getState();
// Plus the release permit
int next = current + releases;
// Detect overflow
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// If atoms update state The value of success , It means that the license is released successfully , Then return to true
if (compareAndSetState(current, next))
return true;
}
}
// Reduce permission
final void reducePermits(int reductions) {
for (;;) {
// Let's see how many licenses there are
int current = getState();
// Subtract the permission that will be reduced
int next = current - reductions;
// Detect overflow
if (next > current) // underflow
throw new Error("Permit count underflow");
// Atomic updates state Value , Successfully returned true
if (compareAndSetState(current, next))
return;
}
}
// Permission to destroy
final int drainPermits() {
for (;;) {
// Let's see how many licenses there are
int current = getState();
// If 0, Go straight back to
// If not for 0, hold state The atom is updated to 0
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
3.2、 Inner class NonfairSync
In the unfair mode , Call the parent class directly nonfairTryAcquireShared() Try to get a license
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
// Construction method , Call the constructor of the parent class
NonfairSync(int permits) {
super(permits);
}
// Try to get permission , Calling the nonfairTryAcquireShared() Method
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
3.3、 Inner class FairSync
In fair mode , First, check whether there is a queue in front of you , If there is a queue, it fails to get permission , Enter the queue , Otherwise try atom update state Value
Be careful : For the convenience of reading , In this inner class, some AQS The method in is pasted , The method header annotation is marked !
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
// be located AQS in , Methods that can acquire shared locks in response to interrupts
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// Conditions established : Describes the current thread calling acquire The thread of the method is already in the interrupt state , Throw an exception directly ...
if (Thread.interrupted())
throw new InterruptedException();
// Try to get a pass , By reducing state Value ) Return on success >= 0 Value , Acquisition failure , return <0 Value
if (tryAcquireShared(arg) < 0)
// Add the thread that failed to obtain the pass to AQS In the blocking queue
doAcquireSharedInterruptibly(arg);
}
// be located AQS in : Share interrupts
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Will call semaphore.acquire() The thread of the method is wrapped as node Add to AQS In the blocking queue
final Node node = addWaiter(Node.SHARED);
boolean failed = true; // Whether there is an exception
try {
for (;;) {
// Gets the precursor node of the current thread node
final Node p = node.predecessor();
// Conditions established , This indicates that the node corresponding to the current thread is head.next node
if (p == head) {
//head.next The node has the right to acquire the shared lock
int r = tryAcquireShared(arg);
// Only when all the threads in the queue that acquire the shared lock have not been released will they succeed
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// In response to interrupt
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// Try to get a pass ( By reducing state Value ) Return on success >= 0 Value , Acquisition failure , return <0 Value
protected int tryAcquireShared(int acquires) {
// Spin operation
for (;;) {
// Judge the present AQS Whether there is a waiting thread in the blocking queue , If there is a direct return -1, At present acquire The thread of operation needs to enter the queue and wait
if (hasQueuedPredecessors())
return -1;
// So let's go over here , There are several situations ?
// 1. call acquire When ,AQS There are no other waiters in the blocking queue
// 2. The current node in the blocking queue is head.next node ( Reentrant lock )
// obtain state Value ,state Means pass
int available = getState();
// remaining: Indicates that after the current thread obtains the pass ,semaphore The remaining quantity
int remaining = available - acquires;
// Condition one holds :remaining < 0: Indicates that the thread failed to obtain the pass
// Condition 2 : Prerequisite :remaining >= 0, Indicates that the current thread can obtain a pass
// compareAndSetState(available, remaining): establish : This indicates that the current thread has successfully obtained the pass ,CAS Failure , Then spin
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// be located AQS in , Release the shared lock
public final boolean releaseShared(int arg) {
// Conditions established : Indicates that the current thread has released resources successfully , After successfully releasing the resource, wake up the thread that failed to obtain the resource
if (tryReleaseShared(arg)) {
// Wake up the thread that failed to acquire resources
doReleaseShared();
return true;
}
return false;
}
// Try to return the token , The current thread releases resources
protected final boolean tryReleaseShared(int releases) {
// The spin
for (;;) {
// Get current AQS Of state Value
int current = getState();
// Number of new passes obtained
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS In exchange for Number of passes
if (compareAndSetState(current, next))
return true;
}
}
// What kinds of paths will be called to doReleaseShared Methods? ?
//1.latch.countDown() -> AQS.state==0 -> doReleaseShared()
// Wake up the in the current blocking queue head.next The corresponding thread
//2. Awakened thread -> doAcquireSharedInterruptibly parkAndCheckInterrupt() Wake up the
//-> setHeadAndPropagate() -> doReleaseShared()
// Semaphore edition
// Wake up the thread that failed to acquire resources
// Wakes up the waiting thread
private void doReleaseShared() {
for (;;) {
// Gets the header of the queue
Node h = head;
// If the queue has been initialized successfully and the number of nodes in the queue >1
// Conditions 1:h!=null establish , Description: the blocking queue is not empty
// Don't set up :h==null When will it be like this ?
//latch When it's created , No thread has ever called await() Before method , There are thread calls latch.countDown() And trigger the logic of waking up the blocking node
// Conditions 2:h!=tail establish Indicates that the current blocking queue , except head Outside the node , There are other nodes
//h==tail -> head and tail Point to the same node object , When will this happen ?
// Under normal wake-up conditions , Obtain the shared lock in turn , When the current thread reaches this point , This thread is tail node
// The first call await() Method thread and call countDown() And the thread that triggers the wake-up blocking node has concurrency
// because await() The thread is the first to call latch.await() The thread of , There is nothing in the queue at this time , It chooses to add to create a head
// stay await() Before the thread queue completes , Assume that there are only empty elements just created in the current queue head
// At the same time , There is an external call countDown() The thread of , take state Value from 1 Revised to 0, Then this thread needs to perform wake-up blocking operation
// Be careful : call await() The thread of , Because when you're fully in the team , Back again doAcquireSharedInterruptibly Will go into spin
// Gets the precursor of the current element , Judge who you are head.next, So next, the thread will set itself to head, The thread then starts from await Method returns
if (h != null && h != tail) {
// Get the waiting state of the header node
int ws = h.waitStatus;
// If the waiting state of the header node is SIGNAL, This indicates that the successor node has not been awakened
if (ws == Node.SIGNAL) {
// Before waking up the successor node , take head The state of the node is changed to 0
// Why do I use CAS Well ? Go back ...
// When doReleaseShared Method , There are multiple thread wakes head.next When it's logical
//CAS May fail ...
// Case study :
//t3 The thread is in if(h==head) return false When , Will continue to spin , Participate in awakening the next head.next The logic of
//t3 At this time, it is to CAS waiStatus(h,node.SIGNAL,0) success ..t4 stay t3 Before the modification is successful , And into if
// however t4 modify CAS WaitStatus(h,Node.SIGNAL, 0) Will fail , because t3 Changed ...
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// Wake up the successor node
unparkSuccessor(h);
}
// So let's go over here , This indicates that the waiting state of the current header node is not SIGNAL
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// Conditions established :
// Describe the successor nodes that have just woken up , It hasn't been implemented yet setHeadAndPropagate Method Set the current wake-up node to head The logic of
// This is the time , The current thread directly jumps out ... It's over ..
// Don't worry at this time , The wake-up logic will break here ?
// Don't worry , Because the awakened thread will execute sooner or later doReleaseShared Method
//h==null latch When it's created , No thread has ever called await() Before method
// There are thread calls latch.countDown() operation , And it triggers the operation of waking up the blocking node
//3.h=tail -> head and tail Point to the same node object
// Conditions not established :
// The awakened node is very active , Set yourself directly as a new head, here , Wake up its node ( Forerunner ), perform h==head Will it not hold
// here head The precursor of the node , Will not jump out of doReleaseShared Method , Will continue to awaken new head The successor of the node
if (h == head) // loop if head changed
break;
}
}
}
3.3、 Construction method
establish Semaphore The number of licenses that need to be passed in .Semaphore Default is also unfair , But you can call the second constructor to declare it as Fair model .
// Construction method , The number of licenses to be passed in when creating , Default to unfair mode
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// Construction method , Number of incoming licenses required , And whether it's fair
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
3.4、acquire() Method
Get a license , By default, the interruptible mode is used , If the attempt to obtain a license fails , Will enter AQS Queued in the blocking queue in
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// Get a license , Non disruptive way , If the attempt to obtain a license fails , Will enter AQS In the queue of .
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
3.5、acquire(int permits) Method
Get more than one license at a time , Interruptible mode .
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
// Get more than one license at a time , Non disruptive way .
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
Section :
- Semaphore, It's also called semaphore , It is usually used to access shared resources at the same time , That is, the current limiting scenario ;
- Semaphore The internal implementation of is based on AQS Shared lock implementation
- Semaphore When initializing, you need to specify the number of licenses , The number of licenses is stored in state in
- When getting a license , be state Value Minus one
- When releasing a license , be state Wake up together AQS Blocking queued threads in the queue
边栏推荐
- Fd: file descriptor
- Unity 5 自带的Mono也可以支持C# 6
- doc常用语法,更新中……
- Doc common syntax, updating
- Given a two-dimensional list of m*n, find out whether a number exists
- Post-Process初级使用笔记(重要的几项)
- Three ways to solve cross domain problems
- 4.9 commander. js
- Global and Chinese market of Toro from 2022 to 2028: Research Report on technology, participants, trends, market size and share
- Kotlin middle tail recursive function
猜你喜欢

客户端建设及调优实践

Eureka's timedsupersortask class (periodic task with automatic interval adjustment)

Mono of unity 5 can also support C # 6

Blue Bridge Cup: Candy

2022-2028 global hydrogen engine industry research and trend analysis report

26. Hikvision camera configuration and preliminary test

Mono fourni avec l'unit é 5 peut également supporter C # 6

日记(C语言总结)

Using elastic stack to analyze Olympic data (II)

Joking Domain Driven Design (VI) -- Boundary context -- Design
随机推荐
Dumpling備份數據庫
5分钟搞懂MySQL - 行转列
Tidb3.0- 4.0 memory control / modification log saving days / maximum index length
Extensions in kotlin
What should I do if a white page appears during MySQL installation
JUnit5单元测试
Global and Chinese market of carton folding and bonding machines 2022-2028: Research Report on technology, participants, trends, market size and share
Unity写多线程注意事项
[DB written interview 274] in Oracle, what is deferred segment creation?
给两个字符串s和t,判断t是否为s的重新排列后组成的单词
4.4 Eval function replaces function
Requirements for setting up points sign in tasks and common problems in the process of building points mall
PHP类与对象详细介绍
[yuanuniverse 3D competition]
Base de données de sauvegarde DumpLiNg
2022-2028 global postoperative pressure suit industry research and trend analysis report
Difference between function declaration and function expression
Kotlin middle tail recursive function
Give two strings S and T, and judge whether T is the word formed after rearrangement of S
Fd: file descriptor