当前位置:网站首页>AQS explanation of concurrent programming

AQS explanation of concurrent programming

2022-06-11 00:36:00 xujingyiss

Java The core of concurrent programming is java.util.concurrent package ( abbreviation juc).

The author of this whole package is :doug lea

What is? AQS

AQS yes AbstractQueuedSynchronizer For short .AbstractQueuedSynchronizer Is an abstract class .

Although I won't use this class directly , But this class is Java The underlying implementation of many concurrency tools . stay juc in , Commonly used classes ( for example ReentrantLock、CountDownLatch、Semaphore), There is a... Maintained internally Sync attribute ( synchronizer ), and Sync Is an inheritance AbstractQueuedSynchronizer  The inner class of .

AQS Two resource sharing methods are defined :

  1. Exclusive- Monopoly , Only one thread can execute , Such as ReentrantLock
  2. Share- share , Multiple threads can execute at the same time , Such as Semaphore/CountDownLatch

node Node And key attributes

AQS As defined in Node

AQS Internal maintenance of a two-way linked list , The elements of this two-way linked list are Node Express .

Node It is defined in AbstractQueuedSynchronizer in :

static final class Node {
	//  Tag node not shared mode 
	static final Node SHARED = new Node();
	//  Mark nodes in exclusive mode 
	static final Node EXCLUSIVE = null;
	//  The thread waiting in the synchronization queue timed out or was interrupted , Need to cancel waiting from synchronization queue 
	static final int CANCELLED = 1;
	/**
	 *   The thread of the successor node is waiting , If the current node releases the synchronization state or is cancelled ,
	 *   Subsequent nodes will be notified , Enables subsequent nodes to run threads .
	 */
	static final int SIGNAL = -1;
	/**
	 *   The node is in the waiting queue , The thread of the node is waiting in Condition On , When other threads Condition Called signal() After the method ,
	 *   The node moves from the waiting queue to the synchronization queue , Add to get synchronization state 
	 */
	static final int CONDITION = -2;
	//  Indicates that the next shared synchronization state acquisition will be unconditionally propagated 
	static final int PROPAGATE = -3;
	/**
	 *  Flag the semaphore status of the current node  (1,0,-1,-2,-3)5 States 
	 *  Use CAS Change state ,volatile Ensure thread visibility , High concurrency scenarios , After being modified by a thread , The state will immediately be visible to other threads .
	 */
	volatile int waitStatus;
	//  Precursor node , The current node is set to join the synchronization queue 
	volatile Node prev;
	//  The subsequent nodes 
	volatile Node next;
	//  Node synchronization state thread 
	volatile Thread thread;
	/**
	 *  Wait for the successor node in the queue , If the current node is shared , So this field is a SHARED Constant ,
	 *  That is to say, node type ( Exclusive and shared ) Share the same field with subsequent nodes in the waiting queue .
	 */
	Node nextWaiter;
	//  If the node is shared , return true
	final boolean isShared() {
		return nextWaiter == SHARED;
	}
	//  Return to the precursor node 
	final Node predecessor() throws NullPointerException {
		Node p = prev;
		if (p == null)
			throw new NullPointerException();
		else
			return p;
	}
	Node() {    // Used to establish initial head or SHARED marker
	}
	Node(Thread thread, Node mode) {     // Used by addWaiter
		this.nextWaiter = mode;
		this.thread = thread;
	}
	Node(Thread thread, int waitStatus) { // Used by Condition
		this.waitStatus = waitStatus;
		this.thread = thread;
	}
}

AQS Key attributes

//  Point to the head node of the synchronization waiting queue 
private transient volatile Node head;

//  Point to the tail node of the synchronization waiting queue 
private transient volatile Node tail;

//  Synchronize resource status . For example, in ReentrantLock in ,state Identify the number of locks 
private volatile int state;

Fair lock & Not fair lock

Fair lock

