当前位置:网站首页>Semaphore source code analysis

Semaphore source code analysis

2022-06-25 11:30:00 sermonlizhi

One 、Semaphore Introduce

Semaphore, Commonly known as semaphore , It is the operating system PV Operation primitive in JDK In the implementation of , Again , It's also based on AbstractQueuedSynchronizer To achieve .

Semaphore It is commonly understood as a shared lock , It can define the number of shared resources , As long as we share resources and , Other threads can execute , Otherwise it will be blocked .

And about the operating system PV For the primitive of operation, please refer to 《 Explain process synchronization 》 The semaphores are introduced in detail in this article

Semaphore It's very powerful , The size is 1 The amount of signal , Its function is similar to mutex (ReentrantLock), Only one thread can get semaphores at the same time , And then execute the thread's own business logic . And the size is n The amount of signal , The current limiting function can be realized , That is, it can guarantee that only n Threads can simultaneously acquire semaphores to execute business logic .

Two 、Semaphore Use

Semaphore The current limiting function can be realized

The following code defines a shared resource with the number of 3 Of Semaphore, And created a thread pool

stay test() In the method , adopt sleep() Method to control only commit per second 10 A mission , And in the exec() In the method , You can appropriately increase the sleep time , In this way, you can see the implementation clearly

Since the semaphore is 3, So only three threads can call at the same time acquire() Method success , Then execute the following logic , When the fourth thread comes in , call acquire() Method will be blocked , Until the previous thread has finished executing and released the shared resources

private static Semaphore semaphore = new Semaphore(3);
private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(10,50,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(200));

@SneakyThrows
static void exec(){
    
    try {
    
        semaphore.acquire();
        System.out.println(" perform exec Method ");
        Thread.sleep(10000);
    } finally {
    
        semaphore.release();
    }
}

@Test
@SneakyThrows
public void test(){
    
    for (;;){
    
        Thread.sleep(100);
        executorService.execute(() -> exec());
    }
}

3、 ... and 、Semaphore Source code analysis

3.1 Common methods

Semaphore Also use inner classes Sync Inherited AbstractQueuedSynchronizer, It also provides NonfairSync and FairSync Two fair and unfair inner classes , At this point with ReentrantLock cut from the same cloth .

Its construction method is as follows , The default is to use an unfair method , Parameters permits Indicates the number of licenses ( That is, the number of shared resources ), Constructor will call Sync Construction method of , Then call AQS Of setState() Method , Use state Property to record the number of resources

public Semaphore(int permits) {
    
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Sync(int permits) {
    
    setState(permits);
}

The common methods are as follows :

public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength() 
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)
  • acquire(): License obtained successfully , Go straight back to ; If you fail , You need to add the synchronization queue to block
  • tryAcquire(): Try to get permission , Return immediately regardless of success , If the acquisition fails, it returns false, It does not block the getting thread
  • release(): Release the license and wake up the blocked thread in the synchronization queue
  • availablePermits(): Returns the number of licenses currently available in the semaphore
  • getQueueLength(): Returns the number of threads waiting for a license
  • hasQueuedThreads(): Is there a thread waiting for a license
  • reducePermits(): Reduce the specified number of licenses

3.2 Get a license

Semaphore Medium acquire() Method has two overloaded methods , You can specify the number of licenses to acquire , All of them will call AQS Of acquireSharedInterruptibly() Method

