当前位置:网站首页>Semaphore source code analysis
Semaphore source code analysis
2022-06-25 11:30:00 【sermonlizhi】
One 、Semaphore Introduce
Semaphore, Commonly known as semaphore , It is the operating system PV Operation primitive in JDK In the implementation of , Again , It's also based on AbstractQueuedSynchronizer To achieve .
Semaphore It is commonly understood as a shared lock , It can define the number of shared resources , As long as we share resources and , Other threads can execute , Otherwise it will be blocked .
And about the operating system PV For the primitive of operation, please refer to 《 Explain process synchronization 》 The semaphores are introduced in detail in this article
Semaphore It's very powerful , The size is 1 The amount of signal , Its function is similar to mutex (ReentrantLock), Only one thread can get semaphores at the same time , And then execute the thread's own business logic . And the size is n The amount of signal , The current limiting function can be realized , That is, it can guarantee that only n Threads can simultaneously acquire semaphores to execute business logic .
Two 、Semaphore Use
Semaphore The current limiting function can be realized
The following code defines a shared resource with the number of 3 Of Semaphore, And created a thread pool
stay test() In the method , adopt sleep() Method to control only commit per second 10 A mission , And in the exec() In the method , You can appropriately increase the sleep time , In this way, you can see the implementation clearly
Since the semaphore is 3, So only three threads can call at the same time acquire() Method success , Then execute the following logic , When the fourth thread comes in , call acquire() Method will be blocked , Until the previous thread has finished executing and released the shared resources
private static Semaphore semaphore = new Semaphore(3);
private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(10,50,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(200));
@SneakyThrows
static void exec(){
try {
semaphore.acquire();
System.out.println(" perform exec Method ");
Thread.sleep(10000);
} finally {
semaphore.release();
}
}
@Test
@SneakyThrows
public void test(){
for (;;){
Thread.sleep(100);
executorService.execute(() -> exec());
}
}
3、 ... and 、Semaphore Source code analysis
3.1 Common methods
Semaphore Also use inner classes Sync Inherited AbstractQueuedSynchronizer, It also provides NonfairSync and FairSync Two fair and unfair inner classes , At this point with ReentrantLock cut from the same cloth .
Its construction method is as follows , The default is to use an unfair method , Parameters permits Indicates the number of licenses ( That is, the number of shared resources ), Constructor will call Sync Construction method of , Then call AQS Of setState() Method , Use state Property to record the number of resources
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Sync(int permits) {
setState(permits);
}
The common methods are as follows :
public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength()
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)
- acquire(): License obtained successfully , Go straight back to ; If you fail , You need to add the synchronization queue to block
- tryAcquire(): Try to get permission , Return immediately regardless of success , If the acquisition fails, it returns false, It does not block the getting thread
- release(): Release the license and wake up the blocked thread in the synchronization queue
- availablePermits(): Returns the number of licenses currently available in the semaphore
- getQueueLength(): Returns the number of threads waiting for a license
- hasQueuedThreads(): Is there a thread waiting for a license
- reducePermits(): Reduce the specified number of licenses
3.2 Get a license
Semaphore Medium acquire() Method has two overloaded methods , You can specify the number of licenses to acquire , All of them will call AQS Of acquireSharedInterruptibly() Method
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
stay acquireSharedInterruptibly() In the method , Going to call tryAcquireShared() Method to try to obtain a specified number of licenses , If it fails, go back and call doAcquireSharedInterruptibly() Method to block
because Semaphore Of acquire() Method does not support thread interrupts , So when a thread interrupt is detected, an interrupt exception will be thrown directly
And it has another acquireUninterruptibly() Method , This method supports interrupts
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
3.2.1 Try to get permission
AQS There is no specific implementation in tryAcquireShared() Method , Are implemented on demand by subclasses , With Semaphore Inner class FairSync Of tryAcquireShared() Method to see the source code
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
The above code is also relatively simple , First, determine whether there are waiting threads in the synchronization queue , If there is , Then the current thread cannot obtain permission , Go straight back to -1, Continue to join the team later
without , Then calculate the number of remaining licenses , If greater than or equal to 0, Update the number of remaining licenses , Finally, the number of remaining licenses is returned , When you return a negative number , Description license acquisition failed
3.2.2 Threads queue and block
If a negative number is returned when trying to obtain a license , It means failure , Just call doAcquireSharedInterruptibly() Methods to queue and block
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireSharedInterruptibly() Methods and ReentrantLock in acquireQueued() The overall logic of the method is the same , There are only some differences in implementation , You can have a look at 《AQS&ReentrantLock The source code parsing 》 Chinese vs ReentrantLock Of acquireQueued() Introduction of methods
Calling addWaiter() When the method is used ,ReentrantLock A node of exclusive type is created in , and Semaphore A node of shared type is created in .
The biggest difference is where you try to get permission , stay ReentrantLock in , If the synchronization status is obtained successfully , Just remove the current node from the synchronization queue ; And in the Semaphore in , Not only will the current node be removed , It will also try to wake up the next thread blocking the node
Its implementation is in setHeadAndPropagate() In the method , The first is to call setHead() Method takes the current node as the head node first , Then if the next node of the current node is a node of shared type , I'm going to call it doReleaseShared() Try to wake up the thread of the next node
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
The reason why we adopt the method of spreading awakening is because , The thread that released the license earlier may have released multiple licenses , However, after the first shared node in the blocking queue obtains the number of licenses that are satisfied, there is still room left , Then the next shared node can continue to obtain permission to execute
When the second thread is awakened , It will carry out setHeadAndPropagate() Present the current node , And try to wake up the next node , Wake up backward in turn
3.3 Release license
And acquire() Methods corresponding to the , There are also two overloaded release() Method , Will call AQS Of releaseShared() Method
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
stay releaseShared() In the method , Also try to release the license first , Only if the release succeeds , To call doReleaseShared() Method to wake up the thread
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
3.3.1 Try to release the license
AQS No specific implementation in tryReleaseShared() Method , stay Semaphore The inner class of Sync in , You can see the concrete implementation
The logic for releasing licenses is also simple , By spinning +CAS To update state Property value , You will not exit until the update succeeds
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
3.3.2 Wake up the thread
Wake up threaded doReleaseShared() Method already in AQS To realize , Wake up the thread by spinning +CAS To achieve , First, get the head node of the synchronization queue , Only the head node has a status of SIGNAL It means that subsequent nodes can be awakened , And then through CAS Change the status of the head node to 0( The initial state ), If the modification fails, try again , If the modification is successful, call unparkSuccessor() Method to wake up the thread , The source code of this method is in ReentrantLock Has been mentioned in the unlocking logic of , I won't repeat it here .
So when does this spin end ? Look at the back h == head, When the call is finished unparkSucessor() After the method , Will continue to execute , And then it's out of the loop .
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
边栏推荐
- What is the development history, specific uses and structure of the chip
- Daily 3 questions (2) - find out the lucky numbers in the array
- 贝叶斯
- Comparator(用于Arrays.sort)
- Application of global route guard
- Kingbasees plug-in DBMS of Jincang database_ RANDOM
- Builder pattern
- What are the ways to simulate and burn programs? (including common tools and usage)
- Free access to the global human settlements layer (ghsl) dataset from Gee
- Handler、Message、Looper、MessageQueue
猜你喜欢