Fair lock , If there is already a thread waiting , Then the thread will directly enter the waiting queue . Except for the first thread in the waiting queue , All threads will not block , need CPU To wake up .

Only when no thread is currently waiting , To get the lock :

The advantage is that all threads can get resources ; The disadvantage is that there is only the first thread in the queue , Other threads will block ,cpu The cost of waking up blocked threads can be very high .

Not fair lock

Unfair lock , When a thread attempts to acquire a lock , Even if there are other threads in the waiting queue , The thread will also try to acquire the lock directly (compareAndSetState).

You can also see through the code , When locking, you can go directly through cas Attempt to acquire lock :

The advantage of unfair lock is , If the thread acquires the lock directly , You don't have to add it to the waiting thread , Sure Reduce CPU Wake up thread overhead , So the performance is higher than that of fair lock ; Shortcomings just as the name suggests , Unfair ! It may cause that the thread in the middle of the queue can't get the lock all the time or for a long time .

In the code is :FairSync( Fair lock ) And NonfairSync( Not fair lock ),ReentrantLock The default is " Not fair lock ".

stay  ReentrantLock  There is a constructor that can pass in whether it is a fair lock :

Reentrant lock & Do not reenter the lock

Reentrant means that a lock can be locked repeatedly , Otherwise, you cannot re-enter .

Reentrant lock

Reentrant lock refers to : In threads , When a thread acquires an object lock , This thread can get the lock on this object again ( Other threads are not allowed ).

ReentrantLock It's a typical reentry lock . Let's take an example :

state Used to store the number of locks . Lock every time ,state It's worth adding 1; Every unlock ,state Value reduction 1;

Do not reenter the lock

When a thread acquires an object lock , This thread cannot acquire the lock again , The lock must be released before it can be acquired again .

JDK There is no implementation of non reentrant locks in .

Waiting in line

CLH Waiting in line , It's a Double linked list , It's based on Node  Class . Every time a new node is added , Will be added to the end of the queue .

When a thread comes in , When waiting , Will join in CLH Waiting in the queue . The implementation is to create a Node  object , Add to the double linked list .

  Look at it in code , It's through Node  Of prev  and next  Quantity connected .

Condition queue

Some threads wait for a condition together (Condition), Only when the condition is met , These wait threads will wake up , So we can fight for the lock again .

The conditional queue is also based on Node  Class , But with CLH The waiting queue is different , It's a One way linked list .

Conditional queue ,Node Of waitStatus = Node.CONDITION = -2. And Node Connection between , adopt nextWaiter, instead of prev and next.

Before operating the condition queue, you need to acquire the exclusive lock successfully , After successfully acquiring the exclusive lock , If the current conditions are not met , Is suspended on the condition queue of the current lock , At the same time, release the lock resource currently acquired , Reawakening after the condition is met .

BlockingQueue It is implemented through the condition queue .

No more details , Condition queue at AQS in , Relatively independent of other functions , Write a separate article later .

Exclusive way

The exclusive way is : A thread gets the lock , Other threads can no longer take the lock .

adopt  Node.EXCLUSIVE Indicates exclusive mode

AbstractOwnableSynchronizer Inherited the abstract class AbstractOwnableSynchronizer

In this class , Defines a variable exclusiveOwnerThread, It means that in exclusive mode , Which thread holds the synchronizer .

1) Get the lock

With ReentrantLock For example , It is exclusive mode .

First, the first thread passes through CAS Get the lock , Then set the exclusiveOwnerThread Is the current thread :

2) Join the blocking queue

When another thread wants to acquire a lock , If you find that the lock has been held by another thread , You cannot lock successfully :

If locking fails , Normally, it will be added to the waiting list and blocked .

addWaiter Method to create Node Node and put it at the end of the waiting queue :

