当前位置:网站首页>8. 18 arhat enhancements for atomic operations

8. 18 arhat enhancements for atomic operations

2022-06-11 12:17:00 Shixiaozan

1、 What is it?

AtomicBoolean

AtomicInteger

AtomicIntegerArray

AtomicIntegerFieldUpdater

AtomicLong

AtomicLongArray

AtomicLongFieldUpdater

AtomicMarkableReference

AtomicReference

AtomicReferenceArray

AtomicReferenceFieldUpdater

AtomicStampedReference

DoubleAccumulator

DoubleAdder

LongAccumulator

LongAdder

2、 Reclassification

2.1、 Basic type atomic class

AtomicInteger

AtomicBoolean

AtomicLong

Commonly used API brief introduction :

public final int get() // Get the current value

public final int getAndSet(int newValue)// Get the current value , And set the new value

public final int getAndIncrement()// Get the current value , And self increase

public final int getAndDecrement() // Get the current value , And reduce

public final int getAndAdd(int delta) // Get the current value , And add the expected value

boolean compareAndSet(int expect, int update) // If the value entered is equal to the expected value , Set the value to the input value in atomic mode (update)

Case

package com.atguigu.juc.atomics;


import java.util.concurrent.CountDownLatch;

import java.util.concurrent.atomic.AtomicInteger;


class MyNumber

{

    AtomicInteger atomicInteger = new AtomicInteger();


    public void addPlusPlus()

    {

        atomicInteger.incrementAndGet();

    }

}


/**

 * @auther zzyy

 * @create 2021-03-17 16:26

 */

public class AtomicIntegerDemo

{

    public static final int SIEZ_ = 50;


    public static void main(String[] args) throws InterruptedException

    {


        MyNumber myNumber = new MyNumber();

        CountDownLatch countDownLatch = new CountDownLatch(SIEZ_);


        for (int i = 1; i <=SIEZ_; i++) {

            new Thread(() -> {

                try

                {

                    for (int j = 1 ;j <=1000; j++) {

                        myNumber.addPlusPlus();

                    }

                }catch (Exception e){

                    e.printStackTrace();

                }finally {

                    countDownLatch.countDown();

                }

            },String.valueOf(i)).start();

        }


        //try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }


        countDownLatch.await();


        System.out.println(Thread.currentThread().getName()+"\t"+"---result : "+myNumber.atomicInteger.get());



    }

}

2.2、 Array type, atomic class

AtomicIntegerArray

AtomicLongArray

AtomicReferenceArray

package com.atguigu.juc.atomics;


import java.util.concurrent.atomic.AtomicIntegerArray;


/**

 * @auther zzyy

 * @create 2021-03-18 16:42

 */

public class AtomicIntegerArrayDemo

{

    public static void main(String[] args)

    {

        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);

        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);

        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});


        for (int i = 0; i <atomicIntegerArray.length(); i++) {

            System.out.println(atomicIntegerArray.get(i));

        }

        System.out.println();

        System.out.println();

        System.out.println();

        int tmpInt = 0;


        tmpInt = atomicIntegerArray.getAndSet(0,1122);

        System.out.println(tmpInt+"\t"+atomicIntegerArray.get(0));

        atomicIntegerArray.getAndIncrement(1);

        atomicIntegerArray.getAndIncrement(1);

        tmpInt = atomicIntegerArray.getAndIncrement(1);

        System.out.println(tmpInt+"\t"+atomicIntegerArray.get(1));


    }

}

2.3、 Reference type atomic class

2.3.1、AtomicReference

See the first 7 Chapter CAS ,4.1、AtomicInteger Atomic integers , Can there be other atomic types ?

2.3.2、AtomicStampedReference

Reference type atomic class with version number , Can solve ABA problem

The solution has been modified several times

State stamp atomic reference

ABADemo See the first 7 Chapter CAS,5.3.2、 Lead out ABA problem ???

2.3.3、AtomicMarkableReference

