当前位置:网站首页>瞅一瞅JUC提供的限流工具Semaphore
瞅一瞅JUC提供的限流工具Semaphore
2022-07-05 18:16:00 【笔下天地宽】
微服务架构发展到今天,各个服务之前的调用、接口请求越来越频繁,服务器承受的压力自然也越来越大。如果放任所有请求请求到服务器,不管是服务器也好还是数据库也好,都可能被因为无法承受大批量的请求而阻塞、宕机甚至是GG,尤其是一些查询接口请求特别频繁的情况下,是需要对请求进行一定限制的。
譬如:钉钉打卡数据,动辄几十上百万数据,这种数据一秒钟来个几百次请求,钉钉估计也够呛,调用频率肯定有限制。一些支付流水,对账单也是这个原理,在请求方面都有一些限制。
还有,服务接口限制,频繁点击提示亦或是Tomcat对超出限制的请求进行丢弃等,都是为了保护服务而实行的一些保护措施。
之前已经说了漏桶算法、令牌桶算法、滑动时间窗格,这些都是限流实现的一种方式,今天,我们来看一看JUC给我们提供的一个限流工具类——Semaphore!
先写一个简单的Semaphore使用,大家先对Semaphore有个印象,当然比较熟悉的直接略过哈~
public static void main(String[] args) {
//创建Semaphore,需要指定permits,也就是许可证,或者说信号量
Semaphore semaphore = new Semaphore(3);
// 线程一获取两个许可证(信号量)
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"开始执行了");
boolean flag = semaphore.tryAcquire(2);
if (flag){
System.out.println(Thread.currentThread().getName() + "成功获取到两个信号量,休息10s");
try {
TimeUnit.SECONDS.sleep(20);
semaphore.release(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 线程二获取两个许可证(信号量)
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + "开始执行了");
int i = semaphore.availablePermits();
System.out.println("剩余信号量 = " + i);
// 许可证不够,会进行等待尝试,直到25S之后。刚开始有3个许可证,上面线程获取到了2个,除非
// sleep之后释放,否则这里无法获取到足够的许可证。有些类似令牌桶
boolean flag = semaphore.tryAcquire(2,25, TimeUnit.SECONDS);
if (flag) {
System.out.println(Thread.currentThread().getName() + "成功获取到两个信号量,休息10s");
TimeUnit.SECONDS.sleep(10);
}
} catch (Exception e){
e.printStackTrace();
}
}).start();
}
Semaphore的核心就是permits!也就是许可证,或者说信号量!创建Semaphore的时候,必须指定许可证的数量,有点类似令牌桶,就是你先放好令牌,任务来了,先去取令牌,拿到令牌才能执行任务,没令牌就等着或者直接尥蹶子,返回失败~
下面就来看一看Semaphore的源码,看一看大佬Doug Lea是怎么实现限流滴。
//构造方法必须传许可证数量,默认非公平
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
// 默认非公平实现
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
Sync(int permits) {
setState(permits);
}
protected final void setState(int newState) {
state = newState;
}
我们可以看到,新建Semaphore中传递的permits最终也就是设置state的数值!也就是说这个permits就是AbstractQueuedSynchronizer(AQS)中共享锁的个数!
接着就是尝试获取信号量了。
// 获取信号量
public boolean tryAcquire(int permits) {
// 获取小于0的信号量,完全没意义,异常
if (permits < 0) throw new IllegalArgumentException();
// 是否获取成功
return sync.nonfairTryAcquireShared(permits) >= 0;
}
// 非公平尝试获取共享锁
final int nonfairTryAcquireShared(int acquires) {
// 无限循环
for (;;) {
// 获取当前剩余的信号量
int available = getState();
int remaining = available - acquires;
// 如果剩余信号量小于零,说明不够,直接返回,||后面部分不执行,上文会
// 根据>=0为成功,小于零自然是失败
if (remaining < 0 ||
// CAS 设置剩余信号量,设置成功返回否则重新循环
compareAndSetState(available, remaining))
return remaining;
}
}
信号量其实也就是共享锁个数或者说state的数值,尝试去获取就先看下state是否大于需要的permits,小于的话自然是失败。如果大于或者等于,需要使用CAS进行替换,因为这里的remaining缓存到本线程了,会有并发问题,接着,CAS成功就返回,失败就重新执行这个过程。
当然,这种只是最简单的获取方式,一般是会在某个时间段内获取,超时之后才算失败。也就是semaphore.tryAcquire(2,25, TimeUnit.SECONDS),一起来瞅瞅~
// 尝试获取信号量,过期时间以及单位
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
//信号量小于0,无意义,异常
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
// 固定时间内尝试获取信号量
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
//上文已讲过,不多说
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// 固定时间内获取arg个信号量
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
// 时间过期,结束返回
if (nanosTimeout <= 0L)
return false;
//当前系统时间加过期时间,获取结束时间点
final long deadline = System.nanoTime() + nanosTimeout;
//新建共享节点,关联当前线程,并添加到同步队列尾部,
//如果队列为空,执行(enq),初始化同步队列,并将节点添加到尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 循环
for (;;) {
// 当前节点的前一个节点
final Node p = node.predecessor();
//如果节点是头结点,获取信号量并返回
if (p == head) {
int r = tryAcquireShared(arg);
// 获取成功
if (r >= 0) {
// 设置头结点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
// 剩余时间
nanosTimeout = deadline - System.nanoTime();
// 过期,直接返回失败
if (nanosTimeout <= 0L)
return false;
// 尝试获取信号量失败,清理同步队列中无用或者已经线程结束的节点
// 并且剩余时间要大于1000ns,否则不用中断,继续循环即可,1000ns太短
// 程序执行需要时间,可以直接进行循环处理
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//中断异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这个获取只要分两步,如果前置节点就是头节点,也就是说就只有当前线程获取信号量,直接去获取即可。第二步,如果不是头结点,自然要排队等待,节点状态设置为Node.SIGNAL,同时清理同步队列中已经状态waitStatus > 0的节点,然后线程中断,等待唤醒。
其中还有一个setHeadAndPropagate方法,主要的意思就是,上一个线程释放的信号量,可能当前线程获取之后还有盈余,于是,考虑唤醒下一个等待的线程继续获取信号量。这个方法个人感觉理解也是有点绕,推荐一个博客,想深究的可以去瞅下
//设置队列头,并检查后续队列是否正在等待
private void setHeadAndPropagate(Node node, int propagate) {
// 当前节点获取到共享锁,成为头节点
Node h = head;
setHead(node);
//propagate > 0 说明有剩余的信号量,后续节点可以尝试获取信号量,故要doReleaseShared
//propagate == 0 且h.waitStatus < 0当时tryAcquireShared后没有共享锁剩余,但之后的时刻很可能又有共享锁释放出来了。
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
接着就是信号量释放了,也就是相当于令牌桶,走完通道,令牌肯定要放回桶里呗
//信号量释放
public void release(int permits) {
// 释放小于0,完全没意义,异常
if (permits < 0) throw new IllegalArgumentException();
// 释放信号量
sync.releaseShared(permits);
}
// 释放信号量(共享锁)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Semaphore实现
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
// 要么,release是负数,要么就是超出位数限制,导致比当前还小,
// 异常:超过最大允许计数
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS替换
if (compareAndSetState(current, next))
return true;
}
}
释放信号量的过程也比较简单,就是state重新加上要释放的信号量,然后进行CAS替换即可,当然,还要走一个必须的过程!doReleaseShared!
//释放共享锁
private void doReleaseShared() {
//循环
for (;;) {
Node h = head;
//至少有两个node,就一个节点,也不用释放了
if (h != null && h != tail) {
//获取头结点状态
int ws = h.waitStatus;
// 头结点是SIGNAL,CAS唤醒
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//如果状态为0,说明h的后继所代表的线程已经被唤醒或即将被唤醒,并且这个中间状态即将消失,要么由于acquire thread获取锁失败再次设置head为SIGNAL并再次阻塞,要么由于acquire thread获取锁成功而将自己(head后继)设置为新head并且只要head后继不是队尾,那么新head肯定为SIGNAL。所以设置这种中间状态的head的status为PROPAGATE,让其status又变成负数,这样可能被被唤醒线程检测到
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// head不变退出循环,不然会执行多次循环。
if (h == head) // loop if head changed
break;
}
}
Semaphore的主要逻辑就是这样,如果只是简单的尝试获取信号量的话,就是直接修改state!不过一般这种肯定是会有一个尝试的过期时间(timeout)的,也就是semaphore.tryAcquire(2,25, TimeUnit.SECONDS);
这个时候,就需要用到CLH队列了,也就是阻塞等待,由于每次尝试获取的信号量不一致,可能一个线程释放6个信号量,然后后面三个线程,每个都获取两个,线程就会在获取的时候一次被唤醒,这里的逻辑看起来就比较绕,最好是本地造一些数据,debug看下,这样应该能更好的 理解Semaphore中信号量的获取与释放。
好了,到这里就game over了,感觉还行的话,点个赞呗~
no sacrifice,no victory~
边栏推荐
- 【pm2详解】
- LeetCode 6109. 知道秘密的人数
- [use electron to develop desktop on youqilin]
- 如何获取飞机穿过雷达两端的坐标
- 【在優麒麟上使用Electron開發桌面應】
- Is it safe to open an account and register stocks for stock speculation? Is there any risk? Is it reliable?
- Sophon autocv: help AI industrial production and realize visual intelligent perception
- 从XML架构生成类
- 个人对卷积神经网络的理解
- Electron installation problems
猜你喜欢
图片数据不够?我做了一个免费的图像增强软件
Fix vulnerability - mysql, ES
LeetCode 6111. 螺旋矩阵 IV
Vulnhub's darkhole_ two
Record eval() and no in pytoch_ grad()
第十一届中国云计算标准和应用大会 | 华云数据成为全国信标委云计算标准工作组云迁移专题组副组长单位副组长单位
瀚升优品app翰林优商系统开发功能介绍
Copy the linked list with random pointer in the "Li Kou brush question plan"
星环科技重磅推出数据要素流通平台Transwarp Navier,助力企业实现隐私保护下的数据安全流通与协作
吳恩達團隊2022機器學習課程,來啦
随机推荐
热通孔的有效放置如何改善PCB设计中的热管理?
小白入门NAS—快速搭建私有云教程系列(一)[通俗易懂]
node_ Exporter memory usage is not displayed
MATLAB中print函数使用
华夏基金:基金行业数字化转型实践成果分享
Introduction to Resampling
The 10th global Cloud Computing Conference | Huayun data won the "special contribution award for the 10th anniversary of 2013-2022"
Clickhouse (03) how to install and deploy Clickhouse
About Estimation with Cross-Validation
Nacos distributed transactions Seata * * install JDK on Linux, mysql5.7 start Nacos configure ideal call interface coordination (nanny level detail tutorial)
Electron安装问题
What is the reason why the video cannot be played normally after the easycvr access device turns on the audio?
buuctf-pwn write-ups (9)
Thoroughly understand why network i/o is blocked?
Isprs2022 / Cloud Detection: Cloud Detection with Boundary nets Boundary Networks Based Cloud Detection
从XML架构生成类
ConvMAE(2022-05)
兄弟组件进行传值(显示有先后顺序)
matlab内建函数怎么不同颜色,matlab分段函数不同颜色绘图
jdbc读大量数据导致内存溢出