当前位置:网站首页>Handwriting a simulated reentrantlock

Handwriting a simulated reentrantlock

2022-07-08 00:03:00 daheww

package cn.daheww.demo.juc.reentrylock;

import sun.misc.Unsafe;

import java.lang.reflect.Field;
import java.util.concurrent.locks.LockSupport;

/**
 * @author daheww
 * @date 2022/7/7
 */
public class MiniReentryLock implements Lock {

    /**
     *  What's the lock  -->  resources  --> state
     * 0 -->  Unlocked 
     * >0 ->  Lock 
     */
    private volatile int state;

    /**
     *  Exclusive mode 
     *  Only one thread can hold a lock at a time , Other threads will be blocked when they do not acquire the lock 
     *
     *  Thread that currently owns the lock ( Thread occupying lock )
     */
    private Thread exclusiveOwnerThread;

    /**
     *  Two nodes are needed to maintain the blocking queue 
     * Head  Point to the head node of the queue 
     * Tail  Point to the end of the queue 
     *
     *  A special :Head The thread corresponding to the node is the thread currently occupying the lock 
     */
    private Node head;
    private Node tail;

    /**
     *  Get the lock 
     *  Suppose the current lock is occupied , Will block the caller thread , Until it grabs the lock 
     *
     *  Simulate fair lock 
     * -->  first come , first served 
     *
     * lock The process of 
     *  scene 1. After the thread came in, it found , At present state == 0 -->  Go ahead and grab the lock 
     *  scene 2. After the thread came in, it found , At present state > 0 -->  Queue the current thread 
     */
    @Override
    public void lock() {
        //  The first time a lock is obtained , take state Set to 1
        //  The first n Second reentry , take state Set to n
        acquire(1);
    }

    @Override
    public void unlock() {
        release(1);
    }

    private void release(int arg) {
        //  Conditions established : This indicates that the thread has completely released the lock 
        if (tryRelease(arg)) {
            //  Blocking the queue , And the thread of sleeping , You should wake up a thread 
            //  First of all, we need to know whether there is any waiting node --> head.next == null
            Node head = this.head;
            if (head.nx != null) {
                //  Fair lock , Wake up the head.nx node 
                unparkSuccessor(head);
            }
        }
    }

    private void unparkSuccessor(Node node) {
        Node s = node.nx;

        if (s != null && s.thread != null) {
            LockSupport.unpark(s.thread);
        }
    }

    /**
     *  If the lock is released successfully, it returns true
     */
    private boolean tryRelease(int arg) {
        int c = getState() - arg;

        if (getExclusiveOwnerThread() != Thread.currentThread()) {
            throw new RuntimeException("must get lock first");
        }

        //  So if I go here , There is no concurrency , Only one thread will come here 
        //  Conditions established , It indicates that the current thread holds lock The lock has been completely released 
        if (c == 0) {
            this.exclusiveOwnerThread = null;
            this.state = c;
            return true;
        } else {
            this.state = c;
            return false;
        }
    }

    /**
     *  Compete for resources 
     * 1. Attempt to acquire lock . Success will occupy the lock , And return to 
     * 2. Failed to seize the lock , Block the current thread 
     * @param arg
     */
    private void acquire(int arg) {
        if (!tryAcquire(arg)) {
            //  Lock snatch failed 

            // step1. Encapsulate the current thread as node, Join the blocking queue 
            Node node = addWaiter();
            // step2. Current thread park, Put the thread in a suspended state 
            acquireQueued(node, arg);
        }

        //  Successful lock snatching 
        // 1. Got the lock 
        // 2. Re entered the lock 
    }