Atom updates reference type objects with tag bits

Solve whether it has been modified

Its definition is to simplify the state stamp to true|false

Similar to disposable chopsticks

Status stamp (true/false) Atomic reference

package com.atguigu.juc.atomics;


import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicMarkableReference;


/**

 * @auther zzyy

 * @create 2021-03-22 14:14

 */

public class AtomicMarkableReferenceDemo

{

    static AtomicMarkableReference atomicMarkableReference = new AtomicMarkableReference(100,false);


    public static void main(String[] args)

    {

        new Thread(() -> {

            boolean marked = atomicMarkableReference.isMarked();

            System.out.println(Thread.currentThread().getName()+"\t"+"--- Default modification id :"+marked);

            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }

            boolean b = atomicMarkableReference.compareAndSet(100,101,marked,!marked);

            System.out.println(Thread.currentThread().getName()+"\t"+"--- Successful operation :"+b);

            System.out.println(Thread.currentThread().getName()+"\t"+atomicMarkableReference.getReference());

            System.out.println(Thread.currentThread().getName()+"\t"+atomicMarkableReference.isMarked());

        },"t1").start();


        new Thread(() -> {

            boolean marked = atomicMarkableReference.isMarked();

            System.out.println(Thread.currentThread().getName()+"\t"+"--- Default modification id :"+marked);

            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }

            boolean b = atomicMarkableReference.compareAndSet(100, 20210308, marked, !marked);


            System.out.println(Thread.currentThread().getName()+"\t"+"--- Successful operation :"+b);

            System.out.println(Thread.currentThread().getName()+"\t"+atomicMarkableReference.getReference());

            System.out.println(Thread.currentThread().getName()+"\t"+atomicMarkableReference.isMarked());


        },"t2").start();

    }

}

2.4、 Object's properties modify the atomic class

2.4.1、AtomicIntegerFieldUpdater

Atomic update object int The value of the type field

2.4.2、AtomicLongFieldUpdater

Atomic update object Long The value of the type field

2.4.3、AtomicReferenceFieldUpdater

Atom updates the value of the reference type field

Purpose of use : Manipulate certain fields in non thread safe objects in a thread safe manner

Use requirement : Updated object properties must use public volatile Modifier . Because the object's attribute modification type atomic classes are abstract classes , So every time you use it, you have to use static methods newUpdater() Create an updater , And you need to set the classes and properties you want to update .

package com.atguigu.juc.atomics;


import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;


class BankAccount

{

    String bankName = "ccb";


    // Manipulate certain fields in non thread safe objects in a thread safe manner 


    //1  Updated object properties must use  public volatile  Modifier .

    public volatile int money = 0;


    //2  Because the object's attribute modification type atomic classes are abstract classes , So every time you use it, you must 

    //  Use static methods newUpdater() Create an updater , And you need to set the classes and properties you want to update .

    AtomicIntegerFieldUpdater FieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class,"money");


    public void transfer(BankAccount bankAccount)

    {

        FieldUpdater.incrementAndGet(bankAccount);

    }

}



/**

 * @auther zzyy

 * @create 2021-03-18 17:20

 */

public class AtomicIntegerFieldUpdaterDemo

{

    public static void main(String[] args) throws InterruptedException

    {

        BankAccount bankAccount = new BankAccount();


        for (int i = 1; i <=1000; i++) {

            new Thread(() -> {

                bankAccount.transfer(bankAccount);

            },String.valueOf(i)).start();

        }


        // Pause the thread for a few seconds 

        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }



        System.out.println(Thread.currentThread().getName()+"\t"+"---bankAccount: "+bankAccount.money);

    }

}



 Running results 

main---bankAccount: 1000


Process finished with exit code 0

package com.atguigu.juc.atomics;


import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;


class MyVar

{

    public volatile Boolean isInit = Boolean.FALSE;

