当前位置:网站首页>并发编程之并发工具集
并发编程之并发工具集
2022-07-25 13:10:00 【紫乾2014】
一、CountDownLatch
计数器工具.
await
countDown
1.1 CountDownLatch简单应用
CountDownExample.java
import java.util.concurrent.CountDownLatch;
public class CountDownExample {
static CountDownLatch countDownLatch=new CountDownLatch(1);
static class Thread1 extends Thread{
@Override
public void run(){
//TODO
try {
Thread.sleep(500);
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行结束");
//表示我已经干完了
}
}
static class Thread2 extends Thread{
@Override
public void run(){
try {
Thread.sleep(500);
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行结束");
}
}
static class Thread3 extends Thread{
@Override
public void run(){
try {
Thread.sleep(500);
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行结束");
}
}
public static void main(String[] args) throws InterruptedException {
Thread1 t1=new Thread1();
t1.start();
Thread2 t2=new Thread2();
t2.start();
Thread3 t3=new Thread3();
t3.start();
System.out.println("主线程执行结束");
countDownLatch.countDown();
}
}
1.2 CountDownLatch的业务应用
在启动应用的时候,去对第三方的应用做健康检测
BaseHealthChecker.java
public abstract class BaseHealthChecker implements Runnable{
private String serviceName; //服务名称
private boolean serviceUp;
public BaseHealthChecker(String serviceName) {
this.serviceName = serviceName;
}
@Override
public void run() {
try {
verifyService();
serviceUp=true;
}catch (Exception e){
serviceUp=false;
}finally {
}
}
/**
* 检查服务的健康情况
*/
public abstract void verifyService() throws Exception;
public String getServiceName() {
return serviceName;
}
public boolean isServiceUp() {
return serviceUp;
}
}
CacheHealthChecker.java
import java.util.concurrent.CountDownLatch;
public class CacheHealthChecker extends BaseHealthChecker{
private CountDownLatch countDownLatch;
public CacheHealthChecker(CountDownLatch countDownLatch) {
super("CacheHealthChecker");
this.countDownLatch=countDownLatch;
}
@Override
public void verifyService() throws Exception {
System.out.println("Checking:"+this.getServiceName());
try {
Thread.sleep(1000);
// 如果检查失败,throw RuntimeException()
} catch (Exception e) {
throw e;
}
countDownLatch.countDown();
System.out.println(this.getServiceName()+" 健康状态正常");
}
}
DatabaseHealthChecker.java
import java.util.concurrent.CountDownLatch;
public class DatabaseHealthChecker extends BaseHealthChecker{
private CountDownLatch countDownLatch;
public DatabaseHealthChecker(CountDownLatch countDownLatch) {
super("DatabaseHealthChecker");
this.countDownLatch=countDownLatch;
}
@Override
public void verifyService() throws Exception {
System.out.println("Checking:"+this.getServiceName());
try {
Thread.sleep(1000);
} catch (Exception e) {
throw e;
}
countDownLatch.countDown();
System.out.println(this.getServiceName()+" 健康状态正常");
}
}
ApplicationStartup.java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ApplicationStartup {
private static List<BaseHealthChecker> services;
private static CountDownLatch countDownLatch=new CountDownLatch(2);
static{
services=new ArrayList<>();
services.add(new CacheHealthChecker(countDownLatch));
services.add(new DatabaseHealthChecker(countDownLatch));
}
private final static ApplicationStartup INSTANCE=new ApplicationStartup();
private ApplicationStartup(){
}
public static ApplicationStartup getInstance(){
return INSTANCE;
}
public static boolean checkExternalServices() throws InterruptedException {
for(BaseHealthChecker bh:services){
new Thread(bh).start(); //针对每个服务采用线程来执行
}
countDownLatch.await();
return true;
}
}
StartupMain.java
public class StartupMain {
public static void main(String[] args) {
try {
ApplicationStartup.checkExternalServices();
} catch (InterruptedException e) {
//有问题了.
}
System.out.println("服务启动成功");
}
}
1.3 CountDownLatch的实现原理
它可以让一个线程阻塞
也可以让多个线程阻塞
共享锁的实现。可以允许多个线程同时抢占到锁,然后等到计数器归零的时候,同时唤醒.
state记录计数器.
countDown的时候,实际上就是 state–
二、Semaphore
2.1 semaphore介绍
信号灯.
限流器,限制资源的访问.
本质上:抢占一个令牌,如果抢占到令牌,就通行, 否则,就阻塞!
acquire() 抢占一个令牌
release() 释放一个令牌.
Semaphore semaphore=new Semaphore(10);
acquire = 10 - 1
为0的时候,阻塞,有可能同时阻塞N个线程
release = 令牌+1
有令牌了,唤醒。从阻塞的线程中去唤醒。
为什么要用共享锁?
因为同时可以释放多个令牌,那么意味着可以同时有多个线程抢占到锁。
2.2 semaphore应用
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreExample {
public static void main(String[] args) {
//限制资源的并发数量.
Semaphore semaphore=new Semaphore(10);
for (int i = 0; i < 20; i++) {
new Car(i,semaphore).start();
}
}
static class Car extends Thread{
private int num;
private Semaphore semaphore;
public Car(int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run(){
try {
semaphore.acquire(); //获得一个令牌
System.out.println("第 "+num+"俩车抢到一个车位");
TimeUnit.SECONDS.sleep(2);
System.out.println("第 "+num+"走喽~");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release(); //释放一个令牌
}
}
}
}
三、CyclicBarrier
可重复的栅栏。实现相当于多个线程通过CountDownLatch的await,然后另外一个线程使用countDown方法来唤醒。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int n = 4;
CyclicBarrier barrier = new CyclicBarrier(4, () -> {
System.out.println("所有线程都写入完成,继续处理其他任务");
}); // 4
for (int i = 0; i < n; i++) {
new Writer(barrier).start();
}
}
static class Writer extends Thread {
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier barrier) {
this.cyclicBarrier = barrier;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "写入数据完毕,等待其他线程");
cyclicBarrier.await(); //-1的动作
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
边栏推荐
- JS sorts according to the attributes of the elements in the array
- 牛客论坛项目部署总结
- OAuth, JWT, oidc, you mess me up
- Excel import and export source code analysis
- 录制和剪辑视频,如何解决占用空间过大的问题?
- Brpc source code analysis (III) -- the mechanism of requesting other servers and writing data to sockets
- Use vsftpd service to transfer files (anonymous user authentication, local user authentication, virtual user authentication)
- 【CSDN 年终总结】结束与开始,一直在路上—— “1+1=王”的2021总结
- Emqx cloud update: more parameters are added to log analysis, which makes monitoring, operation and maintenance easier
- 0713RHCSA
猜你喜欢

Django 2 ----- database and admin

G027-OP-INS-RHEL-04 RedHat OpenStack 创建自定义的QCOW2格式镜像

网络空间安全 渗透攻防9(PKI)

卷积神经网络模型之——GoogLeNet网络结构与代码实现

Numpy简介和特点(一)

OAuth, JWT, oidc, you mess me up

【OpenCV 例程 300篇】239. Harris 角点检测之精确定位(cornerSubPix)

好友让我看这段代码

基于百问网IMX6ULL_PRO开发板驱动AP3216实验

Shell common script: get the IP address of the network card
随机推荐
Detailed explanation of the training and prediction process of deep learning [taking lenet model and cifar10 data set as examples]
Django 2 ----- database and admin
Zero basic learning canoe panel (13) -- trackbar
Shell common script: get the IP address of the network card
面试官问我:Mysql的存储引擎你了解多少?
MLX90640 红外热成像仪测温传感器模块开发笔记(五)
如何理解Keras中的指标Metrics
Docekr学习 - MySQL8主从复制搭建部署
Convolutional neural network model -- alexnet network structure and code implementation
R language GLM generalized linear model: logistic regression, Poisson regression fitting mouse clinical trial data (dose and response) examples and self-test questions
ThreadLocal&Fork/Join
Atcoder beginer contest 261 f / / tree array
The programmer's father made his own AI breast feeding detector to predict that the baby is hungry and not let the crying affect his wife's sleep
基于百问网IMX6ULL_PRO开发板驱动AP3216实验
MySQL remote connection permission error 1045 problem
0716RHCSA
备战2022 CSP-J1 2022 CSP-S1 初赛 视频集
【重温SSM框架系列】15 - SSM系列博文总结【SSM杀青篇】
Eccv2022 | transclassp class level grab posture migration
程序员成长第二十七篇:如何评估需求优先级?