    /**
     *  The attempt to grab the lock failed , What needs to be done :
     * 1. The current thread needs to be encapsulated as node, Join the blocking queue 
     * 2. The current thread needs to be park, Put the thread in a suspended state 
     *
     *  Wake up process :
     * 1. Check current node Is it head.next node 
     *      head.next It is a thread with preemptive permission , Other node There is no preemptive permission 
     * 2. preemption :
     *       success :
     *          1. Will the current node Set to node, Will the old head Out of the team , Go back to the business level 
     *          2. continue park Waiting to be awakened 
     *
     * ----------------------------------------------
     * 1. Logic added to the blocking queue  addWaiter()
     * 2. The logic of competing for resources       acquireQueued()
     */
    private void acquireQueued(Node node, int arg) {
        //  The current thread has been put into queue It's in 

        //  Only current node The spin will not jump out until the lock is successfully obtained 
        for (; ; ) {
            //  Under what circumstances , At present node After being awakened, you can try to get the lock ?
            //  There is only one case , At present node yes head Successor node , That's the right 
            //  It's not just first come, first served 

            Node pvNode = node.pv;
            //  Conditions 1:pvNode == head
            //      true -->  Show the current node Have preemptive permission 
            //               queue The first node in represents the thread currently executing the lock  --> head Thread to point to 
            //               head The following thread represents the thread that is queuing  -->  So only head.nx The node has the right to rob the lock 
            //  Conditions 2:tryAcquire(arg)
            //      true -->  The current thread has acquired the lock 
            //
            if (pvNode == head && tryAcquire(arg)) {
                //  Entering this indicates that the current thread has successfully competed for the lock 
                //  What to do :
                // 1. Set up current head For the current thread node
                // 2. Assist the original object to leave the team 
                setHead(node);
                pvNode.nx = null;
                //  Because I got the lock , So I return 了 
                return;
            }

            //  Not at present head.nx node , Or the attempt to acquire the lock fails , At this time, you need to put the current thread park fall 
            System.out.println(" Threads :" + Thread.currentThread().getName() + "  Hang up ");
            LockSupport.park();
            //  Until a thread does the current thread unPark operation , This thread will continue to execute 
            /*
                 So to summarize ,lock The logic of :
                   1. Without a lock , If a thread calls lock Method , It will change lock Medium state value . here state The value will not be 0 了 .
                    Then other threads call lock When the method is used , You'll see this state Not for 0.
                   2. Then the thread will be encapsulated into a node node 
                   3. Then I will try to compete for the lock , Do the final rescue work , If it can't be saved , Just park 了 
                     -->  The thread is right here lock Of lock() The method is blocked . The lock effect is achieved 
                     -->  All calls to this lock object lock Only one thread can continue to execute the method of , Then other threads will be blocked , Until this thread has done unlock operation 
             */
            System.out.println(" Threads :" + Thread.currentThread().getName() + "  Wake up the ");

            //  When to wake up by park The thread of ?--> unlock()
        }
    }

    /**
     *  Queue the current thread 
     *  Returns the... Corresponding to the current thread node node 
     *
     * addWaiter After execution , Ensure that the current thread has been queued successfully 
     */
    private Node addWaiter() {
        Node newNode = new Node(Thread.currentThread());

        //  How to join the team ?
        // Case1. At present node Not the first one to join the team node, The queue already has waiting node 了 
        //     1. find newNode Of pv node 
        //     2. to update newNode.pvNode = pv node 
        //     3.CAS to update tail by newNode
        //     4. to update pv node 
        Node pvNode = tail;
        if (pvNode != null) {
            newNode.pv = pvNode;
            //  Conditions established , Indicates that the current thread has been successfully queued 
            if (compareAndSetTail(pvNode, newNode)) {
                pvNode.nx = newNode;
                return newNode;
            }
        }

        //  Several cases of implementation here 
        // 1.tail == null The queue is empty 
        // 2.cas Set up current newNode by tail It failed  -->  Cycle into the team  -->  The spin 
        enq(newNode);

        return newNode;
    }