Ladder Side-Tuning:预训练模型的“过墙梯”

Double buffer transparent encryption and decryption driven course paper + project source code based on minifilter framework

基于C语言的图书信息管理系统 课程论文+代码及可执行exe文件

金太阳教育美股上市:市值3.6亿美元 成小盘中概股

Shen Ying, China Academy of communications and communications: font open source protocol -- Introduction to ofl v1.1 and analysis of key points of compliance

Vulnérabilité à l'injection SQL (contournement)

Ladder side tuning: the "wall ladder" of the pre training model

Course paper + code and executable EXE file of library information management system based on C language

Upload and modify the use of avatars

牛客网:分糖果问题
随机推荐
Vulnérabilité à l'injection SQL (contournement)
Handler、Message、Looper、MessageQueue
Coscon'22 lecturer solicitation order
一个数学难题,难倒两位数学家
MySQL synchronous data configuration and shell script implementation
Golden sun education listed in the U.S.: a small cap medium cap stock with a market value of USD 360million
Use of three-level linkage plug-ins selected by provinces and cities
C disk uses 100% cleaning method
2022 PMP project management examination agile knowledge points (2)
Gaussdb others scenarios with high memory
Jincang KFS data centralized scenario (many to one) deployment
Causes and solutions of over fitting
MySQL and Oracle processing CLOB and blob fields
Detailed explanation of spark specification
Getting started with Apache Shenyu
查询法,中断法实现USART通信
Design and implementation of university laboratory goods management information system based on SSH
Simple use of SVN
Jincang database kingbasees plug-in identity_ pwdexp
Upload and modify the use of avatars