    AtomicReferenceFieldUpdater<MyVar,Boolean> FieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class,Boolean.class,"isInit");


    public void init(MyVar myVar)

    {

        if(FieldUpdater.compareAndSet(myVar,Boolean.FALSE,Boolean.TRUE))

        {

            System.out.println(Thread.currentThread().getName()+"\t"+"---start init");

            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }

            System.out.println(Thread.currentThread().getName()+"\t"+"---end init");

        }else{

            System.out.println(Thread.currentThread().getName()+"\t"+"--- Snatch failed , There are already threads in the process of modification ");

        }

    }


}



/**

 * @auther zzyy

 * @create 2021-03-22 15:20

 *   Multithreading calls the initialization method of a class concurrently , If not initialized , Initialization will be performed , The request can only be initialized once 

 */

public class AtomicReferenceFieldUpdaterDemo

{

    public static void main(String[] args)

    {

        MyVar myVar = new MyVar();


        for (int i = 1; i <=5; i++) {

            new Thread(() -> {

                myVar.init(myVar);

            },String.valueOf(i)).start();

        }

    }

}


 Running results 

1---start init

4--- Snatch failed , There are already threads in the process of modification 

3--- Snatch failed , There are already threads in the process of modification 

2--- Snatch failed , There are already threads in the process of modification 

5--- Snatch failed , There are already threads in the process of modification 

1---end init

2.4.4、 Deep analysis of the principle of atomic operation enhancement class

DoubleAccumulator

DoubleAdder

LongAccumulator

LongAdder

An interview question :

 1      Hot products like calculator , Like number plus Statistics , No real-time accuracy is required  

 2      A big one List , It's full of int  type , How to realize Jiajia , Talk about ideas

2.4.4.1、 Like counter , Look at the performance

Commonly used API

Introduction :

LongAdder Can only be used to calculate addition , And start from zero

package com.atguigu.juc.atomics;


import java.util.concurrent.atomic.LongAccumulator;

import java.util.concurrent.atomic.LongAdder;

import java.util.function.LongBinaryOperator;


/**

 * @auther zzyy

 * @create 2021-03-19 15:59

 */

public class LongAdderAPIDemo

{

    public static void main(String[] args)

    {

        LongAdder longAdder = new LongAdder();// You can only add 


        longAdder.increment();

        longAdder.increment();

        longAdder.increment();


        System.out.println(longAdder.longValue());


        LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator()

        {

            @Override

            public long applyAsLong(long left, long right)

            {

                return left - right;

            }

        }, 100);


        longAccumulator.accumulate(1);//1

        longAccumulator.accumulate(2);//3

        longAccumulator.accumulate(3);//6


        System.out.println(longAccumulator.longValue());



    }

}


 Running results 

3

94

LongAccumulator Provides custom function operations :long Type of aggregator , Need to pass in a long Type of binary operation , It can be used to calculate various aggregation operations , Including addition and multiplication  

package com.atguigu.juc.atomics;


import java.util.concurrent.atomic.LongAccumulator;

import java.util.concurrent.atomic.LongAdder;

import java.util.function.LongBinaryOperator;


/**

 * @auther zzyy

 * @create 2021-03-19 15:59

 */

public class LongAccumulatorDemo {

    LongAdder longAdder = new LongAdder();


    public void add_LongAdder() {

        longAdder.increment();

    }


    //LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y,0);

    LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() {

        @Override

        public long applyAsLong(long left, long right) {

            return left - right;

        }

    }, 777);


    public void add_LongAccumulator() {

        longAccumulator.accumulate(1);

    }


    public static void main(String[] args) {

        LongAccumulatorDemo demo = new LongAccumulatorDemo();

        demo.add_LongAccumulator();

        demo.add_LongAccumulator();

        System.out.println(demo.longAccumulator.longValue());

    }


}


 Running results 

775

LongAdder High performance comparison Code demonstration

package com.atguigu.juc.atomics;



import java.util.concurrent.CountDownLatch;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.concurrent.atomic.AtomicLong;

import java.util.concurrent.atomic.LongAccumulator;

