当前位置:网站首页>Take a look at semaphore, the current limiting tool provided by JUC
Take a look at semaphore, the current limiting tool provided by JUC
2022-07-05 18:28:00 【The world is wide under the pen】
Microservice architecture has developed to today , Previous calls of various services 、 Interface requests are more and more frequent , The pressure on the server is naturally increasing . If you let all requests go to the server , Whether it's a server or a database , Can be blocked because they can't afford a large number of requests 、 Downtime or even GG, Especially when some query interface requests are particularly frequent , It is necessary to limit the request .
for example : Pin punch data , Tens of millions of data at every turn , This data is requested hundreds of times a second , Nailing is probably enough , There must be a limit to the frequency of calls . Some payment flow , The same principle applies to statements , There are some restrictions on requests .
also , Service interface restrictions , Click the prompt frequently or Tomcat Discard requests that exceed the limit , Are some protective measures implemented to protect Services .
I said before Leaky bucket algorithm 、 Token bucket algorithm 、 Slide the time pane , These are all ways to realize current limiting , today , Let's take a look JUC Provide us with a current limiting tool class ——Semaphore!
Write a simple one first Semaphore Use , Let's start with Semaphore There's an impression , Of course, if you are familiar with it, skip it directly ~
public static void main(String[] args) {
// establish Semaphore, You need to specify the permits, That's the license , Or semaphores
Semaphore semaphore = new Semaphore(3);
// Thread one obtains two licenses ( Semaphore )
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" It's going to work ");
boolean flag = semaphore.tryAcquire(2);
if (flag){
System.out.println(Thread.currentThread().getName() + " Successfully obtained two semaphores , rest 10s");
try {
TimeUnit.SECONDS.sleep(20);
semaphore.release(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// Thread 2 obtains two licenses ( Semaphore )
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + " It's going to work ");
int i = semaphore.availablePermits();
System.out.println(" The remaining semaphore = " + i);
// Insufficient licenses , Will try to wait , until 25S after . At first there was 3 A license , The above thread got 2 individual , Unless
// sleep After the release of , Otherwise, you can't get enough licenses here . Some are similar to token buckets
boolean flag = semaphore.tryAcquire(2,25, TimeUnit.SECONDS);
if (flag) {
System.out.println(Thread.currentThread().getName() + " Successfully obtained two semaphores , rest 10s");
TimeUnit.SECONDS.sleep(10);
}
} catch (Exception e){
e.printStackTrace();
}
}).start();
}
Semaphore The core of permits! That's the license , Or semaphores ! establish Semaphore When , The number of licenses must be specified , A bit like token bucket , Is that you put the token first , Here comes the task , Go and get the token first , Only when you get the token can you perform the task , Wait without a token or kick directly , Return failed ~
Let's take a look at Semaphore Source code , Take a look at the boss Doug Lea How to realize current limiting drop .
// The construction method must be transferred to the number of licenses , Default is not fair
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
// Default unfair implementation
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
Sync(int permits) {
setState(permits);
}
protected final void setState(int newState) {
state = newState;
}
We can see , newly build Semaphore Passed in permits Finally, setting state The numerical ! That is to say permits Namely AbstractQueuedSynchronizer(AQS) Number of locks shared !
Then try to get the semaphore .
// Acquisition semaphore
public boolean tryAcquire(int permits) {
// Get less than 0 The amount of signal , It doesn't make sense at all , abnormal
if (permits < 0) throw new IllegalArgumentException();
// Success or failure
return sync.nonfairTryAcquireShared(permits) >= 0;
}
// Unfair attempt to acquire shared lock
final int nonfairTryAcquireShared(int acquires) {
// Infinite loop
for (;;) {
// Get the current remaining semaphore
int available = getState();
int remaining = available - acquires;
// If the remaining semaphore is less than zero , The explanation is not enough , Go straight back to ,|| The latter part is not executed , Above meeting
// according to >=0 For success , Less than zero is naturally a failure
if (remaining < 0 ||
// CAS Set the remaining semaphore , If the setting is successful, return, otherwise re cycle
compareAndSetState(available, remaining))
return remaining;
}
}
Semaphore is actually the number of shared locks, or state The numerical , Try to get it first state Whether it is larger than the required permits, If it is less than, it is naturally a failure . If it is greater than or equal to , Need to use CAS Replace , Because of the remaining Cache to this thread , There will be concurrency issues , next ,CAS Success returns , If it fails, execute the process again .
Of course , This is just the simplest way to get , Generally, it will be obtained in a certain period of time , Failure occurs only after timeout . That is to say semaphore.tryAcquire(2,25, TimeUnit.SECONDS), Let's have a look ~
// Try to get the semaphore , Expiration time and unit
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
// The semaphore is less than 0, meaningless , abnormal
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
// Try to get semaphore in a fixed time
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
// As mentioned above , Not much to say
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// Get in a fixed time arg A semaphore
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
// Time expired , End return
if (nanosTimeout <= 0L)
return false;
// Current system time plus expiration time , Get the end time point
final long deadline = System.nanoTime() + nanosTimeout;
// Create a new shared node , Associate the current thread , And added to the end of the synchronization queue ,
// If the queue is empty , perform (enq), Initialize synchronization queue , And add the node to the tail
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// loop
for (;;) {
// The previous node of the current node
final Node p = node.predecessor();
// If the node is a header node , Get the semaphore and return
if (p == head) {
int r = tryAcquireShared(arg);
// To be successful
if (r >= 0) {
// Set the head node
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
// The rest of the time
nanosTimeout = deadline - System.nanoTime();
// Be overdue , Direct return failure
if (nanosTimeout <= 0L)
return false;
// Attempt to acquire semaphore failed , Clean up useless or thread terminated nodes in the synchronization queue
// And the remaining time should be greater than 1000ns, Otherwise, there is no need to interrupt , Just continue the cycle ,1000ns Too short
// Program execution takes time , You can directly cycle
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// Interrupt exception
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
This acquisition only needs two steps , If the front node is the head node , That is, only the current thread gets semaphores , Just get it directly . The second step , If it's not the head node , Naturally, you have to wait in line , The node state is set to Node.SIGNAL, At the same time, clear the status in the synchronization queue waitStatus > 0 The node of , Then the thread interrupts , Waiting to wake up .
And one of them setHeadAndPropagate Method , The main meaning is , Semaphore released by the previous thread , There may be surplus after the current thread gets , therefore , Consider waking up the next waiting thread to continue getting semaphores . Personally, this method is a little convoluted , Recommend a Blog , Those who want to study deeply can have a look
// Set the queue header , And check whether the subsequent queue is waiting
private void setHeadAndPropagate(Node node, int propagate) {
// The current node obtains the shared lock , Become the head node
Node h = head;
setHead(node);
//propagate > 0 It indicates that there is residual semaphore , Subsequent nodes can try to acquire semaphores , So it's important doReleaseShared
//propagate == 0 And h.waitStatus < 0 at that time tryAcquireShared There are no shared locks left after , But at a later time, it is likely that the shared lock will be released again .
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
Then the semaphore is released , It is equivalent to token bucket , Walk through the passage , The token must be put back in the bucket
// Semaphore release
public void release(int permits) {
// Release less than 0, It doesn't make sense at all , abnormal
if (permits < 0) throw new IllegalArgumentException();
// Release semaphore
sync.releaseShared(permits);
}
// Release semaphore ( Shared lock )
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Semaphore Realization
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
// or ,release It's a negative number , Or it is beyond the limit of digits , The result is smaller than the current ,
// abnormal : Maximum allowed count exceeded
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS Replace
if (compareAndSetState(current, next))
return true;
}
}
The process of releasing semaphores is also relatively simple , Namely state Add the semaphore to be released , Then proceed CAS Replace it , Of course , There is still a necessary process to go !doReleaseShared!
// Release the shared lock
private void doReleaseShared() {
// loop
for (;;) {
Node h = head;
// There are at least two node, Just one node , There is no need to release
if (h != null && h != tail) {
// Get the header node status
int ws = h.waitStatus;
// The head node is SIGNAL,CAS Wake up the
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// If the state is 0, explain h The thread represented by the successor of has been awakened or is about to be awakened , And this intermediate state is about to disappear , Either because acquire thread Failed to acquire lock and set again head by SIGNAL And block again , Either because acquire thread Get the lock successfully and put yourself (head The subsequent ) Set to new head And as long as head The successor is not the tail of the team , So new head It must be SIGNAL. So set this intermediate state head Of status by PROPAGATE, Let it status It turns negative again , This may be detected by the wakeup thread
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// head Exit the loop unchanged , Otherwise, multiple loops will be executed .
if (h == head) // loop if head changed
break;
}
}
Semaphore The main logic of is like this , If it's just a simple attempt to get semaphores , It's direct modification state! But generally, there must be an expiration time of the attempt (timeout) Of , That is to say semaphore.tryAcquire(2,25, TimeUnit.SECONDS);
This is the time , It needs to be used CLH There's a queue , That is, blocking and waiting , Because the semaphore obtained each time is inconsistent , Maybe a thread releases 6 A semaphore , Then the next three threads , Each gets two , The thread will be awakened at the time of acquisition , The logic here seems to be more convoluted , It's better to make some data locally ,debug look down , This should be better understand Semaphore Acquisition and release of signal quantity .
Okay , Just come here game over 了 , If it feels OK , A great bai ~
no sacrifice,no victory~
边栏推荐
- 吴恩达团队2022机器学习课程,来啦
- 开户注册挖财安全吗?有没有风险的?靠谱吗?
- 《2022中国信创生态市场研究及选型评估报告》发布 华云数据入选信创IT基础设施主流厂商!
- sample_rate(采样率),sample(采样),duration(时长)是什么关系
- Maximum artificial island [how to make all nodes of a connected component record the total number of nodes? + number the connected component]
- Whether to take a duplicate subset with duplicate elements [how to take a subset? How to remove duplicates?]
- jdbc读大量数据导致内存溢出
- 使用JMeter录制脚本并调试
- Privacy computing helps secure data circulation and sharing
- Huaxia Fund: sharing of practical achievements of digital transformation in the fund industry
猜你喜欢
如何写出好代码 - 防御式编程
Sophon base 3.1 launched mlops function to provide wings for the operation of enterprise AI capabilities
FCN: Fully Convolutional Networks for Semantic Segmentation
第十一届中国云计算标准和应用大会 | 华云数据成为全国信标委云计算标准工作组云迁移专题组副组长单位副组长单位
websocket 工具的使用
含重复元素取不重复子集[如何取子集?如何去重?]
ConvMAE(2022-05)
@Extension、@SPI注解原理
Insufficient picture data? I made a free image enhancement software
基于can总线的A2L文件解析(3)
随机推荐
IDEA配置npm启动
Matlab built-in function how different colors, matlab subsection function different colors drawing
【在優麒麟上使用Electron開發桌面應】
【pm2详解】
使用JMeter录制脚本并调试
Vulnhub's darkhole_ two
Simulate the hundred prisoner problem
Share: ZTE Yuanhang 30 Pro root unlock BL magick ZTE 7532n 8040n 9041n brush mask original brush package root method Download
New words new words new words new words [2]
[paddlepaddle] paddedetection face recognition custom data set
快速生成ipa包
如何写出好代码 - 防御式编程
buuctf-pwn write-ups (9)
个人对卷积神经网络的理解
How to obtain the coordinates of the aircraft passing through both ends of the radar
jdbc读大量数据导致内存溢出
Introduction to Resampling
Whether to take a duplicate subset with duplicate elements [how to take a subset? How to remove duplicates?]
nano的CAN通信
Logical words in Articles