当前位置:网站首页>Abstractqueuedsynchronizer (AQS) source code detailed analysis - semaphore source code analysis

Abstractqueuedsynchronizer (AQS) source code detailed analysis - semaphore source code analysis

2022-06-21 08:33:00 *Wucongcong*

1、 brief introduction

  • Semaphore, Semaphore , It holds a series of licenses (permits), Every time you call acquire() Methods will consume a license , Every time you call release() Methods will return a license .
  • Semaphore Usually used to limit the number of accesses to shared resources at the same time , That is to say, it is often said that current limiting .
  • Semaphore Semaphore , Get a peer certification flowchart .

HoBNv9.png

2、 Introductory cases

Case study 1

public class Pool {
    
    /** *  The maximum number of threads that can access resources at the same time  */
    private static final int MAX_AVAILABLE = 100;
    
    /** *  Semaphore   Express : Available object passes  */
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    
    /** *  Shared resources , It can be thought of as  items  The memory of the array is Connection object   Simulation is connection pool  */
    protected Object[] items = new Object[MAX_AVAILABLE];
    
    /** *  Shared resource usage , And items The array corresponds one by one , such as : * items[0] Object is occupied by an external thread , that  used[0] == true, otherwise used[0] == false */
    protected boolean[] used = new boolean[MAX_AVAILABLE];

    /** *  Get a free object  *  If there are no free objects in the current pool , Is waiting for .. Until there are free objects  */
    public Object getItem() throws InterruptedException {
    
        //  Every time you call acquire() Will consume a license (permits)
        available.acquire();
        return getNextAvailableItem();
    }

    /** *  Return objects to the pool  */
    public void putItem(Object x) {
    
        if (markAsUnused(x))
            available.release();
    }

    /** *  Get a free object in the pool , If successful, return Object, Failure to return Null *  After success, the corresponding  used[i] = true */
    private synchronized Object getNextAvailableItem() {
    
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
    
            if (!used[i]) {
    
                used[i] = true;
                return items[i];
            }
        }
        return null;
    }

    /** *  Return objects to the pool , Returned successfully true *  Return failed : * 1. The object reference does not exist in the pool , return false * 2. The object reference... Exists in the pool , But the current state of the object is idle , Also returned false */
    private synchronized boolean markAsUnused(Object item) {
    
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
    
            if (item == items[i]) {
    
                if (used[i]) {
    
                    used[i] = false;
                    return true;
                } else
                    return false;
            }
        }
        return false;
    }
}

Case study 2

public class SemaphoreTest02 {
    
    public static void main(String[] args) throws InterruptedException {
    
        //  Declare semaphores , Initial license (permits) by 2
        //  Fair model :fair by true
        final Semaphore semaphore = new Semaphore(2, true);

        Thread tA = new Thread(() ->{
    
            try {
    
                //  Every time you call acquire() Will consume a license (permits)
                semaphore.acquire();
                System.out.println(" Threads A Obtain the pass successfully ");
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
    
            }finally {
    
                //  Every time you call release() Will return a permit (permits)
                semaphore.release();
            }
        });
        tA.start();
        //  Ensure threads A Has been performed 
        TimeUnit.MILLISECONDS.sleep(200);

        Thread tB = new Thread(() ->{
    
            try {
    
                //  call acquire(2) Will consume 2 Permission (permits)
                semaphore.acquire(2);
                System.out.println(" Threads B Obtain the pass successfully ");
            } catch (InterruptedException e) {
    
            }finally {
    
                //  call release(2) Will be returned 2 Permission (permits)
                semaphore.release(2);
            }
        });
        tB.start();
        //  Ensure threads B Has been performed 
        TimeUnit.MILLISECONDS.sleep(200);

        Thread tC = new Thread(() ->{
    
            try {
    
                //  Every time you call acquire() Will consume a license (permits)
                semaphore.acquire();
                System.out.println(" Threads C Obtain the pass successfully ");
            } catch (InterruptedException e) {
    
            }finally {
    
                //  Every time you call release() Will return a permit (permits)
                semaphore.release();
            }
        });
        tC.start();
    }
}