import java.util.concurrent.atomic.LongAdder;


class ClickNumber

{

    int number = 0;

    public synchronized void add_Synchronized()

    {

        number++;

    }


    AtomicInteger atomicInteger = new AtomicInteger();

    public void add_AtomicInteger()

    {

        atomicInteger.incrementAndGet();

    }


    AtomicLong atomicLong = new AtomicLong();

    public void add_AtomicLong()

    {

        atomicLong.incrementAndGet();

    }


    LongAdder longAdder = new LongAdder();

    public void add_LongAdder()

    {

        longAdder.increment();

        //longAdder.sum();

    }


    LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x+y,0);

    public void add_LongAccumulator()

    {

        longAccumulator.accumulate(1);

    }


}



/**

 * @auther zzyy

 * @create 2021-03-19 16:08

 *

 *  50 Threads , Every thread 100W Time , Always like it 

 */

public class LongAdderCalcDemo

{

    public static final int SIZE_THREAD = 50;

    public static final int _1W = 10000;


    public static void main(String[] args) throws InterruptedException

    {

        ClickNumber clickNumber = new ClickNumber();

        long startTime;

        long endTime;


        CountDownLatch countDownLatch1 = new CountDownLatch(SIZE_THREAD);

        CountDownLatch countDownLatch2 = new CountDownLatch(SIZE_THREAD);

        CountDownLatch countDownLatch3 = new CountDownLatch(SIZE_THREAD);

        CountDownLatch countDownLatch4 = new CountDownLatch(SIZE_THREAD);

        CountDownLatch countDownLatch5 = new CountDownLatch(SIZE_THREAD);

        //========================


        startTime = System.currentTimeMillis();

        for (int i = 1; i <=SIZE_THREAD; i++) {

            new Thread(() -> {

                try

                {

                    for (int j = 1; j <=100 * _1W; j++) {

                        clickNumber.add_Synchronized();

                    }

                }catch (Exception e){

                    e.printStackTrace();

                }finally {

                    countDownLatch1.countDown();

                }

            },String.valueOf(i)).start();

        }

        countDownLatch1.await();

        endTime = System.currentTimeMillis();

        System.out.println("----costTime: "+(endTime - startTime) +"  millisecond "+"\t add_Synchronized"+"\t"+clickNumber.number);



        startTime = System.currentTimeMillis();

        for (int i = 1; i <=SIZE_THREAD; i++) {

            new Thread(() -> {

                try

                {

                    for (int j = 1; j <=100 * _1W; j++) {

                        clickNumber.add_AtomicInteger();

                    }

                }catch (Exception e){

                    e.printStackTrace();

                }finally {

                    countDownLatch2.countDown();

                }

            },String.valueOf(i)).start();

        }

        countDownLatch2.await();

        endTime = System.currentTimeMillis();

        System.out.println("----costTime: "+(endTime - startTime) +"  millisecond "+"\t add_AtomicInteger"+"\t"+clickNumber.atomicInteger.get());


        startTime = System.currentTimeMillis();

        for (int i = 1; i <=SIZE_THREAD; i++) {

            new Thread(() -> {

                try

                {

                    for (int j = 1; j <=100 * _1W; j++) {

                        clickNumber.add_AtomicLong();

                    }

                }catch (Exception e){

                    e.printStackTrace();

                }finally {

                    countDownLatch3.countDown();

                }

            },String.valueOf(i)).start();

        }

        countDownLatch3.await();

        endTime = System.currentTimeMillis();

        System.out.println("----costTime: "+(endTime - startTime) +"  millisecond "+"\t add_AtomicLong"+"\t"+clickNumber.atomicLong.get());


        startTime = System.currentTimeMillis();

        for (int i = 1; i <=SIZE_THREAD; i++) {

            new Thread(() -> {

                try

                {

                    for (int j = 1; j <=100 * _1W; j++) {

                        clickNumber.add_LongAdder();

                    }

                }catch (Exception e){

                    e.printStackTrace();

                }finally {

                    countDownLatch4.countDown();

                }

            },String.valueOf(i)).start();

        }

        countDownLatch4.await();

        endTime = System.currentTimeMillis();

        System.out.println("----costTime: "+(endTime - startTime) +"  millisecond "+"\t add_LongAdder"+"\t"+clickNumber.longAdder.longValue());


        startTime = System.currentTimeMillis();

        for (int i = 1; i <=SIZE_THREAD; i++) {

            new Thread(() -> {

                try

                {

                    for (int j = 1; j <=100 * _1W; j++) {

                        clickNumber.add_LongAccumulator();

                    }

                }catch (Exception e){

                    e.printStackTrace();

                }finally {

                    countDownLatch5.countDown();

                }

            },String.valueOf(i)).start();

        }

        countDownLatch5.await();

        endTime = System.currentTimeMillis();

        System.out.println("----costTime: "+(endTime - startTime) +"  millisecond "+"\t add_LongAccumulator"+"\t"+clickNumber.longAccumulator.longValue());

    }

}


 Running results :