    /**
     *  Spin into the team , Return only after success 
     * 1.tail == null  The queue is empty 
     * 2.cas Set up current newNode by tail It failed 
     */
    private void enq(Node node) {
        for (; ; ) {
            //  Case one : The queue is empty 
            // -->  The current thread is the first thread to preempt the lock ...
            //  The thread currently holding the lock , Nothing has been set node, So as the first driver node of this thread 
            //  Need to wipe his ass 
            //  Add a to the thread that currently holds the lock node As head node 
            // head The node represents the thread currently occupying the lock at any time 
            if (tail == null) {
                //  Conditions established : It indicates that the current thread supplements the thread that currently holds the lock head Operation succeeded 
                if (compareAndSetHead(new Node())) {
                    tail = head;
                    //  Be careful , There was no direct return , But will continue to spin 
                }
            } else {
                //  There are already node 了 , It shows that this is an addition node The process of 

                //  How to join the team ?
                //     1. find newNode Of pv node  -->  Abreast of the times tail node 
                //     2. to update newNode.pvNode = pv node 
                //     3.CAS to update tail by newNode
                //     4. to update pv node 
                Node pvNode = tail;
                node.pv = pvNode;
                //  Conditions established , Indicates that the current thread has been successfully queued 
                if (compareAndSetTail(pvNode, node)) {
                    pvNode.nx = node;
                    return;
                }
            }
        }
    }

    /**
     *  Attempt to acquire lock , It doesn't block threads 
     * true -->  Seize success 
     * false -->  Preemption failed 
     */
    private boolean tryAcquire(int arg) {
        if (state == 0) {
            //  At present state by 0
            //  You can't grab the lock directly  -->  Fair lock  -->  first come , first served 
            //  Conditions for a :!hasQueuedPredecessors() --->  After taking the reverse, it is true, Indicates that there is no waiting thread in front of the current thread 
            //  Condition 2 :compareAndSetState(0, arg) ->  Use cas Why :lock Methods may be called by multiple threads 
            //      true -->  The current thread successfully grabs the lock 
            //          (1) volatile --> state By volatile Modify the , So other threads can know at the first time that this value is not 0 了  -->  The cache can be consistent 
            //          (2) cas -------> state from 0 Turn into arg The operation of cas Realization , Used to ensure that only one thread can change state Value (0->arg) -->  Only one thread can perform the next operation  -->  lock 
            //                1. If cas Variables of do not volatile Modification is meaningless :
            //                    because A The thread has changed state Value , however B Threads don't know 
            //                  ( visibility ,volatile Will make B The copy in the thread will expire immediately , Then get the latest state Value , here B In the thread workspace state The value is not 0 了 )
            //                2. If volatile Variables of do not cas To change its value , It doesn't make sense :
            //·                  step1.A Threads ,B Threads are all got state Copy information for , here state The value is 0
            //                   step2.A The thread has changed state Value .B The thread is still writing , because state The value of , therefore B In the thread workspace state Value change , then B Keep writing .
            //                    So all judgments state The value is 0 All threads can write successfully , And can perform subsequent operations after successful writing 
            //                 So want to use cas+volatile To ensure that only one thread can write this value successfully 
            //                Ps. You can see , If the values that these threads want to write are the same , Write a few more times , But the result is consistent with writing only once 
            //                   cas+volatile The main thing is to control that the operation after a successful write will only be performed once , It's like a lock 
            if (!hasQueuedPredecessors() && compareAndSetState(0, arg)) {
                //  Successful lock snatching 
                // 1. take exclusiveOwnerThread Set as current thread 
                this.exclusiveOwnerThread = Thread.currentThread();
                return true;

                //  Will not join any node, Return to true
                //  Next, the first thread that fails to compete will help create a node, Then perform subsequent operations 
            }
            //  There is a thread waiting in front of the current thread  ||  Multiple threads are trying to acquire this lock together with the current thread , Then the current thread fails  --> return false;
        } else if (Thread.currentThread() == this.exclusiveOwnerThread) {
            //  Timing of execution :
            // 1. The current lock is occupied 
            // 2. The current thread is the lock thread 

            //  There is no concurrency . Only the currently locked thread has permission to modify state
            //    Even if the same thread enters here many times , Set up state Value , Then they all use the same workspace 
            //    There are no different workspaces , This value is different ( Because there is no cache )

            //  Lock reentry process 

            int c = getState();
            c += arg;
            // TODO  Cross border judgment 
            this.state = c;
            return true;
        }

        //  When will it return false?
        // 1.cas Locking failed 
        // 2.state Greater than 0, And the current thread is not a lock thread 
        return false;
    }

