当前位置:网站首页>AQS details

AQS details

2022-07-06 14:12:00 Thorn in the moonlight tonight

AQS

Basic introduction

attribute

AQS Basically, it is realized by four attributes :

  • head: Head node , After all, it's a queue , There must be a head node ;
private transient volatile Node head;
  • tail: Tail node , Two way queue , There must be a tail node :
private transient volatile Node tail;
  • state: state , Lock it , There must be a lock state ,0 Indicates that no thread acquires the lock , Not 0 Indicates the number of times to acquire the lock ( Reentry or shared lock ):
private volatile int state;
  • exclusiveOwnerThread: Thread getting lock , When re entering the lock , Is it necessary to determine which thread obtained the lock ? This attribute is in the parent class (AbstractOwnableSynchronizer) Inside :
private transient Thread exclusiveOwnerThread;

Concept

AQS It's actually a blocking queue , Then it is bidirectional , The main function is to acquire locks by multiple threads , The thread that fails to acquire will become a node (Node) Enter this blocking queue , The thread blocks first and then attempts to acquire the lock . look down Node The data structure of :

	static final class Node {
        //  share Node( Read the lock )
        static final Node SHARED = new Node();
        //  Monopoly Node( Write lock )
        static final Node EXCLUSIVE = null;

		//  Several state constants 
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
		
		//  Node wait state , The above state constants will be used 
		//  When it comes to CANCELLED when , It means giving up robbing the lock ( Timeout can be achieved )
        volatile int waitStatus;
		
		//  Front node , Two way linked list 
        volatile Node prev;

        //  Post node , Linked list 
        volatile Node next;

        //  Threads , Because in essence, threads rob locks , So nodes are encapsulated by threads 
        volatile Thread thread;

        //  The next node of the conditional queue , This is not what the mainstream needs to pay attention to 
        Node nextWaiter;

        ...
        //  Other methods are not important 
    }

You can see ,Node In fact, there are only four attributes : Two linked list attributes (prev、next)、 A thread itself (thread)、 A state (waitStatus), The first three seem easy to understand , It seems that the focus is on state attributes .

use ReentrantLock see AQS Realization

AQS Is an infrastructure , So in fact, in daily use , We seldom use it directly , Unless we need custom locks , Generally, when we need to use locks , Can use directly JDK It has helped us achieve ReentrantLock, Let's take an example :

	public class ProductServiceImpl {

    private ReentrantLock lock = new ReentrantLock();

    public void subSafety(int num){
        lock.lock();
        try {
            // do sub
        }finally {
            lock.unlock();
        }
    }
}

This ensures that only one thread will do inventory reduction at the same time , And let's see ReentrantLock Realize the source code of .

ReentantLock Get the lock

ReentrantLock Is to use its inner class Sync Realized , Its construction method can only generate one Sync object

	public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