public void acquire() throws InterruptedException {
    
    sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
    
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

stay acquireSharedInterruptibly() In the method , Going to call tryAcquireShared() Method to try to obtain a specified number of licenses , If it fails, go back and call doAcquireSharedInterruptibly() Method to block

because Semaphore Of acquire() Method does not support thread interrupts , So when a thread interrupt is detected, an interrupt exception will be thrown directly

And it has another acquireUninterruptibly() Method , This method supports interrupts

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

3.2.1 Try to get permission

AQS There is no specific implementation in tryAcquireShared() Method , Are implemented on demand by subclasses , With Semaphore Inner class FairSync Of tryAcquireShared() Method to see the source code

protected int tryAcquireShared(int acquires) {
    
    for (;;) {
    
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

The above code is also relatively simple , First, determine whether there are waiting threads in the synchronization queue , If there is , Then the current thread cannot obtain permission , Go straight back to -1, Continue to join the team later

without , Then calculate the number of remaining licenses , If greater than or equal to 0, Update the number of remaining licenses , Finally, the number of remaining licenses is returned , When you return a negative number , Description license acquisition failed

3.2.2 Threads queue and block

If a negative number is returned when trying to obtain a license , It means failure , Just call doAcquireSharedInterruptibly() Methods to queue and block

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    
        for (;;) {
    
            final Node p = node.predecessor();
            if (p == head) {
    
                int r = tryAcquireShared(arg);
                if (r >= 0) {
    
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
    
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireSharedInterruptibly() Methods and ReentrantLock in acquireQueued() The overall logic of the method is the same , There are only some differences in implementation , You can have a look at 《AQS&ReentrantLock The source code parsing 》 Chinese vs ReentrantLock Of acquireQueued() Introduction of methods

Calling addWaiter() When the method is used ,ReentrantLock A node of exclusive type is created in , and Semaphore A node of shared type is created in .

The biggest difference is where you try to get permission , stay ReentrantLock in , If the synchronization status is obtained successfully , Just remove the current node from the synchronization queue ; And in the Semaphore in , Not only will the current node be removed , It will also try to wake up the next thread blocking the node

Its implementation is in setHeadAndPropagate() In the method , The first is to call setHead() Method takes the current node as the head node first , Then if the next node of the current node is a node of shared type , I'm going to call it doReleaseShared() Try to wake up the thread of the next node

private void setHeadAndPropagate(Node node, int propagate) {
    
    Node h = head; // Record old head for check below
    setHead(node);
    
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
    
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

The reason why we adopt the method of spreading awakening is because , The thread that released the license earlier may have released multiple licenses , However, after the first shared node in the blocking queue obtains the number of licenses that are satisfied, there is still room left , Then the next shared node can continue to obtain permission to execute

When the second thread is awakened , It will carry out setHeadAndPropagate() Present the current node , And try to wake up the next node , Wake up backward in turn

3.3 Release license

And acquire() Methods corresponding to the , There are also two overloaded release() Method , Will call AQS Of releaseShared() Method

public void release() {
    
    sync.releaseShared(1);
}

public void release(int permits) {
    
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

stay releaseShared() In the method , Also try to release the license first , Only if the release succeeds , To call doReleaseShared() Method to wake up the thread

public final boolean releaseShared(int arg) {
    
    if (tryReleaseShared(arg)) {
    
        doReleaseShared();
        return true;
    }
    return false;
}

3.3.1 Try to release the license

AQS No specific implementation in tryReleaseShared() Method , stay Semaphore The inner class of Sync in , You can see the concrete implementation

The logic for releasing licenses is also simple , By spinning +CAS To update state Property value , You will not exit until the update succeeds

protected final boolean tryReleaseShared(int releases) {
    
    for (;;) {
    
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

3.3.2 Wake up the thread

Wake up threaded doReleaseShared() Method already in AQS To realize , Wake up the thread by spinning +CAS To achieve , First, get the head node of the synchronization queue , Only the head node has a status of SIGNAL It means that subsequent nodes can be awakened , And then through CAS Change the status of the head node to 0( The initial state ), If the modification fails, try again , If the modification is successful, call unparkSuccessor() Method to wake up the thread , The source code of this method is in ReentrantLock Has been mentioned in the unlocking logic of , I won't repeat it here .

So when does this spin end ? Look at the back h == head, When the call is finished unparkSucessor() After the method , Will continue to execute , And then it's out of the loop .

private void doReleaseShared() {
    
    for (;;) {
    
        Node h = head;
        if (h != null && h != tail) {
    
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
    
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
原网站

版权声明
本文为[sermonlizhi]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202200537567775.html