当前位置:网站首页>8、原子操作类之18罗汉增强
8、原子操作类之18罗汉增强
2022-06-11 12:07:00 【施小赞】
1、是什么
2、再分类
2.1、基本类型原子类
public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
package com.atguigu.juc.atomics;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
class MyNumber
{
AtomicInteger atomicInteger = new AtomicInteger();
public void addPlusPlus()
{
atomicInteger.incrementAndGet();
}
}
/**
* @auther zzyy
* @create 2021-03-17 16:26
*/
public class AtomicIntegerDemo
{
public static final int SIEZ_ = 50;
public static void main(String[] args) throws InterruptedException
{
MyNumber myNumber = new MyNumber();
CountDownLatch countDownLatch = new CountDownLatch(SIEZ_);
for (int i = 1; i <=SIEZ_; i++) {
new Thread(() -> {
try
{
for (int j = 1 ;j <=1000; j++) {
myNumber.addPlusPlus();
}
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
},String.valueOf(i)).start();
}
//try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"\t"+"---result : "+myNumber.atomicInteger.get());
}
}2.2、数组类型原子类
package com.atguigu.juc.atomics;
import java.util.concurrent.atomic.AtomicIntegerArray;
/**
* @auther zzyy
* @create 2021-03-18 16:42
*/
public class AtomicIntegerArrayDemo
{
public static void main(String[] args)
{
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
//AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
//AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});
for (int i = 0; i <atomicIntegerArray.length(); i++) {
System.out.println(atomicIntegerArray.get(i));
}
System.out.println();
System.out.println();
System.out.println();
int tmpInt = 0;
tmpInt = atomicIntegerArray.getAndSet(0,1122);
System.out.println(tmpInt+"\t"+atomicIntegerArray.get(0));
atomicIntegerArray.getAndIncrement(1);
atomicIntegerArray.getAndIncrement(1);
tmpInt = atomicIntegerArray.getAndIncrement(1);
System.out.println(tmpInt+"\t"+atomicIntegerArray.get(1));
}
}2.3、引用类型原子类
2.3.1、AtomicReference
见第7章 CAS ,4.1、AtomicInteger原子整型,可否有其它原子类型?
2.3.2、AtomicStampedReference
ABADemo见第7章CAS,5.3.2、引出来ABA问题???
2.3.3、AtomicMarkableReference
package com.atguigu.juc.atomics;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicMarkableReference;
/**
* @auther zzyy
* @create 2021-03-22 14:14
*/
public class AtomicMarkableReferenceDemo
{
static AtomicMarkableReference atomicMarkableReference = new AtomicMarkableReference(100,false);
public static void main(String[] args)
{
new Thread(() -> {
boolean marked = atomicMarkableReference.isMarked();
System.out.println(Thread.currentThread().getName()+"\t"+"---默认修改标识:"+marked);
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
boolean b = atomicMarkableReference.compareAndSet(100,101,marked,!marked);
System.out.println(Thread.currentThread().getName()+"\t"+"---操作是否成功:"+b);
System.out.println(Thread.currentThread().getName()+"\t"+atomicMarkableReference.getReference());
System.out.println(Thread.currentThread().getName()+"\t"+atomicMarkableReference.isMarked());
},"t1").start();
new Thread(() -> {
boolean marked = atomicMarkableReference.isMarked();
System.out.println(Thread.currentThread().getName()+"\t"+"---默认修改标识:"+marked);
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
boolean b = atomicMarkableReference.compareAndSet(100, 20210308, marked, !marked);
System.out.println(Thread.currentThread().getName()+"\t"+"---操作是否成功:"+b);
System.out.println(Thread.currentThread().getName()+"\t"+atomicMarkableReference.getReference());
System.out.println(Thread.currentThread().getName()+"\t"+atomicMarkableReference.isMarked());
},"t2").start();
}
}2.4、对象的属性修改原子类
2.4.1、AtomicIntegerFieldUpdater
2.4.2、AtomicLongFieldUpdater
2.4.3、AtomicReferenceFieldUpdater
使用目的:以一种线程安全的方式操作非线程安全对象内的某些字段
使用要求:更新的对象属性必须使用 public volatile 修饰符。因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
package com.atguigu.juc.atomics;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
class BankAccount
{
String bankName = "ccb";
//以一种线程安全的方式操作非线程安全对象内的某些字段
//1 更新的对象属性必须使用 public volatile 修饰符。
public volatile int money = 0;
//2 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须
// 使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
AtomicIntegerFieldUpdater FieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class,"money");
public void transfer(BankAccount bankAccount)
{
FieldUpdater.incrementAndGet(bankAccount);
}
}
/**
* @auther zzyy
* @create 2021-03-18 17:20
*/
public class AtomicIntegerFieldUpdaterDemo
{
public static void main(String[] args) throws InterruptedException
{
BankAccount bankAccount = new BankAccount();
for (int i = 1; i <=1000; i++) {
new Thread(() -> {
bankAccount.transfer(bankAccount);
},String.valueOf(i)).start();
}
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t"+"---bankAccount: "+bankAccount.money);
}
}
运行结果
main---bankAccount: 1000
Process finished with exit code 0package com.atguigu.juc.atomics;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
class MyVar
{
public volatile Boolean isInit = Boolean.FALSE;
AtomicReferenceFieldUpdater<MyVar,Boolean> FieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class,Boolean.class,"isInit");
public void init(MyVar myVar)
{
if(FieldUpdater.compareAndSet(myVar,Boolean.FALSE,Boolean.TRUE))
{
System.out.println(Thread.currentThread().getName()+"\t"+"---start init");
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t"+"---end init");
}else{
System.out.println(Thread.currentThread().getName()+"\t"+"---抢夺失败,已经有线程在修改中");
}
}
}
/**
* @auther zzyy
* @create 2021-03-22 15:20
* 多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,要求只能初始化一次
*/
public class AtomicReferenceFieldUpdaterDemo
{
public static void main(String[] args)
{
MyVar myVar = new MyVar();
for (int i = 1; i <=5; i++) {
new Thread(() -> {
myVar.init(myVar);
},String.valueOf(i)).start();
}
}
}
运行结果
1---start init
4---抢夺失败,已经有线程在修改中
3---抢夺失败,已经有线程在修改中
2---抢夺失败,已经有线程在修改中
5---抢夺失败,已经有线程在修改中
1---end init2.4.4、原子操作增强类原理深度解析
2 一个很大的 List ,里面都是 int 类型,如何实现加加,说说思路
package com.atguigu.juc.atomics;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongBinaryOperator;
/**
* @auther zzyy
* @create 2021-03-19 15:59
*/
public class LongAdderAPIDemo
{
public static void main(String[] args)
{
LongAdder longAdder = new LongAdder();//只能做加法
longAdder.increment();
longAdder.increment();
longAdder.increment();
System.out.println(longAdder.longValue());
LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator()
{
@Override
public long applyAsLong(long left, long right)
{
return left - right;
}
}, 100);
longAccumulator.accumulate(1);//1
longAccumulator.accumulate(2);//3
longAccumulator.accumulate(3);//6
System.out.println(longAccumulator.longValue());
}
}
运行结果
3
94LongAccumulator提供了自定义的函数操作:long类型的聚合器,需要传入一个long类型的二元操作,可以用来计算各种聚合操作,包括加乘等
package com.atguigu.juc.atomics;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongBinaryOperator;
/**
* @auther zzyy
* @create 2021-03-19 15:59
*/
public class LongAccumulatorDemo {
LongAdder longAdder = new LongAdder();
public void add_LongAdder() {
longAdder.increment();
}
//LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y,0);
LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() {
@Override
public long applyAsLong(long left, long right) {
return left - right;
}
}, 777);
public void add_LongAccumulator() {
longAccumulator.accumulate(1);
}
public static void main(String[] args) {
LongAccumulatorDemo demo = new LongAccumulatorDemo();
demo.add_LongAccumulator();
demo.add_LongAccumulator();
System.out.println(demo.longAccumulator.longValue());
}
}
运行结果
775package com.atguigu.juc.atomics;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
class ClickNumber
{
int number = 0;
public synchronized void add_Synchronized()
{
number++;
}
AtomicInteger atomicInteger = new AtomicInteger();
public void add_AtomicInteger()
{
atomicInteger.incrementAndGet();
}
AtomicLong atomicLong = new AtomicLong();
public void add_AtomicLong()
{
atomicLong.incrementAndGet();
}
LongAdder longAdder = new LongAdder();
public void add_LongAdder()
{
longAdder.increment();
//longAdder.sum();
}
LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x+y,0);
public void add_LongAccumulator()
{
longAccumulator.accumulate(1);
}
}
/**
* @auther zzyy
* @create 2021-03-19 16:08
*
* 50个线程,每个线程100W次,总点赞数出来
*/
public class LongAdderCalcDemo
{
public static final int SIZE_THREAD = 50;
public static final int _1W = 10000;
public static void main(String[] args) throws InterruptedException
{
ClickNumber clickNumber = new ClickNumber();
long startTime;
long endTime;
CountDownLatch countDownLatch1 = new CountDownLatch(SIZE_THREAD);
CountDownLatch countDownLatch2 = new CountDownLatch(SIZE_THREAD);
CountDownLatch countDownLatch3 = new CountDownLatch(SIZE_THREAD);
CountDownLatch countDownLatch4 = new CountDownLatch(SIZE_THREAD);
CountDownLatch countDownLatch5 = new CountDownLatch(SIZE_THREAD);
//========================
startTime = System.currentTimeMillis();
for (int i = 1; i <=SIZE_THREAD; i++) {
new Thread(() -> {
try
{
for (int j = 1; j <=100 * _1W; j++) {
clickNumber.add_Synchronized();
}
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch1.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch1.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_Synchronized"+"\t"+clickNumber.number);
startTime = System.currentTimeMillis();
for (int i = 1; i <=SIZE_THREAD; i++) {
new Thread(() -> {
try
{
for (int j = 1; j <=100 * _1W; j++) {
clickNumber.add_AtomicInteger();
}
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch2.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch2.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_AtomicInteger"+"\t"+clickNumber.atomicInteger.get());
startTime = System.currentTimeMillis();
for (int i = 1; i <=SIZE_THREAD; i++) {
new Thread(() -> {
try
{
for (int j = 1; j <=100 * _1W; j++) {
clickNumber.add_AtomicLong();
}
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch3.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch3.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_AtomicLong"+"\t"+clickNumber.atomicLong.get());
startTime = System.currentTimeMillis();
for (int i = 1; i <=SIZE_THREAD; i++) {
new Thread(() -> {
try
{
for (int j = 1; j <=100 * _1W; j++) {
clickNumber.add_LongAdder();
}
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch4.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch4.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_LongAdder"+"\t"+clickNumber.longAdder.longValue());
startTime = System.currentTimeMillis();
for (int i = 1; i <=SIZE_THREAD; i++) {
new Thread(() -> {
try
{
for (int j = 1; j <=100 * _1W; j++) {
clickNumber.add_LongAccumulator();
}
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch5.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch5.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_LongAccumulator"+"\t"+clickNumber.longAccumulator.longValue());
}
}
运行结果:
----costTime: 6297 毫秒 add_Synchronized50000000
----costTime: 797 毫秒 add_AtomicInteger50000000
----costTime: 787 毫秒 add_AtomicLong50000000
----costTime: 113 毫秒 add_LongAdder50000000
----costTime: 144 毫秒 add_LongAccumulator50000000Striped64有几个比较重要的成员函数
// Number of CPUS, to place bound on table size
//CPU数量,即cells数组的最大长度
static final int NCPU = Runtime.getRuntime().availableProcessors();
//Table of cells. When non-null, size is a power of 2.
//cells数组,为2的幂,2,4,8,16.....,方便以后位运算
transient volatile Cell[] cells;
//Base value, used mainly when there is no contention, but also as
//a fallback during table initialization races. Updated via CAS.
//基础value值,当并发较低时,只累加该值主要用于没有竞争的情况,通过CAS更新。
transient volatile long base;
//Spinlock (locked via CAS) used when resizing and/or creating Cells.
//创建或者扩容Cells数组时使用的自旋锁变量调整单元格大小(扩容),创建单元格时使用的锁。
transient volatile int cellsBusy;base:类似于AtomicLong中全局的value值。在没有竞争情况下数据直接累加到base上,或者cells扩容时,也需要将数据写入到base上
collide:表示扩容意向,false一定不会扩容,true可能会扩容。
cellsBusy:初始化cells或者扩容cells需要获取锁,0:表示无锁状态1:表示其他线程已经持有了锁
casCellsBusy():通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true
是 java.util.concurrent.atomic 下 Striped64 的一个内部类
LongAdder 的基本思路就是 分散热点 ,将 value 值分散到一个 Cell 数组 中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行 CAS 操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的 long 值,只要将各个槽中的变量值累加返回。
sum() 会将所有 Cell 数组中的 value 和 base 累加作为返回值,核心的思想就是将之前 AtomicLong 一个 value 的更新压力分散到多个 value 中去, 从而降级更新热点 。
一个Cell[]数组。base变量:非竞态条件下,直接累加到该变量上
Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中
LongAdder在无竞争的情况,跟AtomicLong一样,对 同一个base进 行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。
longAdder.increment() 源码解读:具体可看大图
b 表示获取的base值
v 表示 期望值
m 表示 cells 数组的长度
a 表示当前线程命中的cell单元格
//扩容意向,false一定不会扩容,true可能会扩容
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
//CASE1: Cell[]数组已经初始化
if ((as = cells) != null && (n = as.length) > 0) {
//.....
}
//CASE2: Cell[]数组未初始化 (首次新建) ,cells没有加锁且没有初始化,则尝试对它加锁,并初始化cells数组
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// ......
}
//CASE3: cells数组正在初始化,则尝试直接在基数base上进行累加操作
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
//..... // Fall back on using base
}
!(uncontended = a.cas(v = a.value, v + x))代码逻辑高清大图:有道云笔记


sum()会将所有Cell数组中的value和base累加作为返回值。
核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去, 从而降级更新热点 。
sum执行时,并没有限制对base和cells的更新(一句要命的话)。所以LongAdder不是强一致性的,它是最终一致性的。
首先,最终返回的sum局部变量,初始被复制为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致。
其次,这里对cell的读取也无法保证是最后一次写入的值。所以,sum方法在没有并发的情况下,可以获得正确的结果。
AtomicLong是多个线程针对单个热点值value进行原子操作
当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用
LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作
场景:低并发下的全局计算、AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。
缺陷:高并发后性能急剧下降,why?AtomicLong的自旋会成为瓶颈
N个线程CAS操作修改线程的值,每次只有一个成功过,其它N-1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。
LongAdder vs AtomicLong Performance
Java 8 Performance Improvements: LongAdder vs AtomicLong | Palomino Labs Blog
原理:CAS+Base+Cell数组分散,空间换时间并分散了热点数据
边栏推荐
- Splunk最佳实践之workload managment
- Golang uses XOR ^ to exchange two variables and encrypt / decrypt them
- UnicodeDecodeError: ‘utf-8‘ codec can‘t decode byte 0xc5 in position 13: invalid continuation byte
- Zhejiang University and Microsoft Asia Research Institute released a new method of video recognition, which can recognize video frame by frame without data marking, or can be used for sign language tr
- 14、课程总结与回顾
- Thread five states (thread life cycle)
- Elk - hearthbeat implements service monitoring
- Software project management 7.1 Basic concept of project schedule
- Take you to know about direct insertion sorting (C language)
- 大数相加(C语言)
猜你喜欢

flink 多流转换(侧输出流分流、Union、Connect) 实时对账app 的支付操作和第三方的支付操作的双流 Join

一些比较的常用网站

Flink time semantics, watermark, generated watermark, and transmission of watermark

SQLServer连接数据库(中文表)部分数据乱码问题解决

flink 时间语义、水位线(Watermark)、生成水位线、水位线的传递

When the security engineer finds a major vulnerability in the PS host, the CD can execute arbitrary code in the system

深度学习与CV教程(14) | 图像分割 (FCN,SegNet,U-Net,PSPNet,DeepLab,RefineNet)

PS does not display text cursor, text box, and does not highlight after selection

Generate statement is not synthesized

Is the SSL certificate reliable in ensuring the information security of the website?
随机推荐
读取geo表达矩阵
JMeter 学习心得
When the security engineer finds a major vulnerability in the PS host, the CD can execute arbitrary code in the system
Using fast and slow pointer method to solve the problem of array (C language)
Flink data flow graph, parallelism, operator chain, jobgraph and executiongraph, task and task slot
ObjectInputStream读取文件对象ObjectOutputStream写入文件对象
Objectinputstream read file object objectoutputstream write file object
C # apply many different fonts in PDF documents
gocron 定时任务管理平台
Acwing50+acwing51 weeks +acwing3493 Maximum sum (open)
軟件項目管理 7.1.項目進度基本概念
14、课程总结与回顾
Apple mobileone: the mobile terminal only needs 1ms of high-performance backbone
中间人攻击之ettercap嗅探
(推荐)splunk 多少数量search head 才合适
flink 多流转换(侧输出流分流、Union、Connect) 实时对账app 的支付操作和第三方的支付操作的双流 Join
(解决)Splunk 之 kv-store down 问题
13、ReentrantLock、ReentrantReadWriteLock、StampedLock讲解
软件项目管理 7.1.项目进度基本概念
Sulley fuzzer learning