----costTime: 6297  millisecond  add_Synchronized50000000

----costTime: 797  millisecond  add_AtomicInteger50000000

----costTime: 787  millisecond  add_AtomicLong50000000

----costTime: 113  millisecond  add_LongAdder50000000

----costTime: 144  millisecond  add_LongAccumulator50000000

2.4.4.2、 Source code 、 Principle analysis

framework

LongAdder yes Striped64 Subclasses of  

principle (LongAdder Why so soon? )

The official website description and Alibaba requirements

Striped64

Striped64 There are several important member functions 

// Number of CPUS, to place bound on table size         
//CPU Number , namely cells The maximum length of an array   
static final int NCPU = Runtime.getRuntime().availableProcessors();

//Table of cells. When non-null, size is a power of 2. 
//cells Array , by 2 The power of ,2,4,8,16....., Convenient for later bit operation 
transient volatile Cell[] cells;


//Base value, used mainly when there is no contention, but also as
//a fallback during table initialization races. Updated via CAS.
// Basics value value , When concurrency is low , Only accumulate this value. It is mainly used when there is no competition , adopt CAS to update .
transient volatile long base;


//Spinlock (locked via CAS) used when resizing and/or creating Cells. 
// Create or expand Cells The spin lock variable used when array resizes the cell ( Capacity expansion ), Lock used when creating cells .
transient volatile int cellsBusy;

Striped64 Definition of some variables or methods in

base: Be similar to AtomicLong Global in value value . In the absence of competition, data is directly added to the market base On , perhaps cells Add capacity , You also need to write data to base On

collide: Indicates the intention of expansion ,false It will not be expanded ,true It may be expanded .

cellsBusy: initialization cells Or expansion cells Need to get lock ,0: No lock state 1: Indicates that other threads already hold locks

casCellsBusy(): adopt CAS Operation modification cellsBusy Value ,CAS Success means acquiring the lock , return true

NCPU: The current computer CPU Number ,CelI When the array is expanded, it will use

getProbe(): Gets the current thread's hash value

advanceProbe(): Reset the current thread's hash value

Cell

yes java.util.concurrent.atomic Next Striped64 An inner class

LongAdder Why so soon?

LongAdder The basic idea is Disperse hot spots  , take value Scatter values into one  Cell Array   in , Different threads will hit different slots in the array , Each thread only performs on that value in its own slot CAS operation , So the hot spots are scattered , The probability of conflict is much smaller . If you want to get real long value , Just add up the variable values in each slot and return to .   

   

sum() Will bring all Cell Array value and base Accumulate as the return value , The core idea is to put the previous AtomicLong One value The update pressure is distributed to multiple value In the middle ,    So as to downgrade the update hotspot .   

Mathematical expression : There's a base Variable ,

One Cell[] Array .base Variable : Under non competitive conditions , Add directly to the variable

