当前位置:网站首页>Take a look at semaphore, the current limiting tool provided by JUC
Take a look at semaphore, the current limiting tool provided by JUC
2022-07-05 18:28:00 【The world is wide under the pen】
Microservice architecture has developed to today , Previous calls of various services 、 Interface requests are more and more frequent , The pressure on the server is naturally increasing . If you let all requests go to the server , Whether it's a server or a database , Can be blocked because they can't afford a large number of requests 、 Downtime or even GG, Especially when some query interface requests are particularly frequent , It is necessary to limit the request .
for example : Pin punch data , Tens of millions of data at every turn , This data is requested hundreds of times a second , Nailing is probably enough , There must be a limit to the frequency of calls . Some payment flow , The same principle applies to statements , There are some restrictions on requests .
also , Service interface restrictions , Click the prompt frequently or Tomcat Discard requests that exceed the limit , Are some protective measures implemented to protect Services .
I said before Leaky bucket algorithm 、 Token bucket algorithm 、 Slide the time pane , These are all ways to realize current limiting , today , Let's take a look JUC Provide us with a current limiting tool class ——Semaphore!
Write a simple one first Semaphore Use , Let's start with Semaphore There's an impression , Of course, if you are familiar with it, skip it directly ~
public static void main(String[] args) {
// establish Semaphore, You need to specify the permits, That's the license , Or semaphores
Semaphore semaphore = new Semaphore(3);
// Thread one obtains two licenses ( Semaphore )
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" It's going to work ");
boolean flag = semaphore.tryAcquire(2);
if (flag){
System.out.println(Thread.currentThread().getName() + " Successfully obtained two semaphores , rest 10s");
try {
TimeUnit.SECONDS.sleep(20);
semaphore.release(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// Thread 2 obtains two licenses ( Semaphore )
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + " It's going to work ");
int i = semaphore.availablePermits();
System.out.println(" The remaining semaphore = " + i);
// Insufficient licenses , Will try to wait , until 25S after . At first there was 3 A license , The above thread got 2 individual , Unless
// sleep After the release of , Otherwise, you can't get enough licenses here . Some are similar to token buckets
boolean flag = semaphore.tryAcquire(2,25, TimeUnit.SECONDS);
if (flag) {
System.out.println(Thread.currentThread().getName() + " Successfully obtained two semaphores , rest 10s");
TimeUnit.SECONDS.sleep(10);
}
} catch (Exception e){
e.printStackTrace();
}
}).start();
}
Semaphore The core of permits! That's the license , Or semaphores ! establish Semaphore When , The number of licenses must be specified , A bit like token bucket , Is that you put the token first , Here comes the task , Go and get the token first , Only when you get the token can you perform the task , Wait without a token or kick directly , Return failed ~
Let's take a look at Semaphore Source code , Take a look at the boss Doug Lea How to realize current limiting drop .
// The construction method must be transferred to the number of licenses , Default is not fair
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
// Default unfair implementation
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
Sync(int permits) {
setState(permits);
}
protected final void setState(int newState) {
state = newState;
}
We can see , newly build Semaphore Passed in permits Finally, setting state The numerical ! That is to say permits Namely AbstractQueuedSynchronizer(AQS) Number of locks shared !
Then try to get the semaphore .
// Acquisition semaphore
public boolean tryAcquire(int permits) {
// Get less than 0 The amount of signal , It doesn't make sense at all , abnormal
if (permits < 0) throw new IllegalArgumentException();
// Success or failure
return sync.nonfairTryAcquireShared(permits) >= 0;
}
// Unfair attempt to acquire shared lock
final int nonfairTryAcquireShared(int acquires) {
// Infinite loop
for (;;) {
// Get the current remaining semaphore
int available = getState();
int remaining = available - acquires;
// If the remaining semaphore is less than zero , The explanation is not enough , Go straight back to ,|| The latter part is not executed , Above meeting
// according to >=0 For success , Less than zero is naturally a failure
if (remaining < 0 ||
// CAS Set the remaining semaphore , If the setting is successful, return, otherwise re cycle
compareAndSetState(available, remaining))
return remaining;
}
}
Semaphore is actually the number of shared locks, or state The numerical , Try to get it first state Whether it is larger than the required permits, If it is less than, it is naturally a failure . If it is greater than or equal to , Need to use CAS Replace , Because of the remaining Cache to this thread , There will be concurrency issues , next ,CAS Success returns , If it fails, execute the process again .
Of course , This is just the simplest way to get , Generally, it will be obtained in a certain period of time , Failure occurs only after timeout . That is to say semaphore.tryAcquire(2,25, TimeUnit.SECONDS), Let's have a look ~
// Try to get the semaphore , Expiration time and unit
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
// The semaphore is less than 0, meaningless , abnormal
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
// Try to get semaphore in a fixed time
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
// As mentioned above , Not much to say
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// Get in a fixed time arg A semaphore
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
// Time expired , End return
if (nanosTimeout <= 0L)
return false;
// Current system time plus expiration time , Get the end time point
final long deadline = System.nanoTime() + nanosTimeout;
// Create a new shared node , Associate the current thread , And added to the end of the synchronization queue ,
// If the queue is empty , perform (enq), Initialize synchronization queue , And add the node to the tail
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// loop
for (;;) {
// The previous node of the current node
final Node p = node.predecessor();
// If the node is a header node , Get the semaphore and return
if (p == head) {
int r = tryAcquireShared(arg);
// To be successful
if (r >= 0) {
// Set the head node
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
// The rest of the time
nanosTimeout = deadline - System.nanoTime();
// Be overdue , Direct return failure
if (nanosTimeout <= 0L)
return false;
// Attempt to acquire semaphore failed , Clean up useless or thread terminated nodes in the synchronization queue
// And the remaining time should be greater than 1000ns, Otherwise, there is no need to interrupt , Just continue the cycle ,1000ns Too short
// Program execution takes time , You can directly cycle
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// Interrupt exception
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
This acquisition only needs two steps , If the front node is the head node , That is, only the current thread gets semaphores , Just get it directly . The second step , If it's not the head node , Naturally, you have to wait in line , The node state is set to Node.SIGNAL, At the same time, clear the status in the synchronization queue waitStatus > 0 The node of , Then the thread interrupts , Waiting to wake up .
And one of them setHeadAndPropagate Method , The main meaning is , Semaphore released by the previous thread , There may be surplus after the current thread gets , therefore , Consider waking up the next waiting thread to continue getting semaphores . Personally, this method is a little convoluted , Recommend a Blog , Those who want to study deeply can have a look
// Set the queue header , And check whether the subsequent queue is waiting
private void setHeadAndPropagate(Node node, int propagate) {
// The current node obtains the shared lock , Become the head node
Node h = head;
setHead(node);
//propagate > 0 It indicates that there is residual semaphore , Subsequent nodes can try to acquire semaphores , So it's important doReleaseShared
//propagate == 0 And h.waitStatus < 0 at that time tryAcquireShared There are no shared locks left after , But at a later time, it is likely that the shared lock will be released again .
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
Then the semaphore is released , It is equivalent to token bucket , Walk through the passage , The token must be put back in the bucket
// Semaphore release
public void release(int permits) {
// Release less than 0, It doesn't make sense at all , abnormal
if (permits < 0) throw new IllegalArgumentException();
// Release semaphore
sync.releaseShared(permits);
}
// Release semaphore ( Shared lock )
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Semaphore Realization
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
// or ,release It's a negative number , Or it is beyond the limit of digits , The result is smaller than the current ,
// abnormal : Maximum allowed count exceeded
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS Replace
if (compareAndSetState(current, next))
return true;
}
}
The process of releasing semaphores is also relatively simple , Namely state Add the semaphore to be released , Then proceed CAS Replace it , Of course , There is still a necessary process to go !doReleaseShared!
// Release the shared lock
private void doReleaseShared() {
// loop
for (;;) {
Node h = head;
// There are at least two node, Just one node , There is no need to release
if (h != null && h != tail) {
// Get the header node status
int ws = h.waitStatus;
// The head node is SIGNAL,CAS Wake up the
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// If the state is 0, explain h The thread represented by the successor of has been awakened or is about to be awakened , And this intermediate state is about to disappear , Either because acquire thread Failed to acquire lock and set again head by SIGNAL And block again , Either because acquire thread Get the lock successfully and put yourself (head The subsequent ) Set to new head And as long as head The successor is not the tail of the team , So new head It must be SIGNAL. So set this intermediate state head Of status by PROPAGATE, Let it status It turns negative again , This may be detected by the wakeup thread
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// head Exit the loop unchanged , Otherwise, multiple loops will be executed .
if (h == head) // loop if head changed
break;
}
}
Semaphore The main logic of is like this , If it's just a simple attempt to get semaphores , It's direct modification state! But generally, there must be an expiration time of the attempt (timeout) Of , That is to say semaphore.tryAcquire(2,25, TimeUnit.SECONDS);
This is the time , It needs to be used CLH There's a queue , That is, blocking and waiting , Because the semaphore obtained each time is inconsistent , Maybe a thread releases 6 A semaphore , Then the next three threads , Each gets two , The thread will be awakened at the time of acquisition , The logic here seems to be more convoluted , It's better to make some data locally ,debug look down , This should be better understand Semaphore Acquisition and release of signal quantity .
Okay , Just come here game over 了 , If it feels OK , A great bai ~
no sacrifice,no victory~
边栏推荐
- lombok @Builder注解
- Vulnhub's darkhole_ two
- Clickhouse (03) how to install and deploy Clickhouse
- 《力扣刷题计划》复制带随机指针的链表
- 兄弟组件进行传值(显示有先后顺序)
- Crontab 日志:如何记录我的 Cron 脚本的输出
- How to automatically install pythn third-party libraries
- Electron installation problems
- Sophon CE Community Edition is online, and free get is a lightweight, easy-to-use, efficient and intelligent data analysis tool
- Fix vulnerability - mysql, ES
猜你喜欢
使用Jmeter虚拟化table失败
rust统计文件中单词出现的次数
基于can总线的A2L文件解析(3)
Introduction to the development function of Hanlin Youshang system of Hansheng Youpin app
To solve the stubborn problem of Lake + warehouse hybrid architecture, xinghuan Technology launched an independent and controllable cloud native Lake warehouse integrated platform
Sophon autocv: help AI industrial production and realize visual intelligent perception
How to obtain the coordinates of the aircraft passing through both ends of the radar
Vulnhub's darkhole_ two
Sophon base 3.1 launched mlops function to provide wings for the operation of enterprise AI capabilities
Pytorch yolov5 training custom data
随机推荐
案例分享|金融业数据运营运维一体化建设
彻底理解为什么网络 I/O 会被阻塞?
About Statistical Power(统计功效)
The easycvr platform reports an error "ID cannot be empty" through the interface editing channel. What is the reason?
金太阳开户安全吗?万一免5开户能办理吗?
MYSQL中 find_in_set() 函数用法详解
分享:中兴 远航 30 pro root 解锁BL magisk ZTE 7532N 8040N 9041N 刷机 刷面具原厂刷机包 root方法下载
@Extension、@SPI注解原理
Is it complicated to open an account? Is online account opening safe?
使用Jmeter虚拟化table失败
瞅一瞅JUC提供的限流工具Semaphore
How to improve the thermal management in PCB design with the effective placement of thermal through holes?
Let more young people from Hong Kong and Macao know about Nansha's characteristic cultural and creative products! "Nansha kylin" officially appeared
如何获取飞机穿过雷达两端的坐标
让更多港澳青年了解南沙特色文创产品!“南沙麒麟”正式亮相
怎么自动安装pythn三方库
瀚升优品app翰林优商系统开发功能介绍
Penetrate the whole intranet through socks agent
【在优麒麟上使用Electron开发桌面应】
websocket 工具的使用