private Node addWaiter(Node mode) {
	//  Build the current thread to Node type 
	Node node = new Node(Thread.currentThread(), mode);
	Node pred = tail;
	//  At present tail Whether the node is null?
	if (pred != null) {
		node.prev = pred;
		// CAS Insert the node into the end of the synchronization queue , then return Current node 
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	//  At present tail Node is empty , Indicates that the waiting queue is empty . At this point, create a waiting queue , And will Node Add to the end of the waiting queue 
	enq(node);
	return node;
}

acquireQueued Method is used when a lock is not acquired , Block the current thread , Before blocking, you will still try to acquire the lock again :

final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
		boolean interrupted = false;
		for (;;) {
		    //  If prev Node is head node , Then try to acquire the lock again 
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			if (
				/**
				 *  Judge whether the lock acquisition fails , Do you need to block 
				 *  The internal will be set to SIGNAL state , So it will return soon true, Then you can block the thread 
				 *  So we won't let this loop run all the time CPU
				 */
			    shouldParkAfterFailedAcquire(p, node) &&
				//  Block the current thread 
				parkAndCheckInterrupt()
			)
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

Take an example :

public class ExclusiveTest {
    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();

        Thread t1 = new Thread(() -> {
            lock.lock();
            System.out.println(" Threads 1 Locking success ");
            try {
                Thread.sleep(1000000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock.unlock();
            System.out.println(" Threads 1 Unlock complete ");
        });

        Thread t2 = new Thread(() -> {
            lock.lock();
            System.out.println(" Threads 2 Locking success ");
            lock.unlock();
            System.out.println(" Threads 2 Unlock complete ");
        });

        t1.start();
        Thread.sleep(500);
        t2.start();
        System.out.println("==============");
    }
}

Start two threads , Threads t1(Thread-0) Acquire the lock , But it has not been released . Threads 2(Thread-1) Trying to get the lock again will fail , Will be added to the waiting list .

3) After the thread holding the lock releases the lock , Wake up the thread in the waiting queue

When the thread holding the lock releases the lock , Will wake up the thread in the waiting queue .

When calling unlock() Method to release the lock ,

/**
 *  Release the lock held by exclusive mode 
 */
public final boolean release(int arg) {
	// tryRelease(arg):arg = 1, Release the lock once ,state - 1
	if (tryRelease(arg)) { 
		Node h = head;
		//  If head The node is in SIGNAL state (waitStatus = -1), It indicates that there are threads that need to be awakened 
		if (h != null && h.waitStatus != 0)
		//  Wake up the subsequent (next) Threads in nodes 
		unparkSuccessor(h);
		return true;
	}
	return false;
}

First , The thread holding the lock attempts to release the lock (tryRelease), Inside it will put (state-1), That is, release “ once ” lock , Only when state Reduced to 0, This thread will finally release the lock , return true, Otherwise return to false

protected final boolean tryRelease(int releases) {
	// state  reduce  1
	int c = getState() - releases;
	//  If the lock is not held by the current thread , Throw an exception 
	if (Thread.currentThread() != getExclusiveOwnerThread())
		throw new IllegalMonitorStateException();
	boolean free = false;
	if (c == 0) {
		//  If state It has been reduced to 0 了 , This indicates that this thread has completely released the lock 
		free = true;
		//  Because the current thread has released the lock , therefore exclusiveOwnerThread empty 
		setExclusiveOwnerThread(null);
	}
	//  Set up the latest state value 
	setState(c);
	return free;
}

If the thread successfully releases the lock , Then look at head Whether the node is in SIGNAL state (waitStatus=-1), Then wake up the thread in the waiting queue (head Successor node ), The concrete realization is unparkSuccessor() In the method :

/**
 *  Wakes up the corresponding thread of the node . Under normal circumstances , What wakes up is head Thread in the successor node of 
 */
private void unparkSuccessor(Node node) {
	//  this node yes head node 
	int ws = node.waitStatus;
	if (ws < 0)
		//  Will wait for status waitStatus Set to initial value 0. In fact, the most common node in the waiting queue waitStatus Namely 0
		compareAndSetWaitStatus(node, ws, 0); 
	// head Successor node 
	Node s = node.next;
	//  If the subsequent node is empty , Or the status is expired (CANCEL=1)
	if (s == null || s.waitStatus > 0) {
		s = null;
		//  Traverse forward from the back tail to find the first node in the normal blocking state to wake up 
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
				s = t;
	}
	if (s != null)
		//  Call local (native) Method wake up thread 
		LockSupport.unpark(s.thread);
}

After the thread is awakened , You can get the lock .

How to share

The sharing method is : Support multiple threads to acquire locks at the same time , Access shared resources .

With Semaphore For example ( The default is to create an unfair lock ).Semaphore  The function of is to limit current . stay Semaphore  in ,state  Indicates the total number of resources .

For example, only... Are allowed at the same time 2 A thread can access resources , that state The beginning is 2.

for instance :

public class SemaphoreTest {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);
        for (int i = 1; i < 6; i++) {
            Thread thread = new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(" Threads " + Thread.currentThread().getName()
                            + " Locking success ==>" + System.currentTimeMillis());
                    Thread.sleep(5000);
                    semaphore.release();
                    System.out.println(" Threads " + Thread.currentThread().getName()
                            + " Unlock complete ==>" + System.currentTimeMillis());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, " Threads " + i);
            thread.start();
        }
    }
}