Then look at the locking method (lock) Source code , Take the fair lock (FairSync) For example , The intermediate steps are omitted , Look directly at AbstractQueuedSynchronizer#acquire Method , be-all AQS This method should be called to implement the locking function

  • AbstractQueuedSynchronizer#acquire
	public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire Method means to try to acquire the lock , Got the return true, stay AQS This method is not implemented , Subclasses need to implement themselves , This also reflects the idea of object-oriented programming , look down FairSync in tryAcquire Come true

  • ReentrantLock#FairSync#tryAcquire
	protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //  Get the status of the lock 
            int c = getState();
            // 0 Indicates that no thread has acquired the lock 
            if (c == 0) {
            	//  The realization of fair lock : If someone is in the queue, just line up 
                if (!hasQueuedPredecessors() &&
                // CAS Attempt to acquire lock 
                    compareAndSetState(0, acquires)) {
                    //  Success will AQS The exclusive thread in is set to itself , It means I grabbed the lock 
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //  Reentrant lock ( Add the number of times to obtain the lock, and it returns )
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //  Neither of the above two failed 
            return false;
        }

The code for grabbing the lock is very simple , And the logic is clear , There's no need to say more , It should be noted that : After reading the source code implementation of unfair lock, you will find , Unfair lock is missing !hasQueuedPredecessors() Judge , in other words , The unfairness of an unfair lock is reflected in the fact that the first time you rob a lock, you don't care whether there are nodes in the queue , Try to grab the lock directly , If you can't grab it, you will queue up in the queue , It's very clear to see the source code .
go back to AbstractQueuedSynchronizer#acquire Method :

	public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

Will find , If tryAcquire(arg) Method grab the lock ( return true) That's the end , But if you don't grab the lock, the normal logic is to suspend the thread , Then add to the queue , look down addWaiter(Node.EXCLUSIVE) Method , This is in AQS Implemented in :

	private Node addWaiter(Node mode) {
		//  Generate the current thread Node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //  Add to the end of the team 
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //  Notice the logic that the code can get here :
        //  The queue is empty  || CAS Failed to join the team 
        //  So the function of this method is to initialize the queue and spin into the queue ( If you fail to join the team once, you will join it several times )
        enq(node);
        return node;
    }

The code logic is very simple , It's generation Node Then the team , But you need to pay attention to enq Method , Take a look at the source code .

	private Node enq(final Node node) {
		//  The spin 
        for (;;) {
            Node t = tail;
            //  The queue is empty 
            if (t == null) { // Must initialize
            	//  Generate an empty Node, And then cycle again 
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            	//  The normal logic is here 
                node.prev = t;
                //  You have to join the team to exit the spin 
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

This method only needs to remember the conditions that can run to this method ( The queue is empty or failed to join ) It is easy to understand the logic of this method , Note that when the queue is empty , Mr. Hui becomes an empty Node As the first node , We call this node sentinel node , This is still very important .
And then back to acquire Method , look down acquireQueued Method ,addWaiter The method has already thread Method queue , that acquireQueued The function of the method should be to suspend the thread .

	//  there node It's the one who joined the team from above node
	final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //  Dead cycle 
            for (;;) {
            	//  This is to get the front node 
            	//  Here is what I said earlier , Queued head Not a real head , It's the sentinel node , So we need to get the front node of this node and compare 
                final Node p = node.predecessor();
                // p == head Explain now node It's the head node , You can try to acquire the lock directly , Call again tryAcquire
                if (p == head && tryAcquire(arg)) {
                	//  This indicates that the lock has been obtained , This method will node Medium thread Set to null
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }

				//  There are two possibilities for the code here :
				// node It's not a head node  ||  The lock failed 
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

This method involves the sentinel node mentioned earlier , The code logic is relatively simple , Is that if node If it is the first node ( It can't be said to be a header node ) Just try to get the lock , What is difficult to understand is the following handling method of lock acquisition failure shouldParkAfterFailedAcquire, If this method returns true, That would call parkAndCheckInterrupt Method to suspend the thread .

// pred This is it node The front node of 
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // Node.SIGNAL The state is the state in which the lock can be obtained , Suspend the thread directly 
        if (ws == Node.SIGNAL)
            return true;
        //  Greater than 0 Indicates that this node has been abandoned , Don't get locks anymore 
        if (ws > 0) {
            do {
            	//  Because the front node is no longer awakened to acquire the lock , therefore node You need to cycle to find a normal front node 
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
           // node When joining the team, the state is 0, So set it to Node.SIGNAL, Wakeable state 
           //  The first time to enter this method should be to go here first , Then cycle into (acquireQueued There is a dead cycle in the method )
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

The function of this method is whether the thread should be suspended , The return result is a boolean value , You need to know a concept before looking at this method : The wake-up of the node in the queue is completed by the front node , When the front node releases the lock, it will wake up the first node to try to obtain the lock .
Come here ReentrantLock The source code of the lock acquisition part is finished , In fact, I just did a few things : Attempt to acquire lock 、 Fail to generate node The team 、 If the team is empty, create a short node ( The sentinel node )、 Suspend the thread and wait for it to wake up .

ReentrantLock Release the lock

Understand the process of obtaining the lock , It's easy to release the lock , Don't look at it step by step , Look directly at AbstractQueuedSynchronizer#release Methods!

  • AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

A routine similar to obtaining locks , Then look at it. ReentrantLock#tryRelease Method .

  • ReentrantLock#tryRelease
	  protected final boolean tryRelease(int releases) {
	  	//  This is the state of lock ,0 Indicates that it has not been obtained ,>0 Indicates the number of times obtained 
	  	//  This is release , So subtract the number of releases 
          int c = getState() - releases;
          //  Of course, your lock can only be released by yourself 
          if (Thread.currentThread() != getExclusiveOwnerThread())
              throw new IllegalMonitorStateException();
              //  Release results :true Indicates complete release ,false It means that it has not been completely released 
          boolean free = false;
          //  When the release result is 0 It means that the lock is completely released , Otherwise, only a part of it will be released ( Reentry multiple times )
          if (c == 0) {
              free = true;
              setExclusiveOwnerThread(null);
          }
          setState(c);
          return free;
      }

The logic of this method is too clear , I don't even know what to say , Back to release Method , Look at the operation after the lock is released successfully , For example, wake up subsequent nodes

	public final boolean release(int arg) {
        if (tryRelease(arg)) {
    		//  The successful release of the lock is here     	
            Node h = head;
            //  Head node , The head node is the sentinel node !
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

Let's emphasize here : The head node is the sentinel node , It's an empty node , Not our actual node ! Then look at unparkSuccessor Method , This method is important to wake up the post node

	//  there node It's the head node 
	private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

       	//  see , The actual operation is the post node of the head node 
        Node s = node.next;
        //  If this node cancels the wait 
        if (s == null || s.waitStatus > 0) {
            s = null;
            //  Start from the tail node to find , Look backwards 
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        
        if (s != null)
        	//  Wake up the 
            LockSupport.unpark(s.thread);
    }

stay AQS in , Many lookups in the queue are reverse lookups starting at the end , As for the reason , The information checked on the Internet says it is related to concurrency , I don't quite understand .
Come here AQS The whole implementation is finished , It's not much , And the code logic is very clear and simple , There seems to be no summary .

原网站

版权声明
本文为[Thorn in the moonlight tonight]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202131413288830.html