    /**
     *  Whether there is a waiting thread in front of the current thread 
     * true -->  There is a waiting thread in front of the current thread 
     * false ->  There are no other waiting threads in front of the current thread 
     *
     *  Call chain 
     * lock --> acquire -> tryAcquire -> hasQueuedPredecessors(state The value is 0 when , At present lock It is in an ownerless state )
     *
     *  When to return false?
     * 1. The current queue is empty 
     * 2. The current thread is head.next node  --> head.next Have the right to fight for at any time lock
     */
    private boolean hasQueuedPredecessors() {
        Node h = head;
        Node t = tail;
        Node s;

        //  Conditions for a :h != t
        //     true -->  The current queue already has node 了 
        //     false -> h == t
        //          case1. h == t == null -->  Not initialized yet queue
        //          case2. h == t == head
        //               The first thread that fails to acquire a lock will create a new one for the thread that currently holds the lock head node
        //  Condition 2 :
        //      precondition : Condition one holds 
        //      Exclude several situations :
        //        Conditions 2.1: In extreme cases  -->  The first thread that failed to acquire the lock , It will be added to the lock thread head node , Then join the team in spin 
        //                           step1.cas Set up tail succeed 
        //                           step2.head.next = node
        //                            In the middle of these two steps , There are threads to check whether there are waiting threads in front 
        //                This situation should return true: There has been a head.next The node , When other threads come here, they need to return true
        //        Conditions 2.2:
        //                precondition :h.next No null
        //               true -->  If the condition is true, it means that the current thread is the thread holding the lock 
        //               false ->  This indicates that the current thread is h.next The thread corresponding to the node , Need to return false. Then the thread will compete for the lock 

        return h != t && ((s = h.nx) == null || s.thread != Thread.currentThread());
    }

    private static final Unsafe UNSAFE;
    private static final long STATE_OFFSET;
    private static final long HEAD_OFFSET;
    private static final long TAIL_OFFSET;

    static {
        try {
            Field f = Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);

            UNSAFE = (Unsafe) f.get(null);
            STATE_OFFSET = UNSAFE.objectFieldOffset(MiniReentryLock.class.getDeclaredField("state"));
            HEAD_OFFSET = UNSAFE.objectFieldOffset(MiniReentryLock.class.getDeclaredField("head"));
            TAIL_OFFSET = UNSAFE.objectFieldOffset(MiniReentryLock.class.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    private boolean compareAndSetHead(Node update) {
        return UNSAFE.compareAndSwapObject(this, HEAD_OFFSET, null, update);
    }

    private boolean compareAndSetTail(Node expect, Node update) {
        return UNSAFE.compareAndSwapObject(this, TAIL_OFFSET, expect, update);
    }

    private boolean compareAndSetState(int expect, int update) {
        return UNSAFE.compareAndSwapInt(this, STATE_OFFSET, expect, update);
    }

    /**
     *  Blocked threads are encapsulated as node node , And then put in FIFO queue 
     */
    static final class Node {
        /**
         *  Encapsulated thread itself 
         */
        Thread thread;
        /**
         *  Front node reference 
         */
        Node pv;
        /**
         *  Post node reference 
         */
        Node nx;

        public Node(Thread thread) {
            this.thread = thread;
        }

        public Node() {
        }
    }

    public int getState() {
        return state;
    }

    private void setHead(Node node) {
        this.head = node;
        //  The current thread is already the thread that has obtained the lock 
        node.thread = null;
        node.pv = null;
    }

    public void setState(int state) {
        this.state = state;
    }

    public Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }

    public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) {
        this.exclusiveOwnerThread = exclusiveOwnerThread;
    }

    public Node getHead() {
        return head;
    }

    public Node getTail() {
        return tail;
    }

    public void setTail(Node tail) {
        this.tail = tail;
    }
    
}

原网站

版权声明
本文为[daheww]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/188/202207072154462535.html