The results are as follows

 Threads A Obtain the pass successfully 
 Threads B Obtain the pass successfully 
 Threads C Obtain the pass successfully 

3、 Source code analysis

3.1、 Inner class Sync

  • adopt Sync Several implementation methods of , We get the following information :
    • The number of licenses is passed in when constructing the method
    • The license is stored in the state variable state in
    • When trying to get a license , be state Value reduce 1
    • When state The value of is 0 When , You can't get permission
    • When releasing a license , be state The value of the add 1
    • The number of licenses can be changed dynamically
abstract static class Sync extends AbstractQueuedSynchronizer {
    
    private static final long serialVersionUID = 1192457210091910933L;

    //  Construction method , Number of incoming licenses , Put in state in 
    Sync(int permits) {
    
        setState(permits);
    }

    //  Number of permits obtained 
    final int getPermits() {
    
        return getState();
    }

	//  The unfair model tries to get permission 
    final int nonfairTryAcquireShared(int acquires) {
    
        for (;;) {
    
            //  Let's see how many licenses there are 
            int available = getState();
            //  Minus the licenses that need to be obtained this time, there are still several licenses left 
            int remaining = available - acquires;
            //  If the remaining permission is less than 0 Go straight back to 
            //  If the remaining permission is not less than 0, Try atom update state Value , Successfully returned remaining licenses 
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    //  Release license 
    protected final boolean tryReleaseShared(int releases) {
    
        for (;;) {
    
            //  Let's see how many licenses there are 
            int current = getState();
            //  Plus the release permit 
            int next = current + releases;
            //  Detect overflow 
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            //  If atoms update state The value of success , It means that the license is released successfully , Then return to true
            if (compareAndSetState(current, next))
                return true;
        }
    }

	//  Reduce permission 
    final void reducePermits(int reductions) {
    
        for (;;) {
    
            //  Let's see how many licenses there are 
            int current = getState();
            //  Subtract the permission that will be reduced 
            int next = current - reductions;
            //  Detect overflow 
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            //  Atomic updates state Value , Successfully returned true
            if (compareAndSetState(current, next))
                return;
        }
    }
    
	//  Permission to destroy 
    final int drainPermits() {
    
        for (;;) {
    
            //  Let's see how many licenses there are 
            int current = getState();
            //  If 0, Go straight back to 
			//  If not for 0, hold state The atom is updated to 0
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

3.2、 Inner class NonfairSync

In the unfair mode , Call the parent class directly nonfairTryAcquireShared() Try to get a license

static final class NonfairSync extends Sync {
    
    private static final long serialVersionUID = -2694183684443567898L;
	
	//  Construction method , Call the constructor of the parent class 
    NonfairSync(int permits) {
    
        super(permits);
    }
	//  Try to get permission , Calling the nonfairTryAcquireShared() Method 
    protected int tryAcquireShared(int acquires) {
    
        return nonfairTryAcquireShared(acquires);
    }
}

3.3、 Inner class FairSync

In fair mode , First, check whether there is a queue in front of you , If there is a queue, it fails to get permission , Enter the queue , Otherwise try atom update state Value

Be careful For the convenience of reading , In this inner class, some AQS The method in is pasted , The method header annotation is marked

static final class FairSync extends Sync {
    
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
    
        super(permits);
    }

    // be located AQS in , Methods that can acquire shared locks in response to interrupts 
    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    
        //  Conditions established : Describes the current thread calling  acquire  The thread of the method is already in the interrupt state , Throw an exception directly ...
        if (Thread.interrupted())
            throw new InterruptedException();
        //  Try to get a pass , By reducing  state Value ) Return on success  >= 0 Value , Acquisition failure , return  <0  Value 
        if (tryAcquireShared(arg) < 0)
            // Add the thread that failed to obtain the pass to AQS In the blocking queue 
            doAcquireSharedInterruptibly(arg);
    }

    // be located AQS in : Share interrupts 
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    
        // Will call semaphore.acquire() The thread of the method is wrapped as node Add to AQS In the blocking queue 
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true; // Whether there is an exception 
        try {
    
            for (;;) {
    
                // Gets the precursor node of the current thread node 
                final Node p = node.predecessor();
                // Conditions established , This indicates that the node corresponding to the current thread is head.next node 
                if (p == head) {
    
                    //head.next The node has the right to acquire the shared lock 
                    int r = tryAcquireShared(arg);
                    // Only when all the threads in the queue that acquire the shared lock have not been released will they succeed 
                    if (r >= 0) {
    
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // In response to interrupt 
                    throw new InterruptedException();
            }
        } finally {
    
            if (failed)
                cancelAcquire(node);
        }
    }

    //  Try to get a pass ( By reducing  state Value ) Return on success  >= 0 Value , Acquisition failure , return  <0  Value 
    protected int tryAcquireShared(int acquires) {
    
        //  Spin operation 
        for (;;) {
    
            //  Judge the present AQS  Whether there is a waiting thread in the blocking queue , If there is a direct return  -1, At present  acquire  The thread of operation needs to enter the queue and wait 
            if (hasQueuedPredecessors())
                return -1;
            //  So let's go over here , There are several situations ?
            // 1. call  acquire  When ,AQS  There are no other waiters in the blocking queue 
            // 2. The current node in the blocking queue is  head.next  node ( Reentrant lock )

            //  obtain  state  Value ,state  Means pass 
            int available = getState();
            // remaining: Indicates that after the current thread obtains the pass ,semaphore  The remaining quantity 
            int remaining = available - acquires;
            //  Condition one holds :remaining < 0: Indicates that the thread failed to obtain the pass 
            //  Condition 2 : Prerequisite :remaining >= 0, Indicates that the current thread can obtain a pass 
            // compareAndSetState(available, remaining): establish : This indicates that the current thread has successfully obtained the pass ,CAS  Failure , Then spin 
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    // be located AQS in , Release the shared lock 
    public final boolean releaseShared(int arg) {
    
        //  Conditions established : Indicates that the current thread has released resources successfully , After successfully releasing the resource, wake up the thread that failed to obtain the resource 
        if (tryReleaseShared(arg)) {
    
            //  Wake up the thread that failed to acquire resources 
            doReleaseShared();
            return true;
        }
        return false;
    }

    //  Try to return the token , The current thread releases resources 
    protected final boolean tryReleaseShared(int releases) {
    
        //  The spin 
        for (;;) {
    
            //  Get current AQS Of  state Value 
            int current = getState();
            //  Number of new passes obtained 
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // CAS  In exchange for   Number of passes 
            if (compareAndSetState(current, next))
                return true;
        }
    }

    // What kinds of paths will be called to doReleaseShared Methods? ?
    //1.latch.countDown() -> AQS.state==0 -> doReleaseShared()
    // Wake up the in the current blocking queue head.next The corresponding thread 
    //2. Awakened thread  -> doAcquireSharedInterruptibly parkAndCheckInterrupt() Wake up the 
    //-> setHeadAndPropagate() -> doReleaseShared()
    // Semaphore edition 
    //  Wake up the thread that failed to acquire resources 
    // Wakes up the waiting thread 
    private void doReleaseShared() {
    
        for (;;) {
    
            // Gets the header of the queue 
            Node h = head;
            // If the queue has been initialized successfully and the number of nodes in the queue >1
            // Conditions 1:h!=null establish , Description: the blocking queue is not empty 
            // Don't set up :h==null  When will it be like this ?
            //latch When it's created , No thread has ever called await() Before method , There are thread calls latch.countDown() And trigger the logic of waking up the blocking node 
            // Conditions 2:h!=tail establish   Indicates that the current blocking queue , except head Outside the node , There are other nodes 
            //h==tail -> head and tail Point to the same node object , When will this happen ?
            // Under normal wake-up conditions , Obtain the shared lock in turn , When the current thread reaches this point , This thread is tail node 
            // The first call await() Method thread and call countDown() And the thread that triggers the wake-up blocking node has concurrency 
            // because await() The thread is the first to call latch.await() The thread of , There is nothing in the queue at this time , It chooses to add to create a head
            // stay await() Before the thread queue completes , Assume that there are only empty elements just created in the current queue head
            // At the same time , There is an external call countDown() The thread of , take state Value from 1 Revised to 0, Then this thread needs to perform wake-up blocking operation 
            // Be careful : call await() The thread of , Because when you're fully in the team , Back again doAcquireSharedInterruptibly Will go into spin 
            // Gets the precursor of the current element , Judge who you are head.next, So next, the thread will set itself to head, The thread then starts from await Method returns 
            if (h != null && h != tail) {
    
                // Get the waiting state of the header node 
                int ws = h.waitStatus;
                // If the waiting state of the header node is SIGNAL, This indicates that the successor node has not been awakened 
                if (ws == Node.SIGNAL) {
    
                    // Before waking up the successor node , take head The state of the node is changed to  0
                    // Why do I use CAS Well ? Go back ...
                    // When doReleaseShared Method , There are multiple thread wakes head.next When it's logical 
                    //CAS May fail ...
                    // Case study :
                    //t3 The thread is in if(h==head) return false When , Will continue to spin , Participate in awakening the next head.next The logic of 
                    //t3 At this time, it is to CAS waiStatus(h,node.SIGNAL,0) success ..t4 stay t3 Before the modification is successful , And into if
                    // however t4  modify  CAS WaitStatus(h,Node.SIGNAL, 0)  Will fail , because  t3  Changed ...
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // Wake up the successor node 
                    unparkSuccessor(h);
                }
                // So let's go over here , This indicates that the waiting state of the current header node is not SIGNAL
                else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // Conditions established :
            // Describe the successor nodes that have just woken up , It hasn't been implemented yet setHeadAndPropagate Method   Set the current wake-up node to head The logic of 
            // This is the time , The current thread directly jumps out ... It's over ..
            // Don't worry at this time , The wake-up logic will break here ?
            // Don't worry , Because the awakened thread will execute sooner or later doReleaseShared Method 
            //h==null latch When it's created , No thread has ever called await() Before method 
            // There are thread calls latch.countDown() operation , And it triggers the operation of waking up the blocking node 
            //3.h=tail -> head and tail Point to the same node object 
            // Conditions not established :
            // The awakened node is very active , Set yourself directly as a new head, here , Wake up its node ( Forerunner ), perform h==head Will it not hold 
            // here head The precursor of the node , Will not jump out of doReleaseShared Method , Will continue to awaken new head The successor of the node 
            if (h == head)                   // loop if head changed
                break;
        }
    }
}

3.3、 Construction method

establish Semaphore The number of licenses that need to be passed in .Semaphore Default is also unfair , But you can call the second constructor to declare it as Fair model .

//  Construction method , The number of licenses to be passed in when creating , Default to unfair mode 
public Semaphore(int permits) {
    
    sync = new NonfairSync(permits);
}

//  Construction method , Number of incoming licenses required , And whether it's fair 
public Semaphore(int permits, boolean fair) {
    
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

3.4、acquire() Method

Get a license , By default, the interruptible mode is used , If the attempt to obtain a license fails , Will enter AQS Queued in the blocking queue in

public void acquire() throws InterruptedException {
    
    sync.acquireSharedInterruptibly(1);
}

//  Get a license , Non disruptive way , If the attempt to obtain a license fails , Will enter AQS In the queue of .
public void acquireUninterruptibly() {
    
     sync.acquireShared(1);
}

3.5、acquire(int permits) Method

Get more than one license at a time , Interruptible mode .

public void acquire(int permits) throws InterruptedException {
    
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

//  Get more than one license at a time , Non disruptive way .
public void acquireUninterruptibly(int permits) {
    
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

Section

  • Semaphore, It's also called semaphore , It is usually used to access shared resources at the same time , That is, the current limiting scenario ;
  • Semaphore The internal implementation of is based on AQS Shared lock implementation
  • Semaphore When initializing, you need to specify the number of licenses , The number of licenses is stored in state in
  • When getting a license , be state Value Minus one
  • When releasing a license , be state Wake up together AQS Blocking queued threads in the queue
原网站

版权声明
本文为[*Wucongcong*]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202221454047861.html