当前位置:网站首页>Analysis of semaphore source code of AQS

Analysis of semaphore source code of AQS

2022-06-09 05:15:00 smartjiang-java

1:Semaphore Basics

Semaphore : Limit the number of threads that can access shared resources at the same time .
Unlike a lock , A lock has only one shared resource ; There are multiple semaphore shared resources , Limit the maximum number of threads accessed .

effect :
Current limiting can be done , Stand alone version only , Only limit the number of threads , Instead of limiting the number of resources (tomcat The number of connections )
Can act on database connection pool , Enjoy yuan mode , Better performance and readability

2: Create semaphores

Semaphore semaphore = new Semaphore(n);

   //  Pass in  int  Numbers , Indicates the number of shared resources 
    public Semaphore(int permits) {
    
        //  Get into NonfairSync(permits)  Method 
        sync = new NonfairSync(permits);
    }

Get into NonfairSync(permits) Method

       NonfairSync(int permits) {
    
          //  Calling the  NonfairSync(permits)  Method 
            super(permits);
        }

Calling the NonfairSync(permits) Method

        Sync(int permits) {
    
           //  Set the incoming value to  state  Value , As a shared variable 
            setState(permits);
        }

3: Acquisition semaphore

call acquire() Method

    public void acquire() throws InterruptedException {
    
        //  Get into  acquireSharedInterruptibly(1) Method 
        sync.acquireSharedInterruptibly(1);
    }

Get into acquireSharedInterruptibly(1) Method

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    
         //  If the thread is interrupted , Throw an exception 
        if (Thread.interrupted())
            throw new InterruptedException();
          //  Get into  tryAcquireShared(1)  Method 
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

Get into tryAcquireShared(1) Method :
If it's a fair lock ,

        protected int tryAcquireShared(int acquires) {
    
            for (;;) {
    
                //  If there are threads in the waiting queue , return -1, The unfair lock adds to the check of this step 
                if (hasQueuedPredecessors())
                    return -1;
                 //  obtain  state  Number of  
                int available = getState();
                // state -1
                int remaining = available - acquires;
                //  If < 0 , Indicates that the shared resource has been used ; return -1
                //  If  >0.cas  modify  state  Value , And back to  
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

If it's unfair lock

        protected int tryAcquireShared(int acquires) {
    
            return nonfairTryAcquireShared(acquires);
        }
        final int nonfairTryAcquireShared(int acquires) {
    
            for (;;) {
    
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

go back to acquireSharedInterruptibly(1) Method

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    
        if (Thread.interrupted())
            throw new InterruptedException();
          // tryAcquireShared(1)  Return the remaining  state  Value , >=0  Show success , Thread continues to run down 
         // If  < 0, Get into  doAcquireSharedInterruptibly(1) Method , Join the wait queue 
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

Get into doAcquireSharedInterruptibly(1) Method

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    
        //  If it is the first thread to enter the waiting queue , You will first create a sub meta thread as the head node ; The current thread node joins the waiting queue as the tail node 
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
    
            for (;;) {
    
               //  Get the previous node of the current node , And if it is a header node 
                final Node p = node.predecessor();
                if (p == head) {
    
                 //  Try to acquire the lock again 
                    int r = tryAcquireShared(arg);
                 //  If successful , The current node is set as the head node 
                    if (r >= 0) {
    
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // shouldParkAfterFailedAcquire (): If the lock acquisition fails , The of the previous node will be  waitStatus  from  0  become  -1
                // parkAndCheckInterrupt() : Block the current thread . If the current thread is awakened , Will enter... Again  for  Circular competition 
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
    
            if (failed)
                cancelAcquire(node);
        }
    }

3: Release semaphore

call release() Method

    public void release() {
    
        //  Get into  releaseShared(1)  Method 
        sync.releaseShared(1);
    }

Get into releaseShared(1) Method

    public final boolean releaseShared(int arg) {
    
       //  Get into  tryReleaseShared(1) Method 
        if (tryReleaseShared(arg)) {
    
            doReleaseShared();
            return true;
        }
        return false;
    }

Get into tryReleaseShared(1) Method

        protected final boolean tryReleaseShared(int releases) {
    
            for (;;) {
    
               //  Into the loop , Get current  state  Value 
                int current = getState();
               // state  Number plus  1
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                // cas  Replace  state  Value , return  true 
                if (compareAndSetState(current, next))
                    return true;
            }
        }

go back to releaseShared(1) Method

    public final boolean releaseShared(int arg) {
    
       // tryReleaseShared(1) return  true
        if (tryReleaseShared(arg)) {
    
            //  Get into  doReleaseShared()  Method 
            doReleaseShared();
            return true;
        }
        return false;
    }

Get into doReleaseShared() Method

    private void doReleaseShared() {
    
        for (;;) {
    
            Node h = head;
            //  Determine whether the waiting queue is empty , And there are other nodes except the head node 
            if (h != null && h != tail) {
    
                int ws = h.waitStatus;
                //  Judge the head node's  waitStatus  Is it  -1
                if (ws == Node.SIGNAL) {
    
                    // cas  Add the header node to the  waitStatus  from  -1  become  0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //  Get into  unparkSuccessor(h)  Method 
                    unparkSuccessor(h); 
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //  The queue is empty , And only the head node , return 
            if (h == head)                   // loop if head changed
                break;
        }
    }

Get into unparkSuccessor(h) Method

    private void unparkSuccessor(Node node) {
    
        int ws = node.waitStatus;
        //  Head of the node  waitStatus  Has become a  0, Not satisfied with this judgment 
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //  Get the next node of the header node  
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
    
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //  exist , Awakened , stay  doAcquireSharedInterruptibly()  Method to continue trying to acquire the lock 
        if (s != null)
            LockSupport.unpark(s.thread);
    }
原网站

版权声明
本文为[smartjiang-java]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/160/202206090511357898.html