当前位置:网站首页>多线程与高并发(8)—— 从CountDownLatch总结AQS共享锁(三周年打卡)
多线程与高并发(8)—— 从CountDownLatch总结AQS共享锁(三周年打卡)
2022-07-06 00:10:00 【李王家的翠花】
不知不觉已经写文章3年了,3年只写了六十多篇文章,略有惭愧。希望后续一段时间能迎来一波创作的爆发,也能迎来自己技术的爆发。加油吧,翠花!
一 、概述
上篇文章:多线程与高并发(7)——从ReentrantLock到AQS源码详细讲解了AQS的原理,我们知道使用 AQS 能简单且高效地构造出同步器(重写tryAcquire、tryRelease就能简单实现)。这里呢,JDK给我们提供了很多高效同步器,如CountDownLatch 、CyclicBarrier、Phaser、Semaphore、Exchanger、ReentrantReadWriteLock,我们后面文章逐一讲解分析。
这里,我们会通过CountDownLatch 总结AQS共享锁的原理,同时对比下CyclicBarrier和Phaser。
二、CountDownLatch
CountDownLatch字面意思是倒数门栓,也就是倒数计数多少个线程执行完毕了。 其允许 int个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
1、常用方法
await(): 调用该方法的线程处于等待状态,直到latch的值被减到0或者当前线程被中断。一般都是主线程调用。——开门
await(long timeout, TimeUnit unit):带超时时间的await。
countDown():使latch的值减1,如果减到了0,则会唤醒所有等待在这个latch上的线程。——倒数
getCount():获得latch值。
2、两种典型用法
1、执行完所有的业务后才执行主线程
将 CountDownLatch 的计数器初始化为 n (new CountDownLatch(n)),每当一个任务线程执行完毕,就将计数器减 1 (countdownlatch.countDown()),当计数器的值变为 0 时,在 CountDownLatch 上 await() 的线程就会被唤醒。比如百度的文本转语音的代码,对于长文本来说,需要分段来转,需每段都转换完成才能整合整个文本。
我们再举一个例子,比如一场考试,有30个考生(线程),一个监考老师(主线程),每个考生完成试卷之后都可以提前交卷,且交卷后不需要管其他考试如何,考试结束后(所有学生交卷,latch为0),监考老师才能离开(主线程结束)。
30个学生输出结果太长,我们换成5个学生,代码如下:
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(5);
//指定线程数
ExecutorService threadPool = Executors.newFixedThreadPool(30);
for (int i = 0; i < 5; i++) {
int finalI = i;
threadPool.execute(() -> {
System.out.println("学生"+ finalI +"交卷");
countDownLatch.countDown();
});
}
try {
//开门
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.shutdown();
System.out.println("考试结束,监考老师离场");
}
运行结果如下,可以看出学生交卷是无序的,但是老师必须等所有学生交完卷才能离场:
学生0交卷
学生2交卷
学生3交卷
学生1交卷
学生4交卷
考试结束,监考老师离场
2、某一时刻,所有线程一起执行。
此作用是实现多个线程开始执行任务的最大并行性。
所谓并行,强调的是多个线程在某一时刻同时开始执行。而并发在围观上仍然是顺序执行。
类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。
代码如下:
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(1);
CountDownLatch countDownLatch2 = new CountDownLatch(5);
//指定线程数
ExecutorService threadPool = Executors.newFixedThreadPool(30);
for (int i = 0; i < 5; i++) {
int finalI = i;
threadPool.execute(() -> {
try {
System.out.println("运动员"+ finalI +"正在准备");
//线程阻塞,等着被释放
countDownLatch.await();
Thread.sleep(100);
System.out.println("运动员"+ finalI +"冲刺");
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch2.countDown();
System.out.println("运动员"+ finalI +"完成比赛");
});
}
try {
Thread.sleep(1000);
System.out.println("所有运动员准备完毕,开跑");
//解锁释放上面的所有线程
countDownLatch.countDown();
//所有线程完成比赛,往下走
countDownLatch2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.shutdown();
}
执行结果如下:
运动员1正在准备
运动员4正在准备
运动员3正在准备
运动员0正在准备
运动员2正在准备
所有运动员准备完毕,开跑
运动员0冲刺
运动员1冲刺
运动员4冲刺
运动员3冲刺
运动员2冲刺
运动员3完成比赛
运动员4完成比赛
运动员1完成比赛
运动员0完成比赛
运动员2完成比赛
3、源码解析
有了上一篇文章的基础,我们这里讲共享锁的流程会更加的得心应手。
CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown()方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当调用 await() 方法的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 方法就会一直阻塞,也就是说await() 方法之后的语句不会被执行。然后,CountDownLatch 会自旋 CAS 判断 state == 0,如果 state == 0 的话,就会释放所有等待的线程,await() 方法之后的语句得到执行。
首先,我们看一下它的构造方法:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
...
Sync(int count) {
setState(count);
}
可以看出来,在进行new对象的时候,count就是state的初始值。
然后,我们再看一下await()方法:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
其调用了sync的acquireSharedInterruptibly方法,也是AQS的方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果被中断,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取同步状态
if (tryAcquireShared(arg) < 0)
// 获取同步状态失败,自旋
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared()方法为CountDownLatch 重写的方法,如下:
protected int tryAcquireShared(int acquires) {
//当前状态是否为0可释放锁
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedInterruptibly代码如下:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//将当前线程加入同步队列的尾部,addWaiter可以看上一篇文章
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//自旋
for (;;) {
//当前节点的前驱节点
final Node p = node.predecessor();
//如果前驱节点是头结点,则尝试获取同步状态
if (p == head) {
//当前节点尝试获取同步状态
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果获取成功,则设置当前节点为头结点并唤醒下一个线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//如果当前节点的前驱不是头结点,尝试挂起当前线程,和独享锁相同
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
//取消获取锁,和独享锁相同
cancelAcquire(node);
}
}
下面,我们看一下countDown() 方法的源码:
public void countDown() {
sync.releaseShared(1);
}
其同样调用了AQS的releaseShared()方法:
public final boolean releaseShared(int arg) {
//获取释放同步状态
if (tryReleaseShared(arg)) {
// 如果成功,进入自旋,尝试唤醒同步队列中头结点的后继节点
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared()代码如下:
protected boolean tryReleaseShared(int releases) {
// 就一个意思,自旋,直到state状态-1为0
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
doReleaseShared()方法如下:
private void doReleaseShared() {
//上来先自旋
for (;;) {
//获取头节点
Node h = head;
if (h != null && h != tail) {
//头结点状态
int ws = h.waitStatus;
//如果是SIGNAL,尝试唤醒后继节点
if (ws == Node.SIGNAL) {
//只要head成功的从SIGNAL修改为0,那么head的后继节点对应的线程将会被唤醒。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//把下一个不为空的节点unpark
unparkSuccessor(h);
}
//其他时候不唤醒
else if (ws == 0 &&
//前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。不停地for
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果head没有改变,则调用break退出循环
if (h == head) // loop if head changed
break;
}
}
当头结点的后继节点被唤醒后,线程将从挂起的地方醒来,继续执行。当前状态若还>0,则设置当前节点为头结点。setHeadAndPropagate()代码如下:
private void setHeadAndPropagate(Node node, int propagate) {
//当前头节点
Node h = head; // Record old head for check below
//设置当前节点为头节点
setHead(node);
//如果执行这个函数,那么propagate一定等于1
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//获取当前节点的下一个节点
Node s = node.next;
//唤醒后续节点
if (s == null || s.isShared())
doReleaseShared();
}
}
嗯,写着写着有点乱了,画个图冷静一下:
蓝色为CountDownLatch,黄色为AQS源码。
三、CyclicBarrier
CyclicBarrier 的字面意思是可循环使用(Cyclic)的栅栏(Barrier)。一组线程到达一个栅栏(也可以叫同步点)时被阻塞,直到最后一个线程到达栅栏时,栅栏才会开门,所有被拦截的线程才会继续干活。
它和CountDownLatch的区别是,比如考试,CountDownLatch中前一个考生完全不用管后一个考生如何。但是CyclicBarrier中,必须所有人到达,比如小学生出去旅游,回家时,必须每个都上车能回家。
核心方法:
await() :在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
CyclicBarrier(int parties, Runnable barrierAction): 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。
reset():将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException。
使用代码如下:
public static void main(String[] args) {
//每5个人发车
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
//指定线程数
ExecutorService threadPool = Executors.newFixedThreadPool(30);
for (int i = 0; i < 5; i++) {
int finalI = i;
threadPool.execute(() -> {
System.out.println("学生"+ finalI +"上车");
try {
//等待以保证子线程执行结束
cyclicBarrier.await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println("学生"+ finalI +"出发");
});
}
threadPool.shutdown();
}
执行结果如下:
学生1上车
学生3上车
学生4上车
学生2上车
学生0上车
学生0出发
学生3出发
学生1出发
学生2出发
学生4出发
可以看出,所有学生到齐了才出发。
CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。
扩展:
Phaser类又是什么呢?
Phaser 是是一个可重用的同步栅栏,它的功能与 CountDownLatch、CyclicBarrier 相似,但是可以用来解决控制多个线程分阶段共同完成任务的情景问题。——分段栅栏
也有点类似于CyclicBarrier的barrierAction,这里就不过多总结了。
边栏推荐
- openssl-1.0.2k版本升级openssl-1.1.1p
- [day39 literature extensive reading] a Bayesian perspective on magnetic estimation
- NSSA area where OSPF is configured for Huawei equipment
- 激光slam学习记录
- Detailed explanation of APP functions of door-to-door appointment service
- 7.5 simulation summary
- mysql-全局锁和表锁
- 行列式学习笔记(一)
- 14 MySQL view
- OS i/o devices and device controllers
猜你喜欢
Initialize your vector & initializer with a list_ List introduction
激光slam学习记录
跟着CTF-wiki学pwn——ret2libc1
上门预约服务类的App功能详解
About the slmgr command
软件测试工程师必会的银行存款业务,你了解多少?
XML configuration file (DTD detailed explanation)
Choose to pay tribute to the spirit behind continuous struggle -- Dialogue will values [Issue 4]
妙才周刊 - 8
Transport layer protocol ----- UDP protocol
随机推荐
Single merchant v4.4 has the same original intention and strength!
Hudi of data Lake (1): introduction to Hudi
XML配置文件(DTD详细讲解)
Configuring OSPF GR features for Huawei devices
What is information security? What is included? What is the difference with network security?
Senparc.Weixin.Sample.MP源码剖析
Shardingsphere source code analysis
What if the C disk is not enough? Let's see how I can clean up 25g of temp disk space after I haven't redone the system for 4 years?
MySQL之函数
什么叫做信息安全?包含哪些内容?与网络安全有什么区别?
18. (ArcGIS API for JS) ArcGIS API for JS point collection (sketchviewmodel)
Effet Doppler (déplacement de fréquence Doppler)
MySQL functions
PV静态创建和动态创建
多普勒效应(多普勒频移)
FFMPEG关键结构体——AVCodecContext
[Luogu cf487e] tours (square tree) (tree chain dissection) (line segment tree)
Tools to improve work efficiency: the idea of SQL batch generation tools
上门预约服务类的App功能详解
Miaochai Weekly - 8