当前位置:网站首页>CountDownLatch使用及原理
CountDownLatch使用及原理
2022-08-04 21:40:00 【m0_67595943】
一、概念
CountDownLatch可以使一个或多个线程等待其他线程各自执行完毕后再执行。
CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。
二、常用方法说明
CountDownLatch(int count):构造方法,创建一个值为count 的计数器。
await():阻塞当前线程,将当前线程加入阻塞队列。
await(long timeout, TimeUnit unit):在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行,
countDown():对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。
三、使用场景
3.1 多线程优化报表统计
3.1.1 功能现状
运营系统有统计报表、业务为统计每日的用户新增数量、订单数量、商品的总销量、总销售额......等多项指标统一展示出来,因为数据量比较大,统计指标涉及到的业务范围也比较多,所以这个统计报表的页面一直加载很慢,所以需要对统计报表这块性能需进行优化。
3.1.2 问题分析
统计报表页面涉及到的统计指标数据比较多,每个指标需要单独的去查询统计数据库数据,单个指标只要几秒钟,但是页面的指标有10多个,所以整体下来页面渲染需要将近一分钟。
3.1.3 解决方案
任务时间长是因为统计指标多,而且指标是串行的方式去进行统计的,我们只需要考虑把这些指标从串行化的执行方式改成并行的执行方式,那么整个页面的时间的渲染时间就会大大的缩短, 如何让多个线程同步的执行任务,我们这里考虑使用多线程,每个查询任务单独创建一个线程去执行,这样每个统计指标就可以并行的处理了。
3.1.4 要求
因为主线程需要每个线程的统计结果进行聚合,然后返回给前端渲染,所以这里需要提供一种机制让主线程等所有的子线程都执行完之后再对每个线程统计的指标进行聚合。 这里我们使用CountDownLatch 来完成此功能。
3.1.5 模拟代码
1、分别统计4个指标用户新增数量、订单数量、商品的总销量、总销售额;
2、假设每个指标执行时间为3秒。如果是串行化的统计方式那么总执行时间会为12秒。
3、我们这里使用多线程并行,开启4个子线程分别进行统计
4、主线程等待4个子线程都执行完毕之后,返回结果给前端。
【代码】
public class CountDownLatchTest {
//用于聚合所有的统计指标
private static Map map = new HashMap();
//创建计数器,这里需要统计4个指标
private static CountDownLatch countDownLatch = new CountDownLatch(4);
public static void main(String[] args) {
//记录开始时间
long startTime = System.currentTimeMillis();
Thread countUserThread = new Thread(() -> {
try {
System.out.println("正在统计新增用户数量");
//任务执行需要3秒
Thread.sleep(3000);
//保存结果值
map.put("userNumber", 1);
//标记已经完成一个任务
countDownLatch.countDown();
System.out.println("统计新增用户数量完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread countOrderThread = new Thread(() -> {
try {
System.out.println("正在统计订单数量");
//任务执行需要3秒
Thread.sleep(3000);
//保存结果值
map.put("countOrder", 2);
//标记已经完成一个任务
countDownLatch.countDown();
System.out.println("统计订单数量完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread countGoodsThread = new Thread(() -> {
try {
System.out.println("正在商品销量");
//任务执行需要3秒
Thread.sleep(3000);
//保存结果值
map.put("countGoods", 3);
//标记已经完成一个任务
countDownLatch.countDown();
System.out.println("统计商品销量完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread countmoneyThread = new Thread(() -> {
try {
System.out.println("正在总销售额");
//任务执行需要3秒
Thread.sleep(3000);
//保存结果值
map.put("countmoney", 4);
//标记已经完成一个任务
countDownLatch.countDown();
System.out.println("统计销售额完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//启动子线程执行任务
countUserThread.start();
countGoodsThread.start();
countOrderThread.start();
countmoneyThread.start();
try {
//主线程等待所有统计指标执行完毕
countDownLatch.await();
//记录结束时间
long endTime = System.currentTimeMillis();
System.out.println("------统计指标全部完成--------");
System.out.println("统计结果为:" + map.toString());
System.out.println("任务总执行时间为" + (endTime - startTime) / 1000 + "秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
【结果】
正在统计新增用户数量
正在统计订单数量
正在总销售额
正在商品销量
统计新增用户数量完毕
统计订单数量完毕
统计商品销量完毕
------统计指标全部完成--------
统计销售额完毕
统计结果为:{countmoney=4, userNumber=1, countGoods=3}
任务总执行时间为3秒
Process finished with exit code 0
3.2 模拟高并发场景
3.2.1 场景说明
创建开启n个线程,等线程都创建准备好后,再同时执行业务逻辑,达到模拟并发场景的目标。
3.2.2 模拟代码
【代码】
public class CountDownLatchTest2 {
public static void main(String[] args) {
//模拟线程数
int times = 1000;
//用于模拟线程安全
final AtomicInteger atomicInteger = new AtomicInteger(0);
//用于模拟线程非安全
final Member member = new Member();
//相当于计数器,当所有线程都准备好了,再一起执行,模仿多并发,保证并发量
final CountDownLatch countDownLatch = new CountDownLatch(times);
//保证所有线程执行完了再打印atomicInteger、用户年龄的值
final CountDownLatch countDownLatch2 = new CountDownLatch(times);
ExecutorService executorService = Executors.newFixedThreadPool(10);
try {
for (int i = 0; i < times; i++) {
executorService.submit(() -> {
try {
//一直阻塞当前线程,直到计时器的值为0,保证同时并发
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("当前线程执行时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
//数量自增1:线程安全(执行n次,每次都是1000)
atomicInteger.incrementAndGet();
//用户年龄自增1:线程不安全(执行n次,每次结果可能不一样)
member.setAge(member.getAge() + 1);
//当前线程执行完,计数器减一
countDownLatch2.countDown();
});
countDownLatch.countDown();
}
//保证所有线程执行完
countDownLatch2.await();
//打印结果
System.out.println("atomicInteger = " + atomicInteger);
System.out.println("age = " + member.getAge());
executorService.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class Member {
private int age;
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
【结果】
--备注:前面的时间省略
当前线程执行时间:2022-03-18 13:56:29:011
当前线程执行时间:2022-03-18 13:56:29:010
当前线程执行时间:2022-03-18 13:56:29:011
当前线程执行时间:2022-03-18 13:56:29:010
当前线程执行时间:2022-03-18 13:56:29:010
当前线程执行时间:2022-03-18 13:56:29:010
当前线程执行时间:2022-03-18 13:56:29:011
当前线程执行时间:2022-03-18 13:56:29:011
当前线程执行时间:2022-03-18 13:56:29:011
当前线程执行时间:2022-03-18 13:56:29:011
当前线程执行时间:2022-03-18 13:56:29:011
atomicInteger = 1000
age = 995
Process finished with exit code 0
四、实现原理
4.1 创建计数器
当我们调用CountDownLatch countDownLatch=new CountDownLatch(4) 时候,此时会创建一个AQS的同步队列,并把创建CountDownLatch 传进来的计数器赋值给AQS队列的 state,所以state的值也代表CountDownLatch所剩余的计数次数。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);//创建同步队列,并设置初始计数器值
}
4.2 阻塞线程
当我们调用countDownLatch.wait()的时候,会创建一个节点,加入到AQS阻塞队列,并同时把当前线程挂起。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
判断计数器是技术完毕,未完毕则把当前线程加入阻塞队列
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//锁重入次数大于0 则新建节点加入阻塞队列,挂起当前线程
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
构建阻塞队列的双向链表,挂起当前线程
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//新建节点加入阻塞队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获得当前节点pre节点
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);//返回锁的state
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//重组双向链表,清空无效节点,挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
4.3 计数器递减
当我们调用countDownLatch.down()方法的时候,会对计数器进行减1操作,AQS内部是通过释放锁的方式,对state进行减1操作,当state=0的时候证明计数器已经递减完毕,此时会将AQS阻塞队列里的节点线程全部唤醒。
public void countDown() {
//递减锁重入次数,当state=0时唤醒所有阻塞线程
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//递减锁的重入次数
if (tryReleaseShared(arg)) {
doReleaseShared();//唤醒队列所有阻塞的节点
return true;
}
return false;
}
private void doReleaseShared() {
//唤醒所有阻塞队列里面的线程
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//节点是否在等待唤醒状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改状态为初始
continue;
unparkSuccessor(h);//成功则唤醒线程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
边栏推荐
- What does Xinchuang mean?Which industries are involved?Why develop Xinchuang?
- PCBA scheme design - kitchen voice scale chip scheme
- unity2D横版游戏教程8-音效
- 驱动点云格式修改带来的效率提升
- Red team kill-free development practice of simulated confrontation
- 七夕特制:《牛郎会织女》
- 【分布式】分布式ID生成策略
- deepstream多相机显示布局
- 搬走地下空间开发利用“绊脚石” 中地数码取得地下空间透明化技术突破
- LeetCode: 406. 根据身高重建队列
猜你喜欢
模拟对抗之红队免杀开发实践
[2022 Nioke Duo School 5 A Question Don't Starve] DP
MySQL查询为啥慢了?
EasyGBS接入最新版海康摄像头后无法传递告警信息该如何解决?
1319_STM32F103串口BootLoader移植
立即升级!WPS Office 出现 0day 高危安全漏洞:可完全接管系统,官方推出紧急更新
Red team kill-free development practice of simulated confrontation
[QT] Implementation of callback function
In which industries is the PMP certificate useful?
buu web
随机推荐
SPSS-System Clustering Hand Calculation Practice
【CC3200AI 实验教程 1】疯壳·AI语音人脸识别(会议记录仪/人脸打卡机)-开发环境搭建
Altium Designer 19.1.18 - 保护锁定的对象
DSPE-PEG-Aldehyde,DSPE-PEG-CHO,磷脂-聚乙二醇-醛基一种疏水18碳磷脂
用Tesseract开发一个你自己的文字识别应用
数电快速入门(二)(复合逻辑运算和逻辑代数的基本定律的介绍)
UnicodeDecodeError: ‘utf-8‘ codec can‘t decode byte 0xd6 in position 120: invalid continuation byte
LocalDate时间日期包的用法
Milvus configuration related
js data type, throttling/anti-shake, click event delegation optimization, transition animation
【PCBA program design】Grip dynamometer program
Cocoa Application-test
docker 搭建mysql 主从复制
国际项目管理师PMP证书,值得考嘛?
热力学相关的两个定律
七夕特制:《牛郎会织女》
中大型商业银行堡垒机升级改造方案!必看!
看看XDOC如何做Word文档预览
强网杯2022——WEB
JdbcTemplate概述和测试