当前位置:网站首页>Source code analysis of cyclicbarrier in AQS

Source code analysis of cyclicbarrier in AQS

2022-06-09 05:15:00 smartjiang-java

1:CyclicBarrier Basic knowledge

1: It literally means recyclable (Cyclic) The barrier (Barrier). When a group of threads all reach the barrier , The barrier will open , The thread blocked by the barrier will continue to run . The thread that arrives first is blocked .
2: Use scenarios : It can be used for multithreading data , Scenario of the last merged results .
3:CountDownLatch Can only be used once , and CyclicBarrier The counter of can be used reset() Method reset .
4: It is generally used with a fixed number of thread pools , The number of threads is best defined the same

2: Construction method

Default constructor : Total number of incoming intercepted threads

    public CyclicBarrier(int parties) {
    
        //  Call overloaded method , That is, another construction method 
        this(parties, null);
    }

Enter overloaded method : Another construction method

    public CyclicBarrier(int parties, Runnable barrierAction) {
    
        //  The number of threads intercepted cannot  <=0, Otherwise, an exception will be thrown 
        if (parties <= 0) throw new IllegalArgumentException();
        //  Pass in a task , When the thread reaches the barrier , Give priority to this task 
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

3: Blocking method await

The thread that needs to wait needs to call await() Method

    public int await() throws InterruptedException, BrokenBarrierException {
    
        try {
    
            //  Get into  dowait(false, 0L)  Method 
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
    
            throw new Error(toe); // cannot happen
        }
    }

Get into dowait(false, 0L) Method

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
    
        //  It can be seen that there is a  ReentrantLock  lock  
        final ReentrantLock lock = this.lock;
        //  Before executing the logic , You need to lock it first 
        lock.lock();
        try {
    
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
    
                breakBarrier();
                throw new InterruptedException();
            }
            //  Number of barriers   reduce  1 , Judge   reduce  1  Whether the latter is equal to  0 
            int index = --count;
            if (index == 0) {
      
                boolean ranAction = false;
                try {
    
                    //  If the priority task passed in by the constructor is not  null,  priority 
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //  Get into  nextGeneration()  Method 
                    nextGeneration();
                    return 0;
                } finally {
    
                    if (!ranAction)
                        breakBarrier();
                }
            }
            
            for (;;) {
    
                try {
    
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
    
                    if (g == generation && ! g.broken) {
    
                        breakBarrier();
                        throw ie;
                    } else {
    
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
    
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
    
            lock.unlock();
        }
    }

Get into nextGeneration() Method

    private void nextGeneration() {
    
        //  Get into  Condition.signalAll()  Method 
        trip.signalAll();
        count = parties;
        generation = new Generation();
    }

Get into Condition.signalAll() Method

        public final void signalAll() {
    
            //  Judge to have  ReentrantLock  Whether the thread of the lock is the current thread , If not, throw an exception 
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // Conditinon  The first node in , If it's not empty , Get into  doSignalAll(first)  Method 
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

Get into doSignalAll(first) Method

        private void doSignalAll(Node first) {
    
            //  take  Conition  Node delete moved to  AQS  Waiting in the queue , It's just that the order of all nodes is pushed back by one bit , The head node is a sub element node 
            lastWaiter = firstWaiter = null;
            do {
    
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

go back to nextGeneration() Method

    private void nextGeneration() {
    
        // Condition.signalAll() : take  Condition  Node deletion moved to  AQS  Waiting in line 
        trip.signalAll();
        count = parties;
        //  Create a new instance , Each new use of the barrier creates an instance 
        generation = new Generation();
    }

go back to dowait(false, 0L) Method

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
    
        //  It can be seen that there is a  ReentrantLock  lock  
        final ReentrantLock lock = this.lock;
        //  Before executing the logic , You need to lock it first 
        lock.lock();
        try {
    
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
    
                breakBarrier();
                throw new InterruptedException();
            }
            //  Number of barriers   reduce  1 , Judge   reduce  1  Whether the latter is equal to  0 
            int index = --count;
            if (index == 0) {
      
                boolean ranAction = false;
                try {
    
                    //  If the priority task passed in by the constructor is not  null,  priority 
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //  Get into  nextGeneration()  Method 
                    nextGeneration();
                    return 0;
                } finally {
    
                // ranAction  be equal to  true, Not meeting the conditions 
                    if (!ranAction)
                        breakBarrier();
                }
            }
            //  Number of barriers   reduce  1  Greater than  0, If entering  for  loop 
            for (;;) {
    
                try {
    
                   //  If there is no transmission waiting time , So enter  Condition.await()  Method ; Otherwise, enter the timeout Condition.awaitNanos() Method 
                  //  Generate a new node , Add to  Conditon  The end of a two-way linked list , Release the synchronization lock held on the node , And blocked 
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
    
                    if (g == generation && ! g.broken) {
    
                        breakBarrier();
                        throw ie;
                    } else {
    
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
    
                    breakBarrier();
                    throw new TimeoutException();
                }
            }   
        } finally {
    
            //  Last  ReentrantLock  To unlock 
            lock.unlock();
        }
    }
原网站

版权声明
本文为[smartjiang-java]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/160/202206090511357736.html