Prefacestay JUC Thread synchronizer in addition to CountDownLatch and CycleBarrier , There's another one called Semaphore ( Semaphore ), Again based on AQS Realized . Let's look at the internal principle of semaphores .
official account :『 Liu Zhihang 』, Record the skills in work study 、 Development and source notes ; From time to time to share some of the life experience . You are welcome to guide !
Introduce
One count semaphore . conceptually , Semaphores maintain a set of permissions . If necessary , Call before permission is available acquire Methods will be blocked , Until the license is available . call release Method will add a license , To release the blocked thread .
- Specify the initial license number when declaring .
- call acquire(int permits) Method , Specify the target number of permits .
- call release(int permits) Method , Issue a specified number of licenses .
When the permitted quantity does not reach the specified target quantity , call acquire Method's thread will be blocked .
Basic use
public class SemaphoreTest1 {
private static final Semaphore SEMAPHORE = new Semaphore(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 5; i++) {
pool.submit(() -> {
try {
Thread.sleep(1000 + new Random().nextInt(1000));
} catch (InterruptedException ignored) {
}
System.out.println(" Current thread : " + Thread.currentThread().getName() + " Issue a license ");
SEMAPHORE.release(1);
});
}
System.out.println("-----> This is the main thread ");
SEMAPHORE.acquire(5);
System.out.println("-----> Main thread execution completed ");
pool.shutdown();
}
}
-----> This is the main thread
Current thread : Thread-pool-2 Issue a license
Current thread : Thread-pool-4 Issue a license
Current thread : Thread-pool-1 Issue a license
Current thread : Thread-pool-0 Issue a license
Current thread : Thread-pool-3 Issue a license
-----> Main thread execution completed
The above method is similar to CountDownLatch Usage of , After the child thread has finished executing , The main thread continues to execute . It's just Semaphore and CountDownLatch The biggest difference is :
Semaphore Is to increase from the specified value , Until the number of permits is reached , Then the blocked thread starts to execute .
CountDownLatch It starts with a specified number of threads , Until 0 when , The blocked thread begins to execute .
Of course, it's just the simplest use , In addition, let the main thread wait , You can also make other threads wait , And then start to execute .
Question question
- Semaphore and AQS What does it matter ?
- Semaphore and CountDownLatch What's the difference? ?
Source code analysis
The basic structure
It can be seen from the class diagram that Semaphore There is a static inner class inside Sync Inherited AQS, At the same time, in order to distinguish between fair and unfair situations ,Sync There are two subclasses respectively :NonfairSync 、FairSync.
Next, according to the case, from the constructor 、acquire()、release() Starting with , To understand the internal implementation principle .
initialization
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
Initialize default unfair lock , At the same time, you need to pass in the specified number of permissions , You can see that this code is calling AQS Of setState(permits) Method . The code is as follows :
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
}
setState The method is actually to AQS Of state Assign a value .
Add
- stay ReentrantLock in state It stands for lock state ,0 No thread gets lock , Greater than or equal to 1 A thread has already acquired a lock , Greater than 1 Indicates that the thread that obtained the lock has re entered multiple times .
- stay ReentrantReadWriteLock in state Represents the state of the lock .state by 0 , No thread holds lock ,state The height of 16 Represents the read lock state , low 16 Represents the write lock state . The actual value of the read-write lock can be obtained by bit operation .
- And in here, (CountDownLatch) It represents the value of the latch or count .
If the state Forgotten , You can read the preceding AQS 、CAS Related codes . state Here is the allowed number of semaphores .
acquire()
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
acquire() and acquire(int permits) All the calls are sync.acquireSharedInterruptibly(permits) Method , It's just a passing parameter , One default is 1.
acquireSharedInterruptibly Method , In fact, that is Sync Inherited from AQS Of .
This one can be read AQS The article , Here is a brief introduction :
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- After failure, you will use
doAcquireSharedInterruptibly(arg);
Keep getting resources ; final Node node = addWaiter(Node.SHARED);
Nodes will be created and put into the queue in shared mode ;- Continue to judge the previous node in the loop , If it is head, Then try to get shared resources ;
- In the shared mode, you will use
setHeadAndPropagate(node, r);
Set the head node , At the same time, wake up the subsequent nodes .
tryAcquireShared It needs subclasses to implement , That is to say Semaphore.Sync In the implementation class of , Here we use FairSync Explain :
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
// If there's a node in front of it , Then return directly -1 It means failure
if (hasQueuedPredecessors())
return -1;
// Get current semaphore
int available = getState();
// Get the current surplus
int remaining = available - acquires;
// If it is less than 0 perhaps CAS Set semaphore successfully Then return directly
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
And the meaning of this code is :
- If there's a node in front of it , It's blocking directly ;
- If the current residual semaphore is less than 0 , Then return a negative value , Direct blocking ;
- If the current surplus is greater than or equal to 0 , Meeting CAS Update semaphore , And return a nonnegative number .
The meaning of this value is , stay AQS It defines , The meaning is as follows :
- Less than 0: It means failure ;
- be equal to 0: Indicates that the shared mode has successfully obtained resources , However, subsequent nodes cannot succeed in shared mode ;
- Greater than 0: Indicates that the shared mode has successfully obtained resources , Subsequent nodes may also succeed in sharing mode , under these circumstances , Subsequent waiting threads must check availability .
release()
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
A given number of issuance licenses , This number increases the number of licenses available . Look at its internal call is Sync Of releaseShared, In fact, that is AQS The corresponding method of :
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
If you realize tryReleaseShared return true, Release resources in shared mode . Among them tryReleaseShared Partly from Semaphore.Sync To realize , The logic is as follows :
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// Get current state
int current = getState();
// Yes state Add
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// Use CAS assignment
if (compareAndSetState(current, next))
return true;
}
}
As you can see from the code above , stay Semaphore Of release The main method is to state Add , After successful addition, it will call AQS Of doReleaseShared Method to wake up the head node .
summary
Q&A
Q: since Semaphore Is based on AQS, That's in Semaphore in state What does it mean ?
A: stay Semaphore in state Represents the number of licenses ,acquire Method when the permission is less than a specified number of threads will be blocked ,release Method to increase the permission. When the permission increase is successful, the blocking node will be awakened .
Q: Semaphore be based on AQS How does this work ?
A:
- Initial settings state The initial value of the , That's the initial number of licenses .
- acquire Method to set the target number , When the target quantity is greater than the current quantity , Will block the thread and put it in the blocking queue . This is based on AQS Realization .
- release Yes state Add , When successful, it will call AQS Of doReleaseShared Wake up the head node . Again based on AQS Realization .
Q: Semaphore and CountDownLatch What's the difference? ?
A: Semaphore The counter is incremental , and CountDownLatch It's decreasing . The same thing is that the counters can't be reset .
Conclusion
In the reading Semaphore In the process of source code , Found that its main functions are based on AQS Realized , You can look back and read AQS Notes on . Again Semaphore It also supports fair and unfair models , You need to read this by yourself .