rise 5 Threads , Sleep in thread 5 second , It can be seen that there can only be 2 Threads get locks ,5 Seconds later , After a thread releases the lock , Other threads can continue to acquire locks . But there can only be 2 Threads can acquire locks at the same time .

Next, let's take a look at how it is implemented in the source code .

1) Get the lock

If you can get the lock , Just go back to normal . Don't throw exceptions , It doesn't block . 

public final void acquireSharedInterruptibly(int arg)
		throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	//  Attempt to acquire the Shared lock , If tryAcquireShared return >=0, Then it means that it is successful 
	if (tryAcquireShared(arg) < 0)
	    //  If you don't get the lock , Normally, the thread should be added to the waiting queue 
		doAcquireSharedInterruptibly(arg);
}

Determine whether the shared lock can be obtained :

final int nonfairTryAcquireShared(int acquires) {
	for (;;) {
	    // state, The initial value is passed in the constructor permits, That is, the number of threads supported at the same time 
		int available = getState();
		//  Number of executable threads remaining  = state - 1
		int remaining = available - acquires;
		//  If remaining<0, It means that the locks have been occupied . otherwise cas modify state value 
		if (remaining < 0 || compareAndSetState(available, remaining))
			return remaining;
	}
}

2) If the lock cannot be acquired , Add the current thread to the wait queue , And block

If you don't get the lock ( Back to remaining Less than 0), Generally, it will be put in the waiting queue , And block the current thread .

private void doAcquireSharedInterruptibly(int arg)
	throws InterruptedException {
	//  establish Node node , And join the waiting queue 
	final Node node = addWaiter(Node.SHARED);
	boolean failed = true;
	try {
		for (;;) {
		    // prev node 
			final Node p = node.predecessor();
			if (p == head) {
			    //  If at present Node Of prev Node is head, Then try again to get the lock 
				int r = tryAcquireShared(arg);
				if (r >= 0) {
					setHeadAndPropagate(node, r);
					p.next = null; // help GC
					failed = false;
					return;
				}
			}
			if (
			    /**
				 *  Judge whether the lock acquisition fails , Do you need to block 
				 *  The internal will be set to SIGNAL state , So it will return soon true, Then you can block the thread 
				 *  So we won't let this loop run all the time CPU
				 */
			    shouldParkAfterFailedAcquire(p, node) &&
			    //  call UNSAFE Methods , Block thread 
				parkAndCheckInterrupt()
			)
				throw new InterruptedException();
		}
	} finally {
		if (failed)
		    //  Cancel acquisition lock 
			cancelAcquire(node);
	}
}

addWaiter Method to create Node Node and put it at the end of the waiting queue . The source code is explained in exclusive mode .

