当前位置:网站首页>Chapter 8 - shared model JUC

Chapter 8 - shared model JUC

2022-06-12 23:01:00 Ape feather

Chapter viii. - Shared model JUC

AQS principle

summary

The full name is AbstractQueuedSynchronizer, yes Blocking lock and Framework of related synchronizer tools

characteristic :

  • use state Property to represent the state of the resource ( branch Exclusive mode and Sharing mode ), Subclasses need to define how to maintain this state , How to control Get lock and release lock
    • getState - obtain state state
    • setState - Set up state state
    • compareAndSetState - CAS Mechanism settings state state
    • Exclusive mode is that only one thread can access the resource , Shared mode allows multiple threads to access resources
  • Provides the basis for FIFO ( fifo ) Waiting queue , Be similar to Monitor Of EntryList
  • Conditional variables to achieve wait for 、 Wake up the Mechanism , Support multiple conditional variables , Be similar to Monitor Of WaitSet

Subclasses mainly implement such methods ( Default throw UnsupportedOperationException

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

Get lock pose

//  If the lock acquisition fails 
if (!tryAcquire(arg)) {
    
  //  The team ,  You can choose to block the current thread  park unpark
}

Get lock pose

//  If the lock is released successfully 
if (tryRelease(arg)) {
    
  //  Let the blocked thread run again 
}

Implement non reentrant locks

The following implements a non reentrant blocking lock : Use AbstractQueuedSynchronizer Customize a synchronizer to implement a custom lock !

/** * @author xiexu * @date 2022/2/14 10:52 */
@Slf4j(topic = "c.TestAQS")
@SuppressWarnings("all")
public class TestAqs {
    

    public static void main(String[] args) {
    
        MyLock lock = new MyLock();
        new Thread(() -> {
    
            lock.lock();
            try {
    
                log.debug("locking...");
                Sleeper.sleep(2);
            } finally {
    
                log.debug("unlocking...");
                lock.unlock();
            }
        }, "t1").start();

        new Thread(() -> {
    
            lock.lock();
            try {
    
                log.debug("locking...");
            } finally {
    
                log.debug("unlocking...");
                lock.unlock();
            }
        }, "t2").start();
    }

}

//  Custom Lock ( Do not reenter the lock )
class MyLock implements Lock {
    

    //  An exclusive lock   Synchronizer class 
    class MySync extends AbstractQueuedSynchronizer {
    

        @Override //  Attempt to acquire lock 
        protected boolean tryAcquire(int arg) {
    
            //  Make sure the atomicity , If at present State yes 0, Just set it to 1, It means that you have obtained the lock 
            if (compareAndSetState(0, 1)) {
    
                //  Lock on , And set up  owner  Is the current thread 
                setExclusiveOwnerThread(Thread.currentThread());
                return true; //  The lock obtained successfully returns true
            }
            //  return false Indicates that locking failed 
            return false;
        }

        @Override //  Try to release the lock 
        protected boolean tryRelease(int arg) {
    
            //  There is no need to determine atomicity here ,  Because it is the lock holder who releases 
            //  hold setExclusiveOwnerThread(null) Put it in setState(0) front ,  To prevent problems caused by reordering instructions 
            setExclusiveOwnerThread(null); //  Indicates that there is no thread occupation 
            setState(0); // state yes volatile Embellished ,  stay setState(0) The previous attribute modification ,  It is also visible to other threads ,  Specific view volatile principle ( Write barriers )
            return true;
        }

        @Override //  Hold exclusive lock or not 
        protected boolean isHeldExclusively() {
    
            return getState() == 1;
        }

        //  Create conditional variables 
        public Condition newCondition() {
    
            return new ConditionObject();
        }

    }

    //  Synchronizer class object 
    private MySync sync = new MySync();

    @Override //  Lock ( If it doesn't succeed, it will enter the waiting queue )
    public void lock() {
    
        sync.acquire(1);
    }

    @Override //  Lock , Can interrupt 
    public void lockInterruptibly() throws InterruptedException {
    
        sync.acquireInterruptibly(1);
    }

    @Override //  Try to lock ( Try it once )
    public boolean tryLock() {
    
        return sync.tryAcquire(1);
    }

    @Override //  Try to lock ( With timeout )
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override //  Unlock 
    public void unlock() {
    
        sync.release(1);
    }

    @Override //  Create conditional variables 
    public Condition newCondition() {
    
        return sync.newCondition();
    }

}

Untitled

Non reentrant test

  • If you change to the following code , You will find yourself blocked ( Only print once locking)
public static void main(String[] args) {
    

        MyLock lock = new MyLock();
        new Thread(() -> {
    
            lock.lock();
            log.debug("locking...");
            //  Do not reenter the lock ,  The same thread before the lock is released ,  Only one lock can be added 
            lock.lock();
            log.debug("locking...");
            try {
    
                log.debug("locking...");
                Sleeper.sleep(2);
            } finally {
    
                log.debug("unlocking...");
                lock.unlock();
            }
        }, "t1").start();

    }

Untitled

ReentrantLock principle

  • ReentrantLock Two synchronizers are provided , Realization Fair lock and Not fair lock , Default is unfair lock !

Untitled

The implementation principle of unfair lock

Lock and unlock process

Let's start with the constructor , The default is Not fair lock Realization

public ReentrantLock() {
    
  sync = new NonfairSync();
}

NonfairSync Inherited from AQS

  • When there is no competition
    • Thread-0 Be the holder of the lock

Untitled

  • When the first competition appears , View the source code NonfairSync Of lock Method

Untitled

**new NonfairSync( ); Source code

static final class NonfairSync extends Sync {
    
    private static final long serialVersionUID = 7316153563782823691L;

    /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */
    final void lock() {
    
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
    
        return nonfairTryAcquire(acquires);
    }
}

**acquire( ) Source code

public final void acquire(int arg) {
    
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

Thread-1 Yes

  • lock In the method CAS Try to state from 0 Change it to 1, It turned out to be a failure ( Because at this time CAS operation , already state Have been to 1 了 )
  • lock Method acquire Method , Get into tryAcquire Logic , Here we think that at this time state It's already 1, It still failed
  • Next into acquire Methodical addWaiter Logic , structure Node queue ( Two way linked list )
    • The yellow triangle in the figure below indicates this Node Of waitStatus state , among 0 Is the default normal state
    • Node The creation of is lazy
    • The first Node be called Dummy( Virtual head node ) Or sentinels , Used for occupying , Not associated with threads

Untitled

The current thread enters acquire Methodical acquireQueued Logic

  1. acquireQueued Will keep trying to get the lock in an endless loop , Enter after failure park Blocking
  2. If you are next to head( Second place , The first is the virtual head node ), that Again tryAcquire Attempt to acquire lock , We set this time here state Still 1, Failure
  3. Get into shouldParkAfterFailedAcquire Logic , Put the precursor node, namely head (dummy) Of waitStatus Change it to -1, This time back false

**acquireQueued( ) Source code

final boolean acquireQueued(final Node node, int arg) {
    
    boolean failed = true;
    try {
    
        boolean interrupted = false;
        for (;;) {
    
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
    
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
    
        if (failed)
            cancelAcquire(node);
    }
}

Untitled

  1. shouldParkAfterFailedAcquire After execution, go back to acquireQueued , Again tryAcquire Attempt to acquire lock , Of course at this time state Still 1, Failure
  2. When you enter shouldParkAfterFailedAcquire when , At this time, because of its precursor node(dummy) Of waitStatus It's already -1, This time back true
  3. Get into parkAndCheckInterrupt, Thread-1 park( Gray indicates that it is blocked )

Untitled

Untitled

  1. Again, multiple threads go through the above process, and the competition fails , It's like this

Untitled

  • Thread-0 call unlock Method ( stay ReentrantLock Inside ) Inside release Method Release the lock , Get into tryRelease technological process , If it works , Set up exclusiveOwnerThread by null,state = 0

Untitled

unlock、release Source code

public void unlock() {
    
    sync.release(1);
}

public final boolean release(int arg) {
    
        if (tryRelease(arg)) {
    
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease( ) Source code

protected final boolean tryRelease(int releases) {
    
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
    
        free = true;
      //  The lock owner is empty 
        setExclusiveOwnerThread(null);
    }
  //  here state  Set to 0
    setState(c);
    return free;
}
  • unlock In the method release In the method , If the current queue is not null, also head Of waitStatus = -1, Get into unparkSuccessor technological process : unparkSuccessor In the queue, you will find the left head The latest one Node( No cancellation , That is to say Thread-1),unpark Wake up the Thread-1 Resume its operation , In this case Thread-1 go back to Thread-1 Continue at the blocked position , Will continue to execute acquireQueued technological process

Untitled

  • If the lock is successful ( There is no competition ), Will be set (acquireQueued In the method )
    • exclusiveOwnerThread by Thread-1,state = 1
    • head Point to just Thread-1 Where Node, The Node Empty Thread
    • The original head Because it's disconnected from the list , And can be recycled
  • If there are other threads competing at this time ( The manifestation of unfairness ), For example, there is Thread-4 coming

Untitled

  • If you happen to be Thread-4 Take the lead
    • Thread-4 Set to exclusiveOwnerThread,state = 1
    • Thread-1 Once again into the acquireQueued technological process , Lock acquisition failed , Re enter park Blocking

Lock source code

// Sync  Inherited from  AQS
static final class NonfairSync extends Sync {
    
    private static final long serialVersionUID = 7316153563782823691L;

     //  Lock implementation 
    final void lock() {
    
        //  First use  cas  Try ( Just try once ) take  state  from  0  Change it to  1,  If successful, it means that an exclusive lock has been obtained 
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //  If the attempt fails , Get into  ㈠
            acquire(1);
    }

    // ㈠ AQS  The inherited method ,  Easy to read ,  Put it here 
    public final void acquire(int arg) {
    
        // ㈡ tryAcquire
        if (
                !tryAcquire(arg) &&
            	//  When  tryAcquire  Return to  false  when ,  First call  addWaiter ㈣,  next  acquireQueued ㈤
                 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
    
            selfInterrupt();
        }
    }

    // ㈡  Get into  ㈢
    protected final boolean tryAcquire(int acquires) {
    
        return nonfairTryAcquire(acquires);
    }

    // ㈢ Sync  The inherited method ,  Easy to read ,  Put it here 
    final boolean nonfairTryAcquire(int acquires) {
    
        final Thread current = Thread.currentThread();
        int c = getState();
        //  If you haven't got the lock yet 
        if (c == 0) {
    
            //  Try to use  cas  get ,  This reflects the unfairness :  Don't check  AQS  queue 
            if (compareAndSetState(0, acquires)) {
    
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //  If the lock has been obtained ,  Thread or current thread ,  Indicates that a lock reentry has occurred 
        else if (current == getExclusiveOwnerThread()) {
    
            // state++
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        //  Acquisition failure ,  Back to the tune 
        return false;
    }

    // ㈣ AQS  The inherited method ,  Easy to read ,  Put it here 
    private Node addWaiter(Node mode) {
    
//  Associate the current thread to a  Node  On the object ,  The mode is exclusive , New Node Of waitstatus The default is 0, because waitstatus Is a member variable , The default is initialized to 0
        Node node = new Node(Thread.currentThread(), mode);
        //  If  tail  Not for  null, cas  Try to  Node  Object to join  AQS  Queue tail 
        Node pred = tail;
        if (pred != null) {
    
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
    
                //  Double linked list 
                pred.next = node;
                return node;
            }
        }
        // If tail by null, Try to  Node  Join in  AQS,  Get into  ㈥
        enq(node);
        return node;
    }

    // ㈥ AQS  The inherited method ,  Easy to read ,  Put it here 
    private Node enq(final Node node) {
    
        for (;;) {
    
            Node t = tail;
            if (t == null) {
    
                //  Not yet ,  Set up  head  For sentinel nodes ( It doesn't correspond to the thread , Status as  0)
                if (compareAndSetHead(new Node())) {
    
                    tail = head;
                }
            } else {
    
                // cas  Try to  Node  Object to join  AQS  Queue tail 
                node.prev = t;
                if (compareAndSetTail(t, node)) {
    
                    t.next = node;
                    return t;
                }
            }
        }
    }

    // ㈤ AQS  The inherited method ,  Easy to read ,  Put it here 
    final boolean acquireQueued(final Node node, int arg) {
    
        boolean failed = true;
        try {
    
            boolean interrupted = false;
            for (;;) {
    
                final Node p = node.predecessor();
                //  The last node is  head,  It's your turn ( Corresponding to the current thread  node) 了 ,  Try to get 
                if (p == head && tryAcquire(arg)) {
    
                    //  To be successful ,  Set yourself up ( Corresponding to the current thread  node) by  head
                    setHead(node);
                    //  Last node  help GC
                    p.next = null;
                    failed = false;
                    //  Return interrupt flag  false
                    return interrupted;
                }
                if (
                    //  Judge whether you should  park,  Get into  ㈦
                    shouldParkAfterFailedAcquire(p, node) &&
                    // park  wait for ,  here  Node  The state of is set to  Node.SIGNAL ㈧
                    parkAndCheckInterrupt()
                ) {
    
                    interrupted = true;
                }
            }
        } finally {
    
            if (failed)
                cancelAcquire(node);
        }
    }

    // ㈦ AQS  The inherited method ,  Easy to read ,  Put it here 
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    
        //  Get the status of the previous node 
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) {
    
            //  The last node is blocking ,  Then you can stop yourself 
            return true;
        }
        // > 0  Indicates cancellation status 
        if (ws > 0) {
    
            //  The previous node canceled ,  Then the refactoring deletes all the previously cancelled nodes ,  Return to the outer loop and try again 
            do {
    
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
    
            //  There's no blockage this time 
            //  But next time, if the retry fails ,  You need to block , At this time, you need to set the status of the previous node to  Node.SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    // ㈧  Block the current thread 
    private final boolean parkAndCheckInterrupt() {
    
        LockSupport.park(this);
        return Thread.interrupted();
    }
}

Unlock the source code

// Sync  Inherited from  AQS
static final class NonfairSync extends Sync {
    
    //  Unlock implementation 
    public void unlock() {
    
        sync.release(1);
    }

    // AQS  The inherited method ,  Easy to read ,  Put it here 
    public final boolean release(int arg) {
    
        //  Try to release the lock ,  Get into  ㈠
        if (tryRelease(arg)) {
    
            //  Queue head node  unpark
            Node h = head;
            if (
                //  The queue is not for  null
                h != null &&
                // waitStatus == Node.SIGNAL  That's what we need  unpark
                h.waitStatus != 0
            ) {
    
                // unpark AQS  Threads waiting in ,  Get into  ㈡
                unparkSuccessor(h);
            }
            return true;
        }
        return false;
    }

    // ㈠ Sync  The inherited method ,  Easy to read ,  Put it here 
    protected final boolean tryRelease(int releases) {
    
        // state--
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        //  Support lock reentry ,  Only  state  Reduced to  0,  To release successfully 
        if (c == 0) {
    
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    // ㈡ AQS  The inherited method ,  Easy to read ,  Put it here 
    private void unparkSuccessor(Node node) {
    
        //  If the state is  Node.SIGNAL  Attempt to reset the status to  0,  If the thread obtains the lock, the head node will be discarded later 
        //  If you don't succeed, you can 
        int ws = node.waitStatus;
        if (ws < 0) {
    
            compareAndSetWaitStatus(node, ws, 0);
        }
        //  Find the need  unpark  The node of ,  But this node starts from  AQS  Out of the queue ,  It is done by the wake-up node 
        Node s = node.next;
        //  Do not consider cancelled nodes ,  from  AQS  Find the front of the queue from back to front  unpark  The node of 
        if (s == null || s.waitStatus > 0) {
    
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
}

Reentry principle

Same thread , Lock reentry , Would be right state Since it increases . When you release the lock , state Since the subtract ; When state Reduce to 0 When . At this point, the thread will lock Release successful , Will further awaken Other threads To compete for locks

static final class NonfairSync extends Sync {
    
    // ...

    // Sync  The inherited method ,  Easy to read ,  Put it here 
    final boolean nonfairTryAcquire(int acquires) {
    
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
    
            if (compareAndSetState(0, acquires)) {
    
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //  If the lock has been obtained ,  Thread or current thread ,  Indicates that a lock reentry has occurred 
        else if (current == getExclusiveOwnerThread()) {
    
            // state++
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    // Sync  The inherited method ,  Easy to read ,  Put it here 
    protected final boolean tryRelease(int releases) {
    
        // state--
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        //  Support lock reentry ,  Only  state  Reduced to  0,  To release successfully 
        if (c == 0) {
    
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}

The interruptible principle

Non interruptible mode

In this mode , Even if it's interrupted , Will still reside in AQS In line , You have to wait until you get the lock to know that you have been interrupted

// Sync  Inherited from  AQS
static final class NonfairSync extends Sync {
    
    // ...

    private final boolean parkAndCheckInterrupt() {
    
        //  If the break mark is already  true,  be  park  It will fail 
        //  By park Blocked threads ,  It can be called by other threads interrupt Method to interrupt the park Blocking 
        LockSupport.park(this);
        // interrupted  The break mark is cleared ;  The next time park It can still be blocked 
        return Thread.interrupted();
    }

    final boolean acquireQueued(final Node node, int arg) {
    
        boolean failed = true;
        try {
    
            boolean interrupted = false;
            for (;;) {
    
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
    
                    setHead(node);
                    p.next = null;
                    failed = false;
                    //  You still need to get the lock ,  To return to the interrupt state 
                    return interrupted;
                }
                if (
                        shouldParkAfterFailedAcquire(p, node) &&
                                parkAndCheckInterrupt()
                ) {
    
                    //  If it's because  interrupt  Awakened ,  The return interrupt status is  true
                    interrupted = true;
                }
            }
        } finally {
    
            if (failed)
                cancelAcquire(node);
        }
    }

    public final void acquire(int arg) {
    
        if (
                !tryAcquire(arg) &&
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
    
            //  If the interrupt status is  true
            selfInterrupt();
        }
    }

    static void selfInterrupt() {
    
        //  Regenerate an interrupt , At this time, if the thread is running normally , So it's not out of sleep Equal state ,interrupt Method will not report an error 
        Thread.currentThread().interrupt();
    }
}
}

Interruptible mode

static final class NonfairSync extends Sync {
    
    public final void acquireInterruptibly(int arg) throws InterruptedException {
    
        if (Thread.interrupted())
            throw new InterruptedException();
        //  If you don't get the lock ,  Get into  ㈠
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    // ㈠  Interruptible lock acquisition process 
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
    
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
    
            for (;;) {
    
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
    
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt()) {
    
                    //  stay  park  If the process is  interrupt  Will enter this 
                    //  An exception is thrown at this time ,  Instead of entering  for (;;)
                    throw new InterruptedException();
                }
            }
        } finally {
    
            if (failed)
                cancelAcquire(node);
        }
    }
}

Fair lock implementation principle

see AQS In line , own ( Threads ) Is there a precursor node ( This node refers to threads , Instead of occupying sentinel nodes ); If so, don't compete for locks ; without , To go to CAS operation

if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
    
setExclusiveOwnerThread(current);
return true;
}
static final class FairSync extends Sync {
    
    private static final long serialVersionUID = -3000897897090466540L;
    final void lock() {
    
        acquire(1);
    }

    // AQS  The inherited method ,  Easy to read ,  Put it here 
    public final void acquire(int arg) {
    
        if (
                !tryAcquire(arg) &&
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
    
            selfInterrupt();
        }
    }
    //  The main difference from unfair lock is  tryAcquire  Method implementation 
    protected final boolean tryAcquire(int acquires) {
    
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
    
            //  First check  AQS  Whether there is a precursor node in the queue ,  No competition 
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
    
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
    
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    // ㈠ AQS  The inherited method ,  Easy to read ,  Put it here 
    public final boolean hasQueuedPredecessors() {
    
        Node t = tail;
        Node h = head;
        Node s;
        // h != t  When, it means that there is... In the queue  Node
        return h != t &&
                (
                        // (s = h.next) == null  It indicates whether there is still a dick in the queue 
                        (s = h.next) == null || //  Or the second thread in the queue is not this thread 
                                s.thread != Thread.currentThread()
                );
    }
}

The principle of conditional variable implementation

Each condition variable actually corresponds to a waiting queue , In fact, the implementation class is ConditionObject

await technological process

  • Start Thread-0 Hold lock ,conditionObject Object call  await, Get into ConditionObject Of  addConditionWaiter technological process
  • Create a new Node Status as  -2(Node.CONDITION), relation Thread-0, Join the tail of the waiting queue

Untitled

  • Next into AQS Of  fullyRelease technological process , Release all locks on the synchronizer ( Because threads can be reentrant , The lock has many layers )

Untitled

  • unparkSuccessor(h); —> unpark Wake up the AQS The next node in the queue , Competitive lock , Suppose there are no other competing threads , that  Thread-1 Competitive success

Untitled

  • LockSupport.park(this); —> park Blocking Thread-0

Untitled

signal technological process

  • hypothesis Thread-1 To wake up Thread-0
//  If you don't hold the lock , It throws an exception  -->  It means Thread-1 To hold the lock ,  To wake up the waiting thread in the condition variable 
        if (!isHeldExclusively())

Untitled

  • Get into ConditionObject Of doSignal technological process , Get the first... In the waiting queue Node, namely Thread-0 Where Node
private void doSignal(Node first) {
    
    do {
    
    	//  Go to firstWaiter Take out the waiting thread in the condition variable .
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        //  Transferred to the AQS Of the queue ,  Wait for the competition lock 
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

Untitled

  • perform  transferForSignal technological process , Will be Node Join in AQS Queue tail , take Thread-0 waitStatus Change it to 0,Thread-3 Of waitStatus Change it to -1, Change it to -1 Have the responsibility to wake up their own The subsequent nodes
final boolean transferForSignal(Node node) {
    
    /* * If cannot change waitStatus, the node has been cancelled. */
  //  First, change the status code from the waiting status -2 Change to queue status 0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
  //  At the end of the queue 
    Node p = enq(node);
  //  Returns the status code of its predecessor node 
    int ws = p.waitStatus;
  //  Try to change the status code of the precursor node to -1, Because I want him to wake up later node
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

Untitled

  • Thread-1 Release the lock , Get into unlock technological process , A little

Source code analysis

public class ConditionObject implements Condition, java.io.Serializable {
    
    private static final long serialVersionUID = 1173984872572414699L;

    //  The first waiting node 
    private transient Node firstWaiter;

    //  The last waiting node 
    private transient Node lastWaiter;
    public ConditionObject() {
     }
    // ㈠  Add one  Node  To the waiting queue 
    private Node addConditionWaiter() {
    
        Node t = lastWaiter;
        //  All cancelled  Node  Delete... From the queue linked list ,  see  ㈡
        if (t != null && t.waitStatus != Node.CONDITION) {
    
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //  Create a new thread associated with the current thread  Node,  Add to the end of the queue 
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    //  Wake up the  -  Transfer the first node not cancelled to  AQS  queue 
    private void doSignal(Node first) {
    
        do {
    
            //  It's already the tail node 
            if ( (firstWaiter = first.nextWaiter) == null) {
    
                lastWaiter = null;
            }
            first.nextWaiter = null;
        } while (
            //  Will wait for... In the queue  Node  Transfer to  AQS  queue ,  If it is unsuccessful and there are still nodes, continue the cycle  ㈢
                !transferForSignal(first) &&
                        //  The queue has nodes 
                        (first = firstWaiter) != null
        );
    }

    //  External class methods ,  Easy to read ,  Put it here 
    // ㈢  If the node status is cancel ,  return  false  Indicates that the transfer failed ,  Otherwise, the transfer will succeed 
    final boolean transferForSignal(Node node) {
    
        //  Set up current node Status as 0( Because it's at the end of the queue ), If the state is no longer  Node.CONDITION,  The description was cancelled 
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //  Join in  AQS  Queue tail 
        Node p = enq(node);
        int ws = p.waitStatus;
        if (
            //  The last node inserted into the node is cancelled 
                ws > 0 ||
                        //  The last node inserted into a node cannot be set to the status of  Node.SIGNAL
                        !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
        ) {
    
            // unpark  Unblock ,  Let the thread resynchronize the State 
            LockSupport.unpark(node.thread);
        }
        return true;
    }
//  All wake up  -  All nodes waiting for the queue are transferred to  AQS  queue 
private void doSignalAll(Node first) {
    
    lastWaiter = firstWaiter = null;
    do {
    
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

    // ㈡
    private void unlinkCancelledWaiters() {
    
        // ...
    }
    //  Wake up the  -  You must hold a lock to wake up ,  therefore  doSignal  There is no need to consider locking 
    public final void signal() {
    
        //  If you don't hold the lock , It throws an exception 
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    //  All wake up  -  You must hold a lock to wake up ,  therefore  doSignalAll  There is no need to consider locking 
    public final void signalAll() {
    
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }
    //  Don't interrupt waiting  -  Until awakened 
    public final void awaitUninterruptibly() {
    
        //  Add one  Node  To the waiting queue ,  see  ㈠
        Node node = addConditionWaiter();
        //  Release the lock held by the node ,  see  ㈣
        int savedState = fullyRelease(node);
        boolean interrupted = false;
        //  If the node has not been transferred to  AQS  queue ,  Blocking 
        while (!isOnSyncQueue(node)) {
    
            // park  Blocking 
            LockSupport.park(this);
            //  If interrupted ,  Only set the interrupt status 
            if (Thread.interrupted())
                interrupted = true;
        }
        //  After wake up ,  Try competing locks ,  If it fails, enter  AQS  queue 
        if (acquireQueued(node, savedState) || interrupted)
            selfInterrupt();
    }
    //  External class methods ,  Easy to read ,  Put it here 
    // ㈣  Because a thread may re-enter , Need to put  state  Release all , obtain state, Then subtract it all , Release in full 
    final int fullyRelease(Node node) {
    
        boolean failed = true;
        try {
    
            int savedState = getState();
            //  Wake up the next node in the waiting queue 
            if (release(savedState)) {
    
                failed = false;
                return savedState;
            } else {
    
                throw new IllegalMonitorStateException();
            }
        } finally {
    
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    //  Break mode  -  Reset the interrupt state when exiting the wait 
    private static final int REINTERRUPT = 1;
    //  Break mode  -  Throw an exception when exiting the wait 
    private static final int THROW_IE = -1;
    //  Judge the interruption mode 
    private int checkInterruptWhileWaiting(Node node) {
    
        return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
    }
    // ㈤  Apply break mode 
    private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
    
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }
    //  wait for  -  Until awakened or interrupted 
    public final void await() throws InterruptedException {
    
        if (Thread.interrupted()) {
    
            throw new InterruptedException();
        }
        //  Add one  Node  To the waiting queue ,  see  ㈠
        Node node = addConditionWaiter();
        //  Release the lock held by the node 
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //  If the node has not been transferred to  AQS  queue ,  Blocking 
        while (!isOnSyncQueue(node)) {
    
            // park  Blocking  
            LockSupport.park(this);
            //  If interrupted ,  Exit the waiting queue 
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //  After exiting the waiting queue ,  You also need to get  AQS  Queue lock 
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        //  All cancelled  Node  Delete... From the queue linked list ,  see  ㈡
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        //  Apply break mode ,  see  ㈤
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    //  wait for  -  Until awakened or interrupted or timed out 
    public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    
        if (Thread.interrupted()) {
    
            throw new InterruptedException();
        }
        //  Add one  Node  To the waiting queue ,  see  ㈠
        Node node = addConditionWaiter();
        //  Release the lock held by the node 
        int savedState = fullyRelease(node);
        //  Get a deadline 
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        //  If the node has not been transferred to  AQS  queue ,  Blocking 
        while (!isOnSyncQueue(node)) {
    
            //  Timeout ,  Exit the waiting queue 
            if (nanosTimeout <= 0L) {
    
                transferAfterCancelledWait(node);
                break;
            }
            // park  Block for a certain time , spinForTimeoutThreshold  by  1000 ns
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            //  If interrupted ,  Exit the waiting queue 
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        //  After exiting the waiting queue ,  You also need to get  AQS  Queue lock 
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        //  All cancelled  Node  Delete... From the queue linked list ,  see  ㈡
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        //  Apply break mode ,  see  ㈤
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }
    //  wait for  -  Until awakened or interrupted or timed out ,  Logic is similar to  awaitNanos
    public final boolean awaitUntil(Date deadline) throws InterruptedException {
    
        // ...
    }
    //  wait for  -  Until awakened or interrupted or timed out ,  Logic is similar to  awaitNanos
    public final boolean await(long time, TimeUnit unit) throws InterruptedException {
    
        // ...
    }
    //  Tool method   Omit  ...
}

Read-write lock

ReentrantReadWriteLock

  • When read operations are much higher than write operations , Use at this time Read-write lock Give Way read - read Can the concurrent , Improve performance . Similar to... In a database select …from … lock in share mode
  • Provide a Data container class Read locks are used internally to protect data read( ) Method , Write lock to protect data write( ) Method
@Slf4j(topic = "c.DataContainer")
class DataContainer {
    

    private Object data;
    private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    //  Read the lock 
    private ReentrantReadWriteLock.ReadLock r = rw.readLock();
    //  Write lock 
    private ReentrantReadWriteLock.WriteLock w = rw.writeLock();

    public Object read() {
    
        log.debug(" Get read lock ...");
        r.lock();
        try {
    
            log.debug(" Read ");
            sleep(1);
            return data;
        } finally {
    
            log.debug(" Release read lock ...");
            r.unlock();
        }
    }

    public void write() {
    
        log.debug(" Get write lock ...");
        w.lock();
        try {
    
            log.debug(" write in ");
            sleep(1);
        } finally {
    
            log.debug(" Release the write lock ...");
            w.unlock();
        }
    }

}

test Read the lock - Read the lock Can the concurrent

@Slf4j(topic = "c.TestReadWriteLock")
public class TestReadWriteLock {
    

    public static void main(String[] args) throws InterruptedException {
    
        DataContainer dataContainer = new DataContainer();
        new Thread(() -> {
    
            dataContainer.read();
        }, "t1").start();

        new Thread(() -> {
    
            dataContainer.read();
        }, "t2").start();
    }

}

Untitled

test Read the lock - Write lock Mutual blocking

@Slf4j(topic = "c.TestReadWriteLock")
public class TestReadWriteLock {
    

    public static void main(String[] args) throws InterruptedException {
    
        DataContainer dataContainer = new DataContainer();
        new Thread(() -> {
    
            dataContainer.read();
        }, "t1").start();

        new Thread(() -> {
    
            dataContainer.write();
        }, "t2").start();
    }

}

Untitled

test Write lock - Write lock Mutual blocking

Untitled

matters needing attention

  • Read lock does not support conditional variables , Write lock supports conditional variables
  • Upgrade on reentry does not support : That is, holding a read lock to get a write lock , Will cause the acquisition of the write lock to wait forever
r.lock();
try {
    
  // ...
  w.lock();
  try {
    
    // ...
  } finally{
     
    w.unlock();  
  }
} finally{
     
  r.unlock();
}
  • Downgrade support on reentry : That is, when a write lock is held, you can obtain a read lock
class CachedData {
    
    Object data;
    //  Whether it works , If it fails , You have to recalculate  data
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    void processCachedData() {
    
        rwl.readLock().lock();
				//  Judge whether the data is valid 
        if (!cacheValid) {
    
						//  The read lock must be released before obtaining the write lock 
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
    
								//  Determine whether other threads have acquired write locks 、 Updated cache ,  Avoid repeated updates ( Double check )
                if (!cacheValid) {
    
                    data = ...
                    cacheValid = true;
                }
								//  Obtain the read lock before releasing the write lock 
								//  Demote to read lock ,  Release the write lock ,  This allows other threads to read the cache 
                rwl.readLock().lock();
            } finally {
    
                rwl.writeLock().unlock();
            }
        }

      //  Get the read lock , Run out of data by yourself ,  Release read lock ( Prevent others from writing while you are reading )
      // ① The data is not invalid , Get the read lock , You can read it directly 
      // ② After data failure , Then recalculate data, Then get the read lock before releasing the write lock , Continue to read 
        try {
    
            use(data);
        } finally {
    
            rwl.readLock().unlock();
        }
    }
}

StampedLock

Concept

This class is from JDK 8 Join in , To further optimize read performance , It's characterized by the use of read locks 、 When writing locks, you must cooperate 【 stamp 】 Use

Lock

long stamp = lock.readLock();
lock.unlockRead(stamp);

Add / remove write lock

long stamp = lock.writeLock();
lock.unlockWrite(stamp);

Read optimistically ,StampedLock Support tryOptimisticRead() Method ( Read optimistically ), It needs to be done once after reading Stamp verification If the verification passes , Indicates that there is no write operation during this period , Data can be used safely , If the verification fails , The read lock needs to be reacquired , Ensure data security .

long stamp = lock.tryOptimisticRead();
//  Verification stamp 
if(!lock.validate(stamp)){
    
  //  Lock escalation 
}

Example

Provide a Data container class Read locks are used internally to protect data read( ) Method , Write lock to protect data write( ) Method

@Slf4j(topic = "c.DataContainerStamped")
class DataContainerStamped {
    
    private int data;
    private final StampedLock lock = new StampedLock();

    public DataContainerStamped(int data) {
    
        this.data = data;
    }

    public int read(int readTime) {
    
        long stamp = lock.tryOptimisticRead();
        log.debug("optimistic read locking...{}", stamp);
        sleep(readTime);
        if (lock.validate(stamp)) {
    
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        }
        //  Lock escalation  -  Read the lock 
        log.debug("updating to read lock... {}", stamp);
        try {
    
            stamp = lock.readLock();
            log.debug("read lock {}", stamp);
            sleep(readTime);
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        } finally {
    
            log.debug("read unlock {}", stamp);
            lock.unlockRead(stamp);
        }
    }

    public void write(int newData) {
    
        long stamp = lock.writeLock();
        log.debug("write lock {}", stamp);
        try {
    
            sleep(2);
            this.data = newData;
        } finally {
    
            log.debug("write unlock {}", stamp);
            lock.unlockWrite(stamp);
        }
    }
}

Application cache

Cache update strategy

update , Whether to clear the cache or update the database first

  • Clear cache first

Untitled

  • Update the database first ( recommend )

Untitled

  • Add a situation , Suppose the query thread A When querying data, it happens that the cached data expires due to the expiration of time , Or the first query

Untitled

  • The chances of this happening are very small
  • thus it can be seen , To ensure the consistency of the data here , It is necessary to ensure the atomicity of updating the database and clearing the cache .

Read write locks implement consistent caching

Use read-write locks to implement a simple on-demand cache

public class TestGenericDao {
    
    public static void main(String[] args) {
    
        GenericDao dao = new GenericDaoCached();
        System.out.println("============>  Inquire about ");
        String sql = "select * from emp where empno = ?";
        int empno = 7369;
        Emp emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);
        emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);
        emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);

        System.out.println("============>  to update ");
        dao.update("update emp set sal = ? where empno = ?", 800, empno);
        emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);
    }
}

class GenericDaoCached extends GenericDao {
    
    private GenericDao dao = new GenericDao();
    private Map<SqlPair, Object> map = new HashMap<>();
		//  Read-write lock 
    private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();

    @Override
    public <T> List<T> queryList(Class<T> beanClass, String sql, Object... args) {
    
        return dao.queryList(beanClass, sql, args);
    }

    @Override
    public <T> T queryOne(Class<T> beanClass, String sql, Object... args) {
    
        //  Look for... In the cache first , Go straight back to 
        SqlPair key = new SqlPair(sql, args);;
        rw.readLock().lock();
        try {
    
            T value = (T) map.get(key);
            if(value != null) {
    
                return value;
            }
        } finally {
    
            rw.readLock().unlock();
        }
        rw.writeLock().lock();
        try {
    
            //  Multiple threads 
            T value = (T) map.get(key);
            if(value == null) {
    
                //  Not in cache , Query the database 
                value = dao.queryOne(beanClass, sql, args);
                map.put(key, value);
            }
            return value;
        } finally {
    
            rw.writeLock().unlock();
        }
    }

    @Override
    public int update(String sql, Object... args) {
    
        rw.writeLock().lock();
        try {
    
            //  Update the library first 
            int update = dao.update(sql, args);
            //  Empty cache 
            map.clear();
            return update;
        } finally {
    
            rw.writeLock().unlock();
        }
    }

    class SqlPair {
    
        private String sql;
        private Object[] args;

        public SqlPair(String sql, Object[] args) {
    
            this.sql = sql;
            this.args = args;
        }

        @Override
        public boolean equals(Object o) {
    
            if (this == o) {
    
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
    
                return false;
            }
            SqlPair sqlPair = (SqlPair) o;
            return Objects.equals(sql, sqlPair.sql) &&
                    Arrays.equals(args, sqlPair.args);
        }

        @Override
        public int hashCode() {
    
            int result = Objects.hash(sql);
            result = 31 * result + Arrays.hashCode(args);
            return result;
        }
    }

}

Be careful

The above implementation reflects the application of read-write lock , Ensure the consistency between cache and database , But there are the following questions not considered

  • Suitable for reading more and writing less , If write operations are frequent , The above implementation performance is low
  • Cache capacity is not considered , Save without deleting
  • Cache expiration is not considered , Data that has not been used for a long time has not been cleaned up
  • Only suitable for single machine , Here is a java Process implementation , Not suitable for distributed
  • Concurrency is still low , At present, only one lock will be used .
    • If you query multiple tables , surface 1、 surface 2 The read and write operations of are irrelevant , But with the same lock , surface 1 In operation , surface 2 Can't move .
  • The update method is too simple and crude , Empty all key( Consider partitioning or redesigning by type key)
    • surface 1 Of key Empty , surface 2 Should not be affected

Read write lock principle

Diagram flow

The read-write lock uses the same Sycn synchronizer , So wait for the queue 、state Waiting is the same

t1 w.lock,t2 r.lock

  • t1 Successfully locked , Process and ReentrantLock There is nothing special about locking , The difference is Write lock status accounts for state It's low 16 position , The read lock uses state The height of 16 position

Untitled

  • t2 perform r.lock, At this time, enter the lock reading sync.acquireShared(1) technological process , First of all tryAcquireShared technological process . If a write lock is occupied , that tryAcquireShared return -1 It means failure

tryAcquireShared Return value representation

  • 1 It means failure
  • 0 It means success , But the successor node will not continue to wake up
  • A positive number means success , And the numerical value is that several subsequent nodes need to wake up , The read-write lock returns 1

Untitled

  • Then you will enter sync.doAcquireShared(1) technological process , The first is to call addWaiter Add a node , The difference is that the node is set to Node.SHARED Patterns, not Node.EXCLUSIVE Pattern , Note that this time t2 Still active

Untitled

  • t2 Will see if your node is the second , If it is , It will call again tryAcquireShared(1) To try to get the lock
  • If it doesn't work , stay doAcquireShared Inside for ( ; ; ) Cycle time , Put the precursor node waitStatus Change it to -1, Again for ( ; ; ) Loop one attempt tryAcquireShared(1) If it doesn't work , So in parkAndCheckInterrupt( ) It's about park

Untitled

t3 r.lock,t4 w.lock

In this state , Suppose there are t3 Add read lock and t4 Add write lock , During this time t1 Still holding the lock , It becomes like the following

  • Read the lock Share Sharing mode
  • Write lock Ex Exclusive mode

Untitled

t1 w.unlock

  • At this time, I will go to the place where I write the lock sync.release(1) technological process , call sync.tryRelease(1) success , Become the following

Untitled

  • Next, execute the wake-up process sync.unparkSuccessor, That is, let the second run again , At this time t2 stay doAcquireShared Inside parkAndCheckInterrupt() Resume operation at
  • This time again for (; ; ) perform tryAcquireShared If successful, increase the read lock count by one ( This is the code location in the following figure )

Untitled

Untitled

  • At this time t2 Has been taken from the node to resume operation , Next t2 call setHeadAndPropagate(node, 1), Its original node is set as the head node

Untitled

  • It's not over yet , stay setHeadAndPropagate Method also checks whether the next node is shared, If so, call doReleaseShared( ) take head The state of the from -1 Change it to 0 And wake up the dick , At this time t3 stay doAcquireShared Inside parkAndCheckInterrupt( ) Resume operation at

Untitled

  • This time again for ( ; ; ) perform tryAcquireShared If successful, increase the read lock count by one

Untitled

  • At this time t3 Has resumed operation , Next t3 call setHeadAndPropagate(node, 1), Its original node is set as the head node

Untitled

  • The next node is not shared 了 , So it won't continue to wake up t4 The node

Untitled

  • You can see , The next node is share Type of , The release will continue , Until I met Ex Exclusive . This is also reading - The reason why reads can be concurrent , Read lock encountered shared The thread lets him execute , Then just let state++ To count .

t2 r.unlock,t3 r.unlock

  • t2 Get into sync.releaseShared(1) in , call tryReleaseShared(1) Subtract the count by one , But because the count is not zero

Untitled

  • t3 Get into sync.releaseShared(1) in , call tryReleaseShared(1) Subtract the count by one , This time the count is zero , Get into doReleaseShared( ) Remove the header node from -1 Change it to 0 And wake up the dick (t4), namely

Untitled

  • after t4 stay acquireQueued in parkAndCheckInterrupt Resume operation at , Again for ( ; ; ) This time I'm the second , And there is no other competition ,tryAcquire(1) success , Modify the header node , End of the process

Untitled

Source code analysis

Write the lock process

static final class NonfairSync extends Sync {
    
    // ...  Omit irrelevant code 
		//  External class  WriteLock  Method ,  Easy to read ,  Put it here 
    public void lock() {
    
        sync.acquire(1);
    }

    // AQS  The inherited method ,  Easy to read ,  Put it here 
    public final void acquire(int arg) {
    
        if (
								//  The attempt to obtain a write lock failed 
                !tryAcquire(arg) &&
												//  Associate the current thread to a  Node  On the object ,  The mode is exclusive 
												//  Get into  AQS  Queue blocking 
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
    
            selfInterrupt();
        }
    }

    // Sync  The inherited method ,  Easy to read ,  Put it here 
    protected final boolean tryAcquire(int acquires) {
    
				//  Get low  16  position ,  It's for writing locks  state  Count 
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
    
            if (
										// c != 0 and w == 0  Indicates that there is a read lock ,  perhaps 
                    w == 0 ||
														//  If  exclusiveOwnerThread  Not myself 
                            current != getExclusiveOwnerThread()
            ) {
    
								//  Failed to acquire write lock 
                return false;
            }
						//  Write lock count exceeded low  16  position ,  The abnormal 
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
						//  Write lock reentry ,  Lock success 
	          //  here state High position ( Read the lock ) by 0, Direct addition 1 That is, the whole plus 1
            setState(c + acquires);
            return true;
        }
        if (
								//  Determine whether the write lock should be blocked ,  perhaps 
                writerShouldBlock() ||
												//  Attempt to change count failed 
                        !compareAndSetState(c, c + acquires)
        ) {
    
						//  Failed to acquire write lock 
            return false;
        }
				//  The write lock is obtained successfully 
        setExclusiveOwnerThread(current);
        return true;
    }

    //  Not fair lock  writerShouldBlock  Always returns  false,  Without blocking 
    final boolean writerShouldBlock() {
    
        return false;
    }
}

Write lock release process

static final class NonfairSync extends Sync {
    
    // ...  Omit irrelevant code 
		// WriteLock  Method ,  Easy to read ,  Put it here 
    public void unlock() {
    
        sync.release(1);
    }
    // AQS  The inherited method ,  Easy to read ,  Put it here 
    public final boolean release(int arg) {
    
				//  Attempt to release write lock succeeded 
        if (tryRelease(arg)) {
    
						// unpark AQS  Threads waiting in 
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    // Sync  The inherited method ,  Easy to read ,  Put it here 
    protected final boolean tryRelease(int releases) {
    
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
				//  For reentrant reasons ,  The write lock count is  0,  To release successfully 
        boolean free = exclusiveCount(nextc) == 0;
        if (free) {
    
						//  Set the current lock holder to  null
            setExclusiveOwnerThread(null);
        }
        setState(nextc);
        return free;
    }
}

Read lock locking process

static final class NonfairSync extends Sync {
    
    // ReadLock  Method ,  Easy to read ,  Put it here 
    public void lock() {
    
        sync.acquireShared(1);
    }
    // AQS  The inherited method ,  Easy to read ,  Put it here 
    public final void acquireShared(int arg) {
    
				// tryAcquireShared  Return negative ,  Indicates that acquisition of read lock failed 
        if (tryAcquireShared(arg) < 0) {
    
            doAcquireShared(arg);
        }
    }
    // Sync  The inherited method ,  Easy to read ,  Put it here 
    protected final int tryAcquireShared(int unused) {
    
			  //  Get the current thread 
        Thread current = Thread.currentThread();
        int c = getState();
				//  If another thread holds a write lock ,  Failed to get read lock 
        if (
                exclusiveCount(c) != 0 &&
											  //  Or whether the person who writes the lock is himself 
                        getExclusiveOwnerThread() != current
        ) {
    
            return -1;
        }
        int r = sharedCount(c);
        if (
								//  Read locks should not be blocked ( If the second is a write lock , The read lock should be blocked ),  also 
                !readerShouldBlock() &&
												//  Less than read lock count ,  also 
                        r < MAX_COUNT &&
												//  Attempt to increase count succeeded 
										    //  Only let state The high ( Read the lock ) Add 1
                        compareAndSetState(c, c + SHARED_UNIT)
        ) {
    
						// ...  Omit unimportant code 
            return 1;
        }
        return fullTryAcquireShared(current);
    }
    //  Not fair lock  readerShouldBlock  see  AQS  Whether the first node in the queue is a write lock 
		// true  Then it should block , false  It doesn't block 
    final boolean readerShouldBlock() {
    
        return apparentlyFirstQueuedIsExclusive();
    }
    // AQS  The inherited method ,  Easy to read ,  Put it here 
		//  And  tryAcquireShared  The function is similar to ,  But will keep trying  for (;;)  Get read lock ,  No blocking during execution 
    final int fullTryAcquireShared(Thread current) {
    
        HoldCounter rh = null;
        for (;;) {
    
            int c = getState();
            if (exclusiveCount(c) != 0) {
    
                if (getExclusiveOwnerThread() != current)
                    return -1;
            } else if (readerShouldBlock()) {
    
								// ...  Omit unimportant code 
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
    
								// ...  Omit unimportant code 
                return 1;
            }
        }
    }
    // AQS  The inherited method ,  Easy to read ,  Put it here 
    private void doAcquireShared(int arg) {
    
				//  Associate the current thread to a  Node  On the object ,  The mode is shared 
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
    
            boolean interrupted = false;
            for (;;) {
    
								//  See if you are the second node 
                final Node p = node.predecessor();
                if (p == head) {
    
                    //  Try again to get the read lock 
                    int r = tryAcquireShared(arg);
										//  success 
                    if (r >= 0) {
    
												// ㈠
												// r  Indicates the number of available resources ,  It's always here  1  Allow the spread of 
												//( Wake up the  AQS  Next in line  Share  node )
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (
												//  Whether to block when acquisition of read lock fails ( The previous stage  waitStatus == Node.SIGNAL)
                        shouldParkAfterFailedAcquire(p, node) &&
																// park  Current thread 
                                parkAndCheckInterrupt()
                ) {
    
                    interrupted = true;
                }
            }
        } finally {
    
            if (failed)
                cancelAcquire(node);
        }
    }
    // ㈠ AQS  The inherited method ,  Easy to read ,  Put it here 
    private void setHeadAndPropagate(Node node, int propagate) {
    
        Node h = head; // Record old head for check below
					//  Set yourself to  head
        setHead(node);
				// propagate  Indicates that there are shared resources ( For example, shared read locks or semaphores )
				//  primary  head waitStatus == Node.SIGNAL  or  Node.PROPAGATE
				//  Now?  head waitStatus == Node.SIGNAL  or  Node.PROPAGATE
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
    
            Node s = node.next;
						//  If it is the last node or the node waiting to share the read lock 
            if (s == null || s.isShared()) {
    
								//  Get into  ㈡
                doReleaseShared();
            }
        }
    }
    // ㈡ AQS  The inherited method ,  Easy to read ,  Put it here 
    private void doReleaseShared() {
    
				//  If  head.waitStatus == Node.SIGNAL ==> 0  success ,  Next node  unpark
				//  If  head.waitStatus == 0 ==> Node.PROPAGATE,  In order to solve  bug,  See the analysis later 
        for (;;) {
    
            Node h = head;
						//  The queue has nodes 
            if (h != null && h != tail) {
    
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
    
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue; // loop to recheck cases
										//  Next node  unpark  If the read lock is successfully obtained 
										//  And the next node is still  shared,  continue  doReleaseShared
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; // loop on failed CAS
            }
            if (h == head) // loop if head changed
                break;
        }
    }
}

Read lock release process

static final class NonfairSync extends Sync {
    
    // ReadLock  Method ,  Easy to read ,  Put it here 
    public void unlock() {
    
        sync.releaseShared(1);
    }
    // AQS  The inherited method ,  Easy to read ,  Put it here 
    public final boolean releaseShared(int arg) {
    
        if (tryReleaseShared(arg)) {
    
            doReleaseShared();
            return true;
        }
        return false;
    }
    // Sync  The inherited method ,  Easy to read ,  Put it here 
    protected final boolean tryReleaseShared(int unused) {
    
				// ...  Omit unimportant code 
        for (;;) {
    
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc)) {
    
								//  The count of read locks does not affect other threads that acquire read locks ,  But it will affect other threads that obtain write locks 
								//  Count as  0  Is the real release 
                return nextc == 0;
            }
        }
    }
    // AQS  The inherited method ,  Easy to read ,  Put it here 
    private void doReleaseShared() {
    
				//  If  head.waitStatus == Node.SIGNAL ==> 0  success ,  Next node  unpark
				//  If  head.waitStatus == 0 ==> Node.PROPAGATE 
        for (;;) {
    
            Node h = head;
            if (h != null && h != tail) {
    
                int ws = h.waitStatus;
								//  If another thread is releasing the read lock , So you need to  waitStatus  First change to  0
								//  prevent  unparkSuccessor  Be executed many times 
                if (ws == Node.SIGNAL) {
    
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue; // loop to recheck cases
                    unparkSuccessor(h);
                }
								//  If it's already  0  了 , Change it to  -3, To solve the problem of communicability , See semaphores later  bug  analysis 
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; // loop on failed CAS
            }
            if (h == head) // loop if head changed
                break;
        }
    }
}
原网站

版权声明
本文为[Ape feather]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202281120588763.html