当前位置:网站首页>Chapter 8 - shared model JUC
Chapter 8 - shared model JUC
2022-06-12 23:01:00 【Ape feather】
Chapter viii. - Shared model JUC
AQS principle
summary
The full name is AbstractQueuedSynchronizer, yes Blocking lock
and Framework of related synchronizer tools
characteristic :
- use
state
Property to represent the state of the resource ( branchExclusive mode
andSharing mode
), Subclasses need to define how to maintain this state , How to controlGet lock and release lock
getState
- obtain state statesetState
- Set up state statecompareAndSetState
- CAS Mechanism settings state stateExclusive mode is that only one thread can access the resource , Shared mode allows multiple threads to access resources
- Provides the basis for FIFO ( fifo ) Waiting queue , Be similar to Monitor Of
EntryList
- Conditional variables to achieve
wait for 、 Wake up the
Mechanism ,Support multiple conditional variables
, Be similar to Monitor OfWaitSet
Subclasses mainly implement such methods ( Default throw
UnsupportedOperationException
)
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
Get lock pose
// If the lock acquisition fails
if (!tryAcquire(arg)) {
// The team , You can choose to block the current thread park unpark
}
Get lock pose
// If the lock is released successfully
if (tryRelease(arg)) {
// Let the blocked thread run again
}
Implement non reentrant locks
The following implements a non reentrant blocking lock : Use
AbstractQueuedSynchronizer
Customize a synchronizer to implement a custom lock !
/** * @author xiexu * @date 2022/2/14 10:52 */
@Slf4j(topic = "c.TestAQS")
@SuppressWarnings("all")
public class TestAqs {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
Sleeper.sleep(2);
} finally {
log.debug("unlocking...");
lock.unlock();
}
}, "t1").start();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
} finally {
log.debug("unlocking...");
lock.unlock();
}
}, "t2").start();
}
}
// Custom Lock ( Do not reenter the lock )
class MyLock implements Lock {
// An exclusive lock Synchronizer class
class MySync extends AbstractQueuedSynchronizer {
@Override // Attempt to acquire lock
protected boolean tryAcquire(int arg) {
// Make sure the atomicity , If at present State yes 0, Just set it to 1, It means that you have obtained the lock
if (compareAndSetState(0, 1)) {
// Lock on , And set up owner Is the current thread
setExclusiveOwnerThread(Thread.currentThread());
return true; // The lock obtained successfully returns true
}
// return false Indicates that locking failed
return false;
}
@Override // Try to release the lock
protected boolean tryRelease(int arg) {
// There is no need to determine atomicity here , Because it is the lock holder who releases
// hold setExclusiveOwnerThread(null) Put it in setState(0) front , To prevent problems caused by reordering instructions
setExclusiveOwnerThread(null); // Indicates that there is no thread occupation
setState(0); // state yes volatile Embellished , stay setState(0) The previous attribute modification , It is also visible to other threads , Specific view volatile principle ( Write barriers )
return true;
}
@Override // Hold exclusive lock or not
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Create conditional variables
public Condition newCondition() {
return new ConditionObject();
}
}
// Synchronizer class object
private MySync sync = new MySync();
@Override // Lock ( If it doesn't succeed, it will enter the waiting queue )
public void lock() {
sync.acquire(1);
}
@Override // Lock , Can interrupt
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override // Try to lock ( Try it once )
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override // Try to lock ( With timeout )
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override // Unlock
public void unlock() {
sync.release(1);
}
@Override // Create conditional variables
public Condition newCondition() {
return sync.newCondition();
}
}
Non reentrant test
- If you change to the following code , You will find yourself blocked ( Only print once locking)
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
log.debug("locking...");
// Do not reenter the lock , The same thread before the lock is released , Only one lock can be added
lock.lock();
log.debug("locking...");
try {
log.debug("locking...");
Sleeper.sleep(2);
} finally {
log.debug("unlocking...");
lock.unlock();
}
}, "t1").start();
}
ReentrantLock principle
- ReentrantLock Two synchronizers are provided , Realization
Fair lock
andNot fair lock
, Default is unfair lock !
The implementation principle of unfair lock
Lock and unlock process
Let's start with the constructor , The default is
Not fair lock
Realization
public ReentrantLock() {
sync = new NonfairSync();
}
NonfairSync Inherited from AQS
- When there is no competition
- Thread-0 Be the holder of the lock
- When the first competition appears , View the source code
NonfairSync
Oflock
Method
**
new NonfairSync( );
Source code
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
**
acquire( )
Source code
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Thread-1 Yes
- lock In the method CAS Try to state from 0 Change it to 1, It turned out to be a failure ( Because at this time CAS operation , already state Have been to 1 了 )
- lock Method acquire Method , Get into tryAcquire Logic , Here we think that at this time state It's already 1, It still failed
- Next into acquire Methodical addWaiter Logic , structure Node queue ( Two way linked list )
- The yellow triangle in the figure below indicates this Node Of waitStatus state , among 0 Is the default normal state
- Node The creation of is lazy
- The first Node be called Dummy( Virtual head node ) Or sentinels , Used for occupying , Not associated with threads
The current thread enters acquire Methodical acquireQueued
Logic
- acquireQueued Will keep trying to get the lock in an endless loop , Enter after failure park Blocking
- If you are next to head(
Second place , The first is the virtual head node
), thatAgain tryAcquire
Attempt to acquire lock , We set this time here state Still 1, Failure - Get into
shouldParkAfterFailedAcquire
Logic , Put the precursor node, namely head (dummy) Of waitStatus Change it to -1, This time back false
**
acquireQueued( )
Source code
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- shouldParkAfterFailedAcquire After execution, go back to acquireQueued , Again tryAcquire Attempt to acquire lock , Of course at this time state Still 1, Failure
- When you enter shouldParkAfterFailedAcquire when , At this time, because of its precursor node(dummy) Of waitStatus It's already -1, This time back true
- Get into parkAndCheckInterrupt, Thread-1 park( Gray indicates that it is blocked )
- Again, multiple threads go through the above process, and the competition fails , It's like this
- Thread-0 call
unlock Method ( stay ReentrantLock Inside )
Insiderelease Method
Release the lock , Get intotryRelease
technological process , If it works , Set upexclusiveOwnerThread by null,state = 0
unlock、release Source code
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease( ) Source code
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// The lock owner is empty
setExclusiveOwnerThread(null);
}
// here state Set to 0
setState(c);
return free;
}
- unlock In the method release In the method ,
If the current queue is not null, also head Of waitStatus = -1
, Get intounparkSuccessor
technological process :unparkSuccessor In the queue, you will find the left head The latest one Node( No cancellation , That is to say Thread-1)
,unpark Wake up the Thread-1 Resume its operation , In this case Thread-1 go back to Thread-1 Continue at the blocked position , Will continue to execute acquireQueued technological process
- If the lock is successful ( There is no competition ), Will be set (acquireQueued In the method )
- exclusiveOwnerThread by Thread-1,state = 1
- head Point to just Thread-1 Where Node, The Node Empty Thread
- The original head Because it's disconnected from the list , And can be recycled
- If there are other threads competing at this time ( The manifestation of unfairness ), For example, there is Thread-4 coming
- If you happen to be Thread-4 Take the lead
- Thread-4 Set to exclusiveOwnerThread,state = 1
Thread-1 Once again into the acquireQueued technological process , Lock acquisition failed , Re enter park Blocking
Lock source code
// Sync Inherited from AQS
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// Lock implementation
final void lock() {
// First use cas Try ( Just try once ) take state from 0 Change it to 1, If successful, it means that an exclusive lock has been obtained
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// If the attempt fails , Get into ㈠
acquire(1);
}
// ㈠ AQS The inherited method , Easy to read , Put it here
public final void acquire(int arg) {
// ㈡ tryAcquire
if (
!tryAcquire(arg) &&
// When tryAcquire Return to false when , First call addWaiter ㈣, next acquireQueued ㈤
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// ㈡ Get into ㈢
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// ㈢ Sync The inherited method , Easy to read , Put it here
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// If you haven't got the lock yet
if (c == 0) {
// Try to use cas get , This reflects the unfairness : Don't check AQS queue
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// If the lock has been obtained , Thread or current thread , Indicates that a lock reentry has occurred
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// Acquisition failure , Back to the tune
return false;
}
// ㈣ AQS The inherited method , Easy to read , Put it here
private Node addWaiter(Node mode) {
// Associate the current thread to a Node On the object , The mode is exclusive , New Node Of waitstatus The default is 0, because waitstatus Is a member variable , The default is initialized to 0
Node node = new Node(Thread.currentThread(), mode);
// If tail Not for null, cas Try to Node Object to join AQS Queue tail
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// Double linked list
pred.next = node;
return node;
}
}
// If tail by null, Try to Node Join in AQS, Get into ㈥
enq(node);
return node;
}
// ㈥ AQS The inherited method , Easy to read , Put it here
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// Not yet , Set up head For sentinel nodes ( It doesn't correspond to the thread , Status as 0)
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
// cas Try to Node Object to join AQS Queue tail
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// ㈤ AQS The inherited method , Easy to read , Put it here
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// The last node is head, It's your turn ( Corresponding to the current thread node) 了 , Try to get
if (p == head && tryAcquire(arg)) {
// To be successful , Set yourself up ( Corresponding to the current thread node) by head
setHead(node);
// Last node help GC
p.next = null;
failed = false;
// Return interrupt flag false
return interrupted;
}
if (
// Judge whether you should park, Get into ㈦
shouldParkAfterFailedAcquire(p, node) &&
// park wait for , here Node The state of is set to Node.SIGNAL ㈧
parkAndCheckInterrupt()
) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// ㈦ AQS The inherited method , Easy to read , Put it here
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// Get the status of the previous node
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) {
// The last node is blocking , Then you can stop yourself
return true;
}
// > 0 Indicates cancellation status
if (ws > 0) {
// The previous node canceled , Then the refactoring deletes all the previously cancelled nodes , Return to the outer loop and try again
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// There's no blockage this time
// But next time, if the retry fails , You need to block , At this time, you need to set the status of the previous node to Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// ㈧ Block the current thread
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
}
Unlock the source code
// Sync Inherited from AQS
static final class NonfairSync extends Sync {
// Unlock implementation
public void unlock() {
sync.release(1);
}
// AQS The inherited method , Easy to read , Put it here
public final boolean release(int arg) {
// Try to release the lock , Get into ㈠
if (tryRelease(arg)) {
// Queue head node unpark
Node h = head;
if (
// The queue is not for null
h != null &&
// waitStatus == Node.SIGNAL That's what we need unpark
h.waitStatus != 0
) {
// unpark AQS Threads waiting in , Get into ㈡
unparkSuccessor(h);
}
return true;
}
return false;
}
// ㈠ Sync The inherited method , Easy to read , Put it here
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// Support lock reentry , Only state Reduced to 0, To release successfully
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// ㈡ AQS The inherited method , Easy to read , Put it here
private void unparkSuccessor(Node node) {
// If the state is Node.SIGNAL Attempt to reset the status to 0, If the thread obtains the lock, the head node will be discarded later
// If you don't succeed, you can
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// Find the need unpark The node of , But this node starts from AQS Out of the queue , It is done by the wake-up node
Node s = node.next;
// Do not consider cancelled nodes , from AQS Find the front of the queue from back to front unpark The node of
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
}
Reentry principle
Same thread , Lock reentry , Would be right
state
Since it increases . When you release the lock , state Since the subtract ; When state Reduce to 0 When . At this point, the thread willlock
Release successful , Will further awakenOther threads
To compete for locks
static final class NonfairSync extends Sync {
// ...
// Sync The inherited method , Easy to read , Put it here
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// If the lock has been obtained , Thread or current thread , Indicates that a lock reentry has occurred
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// Sync The inherited method , Easy to read , Put it here
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// Support lock reentry , Only state Reduced to 0, To release successfully
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
The interruptible principle
Non interruptible mode
In this mode , Even if it's interrupted , Will still reside in AQS In line , You have to wait until you get the lock to know that you have been interrupted
// Sync Inherited from AQS
static final class NonfairSync extends Sync {
// ...
private final boolean parkAndCheckInterrupt() {
// If the break mark is already true, be park It will fail
// By park Blocked threads , It can be called by other threads interrupt Method to interrupt the park Blocking
LockSupport.park(this);
// interrupted The break mark is cleared ; The next time park It can still be blocked
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
// You still need to get the lock , To return to the interrupt state
return interrupted;
}
if (
shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()
) {
// If it's because interrupt Awakened , The return interrupt status is true
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
// If the interrupt status is true
selfInterrupt();
}
}
static void selfInterrupt() {
// Regenerate an interrupt , At this time, if the thread is running normally , So it's not out of sleep Equal state ,interrupt Method will not report an error
Thread.currentThread().interrupt();
}
}
}
Interruptible mode
static final class NonfairSync extends Sync {
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// If you don't get the lock , Get into ㈠
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// ㈠ Interruptible lock acquisition process
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// stay park If the process is interrupt Will enter this
// An exception is thrown at this time , Instead of entering for (;;)
throw new InterruptedException();
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
Fair lock implementation principle
see AQS In line , own ( Threads ) Is there a precursor node ( This node refers to threads , Instead of occupying sentinel nodes ); If so, don't compete for locks ; without , To go to CAS operation
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
// AQS The inherited method , Easy to read , Put it here
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// The main difference from unfair lock is tryAcquire Method implementation
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// First check AQS Whether there is a precursor node in the queue , No competition
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// ㈠ AQS The inherited method , Easy to read , Put it here
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t When, it means that there is... In the queue Node
return h != t &&
(
// (s = h.next) == null It indicates whether there is still a dick in the queue
(s = h.next) == null || // Or the second thread in the queue is not this thread
s.thread != Thread.currentThread()
);
}
}
The principle of conditional variable implementation
Each condition variable actually corresponds to a waiting queue , In fact, the implementation class is ConditionObject
await technological process
- Start Thread-0 Hold lock ,conditionObject Object call
await
, Get into ConditionObject OfaddConditionWaiter
technological process - Create a new Node Status as
-2
(Node.CONDITION), relation Thread-0, Join the tail of the waiting queue
- Next into AQS Of
fullyRelease
technological process , Release all locks on the synchronizer ( Because threads can be reentrant , The lock has many layers )
unparkSuccessor(h);
—> unpark Wake up the AQS The next node in the queue , Competitive lock , Suppose there are no other competing threads , thatThread-1
Competitive success
LockSupport.park(this);
—> park Blocking Thread-0
signal technological process
- hypothesis Thread-1 To wake up Thread-0
// If you don't hold the lock , It throws an exception --> It means Thread-1 To hold the lock , To wake up the waiting thread in the condition variable
if (!isHeldExclusively())
- Get into ConditionObject Of doSignal technological process , Get the first... In the waiting queue Node, namely Thread-0 Where Node
private void doSignal(Node first) {
do {
// Go to firstWaiter Take out the waiting thread in the condition variable .
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// Transferred to the AQS Of the queue , Wait for the competition lock
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
- perform
transferForSignal
technological process , Will be Node Join in AQS Queue tail , take Thread-0 waitStatus Change it to 0,Thread-3 Of waitStatus Change it to -1, Change it to-1
Have the responsibility to wake up their ownThe subsequent nodes
final boolean transferForSignal(Node node) {
/* * If cannot change waitStatus, the node has been cancelled. */
// First, change the status code from the waiting status -2 Change to queue status 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
// At the end of the queue
Node p = enq(node);
// Returns the status code of its predecessor node
int ws = p.waitStatus;
// Try to change the status code of the precursor node to -1, Because I want him to wake up later node
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
- Thread-1 Release the lock , Get into unlock technological process , A little
Source code analysis
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// The first waiting node
private transient Node firstWaiter;
// The last waiting node
private transient Node lastWaiter;
public ConditionObject() {
}
// ㈠ Add one Node To the waiting queue
private Node addConditionWaiter() {
Node t = lastWaiter;
// All cancelled Node Delete... From the queue linked list , see ㈡
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// Create a new thread associated with the current thread Node, Add to the end of the queue
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// Wake up the - Transfer the first node not cancelled to AQS queue
private void doSignal(Node first) {
do {
// It's already the tail node
if ( (firstWaiter = first.nextWaiter) == null) {
lastWaiter = null;
}
first.nextWaiter = null;
} while (
// Will wait for... In the queue Node Transfer to AQS queue , If it is unsuccessful and there are still nodes, continue the cycle ㈢
!transferForSignal(first) &&
// The queue has nodes
(first = firstWaiter) != null
);
}
// External class methods , Easy to read , Put it here
// ㈢ If the node status is cancel , return false Indicates that the transfer failed , Otherwise, the transfer will succeed
final boolean transferForSignal(Node node) {
// Set up current node Status as 0( Because it's at the end of the queue ), If the state is no longer Node.CONDITION, The description was cancelled
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// Join in AQS Queue tail
Node p = enq(node);
int ws = p.waitStatus;
if (
// The last node inserted into the node is cancelled
ws > 0 ||
// The last node inserted into a node cannot be set to the status of Node.SIGNAL
!compareAndSetWaitStatus(p, ws, Node.SIGNAL)
) {
// unpark Unblock , Let the thread resynchronize the State
LockSupport.unpark(node.thread);
}
return true;
}
// All wake up - All nodes waiting for the queue are transferred to AQS queue
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// ㈡
private void unlinkCancelledWaiters() {
// ...
}
// Wake up the - You must hold a lock to wake up , therefore doSignal There is no need to consider locking
public final void signal() {
// If you don't hold the lock , It throws an exception
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// All wake up - You must hold a lock to wake up , therefore doSignalAll There is no need to consider locking
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// Don't interrupt waiting - Until awakened
public final void awaitUninterruptibly() {
// Add one Node To the waiting queue , see ㈠
Node node = addConditionWaiter();
// Release the lock held by the node , see ㈣
int savedState = fullyRelease(node);
boolean interrupted = false;
// If the node has not been transferred to AQS queue , Blocking
while (!isOnSyncQueue(node)) {
// park Blocking
LockSupport.park(this);
// If interrupted , Only set the interrupt status
if (Thread.interrupted())
interrupted = true;
}
// After wake up , Try competing locks , If it fails, enter AQS queue
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
// External class methods , Easy to read , Put it here
// ㈣ Because a thread may re-enter , Need to put state Release all , obtain state, Then subtract it all , Release in full
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// Wake up the next node in the waiting queue
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// Break mode - Reset the interrupt state when exiting the wait
private static final int REINTERRUPT = 1;
// Break mode - Throw an exception when exiting the wait
private static final int THROW_IE = -1;
// Judge the interruption mode
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// ㈤ Apply break mode
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
// wait for - Until awakened or interrupted
public final void await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// Add one Node To the waiting queue , see ㈠
Node node = addConditionWaiter();
// Release the lock held by the node
int savedState = fullyRelease(node);
int interruptMode = 0;
// If the node has not been transferred to AQS queue , Blocking
while (!isOnSyncQueue(node)) {
// park Blocking
LockSupport.park(this);
// If interrupted , Exit the waiting queue
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// After exiting the waiting queue , You also need to get AQS Queue lock
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// All cancelled Node Delete... From the queue linked list , see ㈡
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// Apply break mode , see ㈤
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// wait for - Until awakened or interrupted or timed out
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// Add one Node To the waiting queue , see ㈠
Node node = addConditionWaiter();
// Release the lock held by the node
int savedState = fullyRelease(node);
// Get a deadline
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// If the node has not been transferred to AQS queue , Blocking
while (!isOnSyncQueue(node)) {
// Timeout , Exit the waiting queue
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// park Block for a certain time , spinForTimeoutThreshold by 1000 ns
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// If interrupted , Exit the waiting queue
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// After exiting the waiting queue , You also need to get AQS Queue lock
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// All cancelled Node Delete... From the queue linked list , see ㈡
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// Apply break mode , see ㈤
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
// wait for - Until awakened or interrupted or timed out , Logic is similar to awaitNanos
public final boolean awaitUntil(Date deadline) throws InterruptedException {
// ...
}
// wait for - Until awakened or interrupted or timed out , Logic is similar to awaitNanos
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
// ...
}
// Tool method Omit ...
}
Read-write lock
ReentrantReadWriteLock
- When read operations are much higher than write operations , Use at this time
Read-write lock
Give Wayread - read
Can the concurrent , Improve performance . Similar to... In a databaseselect …from … lock in share mode
- Provide a
Data container class
Read locks are used internally to protect dataread( )
Method , Write lock to protect datawrite( )
Method
@Slf4j(topic = "c.DataContainer")
class DataContainer {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
// Read the lock
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
// Write lock
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() {
log.debug(" Get read lock ...");
r.lock();
try {
log.debug(" Read ");
sleep(1);
return data;
} finally {
log.debug(" Release read lock ...");
r.unlock();
}
}
public void write() {
log.debug(" Get write lock ...");
w.lock();
try {
log.debug(" write in ");
sleep(1);
} finally {
log.debug(" Release the write lock ...");
w.unlock();
}
}
}
test Read the lock - Read the lock Can the concurrent
@Slf4j(topic = "c.TestReadWriteLock")
public class TestReadWriteLock {
public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();
}
}
test Read the lock - Write lock Mutual blocking
@Slf4j(topic = "c.TestReadWriteLock")
public class TestReadWriteLock {
public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.write();
}, "t2").start();
}
}
test Write lock - Write lock Mutual blocking
matters needing attention
- Read lock does not support conditional variables , Write lock supports conditional variables
- Upgrade on reentry does not support : That is, holding a read lock to get a write lock , Will cause the acquisition of the write lock to wait forever
r.lock();
try {
// ...
w.lock();
try {
// ...
} finally{
w.unlock();
}
} finally{
r.unlock();
}
- Downgrade support on reentry : That is, when a write lock is held, you can obtain a read lock
class CachedData {
Object data;
// Whether it works , If it fails , You have to recalculate data
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
// Judge whether the data is valid
if (!cacheValid) {
// The read lock must be released before obtaining the write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Determine whether other threads have acquired write locks 、 Updated cache , Avoid repeated updates ( Double check )
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Obtain the read lock before releasing the write lock
// Demote to read lock , Release the write lock , This allows other threads to read the cache
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock();
}
}
// Get the read lock , Run out of data by yourself , Release read lock ( Prevent others from writing while you are reading )
// ① The data is not invalid , Get the read lock , You can read it directly
// ② After data failure , Then recalculate data, Then get the read lock before releasing the write lock , Continue to read
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
StampedLock
Concept
This class is from JDK 8 Join in , To further optimize read performance , It's characterized by the use of read locks 、 When writing locks, you must cooperate 【 stamp 】 Use
Lock
long stamp = lock.readLock();
lock.unlockRead(stamp);
Add / remove write lock
long stamp = lock.writeLock();
lock.unlockWrite(stamp);
Read optimistically ,StampedLock Support tryOptimisticRead() Method ( Read optimistically ), It needs to be done once after reading Stamp verification If the verification passes , Indicates that there is no write operation during this period , Data can be used safely , If the verification fails , The read lock needs to be reacquired , Ensure data security .
long stamp = lock.tryOptimisticRead();
// Verification stamp
if(!lock.validate(stamp)){
// Lock escalation
}
Example
Provide a Data container class Read locks are used internally to protect data read( ) Method , Write lock to protect data write( ) Method
@Slf4j(topic = "c.DataContainerStamped")
class DataContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();
public DataContainerStamped(int data) {
this.data = data;
}
public int read(int readTime) {
long stamp = lock.tryOptimisticRead();
log.debug("optimistic read locking...{}", stamp);
sleep(readTime);
if (lock.validate(stamp)) {
log.debug("read finish...{}, data:{}", stamp, data);
return data;
}
// Lock escalation - Read the lock
log.debug("updating to read lock... {}", stamp);
try {
stamp = lock.readLock();
log.debug("read lock {}", stamp);
sleep(readTime);
log.debug("read finish...{}, data:{}", stamp, data);
return data;
} finally {
log.debug("read unlock {}", stamp);
lock.unlockRead(stamp);
}
}
public void write(int newData) {
long stamp = lock.writeLock();
log.debug("write lock {}", stamp);
try {
sleep(2);
this.data = newData;
} finally {
log.debug("write unlock {}", stamp);
lock.unlockWrite(stamp);
}
}
}
Application cache
Cache update strategy
update , Whether to clear the cache or update the database first
- Clear cache first
- Update the database first ( recommend )
- Add a situation , Suppose the query thread A When querying data, it happens that the cached data expires due to the expiration of time , Or the first query
- The chances of this happening are very small
- thus it can be seen , To ensure the consistency of the data here , It is necessary to ensure the atomicity of updating the database and clearing the cache .
Read write locks implement consistent caching
Use read-write locks to implement a simple on-demand cache
public class TestGenericDao {
public static void main(String[] args) {
GenericDao dao = new GenericDaoCached();
System.out.println("============> Inquire about ");
String sql = "select * from emp where empno = ?";
int empno = 7369;
Emp emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
System.out.println("============> to update ");
dao.update("update emp set sal = ? where empno = ?", 800, empno);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
}
}
class GenericDaoCached extends GenericDao {
private GenericDao dao = new GenericDao();
private Map<SqlPair, Object> map = new HashMap<>();
// Read-write lock
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
@Override
public <T> List<T> queryList(Class<T> beanClass, String sql, Object... args) {
return dao.queryList(beanClass, sql, args);
}
@Override
public <T> T queryOne(Class<T> beanClass, String sql, Object... args) {
// Look for... In the cache first , Go straight back to
SqlPair key = new SqlPair(sql, args);;
rw.readLock().lock();
try {
T value = (T) map.get(key);
if(value != null) {
return value;
}
} finally {
rw.readLock().unlock();
}
rw.writeLock().lock();
try {
// Multiple threads
T value = (T) map.get(key);
if(value == null) {
// Not in cache , Query the database
value = dao.queryOne(beanClass, sql, args);
map.put(key, value);
}
return value;
} finally {
rw.writeLock().unlock();
}
}
@Override
public int update(String sql, Object... args) {
rw.writeLock().lock();
try {
// Update the library first
int update = dao.update(sql, args);
// Empty cache
map.clear();
return update;
} finally {
rw.writeLock().unlock();
}
}
class SqlPair {
private String sql;
private Object[] args;
public SqlPair(String sql, Object[] args) {
this.sql = sql;
this.args = args;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SqlPair sqlPair = (SqlPair) o;
return Objects.equals(sql, sqlPair.sql) &&
Arrays.equals(args, sqlPair.args);
}
@Override
public int hashCode() {
int result = Objects.hash(sql);
result = 31 * result + Arrays.hashCode(args);
return result;
}
}
}
Be careful
The above implementation reflects the application of read-write lock , Ensure the consistency between cache and database , But there are the following questions not considered
- Suitable for reading more and writing less , If write operations are frequent , The above implementation performance is low
- Cache capacity is not considered , Save without deleting
- Cache expiration is not considered , Data that has not been used for a long time has not been cleaned up
- Only suitable for single machine , Here is a java Process implementation , Not suitable for distributed
- Concurrency is still low , At present, only one lock will be used .
- If you query multiple tables , surface 1、 surface 2 The read and write operations of are irrelevant , But with the same lock , surface 1 In operation , surface 2 Can't move .
- The update method is too simple and crude , Empty all key( Consider partitioning or redesigning by type key)
- surface 1 Of key Empty , surface 2 Should not be affected
Read write lock principle
Diagram flow
The read-write lock uses the same Sycn synchronizer , So wait for the queue 、state Waiting is the same
t1 w.lock,t2 r.lock
- t1 Successfully locked , Process and ReentrantLock There is nothing special about locking , The difference is
Write lock status accounts for state It's low 16 position , The read lock uses state The height of 16 position
- t2 perform r.lock, At this time, enter the lock reading sync.acquireShared(1) technological process , First of all tryAcquireShared technological process . If a write lock is occupied , that tryAcquireShared return -1 It means failure
tryAcquireShared Return value representation
- 1 It means failure
- 0 It means success , But the successor node will not continue to wake up
- A positive number means success , And the numerical value is that several subsequent nodes need to wake up , The read-write lock returns 1
- Then you will enter sync.doAcquireShared(1) technological process , The first is to call addWaiter Add a node , The difference is that the node is set to Node.SHARED Patterns, not Node.EXCLUSIVE Pattern , Note that this time t2 Still active
- t2 Will see if your node is the second , If it is , It will call again
tryAcquireShared(1)
To try to get the lock - If it doesn't work , stay doAcquireShared Inside for ( ; ; ) Cycle time , Put the precursor node waitStatus Change it to -1, Again for ( ; ; ) Loop one attempt tryAcquireShared(1) If it doesn't work , So in
parkAndCheckInterrupt( ) It's about park
t3 r.lock,t4 w.lock
In this state , Suppose there are t3 Add read lock and t4 Add write lock , During this time t1 Still holding the lock , It becomes like the following
- Read the lock Share Sharing mode
- Write lock Ex Exclusive mode
t1 w.unlock
- At this time, I will go to the place where I write the lock sync.release(1) technological process , call sync.tryRelease(1) success , Become the following
- Next, execute the wake-up process sync.unparkSuccessor, That is, let the second run again , At this time t2 stay doAcquireShared Inside parkAndCheckInterrupt() Resume operation at
- This time again for (; ; ) perform tryAcquireShared If successful, increase the read lock count by one ( This is the code location in the following figure )
- At this time t2 Has been taken from the node to resume operation , Next t2 call setHeadAndPropagate(node, 1), Its original node is set as the head node
- It's not over yet , stay setHeadAndPropagate Method also checks whether the next node is shared, If so, call
doReleaseShared( )
take head The state of the from -1 Change it to 0 And wake up the dick , At this time t3 stay doAcquireShared Inside parkAndCheckInterrupt( ) Resume operation at
- This time again for ( ; ; ) perform tryAcquireShared If successful, increase the read lock count by one
- At this time t3 Has resumed operation , Next t3 call setHeadAndPropagate(node, 1), Its original node is set as the head node
- The next node is not shared 了 , So it won't continue to wake up t4 The node
- You can see , The next node is share Type of , The release will continue , Until I met Ex Exclusive . This is also reading - The reason why reads can be concurrent , Read lock encountered shared The thread lets him execute , Then just let
state++
To count .
t2 r.unlock,t3 r.unlock
- t2 Get into sync.releaseShared(1) in , call tryReleaseShared(1) Subtract the count by one , But because the count is not zero
- t3 Get into sync.releaseShared(1) in , call tryReleaseShared(1) Subtract the count by one , This time the count is zero , Get into doReleaseShared( ) Remove the header node from -1 Change it to 0 And wake up the dick (t4), namely
- after t4 stay acquireQueued in parkAndCheckInterrupt Resume operation at , Again for ( ; ; ) This time I'm the second , And there is no other competition ,tryAcquire(1) success , Modify the header node , End of the process
Source code analysis
Write the lock process
static final class NonfairSync extends Sync {
// ... Omit irrelevant code
// External class WriteLock Method , Easy to read , Put it here
public void lock() {
sync.acquire(1);
}
// AQS The inherited method , Easy to read , Put it here
public final void acquire(int arg) {
if (
// The attempt to obtain a write lock failed
!tryAcquire(arg) &&
// Associate the current thread to a Node On the object , The mode is exclusive
// Get into AQS Queue blocking
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// Sync The inherited method , Easy to read , Put it here
protected final boolean tryAcquire(int acquires) {
// Get low 16 position , It's for writing locks state Count
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (
// c != 0 and w == 0 Indicates that there is a read lock , perhaps
w == 0 ||
// If exclusiveOwnerThread Not myself
current != getExclusiveOwnerThread()
) {
// Failed to acquire write lock
return false;
}
// Write lock count exceeded low 16 position , The abnormal
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Write lock reentry , Lock success
// here state High position ( Read the lock ) by 0, Direct addition 1 That is, the whole plus 1
setState(c + acquires);
return true;
}
if (
// Determine whether the write lock should be blocked , perhaps
writerShouldBlock() ||
// Attempt to change count failed
!compareAndSetState(c, c + acquires)
) {
// Failed to acquire write lock
return false;
}
// The write lock is obtained successfully
setExclusiveOwnerThread(current);
return true;
}
// Not fair lock writerShouldBlock Always returns false, Without blocking
final boolean writerShouldBlock() {
return false;
}
}
Write lock release process
static final class NonfairSync extends Sync {
// ... Omit irrelevant code
// WriteLock Method , Easy to read , Put it here
public void unlock() {
sync.release(1);
}
// AQS The inherited method , Easy to read , Put it here
public final boolean release(int arg) {
// Attempt to release write lock succeeded
if (tryRelease(arg)) {
// unpark AQS Threads waiting in
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// Sync The inherited method , Easy to read , Put it here
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// For reentrant reasons , The write lock count is 0, To release successfully
boolean free = exclusiveCount(nextc) == 0;
if (free) {
// Set the current lock holder to null
setExclusiveOwnerThread(null);
}
setState(nextc);
return free;
}
}
Read lock locking process
static final class NonfairSync extends Sync {
// ReadLock Method , Easy to read , Put it here
public void lock() {
sync.acquireShared(1);
}
// AQS The inherited method , Easy to read , Put it here
public final void acquireShared(int arg) {
// tryAcquireShared Return negative , Indicates that acquisition of read lock failed
if (tryAcquireShared(arg) < 0) {
doAcquireShared(arg);
}
}
// Sync The inherited method , Easy to read , Put it here
protected final int tryAcquireShared(int unused) {
// Get the current thread
Thread current = Thread.currentThread();
int c = getState();
// If another thread holds a write lock , Failed to get read lock
if (
exclusiveCount(c) != 0 &&
// Or whether the person who writes the lock is himself
getExclusiveOwnerThread() != current
) {
return -1;
}
int r = sharedCount(c);
if (
// Read locks should not be blocked ( If the second is a write lock , The read lock should be blocked ), also
!readerShouldBlock() &&
// Less than read lock count , also
r < MAX_COUNT &&
// Attempt to increase count succeeded
// Only let state The high ( Read the lock ) Add 1
compareAndSetState(c, c + SHARED_UNIT)
) {
// ... Omit unimportant code
return 1;
}
return fullTryAcquireShared(current);
}
// Not fair lock readerShouldBlock see AQS Whether the first node in the queue is a write lock
// true Then it should block , false It doesn't block
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
// AQS The inherited method , Easy to read , Put it here
// And tryAcquireShared The function is similar to , But will keep trying for (;;) Get read lock , No blocking during execution
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// ... Omit unimportant code
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
// ... Omit unimportant code
return 1;
}
}
}
// AQS The inherited method , Easy to read , Put it here
private void doAcquireShared(int arg) {
// Associate the current thread to a Node On the object , The mode is shared
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// See if you are the second node
final Node p = node.predecessor();
if (p == head) {
// Try again to get the read lock
int r = tryAcquireShared(arg);
// success
if (r >= 0) {
// ㈠
// r Indicates the number of available resources , It's always here 1 Allow the spread of
//( Wake up the AQS Next in line Share node )
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (
// Whether to block when acquisition of read lock fails ( The previous stage waitStatus == Node.SIGNAL)
shouldParkAfterFailedAcquire(p, node) &&
// park Current thread
parkAndCheckInterrupt()
) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// ㈠ AQS The inherited method , Easy to read , Put it here
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// Set yourself to head
setHead(node);
// propagate Indicates that there are shared resources ( For example, shared read locks or semaphores )
// primary head waitStatus == Node.SIGNAL or Node.PROPAGATE
// Now? head waitStatus == Node.SIGNAL or Node.PROPAGATE
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// If it is the last node or the node waiting to share the read lock
if (s == null || s.isShared()) {
// Get into ㈡
doReleaseShared();
}
}
}
// ㈡ AQS The inherited method , Easy to read , Put it here
private void doReleaseShared() {
// If head.waitStatus == Node.SIGNAL ==> 0 success , Next node unpark
// If head.waitStatus == 0 ==> Node.PROPAGATE, In order to solve bug, See the analysis later
for (;;) {
Node h = head;
// The queue has nodes
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// Next node unpark If the read lock is successfully obtained
// And the next node is still shared, continue doReleaseShared
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
}
Read lock release process
static final class NonfairSync extends Sync {
// ReadLock Method , Easy to read , Put it here
public void unlock() {
sync.releaseShared(1);
}
// AQS The inherited method , Easy to read , Put it here
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Sync The inherited method , Easy to read , Put it here
protected final boolean tryReleaseShared(int unused) {
// ... Omit unimportant code
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) {
// The count of read locks does not affect other threads that acquire read locks , But it will affect other threads that obtain write locks
// Count as 0 Is the real release
return nextc == 0;
}
}
}
// AQS The inherited method , Easy to read , Put it here
private void doReleaseShared() {
// If head.waitStatus == Node.SIGNAL ==> 0 success , Next node unpark
// If head.waitStatus == 0 ==> Node.PROPAGATE
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// If another thread is releasing the read lock , So you need to waitStatus First change to 0
// prevent unparkSuccessor Be executed many times
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// If it's already 0 了 , Change it to -3, To solve the problem of communicability , See semaphores later bug analysis
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
}
边栏推荐
- LeetCode —— 26. Remove duplicates from an ordered array
- PHP删除二维数组中相同项的数据
- Use of map() function in JS
- 80 lines of code to realize simple rxjs
- ASP. Net core Middleware
- vim利用右下4键
- ImageView grayed, reflected, rounded, watermarked
- [leetcode] the k-largest element in the array
- 【LeetCode】102. 二叉树的层序遍历
- csredis-in-asp. Net core theory practice - use examples
猜你喜欢
Qrcodejs2 QR code generation JS
Embedded pipeline out of the box
[Part VI] source code analysis and application details of countdownlatch [key]
度量学习(Metric Learning)【AMSoftmax、Arcface】
Su embedded training day13 - file IO
The annual salary of 500000 is one line, and the annual salary of 1million is another line
基于51单片机的酒精检测仪
QT quick 3D learning: mouse picking up objects
Summary of MySQL foundation view
The carrying capacity of L2 level ADAS increased by more than 60% year-on-year in January, and domestic suppliers "emerged"
随机推荐
MySQL case when then function use
Analysis report on the "fourteenth five year plan" and the latest development trend of China's medical information industry from 2022 to 2028
项目里面的traceID的设计
【LeetCode】33. 搜索旋转排序数组
细数攻防演练中十大关键防守点
The carrying capacity of L2 level ADAS increased by more than 60% year-on-year in January, and domestic suppliers "emerged"
Research Report on market supply and demand and strategy of China's digital camera lens industry
JVM foundation - > three ⾊ mark
Huawei officially entered the "front loading" stage, and the millimeter wave radar track entered the "localization +4d" cycle
[Part 8] semaphore source code analysis and application details [key points]
Mysql concat_ WS, concat function use
Is it safe to open an account in tonghuashun? How to open an account for securities
【LeetCode】69. x 的平方根
China Aquatic Fitness equipment market trend report, technical innovation and market forecast
人脸检测:MTCNN
Research and Analysis on the development of China's Melamine Industry from 2022 to 2028 and market prospect forecast report
42岁大厂高管,给30岁-39岁人提个醒:这6个让你变强的习惯,要尽快养成
同花顺股票账户开户安全吗
80 lines of code to realize simple rxjs
The "fourteenth five year plan" development plan and panoramic strategic analysis report of China's information and innovation industry 2022 ~ 2028