enq Method is used when the wait queue is empty , Create a waiting queue , And will Node Add to the end of the waiting queue :

private Node enq(final Node node) {
	//  Dead cycle , Ensure that the node is successfully added to the end of the wait list , Create a list without a list 
	for (;;) {
		Node t = tail;
		if (t == null) {
			//  The queue is empty and needs to be initialized , Create an empty header node head And tail nodes tail
			if (compareAndSetHead(new Node()))
				tail = head;
		} else {
			//  After the queue is not empty , Put the present Node Add to the end of the list 
			node.prev = t;
			if (compareAndSetTail(t, node)) {
				t.next = node;
				return t;
			}
		}
	}
}

shouldParkAfterFailedAcquire Method is used to judge whether the lock acquisition fails , Do you need to block :

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	int ws = pred.waitStatus;
	if (ws == Node.SIGNAL)
		//  if prev The state of the node is SIGNAL, This means that the current node can be safely park
		return true;
	if (ws > 0) {
		//  If prev The node status is canceled (waitStatus=1), Then move out of the queue 
		do {
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		//  If prev The node is in other states , Set it to SIGNAL state , It can be safely park
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}

shouldParkAfterFailedAcquire Method is internally set to SIGNAL state , So it will return soon true. And then it calls parkAndCheckInterrupt() Blocking the current thread . So it doesn't always spin CPU!

3) Thread release lock

When a thread ends using a common resource , Will release the lock .state  The value will be added back .

First, check whether it can be released once (releases=1) Shared lock , If you can ,state First plus 1, Back again true:

protected final boolean tryReleaseShared(int releases) {
	for (;;) {
	    //  Get current state value 
		int current = getState();
	    // releases = 1,
		int next = current + releases;
		if (next < current)
			throw new Error("Maximum permit count exceeded");
		//  adopt cas Way update state,state += 1
		if (compareAndSetState(current, next))
			return true;
	}
}

tryReleaseShared return true, Indicates that the lock can be released , Start waking up the thread in the waiting queue :

/**
 *  Set the current node to SIGNAL perhaps PROPAGATE
 *  Wake up the head.next(B node ),B After the node wakes up, it can compete for locks , After success head->B.next, And then it wakes up B.next, Repeat until the shared nodes wake up 
 * head The node status is SIGNAL, Reset head.waitStatus->0, Wake up the head Node thread , After wake-up, threads compete for shared locks 
 * head The node status is 0, take head.waitStatus->Node.PROPAGATE Communication status , Indicates that the state needs to be propagated to the next node 
 */
private void doReleaseShared() {
	for (;;) {
		Node h = head;
		if (h != null && h != tail) {
			int ws = h.waitStatus;
			// head yes SIGNAL state , Note that the following node threads can be awakened 
			if (ws == Node.SIGNAL) {
				/**
				 * head Status is SIGNAL, Reset head node waitStatus by 0, It's not directly set to Node.PROPAGATE,
				 *  Because unparkSuccessor(h) in , If ws<0 Will be set to 0, therefore ws Set to 0, Set it to Node.PROPAGATE
				 */
				if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
					// waitStatus Set up 0 Failure words , Recycle 
					continue;
				/**
				 *  call native Method wake up head.next Node thread , The awakened thread will compete for the lock 
				 *  After success head Will point to the next node , That is to say head There is a change 
				 */
				unparkSuccessor(h);
			}
			/**
			 *  If head Node waitStatus Is in reset state (waitStatus==0) Of , Set it to “ spread ” state .
			 *  It means that the state needs to be propagated to the next node 
			 */
			else if (ws == 0 &&
					!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
				continue;
		}
		//  If head Changed , Recycle , Keep waking up head The next node of 
		if (h == head)
			break;
	}
}

Be careful , It wakes up all the node threads in the waiting queue , Then they go back and compete for the lock .

原网站

版权声明
本文为[xujingyiss]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/03/202203020628160129.html