Cell[] Array : Under competitive conditions , Accumulate each thread's own slot Cell[i] in

 

Source code interpretation and in-depth analysis

A small summary

LongAdder In the absence of competition , Follow AtomicLong equally , Yes The same base Into the Line operation , When there is a competitive relationship, it is the practice of breaking up the whole into parts , Change time from space , Use an array cells, Will a value Split into this array cells. Multiple threads need to pair at the same time value During operation , Can be on the thread id Conduct hash obtain hash value , According to hash Values are mapped to this array cells A subscript of , Then the value corresponding to the subscript is automatically increased . When all threads are finished , Will array cells All values and non competitive values base All add up as the final result . 

longAdder.increment() Source code interpretation : See the large picture for details

① add(1L)

as Express cells quote

b Represents the acquired base value

v Express Expectations

m Express cells Count Group The length of

a Represents the hit of the current thread cell Cell

② longAccumulate


// Expansion intention ,false It will not be expanded ,true It may be expanded 
boolean collide = false;                // True if last slot nonempty
for (;;) {
    Cell[] as; Cell a; int n; long v;
    //CASE1: Cell[] Array has been initialized  
    if ((as = cells) != null && (n = as.length) > 0) {
        //.....
    }
    //CASE2: Cell[] Array not initialized  ( New for the first time ) ,cells Not locked and not initialized , Then try to lock it , And initialization cells Array 
    else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
       // ......

    }
    //CASE3: cells Array initializing , Then try directly in the cardinality base Add up on 
    else if (casBase(v = base, ((fn == null) ? v + x :  fn.applyAsLong(v, x))))
        //.....                         // Fall back on using base
}
!(uncontended = a.cas(v = a.value, v + x))

  High definition big picture of code logic : Youdao cloud notes

③ sum

sum() Will bring all Cell Array value and base Accumulate as the return value . 

The core idea is to put the previous AtomicLong One value The update pressure is distributed to multiple value In the middle , So as to downgrade the update hotspot . 

Why in the case of concurrency sum The value of is not accurate

sum Execution time , There is no limit to base and cells Update ( A deadly word ). therefore LongAdder Not strongly consistent , It is ultimately consistent . 

First , Finally returned sum local variable , The initial is copied as base, And when you finally return , Probably base Has been updated , At this point, the local variable sum Will not update , To cause disagreement . 

secondly , Here to cell The read of cannot be guaranteed to be the last written value . therefore ,sum Methods without concurrency , You can get the right results . 

Use summary

AtomicLong:

Thread safety , Some performance loss is allowed , When high precision is required, it can be used

Guarantee accuracy , Performance cost

AtomicLong Multiple threads for a single hotspot value value Perform atomic operations

LongAdder:

When it is necessary to have good performance under high and low performance , And the accuracy of the value is not high , have access to

Guaranteed performance , The cost of accuracy

LongAdder Each thread has its own slot , Each thread generally only checks the value in its own slot CAS operation

2.4.4.3、 A small summary

AtomicLong:

principle :CAS+ The spin 、incrementAndGet

scene : Global computing under low concurrency 、AtomicLong It can ensure the accuracy of counting in case of concurrency , It passes through CAS To solve the problem of concurrency security .

defects : The performance drops sharply after high concurrency ,why?AtomicLong The spin of will become a bottleneck

N Threads CAS Modify the operation value of the thread , Only one has succeeded at a time , Other N-1 Failure , The failure keeps spinning until success , Such a large number of failed spins , once cpu Just hit high .

LongAdder vs AtomicLong Performance

Java 8 Performance Improvements: LongAdder vs AtomicLong | Palomino Labs Blog

LongAdder

principle :CAS+Base+Cell Array scatter , Space changes time and disperses hot data

scene : Global calculation under high concurrency

defects :sum If the calculation thread modifies the result after summation , The final result is not accurate enough

原网站

版权声明
本文为[Shixiaozan]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/162/202206111207398979.html