当前位置:网站首页>Advanced part of concurrent programming: parallel flow, tasks and executors, and completabilefuture classes
Advanced part of concurrent programming: parallel flow, tasks and executors, and completabilefuture classes
2022-06-21 09:36:00 【tragically unhappy】
Concurrent programming
This blog is a summary of the learning process of concurrent programming .
This blog is a basic introduction to the concept of concurrent programming , Its goal is to provide us with enough basic knowledge , So that you can grasp the complexity and danger of the problem , So as to avoid some mistakes .
This blog introduces some advanced concurrency concepts , Including the latest parallel Streams and CompletableFutures.
1、 New definition of concurrency
Concurrent Usually means : More than one task is being performed . and parallel It almost always means : More than one task meanwhile perform .
One of the main reasons these two concepts are mixed together is Java Use Threads To achieve concurrency and parallelism .
There are also some concepts :
- Pure concurrency : In a single CPU Run task on . Pure concurrent systems produce results faster than sequential systems , But its running speed will not increase with the increase of processors .
- Concurrent - parallel : Use concurrency technology , Results programs can produce results faster with more processors .
- parallel - Concurrent : Use parallel programming technology to write , If there is only one processor , As a result, the program can still run .(Java 8 This is the case with streaming programming )
- Pure parallelism : Unless there are multiple processors , Otherwise, it can't be realized .
After a brief understanding of the above concepts , A simpler definition of concurrency is better understood :
Concurrency is a set of performance technologies , Focus on reducing waiting
2、Java Concurrent
Java It adopts a more traditional way to realize concurrency , That is, add support for threads on top of the sequential language . That is, the threading mechanism is to create task exchange in a single process represented by the executor , Instead of crossing in a multitasking operating system (fork) External processes .
Concurrency usually improves the performance of programs running on a single processor .
Concurrency adds extra overhead compared to sequential execution , Including complexity costs , But it can be programmed 、 Balance of resources and convenience of users . Usually , Concurrency allows you to create less coupled designs ; Otherwise, you have to be very careful with code that uses concurrent operations .
The most direct way to achieve concurrency is at the operating system level process . A process is a self-contained program running in its own address space . A multitasking operating system can periodically CPU Switch from one process to another to run multiple processes at the same time ( Program ). And at the same time , image Java The concurrent systems used will share such things as memory and I/O Such resources , So the most basic difficulty in writing multithreaded programs is Coordinate the use of these resources between different thread driven tasks , So that these resources will not be accessed by multiple tasks at the same time .
Four concurrent maxims :
- Don't do this
- Nothing is right , Everything can go wrong
- It works , That doesn't mean it's okay
- You must still understand
To sum up, concurrency is not safe , But you have to learn it .
The following is what will be introduced : The latest advanced Java Concurrent structure :
- Parallel Streams( Parallel flow ): This is a Java 8 Streams Improved syntax provided , You can simply
parallelAdd to the expression to parallelize the flow . - Create and run tasks : A task is a piece of code that can run independently .
- Terminate a long running task : Tasks do not always run independently , So a mechanism is needed to turn them off . A typical approach is to use a flag , This introduces the problem of shared memory , To avoid additional problems , We use Java Of Atomic library .
- Completable Futrues : When you take your clothes to the dry cleaner , They will give you a receipt . With this receipt , You can continue to do other tasks , When your clothes are ready, you take them away . The receipt is the connection between you and the tasks performed by the dry cleaner in the background ,Futrue yes Java 5 Introduced .Future It is more convenient than the previous method , But you still have to show up and take out the dry cleaning with the receipt , If the task is not completed, you need to wait . For a series of chain operations ,Futures It didn't help much .( A lot of scenarios , adopt ExecutorService Get the result of thread running , Use execute Method to submit a task is unable to get the result , At this time, I will use submit Method to submit , In order to get the result of thread running . and submit Method returns Future. Use
future.get()Method to get the thread execution result ,get()Method is blocked , When the main thread executes to get() Method , The current thread will wait for the asynchronous task to complete , In other words , The asynchronous effect is used by us get() Take the result , It's going to be ineffective .) - CompletableFutrue : This is a Java 8 A better solution : It allows you to link actions together , So you don't have to write code to the interface sort operation . With CompletableFutrue Perfect combination , You can easily complete a series of chain operations ( In asynchronous computing , Two computing tasks are independent of each other , But task one depends on the result of task one , In this case , by Future It can't be solved , and CompletableFuture Then you can achieve ).
- Deadlock : Some tasks must wait for the results of other tasks while executing . A blocked task may wait for another blocked task , In turn, cycle , If it's stopped The task of loops to the first , No one can make any progress , A deadlock will occur .
3、 Parallel flow
Java 8 A notable advantage of is : In some cases , They can be easily parallelized . This comes from the careful design of the library , especially The flow uses internal iterations – in other words , They control their iterators . This particular iterator is Spliterator, It is limited to easy automatic segmentation . Just use .parallel It will be split and executed in parallel . But if your code is using Streams Compiling , So it's common to parallelize to improve speed .
for example , The consideration comes from Streams Of Prime.java. Finding prime numbers is a time-consuming process , We can compare :
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.stream.Collectors;
import static java.util.stream.LongStream.*;
/** * The flow uses internal iterations – in other words , They control their iterators . * @Date 2021/8/14 17:47 * @Created by gt136 */
public class ParallelPrime {
static final int COUNT = 100_000;
public static boolean isPrime(long n) {
//rangeClosed Returns an increment 1 And contains the last node ,range Your words do not include . If the second parameter is less than the initial value , Returns an empty stream
// sqrt() return n The square root of , Such as n=4,sqrt(n)=+2; If n=6,sqrt(n)=+2.445, But turn it into long The 2.445 Just round off the decimal places
// therefore 4 Previously, an empty stream was returned, but the result was true,5-8 All for 2
return rangeClosed(2, (long) Math.sqrt(n))
.noneMatch(i -> n % i == 0);// All the data does not meet the conditions before returning true, Empty also returns true
}
public static void main(String[] args) throws IOException {
Timer timer = new Timer();
List<String> primes =
iterate(2,i->i+1)
.parallel() //[1]
.filter(ParallelPrime::isPrime)
.limit(COUNT)
.mapToObj(Long::toString)
.collect(Collectors.toList());
System.out.println(timer.duration());
//System.out.println((long) Math.sqrt(8));
Files.write(Paths.get("primes.txt"), primes, StandardOpenOption.CREATE);
}
}
/* output: 1110 */
We save the data on the hard disk to prevent the compiler from over Optimizing , Finally, it will be printed on the disk .
When the comment goes out //[1] when , My results are approximately parallelized in time 3 times .
3.1 parallel There are restrictions on its use
Realize the summation of sequences , There are many ways to implement it , Here we use timers for comparison . All figures are strictly used long;
Start with a timing method , It uses LongSupplier , measurement getAsLong() The length of the call , Compare the results with checkValue Compare and display results .
import com.gui.demo.thingInJava.concurrency.share.Timer;
import java.util.function.LongSupplier;
import java.util.stream.LongStream;
/** * * @Date 2021/8/16 10:34 * @Created by gt136 */
public class Summing {
static void timeTest(String id, long checkValue, LongSupplier operation) {
System.out.print(id + ": ");
Timer timer = new Timer();
long result = operation.getAsLong();
if (result == checkValue) {
System.out.println(timer.duration() + "ms");
}else {
System.out.format("result: %d%ncheckVallue:%d%n",result,checkValue);
}
}
public static final int SZ = 100_000_000;
//SZ Sum of numbers within
public static final long CHECK = (long) SZ * ((long)SZ + 1) / 2;
public static void main(String[] args) {
System.out.println(CHECK);
timeTest("Sum Stream",CHECK, () -> LongStream.rangeClosed(0, SZ).sum());// Normal summation
timeTest("Sum Stream Parallel",CHECK,()->LongStream.rangeClosed(0,SZ).parallel().sum());// Parallelized summation
timeTest("Sum Iterate: ",CHECK,()->LongStream.iterate(0,i->i+1).limit(SZ+1).sum());// Internal iterative summation
timeTest("Sum Iterated Parallel",CHECK,()->LongStream.iterate(0,i->i+1).parallel().limit(SZ+1).sum());// Internal iteration parallel summation
}
}
/* output = 5000000050000000 Sum Stream: 52ms Sum Stream Parallel: 21ms Sum Iterate: : 97ms Sum Iterated Parallel: 2759ms */
*/
main() The first version of uses direct generation Stream And call sum() Methods . We can see that the advantage of flow is that even if SZ As big as 100 million , The program can also handle it well without overflow . While using parallel The basic range operation of is significantly faster .
But use iterate() To generate a sequence is very slow , It may be that every time a number is generated, it must call lambda Why . however When we try to parallelize it , The result is not only longer than the non parallel version , Even when SZ exceed 100 When all , Will run out of machine memory ( On some machines ). application parallel Is a reasonable attempt , But it will produce these amazing results . But we can make a preliminary observation on the parallel algorithm :
- Stream parallelism divides input data into multiple parts , So the algorithm can be applied to those individual parts .
- The cost of array partitioning is low , The segmentation is uniform and has perfect control over the size of segmentation .
- The linked list does not have these attributes ,“ Split ” A linked list simply means dividing it into “ First element ” and “ The rest of the elements ”, This is not practical .
- Stateless generators behave like arrays , Used above
range()It's stateless . - The iteration generator behaves like a linked list ,
iterate()Is an iteration generator .
Now let's try to solve the problem by filling the array with values and summing the array , Because the array is allocated only once , So we are unlikely to encounter garbage collection timing problems .
import java.util.Arrays;
/** * Fill in the array with values and sum the array * @Date 2021/8/16 14:36 * @Created by gt136 */
public class Summing2 {
/** * Sum an array * @param ia * @return */
static long basicSum(long[] ia) {
long sum = 0;
int size = ia.length;
for (int i = 0; i < size; i++) {
sum += ia[i];
}
return sum;
}
public static final int SZ = 20_000_000;
// Sum up
public static final long CHECK = (long)SZ * ((long) SZ + 1)/2;
public static void main(String[] args) {
System.out.println(CHECK);
long[] la = new long[SZ + 1];
// According to the second parameter generator To generate an ordered queue of array length and size , And replace the value with its array value , If generator by null, Throw an exception
Arrays.parallelSetAll(la, i -> i);
Summing.timeTest("Array Stream Sum", CHECK, () -> Arrays.stream(la).sum());
Summing.timeTest("Parallel", CHECK, () -> Arrays.stream(la).parallel().sum());
Summing.timeTest("Basic Sum", CHECK, () -> basicSum(la));
Summing.timeTest("ParallelPrefix",CHECK,()->{
// This method will sum the values of the first few bits of the array and add the number i The value of bit is assigned to la[i];la[la.length - 1] The value of is the sum
Arrays.parallelPrefix(la, Long::sum);
return la[la.length - 1];
});
}
}
/* output 200000010000000 Array Stream Sum: 25ms Parallel: 16ms Basic Sum: 19ms ParallelPrefix: 98ms */
The first limitation is memory size , Because arrays are preallocated , So there is no way to create the same size as the previous version . Parallelization can speed up , Even better than using Basic Sum The cycle is faster , But the interesting thing is :ParallelPrefix But the slowest. ( Because it sums the first few values ).
Because we are dealing with threads , So we have to capture any trace information into the concurrent data structure .
The following code demonstrates that a bunch of threads take values from a generator , And then to limit Select a limited result set .
public class ParallelStreamPuzzle {
// New thread safe queue
public static final Deque<String> TRACE = new ConcurrentLinkedDeque<>();
/** * int Integer generator , Realized supplier. */
static class IntGenerator implements Supplier<Integer> {
// Atomic classes ,
private AtomicInteger currentValue = new AtomicInteger();
@Override
public Integer get() {
// This step will be called by multiple threads add Method , however ConcurrentLinkedDeque It's thread safe , So they will take turns
TRACE.add(currentValue.get() + ": " + Thread.currentThread().getName());
// Will let the waiting thread execute in turn .
return currentValue.getAndIncrement();
}
}
public static void main(String[] args) throws IOException {
List<Integer> x = Stream.generate(new IntGenerator())
.limit(10)
.parallel()
.collect(Collectors.toList());
System.out.println(x);
Files.write(Paths.get("PSP.txt"), TRACE);
}
}
/* output: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] */
The result of the above code is only like this when I run it for the first time , After each run, the results are different . Its execution process is like this : A stream abstracts an infinite sequence , Generate... On demand . When you ask it to generate parallel streams , All these threads call as much as possible get(), Join in limit() after , Only these are needed . But according to the written txt Can be observed , Even if it's just 10 It's worth , But it also produces 1024 Elements .
currentValue Is thread safe AtomicInteger The class definition , It can prevent the destruction of value caused by competition .
If you want to generate one int flow , You can use IntStream.range().
public class ParallelStreamPuzzle2 {
public static void main(String[] args) {
List<Integer> x = IntStream.range(0,30)
.peek(e->System.out.println(e+": "+ Thread.currentThread().getName()))
.limit(10)
.parallel()
.boxed()
.collect(Collectors.toList());
System.out.println(x);
}
}
/* output: 8: main 2: ForkJoinPool.commonPool-worker-6 1: ForkJoinPool.commonPool-worker-3 7: ForkJoinPool.commonPool-worker-6 6: ForkJoinPool.commonPool-worker-5 4: ForkJoinPool.commonPool-worker-1 3: ForkJoinPool.commonPool-worker-7 0: ForkJoinPool.commonPool-worker-4 5: main 9: ForkJoinPool.commonPool-worker-2 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] */
Added peek() Can verify parallel() It works .boxed() Acceptable int Stream and package it as Integer flow .
Now we can get different values generated by multiple threads , But it only produces 10 Requested values , instead of 1024 And then take the value from the inside .
As for the printed data ForkJoinPoolcommonPool Is the default thread pool used in implementing multithreaded tasks .
thus ,parallel The introduction to the use of , Here's a summary , I see the big man firefly_ Here is a summary of : Address Here it is
parallelStream The applicable scenario is CPU intensive , Just don't waste CPU, If the computer itself CPU The load is heavy , That also uses parallel streams everywhere , That doesn't work ;
1. I/O intensive disk I/O、 The Internet I/O All belong to I/O operation , This part of the operation consumes less CPU resources , In general, parallel flow is not applicable to I/O Intensive operation , For example, the use and popularity of mass message push , It involves a lot of I/O, Using parallel streams is much slower
2. CPU intensive The calculation type belongs to CPU Dense , This kind of operation parallel flow can improve the running efficiency .
3. Don't use it in multithreading parallelStream, The reason is similar to the above , Everyone is scrambling CPU There is no improvement effect , On the contrary, it will increase the overhead of thread switching ;
Will bring uncertainty , Please ensure that each process is stateless and not associated ;
4. consider NQ Model :N Amount of data available ,Q The amount of computation performed for each data element , The product of N*Q The bigger it is , The more likely it is to get parallel acceleration . When N * Q > 10000( About the size of the collection 1000 above ) Will be effectively improved ;
5. parallelStream Is to create a parallel Stream, And its parallel operation does not have thread propagation , So it is impossible to get ThreadLocal The value of the created thread variable ;
When using parallel flow, the order of elements cannot be guaranteed , That is, even if you use a synchronized set, you can only ensure that the elements are correct, but you can't guarantee the order of the elements ;
6. lambda The execution of is not done in an instant , All use parallel stream Can be the source of blocking programs , And during execution, other parts of the program will not be able to access these workers, It means any dependence parallel streams What else is occupying your program common ForkJoinPool It's going to be unpredictable and potentially dangerous .
4、 Create and run tasks
In the last few examples above, you can see , In some cases, concurrency cannot be achieved through parallel streams , At this point, you must create and run your own tasks . We will learn later Java 8 Newly added CompletableFutrue, But let's start with some more basic concepts .
4.1 Tasks and Executors
stay Java In an earlier version , You create your own... Directly Thread Object to use threads ( See Concurrency bottom layer ), Even subclass them to create your own specific “ Task thread ” object . In this case, you manually call the constructor and start the thread yourself .
The overhead of creating these threads is important , Therefore, manual operation is not recommended . stay Java 5 in , Added thread pool , You can give the type of class to execute to ExecutorService To run the task , Instead of creating new... For each different type of task Thread subtypes .ExecutorService Manage threads for you , And re - loop the thread after running the task instead of discarding the thread .
public class NapTask implements Runnable{
final int id;
public NapTask(int id) {
this.id = id;
}
@Override
public void run() {
new Nap(0.1);
System.out.println(this + " " + Thread.currentThread().getName());
}
@Override
public String toString() {
return "NapTask{" +
"id=" + id +
'}';
}
}
/***********************************************************/
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
IntStream.range(0, 10)
.mapToObj(NapTask::new)
.forEach(exec::execute);
System.out.println("All tasks submitted");
exec.shutdown();
while (!exec.isTerminated()) {
System.out.println(Thread.currentThread().getName() + " awaiting termination");
new Nap(0.1);//sleep For a while
}
}
}
/* output All tasks submitted main awaiting termination main awaiting termination NapTask{id=0} pool-1-thread-1 NapTask{id=1} pool-1-thread-1 main awaiting termination main awaiting termination NapTask{id=2} pool-1-thread-1 main awaiting termination NapTask{id=3} pool-1-thread-1 main awaiting termination NapTask{id=4} pool-1-thread-1 main awaiting termination NapTask{id=5} pool-1-thread-1 NapTask{id=6} pool-1-thread-1 main awaiting termination main awaiting termination NapTask{id=7} pool-1-thread-1 main awaiting termination NapTask{id=8} pool-1-thread-1 NapTask{id=9} pool-1-thread-1 main awaiting termination */
Executors.newSingleThreadExecutor(); yes Executors A factory method in , It creates specific types of ExecutorService.
Created ten NapTasks And submit them to ExecutorService, This means that they start running on their own . however ,main Continue to do your own thing . When running exec.shutdown(); when , It tells ExecutorService Things done and submitted , But no longer accept any new tasks . here , These tasks are still running , When all is done ,exec.isTerminated() Turn into true.
Be careful :main() The name of the thread in is main, And there is only one other thread :pool-1-thread-1. Besides , The interleaved output shows that the two threads are indeed running at the same time .
Actually , If you just call exec.shutdown(); The program can complete all tasks . in other words , There is no need to judge later .
4.2 Use more threads
The focus of using threads is to complete tasks faster , So why should we restrict our use SingleThreadExecutor Well ? see Executors Of JavaDoc, You can find more options .
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec
=Executors.newCachedThreadPool();
IntStream.range(0, 10)
.mapToObj(NapTask::new)
.forEach(exec::execute);
exec.shutdown();
}
}
/* output: NapTask[7] pool-1-thread-8 NapTask[4] pool-1-thread-5 NapTask[1] pool-1-thread-2 NapTask[3] pool-1-thread-4 NapTask[0] pool-1-thread-1 NapTask[8] pool-1-thread-9 NapTask[2] pool-1-thread-3 NapTask[9] pool-1-thread-10 NapTask[6] pool-1-thread-7 NapTask[5] pool-1-thread-6 */
When you run this program , You'll find it runs a little shorter . This is because each task has its own thread , They all run in parallel , Instead of waiting for a single thread to run each task . however , Then why use SingleThreadExecutor Well ?
Look at a more complex task :
public class InterferingTask implements Runnable{
final int id;
private static Integer val = 0;
public InterferingTask(int id) {
this.id = id;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
val++;
}
System.out.println(id + " " + Thread.currentThread().getName()+" "+ val);
}
}
/*=======================================*/
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
IntStream.range(0, 10)
.mapToObj(InterferingTask::new)
.forEach(exec::execute);
exec.shutdown();
}
}
/* output: 1 pool-1-thread-2 100 2 pool-1-thread-3 300 3 pool-1-thread-4 200 0 pool-1-thread-1 100 6 pool-1-thread-7 400 4 pool-1-thread-5 500 7 pool-1-thread-8 600 9 pool-1-thread-10 700 5 pool-1-thread-6 800 8 pool-1-thread-9 900 */
//===========================================
public class SingleThreadExecutor3 {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
IntStream.range(0, 10)
.mapToObj(InterferingTask::new)
.forEach(exec::execute);
exec.shutdown();
}
}
/* output = 0 pool-1-thread-1 100 1 pool-1-thread-1 200 2 pool-1-thread-1 300 3 pool-1-thread-1 400 4 pool-1-thread-1 500 5 pool-1-thread-1 600 6 pool-1-thread-1 700 7 pool-1-thread-1 800 8 pool-1-thread-1 900 9 pool-1-thread-1 1000 */
The task of the first class is to make val Add a hundred times . But use CachedThreadPool The output of is not what we expected , And the results are different from one run to the next . The problem is that multiple tasks are running at the same time , Each task tries to write val A single instance of , And there is no protection mechanism , We call such classes thread unsafe .
SingleThreadExecutor The result we want is output , And the results of multiple runs are consistent , Even though InterferingTask Lack of thread safety . Because it runs one task at a time , There is no interference between these tasks , So it To strengthen Thread safety . This phenomenon is called Thread closure , Because running tasks on a single thread limits acceleration , But it can reduce the debugging and rewriting of errors .
4.3 Produce results
InterferingTask Realized Runnable , So there's no return value . In the process of manipulating shared variables , Multiple tasks modify the same variable at the same time to generate competition .
In order to generate the return result , We created Callable instead of Runnable:
public class CountingTask implements Callable<Integer> {
final int id;
public CountingTask(int id) {
this.id = id;
}
@Override
public Integer call() throws Exception {
Integer val = 0;
for (int i = 0; i < 100; i++) {
val++;
}
System.out.println(id + " " + Thread.currentThread().getName()+" "+ val);
return val;
}
}
//===================================================
public class CachedThreadPool3 {
public static Integer extractResult(Future<Integer> future) {
try {
// Return the result of calculation : Will be executed after all tasks are completed , Used to detect possible exceptions
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
List<CountingTask> tasks =
IntStream.range(0, 10)
.mapToObj(CountingTask::new)
.collect(Collectors.toList());
// Call all tasks at once
List<Future<Integer>> futures = exec.invokeAll(tasks);
//reduce Merging method : Use the provided identification value and the associated accumulation function to perform reduction on the elements of the flow , And return the reduced value
Integer sum = futures.stream()
.map(CachedThreadPool3::extractResult)
.reduce(0, Integer::sum);// Returns the result of the operation on each parameter
System.out.println("sum = " + sum);
exec.shutdown();
}
}
/* output: 0 pool-1-thread-1 100 3 pool-1-thread-4 100 1 pool-1-thread-2 100 6 pool-1-thread-7 100 2 pool-1-thread-3 100 7 pool-1-thread-8 100 5 pool-1-thread-6 100 4 pool-1-thread-5 100 8 pool-1-thread-9 100 9 pool-1-thread-10 100 sum = 1000 */
call() Completely independent of all other CountingTask Generate its results , This means that there is no variable sharing state ( Because it calls new, Constructed multiple constructors ).
ExecutorService Allow you to use exec.invokeAll() Start each in the collection Callable.
Only after all the tasks have been completed ,exec.invokeAll() Will return a Future list ( This is the use of future The shortcomings of , So now there is CompletableFutrue), One for each task Future.Future yes java 5 Introduced , Allows you to submit tasks without waiting for them to complete .
future.get() If the task has not been completed Future On the call , It will block , Until the results are available ( You have to wait for the result ). ad locum ,Future It seems redundant , because invokeAll() It will not return until all tasks are completed . however , there Future It is not used to wait for delayed results , It is used to catch any possible exception .
Because when you call get() when ,Future It will block , So it can only expose problems when the task is completed . Final ,Futures Considered an invalid solution . It is now discouraged , But use Java 8 Of CompletableFuture.
public class Futures {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Integer> f = exec.submit(new CountingTask(99));
System.out.println(f.get());
exec.shutdown();
}
}
/* outputs: 99 pool-1-thread-1 100 100 */
It's used here ExecutorService.submit(),
Besides , We can also use parallelism Stream, Solve this problem in a simpler and more elegant way :
public class CountingStream {
public static void main(String[] args) {
System.out.println(
IntStream.range(0,10)
.parallel()
.mapToObj(CountingTask::new)
.map(countingTask -> {
try {
return countingTask.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.reduce(0,Integer::sum)
);
}
}
/* output: 1 pool-1-thread-2 100 0 pool-1-thread-1 100 3 pool-1-thread-4 100 2 pool-1-thread-3 100 5 pool-1-thread-6 100 4 pool-1-thread-5 100 6 pool-1-thread-7 100 7 pool-1-thread-8 100 8 pool-1-thread-9 100 9 pool-1-thread-10 100 sum = 1000 */
4.4 Lambda And method references
stay Java 8 With lambdas And method references , You don't have to be limited to using Runnable and Callable. because lambdas And method references can Use by matching method signatures , So we will not Runnable or Callable Parameters passed to ExecutorService:
class NotRunnable {
public void go() {
System.out.println("NotRunnable");
}
}
class NotCallable {
public Integer get() {
System.out.println("NotCallable");
return 1;
}
}
public class LambdasAndMethodReferences {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
exec.submit(() -> System.out.println("lambda1"));
exec.submit(new NotRunnable()::go);
exec.submit(() -> {
System.out.println("lambda2");
return 1;
});
exec.submit(new NotCallable()::get);
exec.shutdown();
}
}
/* output: lambda1 NotRunnable lambda2 NotCallable */
5、 Terminate time-consuming tasks
Concurrent programs usually use long-running tasks . The callable task returns a value on completion ; Although it has been given a limited life , But it's still a long time . Some tasks are set as background tasks that run forever . You need a way to stop before it's done properly Runnable and Callable Mission , For example, when you close a program .
The original Java The design provides a mechanism for interrupting running tasks . however , Interrupt mechanism includes blocking problem , Interrupting tasks is messy and complex , Because you have to understand all the possible states where interruptions can occur , And possible data loss . Using interrupts is considered against patterns .
The best way to abort a task is to set the periodic check flag for the task . Then the task can pass its own shutdown The process aborted normally . Do not shut down the thread while the task is in progress , It requires the task to terminate itself when it reaches a better opportunity . This can produce better results than interrupts , And more understandable and reasonable code .
When setting tasks, you can set the visual boolean flag, In order to check the flag regularly and perform normal termination . however , This involves a common mutable state .
In the previous code , A lot of them volatile keyword , Here we will use simpler techniques and avoid all variant parameters .
Java 5 Introduced Atomic class , It provides a set of types that do not have to worry about concurrency .
public class QuittableTask implements Runnable {
final int id;
public QuittableTask(int id) {
this.id = id;
}
private AtomicBoolean running = new AtomicBoolean(true);
public void quit() {
running.set(false);
}
@Override
public void run() {
while (running.get()) {
//[1]
new Nap(0.1);
}
System.out.println(id + " ");
}
}
/*================================================*/
public class QuittingTasks {
public static final int COUNT = 150;
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
List<QuittableTask> tasks = IntStream.range(0, COUNT)
.mapToObj(QuittableTask::new)
.peek(qt -> exec.execute(qt))
.collect(Collectors.toList());
new Nap(1);
tasks.forEach(QuittableTask::quit);
exec.shutdown();
}
}
/* output: 71 42 39 35 21 24 30 18 38 50 33 46 41 36 40 52 53 65 76 68 16 77 84 58 63 43 12 47 114 99 98 79 91 100 87 101 95 113 90 104 83 74 82 93 78 96 75 66 54 55 88 92 59 89 67 85 115 62 70 105 80 72 73 81 69 60 61 56 57 49 44 64 45 48 37 25 109 28 111 17 108 29 32 107 110 103 15 102 106 112 7 22 97 0 86 2 5 9 94 6 8 3 10 4 11 23 14 27 1 20 34 19 26 13 31 51 139 117 134 127 125 129 131 124 135 120 132 146 136 138 133 137 143 140 142 144 141 145 147 148 149 118 121 126 116 130 128 119 123 122 */
Although multiple tasks can be successfully called on the same instance quit(), however AtomicBoolean It can prevent multiple tasks from actually modifying at the same time running, So that quit Become thread safe .
The second program we started a lot QuittableTasks And turn them off , We use peek take QuittableTasks Pass to ExecutorService, Then collect these tasks into List.main() in , As long as any task is still running , Will prevent the program from exiting . Even if you call... Sequentially for each task quit(), Tasks are not closed in the order they are created . So independent tasks don't necessarily correspond to signals .
6、CompletableFuture class
As an introduction , Use here CompletableFuture To achieve QuitTasks The task of :
public class QuittingCompletable {
public static void main(String[] args) {
List<QuittableTask> tasks = IntStream.range(0, QuittingTasks.COUNT)
.mapToObj(QuittableTask::new)
.collect(Collectors.toList());
List<CompletableFuture<Void>> cfutures =
tasks.stream()
/* * completableFuture and ExecutorService The difference between implementing multithreading is that the former can better adapt to streaming programming , and * runAsync These tasks can be executed asynchronously , That's one CompletableFuture Corresponding to a task . */
.map(CompletableFuture::runAsync)
.collect(Collectors.toList());
new Nap(1);
tasks.forEach(QuittableTask::quit);
cfutures.forEach(CompletableFuture::join);
}
}
/* output: 4 2 7 0 1 5 3 6 13 12 10 11 9 8 19 18 17 16 15 14 25 24 23 22 21 31 32 20 33 34 30 29 28 27 26 40 39 38 44 37 36 35 47 46 45 43 42 41 53 52 51 50 49 48 59 58 57 56 55 54 65 64 63 62 61 60 71 70 69 68 67 66 77 76 75 74 73 72 83 82 81 80 79 78 89 88 87 86 93 94 85 84 98 99 100 101 102 97 96 95 92 91 90 108 107 106 105 104 103 114 113 112 111 110 109 120 121 122 119 118 117 116 115 128 127 126 125 124 123 134 133 132 131 130 140 129 141 139 138 137 136 135 147 146 145 144 143 142 149 148 */
The task is List<QuittableTask>, It's in QuittingTasks.java In the same , But in this case , No, peek() Each one QuittableTask Submit to ExecutorService. It is Creating cfutures period , Each task is assigned to CompletableFuture::runAsync, And back to CompletableFuture. Because the underlying run() Don't return anything , So use CompletableFuture call join() To wait for it to finish .
The important thing to note in this example is : Running a task does not require the use of ExecutorService, But directly to CompletableFuture management , You don't need shutdown(); in fact , Unless you explicitly call join(), Otherwise the program will exit as soon as possible , Instead of waiting for the task to complete .
6.1 Basic usage
Next we will learn more , The following is with static methods work() Class , It performs... On objects of this class work():
// An object , But it didn't work Runnable perhaps Callable
public class Machina {
public enum State{
//State initial
STATE,ONE,TWO,THREE,END;
State step() {
if (equals(END)) {
return END;
}
return values()[ordinal() + 1];
}
}
private State state = State.STATE;
private final int id;
public Machina(int id) {
this.id = id;
}
// Move the machine from one state to the next . And need 0.1 Seconds to complete the work .
public static Machina work(Machina machina) {
if (machina.state.equals(State.END)) {
new Nap(0.1);
machina.state = machina.state.step();
}
System.out.println(machina);
return machina;
}
@Override
public String toString() {
return "Machina " + "id=" + id + (state.equals(State.END) ? "Complete" : state);
}
}
/***************************************************************/
public class CompletedMachina {
public static void main(String[] args) {
//completedFuture It will return a completed according to the given value CompletableFuture
CompletableFuture<Machina> cf = CompletableFuture.completedFuture(new Machina(0));
try {
Machina m = cf.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
work() Method to move the machine from one state to the next . And need 0.1 Seconds to complete the work .
CompletableFuture call completedFuture() Wrap the incoming object , It creates ( return ) One “ Already completed ” Of CompletableFuture .
Usually ,get() The calling thread is blocked while waiting for the result . But this block won't block any more , because CompletableFuture It's done , So its results are immediately available .
import com.gui.demo.thingInJava.concurrency.share.Timer;
import java.util.concurrent.CompletableFuture;
/** * thenApply Chain use of * @Date 2021/8/23 14:39 * @Created by gt136 */
public class CompletableApplyChained {
public static void main(String[] args) {
Timer timer = new Timer();
/* * thenApply Used to accept an input and generate an output function . In this case ,work() The function produces the same type as it receives (Machina), * So every CompletableFuture The return types of the added operations are Machina, however ( Like... In a stream `map()`) This function can also return different types , * This will be reflected in the return type . */
CompletableFuture<Machina> cf = CompletableFuture.completedFuture(new Machina(0))
.thenApply(Machina::work)// You can see here about CompletableFutures Important information , They will automatically unpack and repackage the objects they carry when you perform the operation . This makes it easier to write and understand code .
.thenApply(Machina::work)
.thenApply(Machina::work)
.thenApply(Machina::work);
System.out.println(timer.duration());
}
}
/* outputs: Machina id=0: ONE Machina id=0: TWO Machina id=0: THREE Machina id=0: Complete 535 */
Here we add a Timer, Its function is obviously increased at every step 100ms Waiting time , Will also CompletableFuture Inside thenApply The extra cost is reflected in .CompletableFutures An important benefit of is They encourage the use of the private subclass principle ( Don't share anything ). By default , Use thenApply To deal with a function that does not communicate with others ---- It takes only one parameter and returns a result . This is also the basis of functional programming , And it's very effective in terms of concurrency . Parallel streams and CompletableFutures Designed to support these principles . As long as you don't decide to share data, you can write relatively safe concurrent programs .
Call back one thenApply() Once you start an operation , So before you finish all the tasks , It won't finish CompletableFuture The construction of . Although sometimes it works , But starting all the tasks is usually more valuable , So that you can continue to do other things while they are running . We can go through thenApplyAsync() To achieve this goal :
public class CompletableApplyAsync {
public static void main(String[] args) {
Timer timer = new Timer();
CompletableFuture<Machina> cf = CompletableFuture.completedFuture(new Machina(0))
.thenApplyAsync(Machina::work)
.thenApplyAsync(Machina::work)
.thenApplyAsync(Machina::work)
.thenApplyAsync(Machina::work);
System.out.println(timer.duration());
System.out.println(cf.join());
System.out.println(timer.duration());
}
}
/* outputs: 50 Machina id=0: ONE Machina id=0: TWO Machina id=0: THREE Machina id=0: Complete Machina id=0: Complete 484 */
A synchronous invocation ( What we usually do ) signify “ Don't return until your work is over ”, And asynchronous invocation means “ Return immediately and continue with the follow-up work ”. As you can see ,cf Now the creation speed is very fast , Every time you call thenApplyAsync() Will return to , So you can go to the next call , The whole method call is completed very quickly .
in fact , If there is no callback cf.join() , The program exits before it finishes its work . and cf.join() It can be until cf Before the operation is completed , prevent main End of process . We can also see that most of the time in this example is spent in cf.join() On .
such “ Return immediately ” The asynchronism capability of depends on CompletableFuture The library does some “ Secret ” The work is done . In especial , It stores the chain of operations you need as a set of callbacks . When operating the first link ( Backstage operation ) When finished and returned , The second link ( backstage ) Must get generated Machina And start working , And so on . But this asynchronous mechanism does not have the sequence we control through the program call stack , Its call link sequence will be lost , So it uses a function address table to store callbacks to solve this problem .
That's all you need to know about callbacks , Through asynchrony ,CompletableFuture Help you manage all callbacks .
6.1.1 Other operating
The following example shows all “ basic ” operation , These operations do not involve combining two CompletableFuture, Nor does it involve exceptions ( Later on ). First , To provide simplicity and convenience , We should reuse two utilities :
public class CompletableUtilities {
// Get and display stored in cf in Value
public static void showr(CompletableFuture<?> cf) {
try {
System.out.println(cf.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
// For those with no value cf operation
public static void voidr(CompletableFuture<Void> cf) {
try {
cf.get();// return void
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
showr() stay CompletableFuture On the call get(), And display the results .voidr() yes CompletableFuture Of showr edition , in other words , It displays information only when a task is completed or fails .
import static com.gui.demo.thingInJava.concurrency.completablefutures.CompletableUtilities.*;
/** * @Classname CompletableOperations * @Description * @Date 2021/8/23 18:25 * @Created by gt136 */
public class CompletableOperations {
//`cfi()` Is a convenient way , It wraps an integer in CompletableFuture<Integer> in
static CompletableFuture<Integer> cfi(int i) {
return CompletableFuture.completedFuture(
Integer.valueOf(i)
);
}
public static void main(String[] args) {
// test showr() have access to
showr(cfi(1));
// call runAsync, because Runnable No return value is generated , Therefore, return is used CompletableFuture <Void> Of voidr() Method . It is recommended to use method reference ( Because it's a static method )
voidr(cfi(2).runAsync(()-> System.out.println("runAsync")));
//runAsync and thenRunAsync The function seems to be the same .
voidr(cfi(3).thenRunAsync(()-> System.out.println("thenRunAsync")));
//runAsync It's a static method , This is recommended
voidr(CompletableFuture.runAsync(()-> System.out.println("runAsync is static")));
//supplyAsync It's also static , But it needs a Supplier Instead of a Runnable, And produce a CompletableFuture<Integer> instead of CompletableFuture<Void>.
showr(CompletableFuture.supplyAsync(() -> 99));
// And thenRunAsync Different ,`cfi(4)`,`cfi(5)` and `cfi(6)` Medium “then” Method parameters are not wrapped Integer.
//thenAcceptAsync Accept one Consumer, Therefore, it will not produce results , Return to one CompletionStage
voidr(cfi(4).thenAcceptAsync(i -> System.out.println("thenAcceptAsync" + i)));
//thenApplyAsync Receive one Function, And produce a CompletionStage( The type of the result can be different from its input type )
showr(cfi(5).thenApplyAsync(i -> i + 42));
//thenComposeAsync And thenApplyAsync Very similar , The only difference is that Function Must be produced already packed in Completable Results in .
showr(cfi(6).thenComposeAsync(i -> cfi(i + 99)));
//
CompletableFuture<Integer> c = cfi(7);
c.obtrudeValue(111);// Reset value
showr(c);
// from CompletionStage Generate a CompletableFuture
showr(cfi(8).toCompletableFuture());
c = new CompletableFuture<>();
// Complete a task by giving it a result (Future), And obtrudeValue() contrary , The latter may force the result to replace the result
c.complete(9);
showr(c);
// If you have completed this task , It ends normally , If not already done , Then use CancellationException Complete this CompletableFuture
c.cancel(true);
System.out.println("canceled: " + c.isCancelled());
System.out.println("completed exceptionally: " + c.isCompletedExceptionally());
System.out.println("done: " + c.isDone());
System.out.println(c);
c = new CompletableFuture<>();
// If the mission (Future) complete , Then return to CompletableFuture The completion value of , Otherwise return to getNow() Replacement value for .
System.out.println(c.getNow(777));
c = new CompletableFuture<>();
c.thenApplyAsync(i -> i + 42)
.thenApplyAsync(i -> i * 12);
//
System.out.println("dependents: " + c.getNumberOfDependents());
c.thenApplyAsync(i -> i / 2);
System.out.println("dependents: " + c.getNumberOfDependents());
}
}
/* outputs: 1 runAsync runRunAsync runAsync is static 99 thenAcceptAsync4 47 105 111 8 9 canceled: true completed exceptionally: true done: true [email protected][Completed exceptionally] 777 dependents: 1 dependents: 2 */
main() Contains a series of which int Value reference test :
Pay attention to
cfi(3)Of thenRunAsync The effect seems to be similar to 2 identical , The difference is :runAsync()It's a static Method , So you don't usually feel likecfi(2)Call it as in , Instead, use it as if it were a method reference .
Follow up testssupplyAsync()It's also a static method , The difference is that it requires a Supplier Instead of a Runnable, And produce aCompletableFuture<Integer>instead ofCompletableFuture<Void>.Last resort (dependence): If we put two
thenApplyAsync()Call link to CompletableFuture On , The number of dependencies does not increase , still 1. however , If we put anotherthenApplyAsync()Attach directly to c, There are now two dependencies : Two links together and another single additional link .This shows that you can use a CompletionStage, When it is done , According to it As a result, multiple new tasks are derived .
6.2 combination CompletableFuture
CompletableFuture The second type of method implementation will be two CompletableFuture Put them together in different ways . It's like a two person game , One CompletableFuture Always arrive at the end earlier than the other . These methods allow you to process the results in different ways .
To test this , We will create a task (Workable), It takes the completion time as one of its parameters , So we can control that CompletableFuture To complete :
public class Workable {
String id;
final double duration;
public Workable(String id, double duration) {
this.id = id;
this.duration = duration;
}
@Override
public String toString() {
return "Workable{" + id + '}';
}
public static Workable work(Workable tt) {
// Sleep the time of the incoming parameter
new Nap(tt.duration);
tt.id = tt.id + "W";
System.out.println(tt);
return tt;
}
public static CompletableFuture<Workable> make(String id, double duration) {
// As in the previous example, I will not go into details
return CompletableFuture.completedFuture(new Workable(id, duration))
.thenApplyAsync(Workable::work);
}
}
stay make() in ,work() The method is applied to CompletableFuture.work() It will take some time to complete , Then it puts the letters W Attach to id On , Indicates that the work has been completed .
Now we can create multiple competing CompletableFuture, And use CompletableFuture To connect them :
import static com.thingInJava.concurrency.completablefutures.CompletableUtilities.*;
/** * @Classname DualCompletableOperations * @Description * @Date 2021/8/26 15:21 * @Created by gt136 */
public class DualCompletableOperations {
static CompletableFuture<Workable> cfA, cfB;
static void init() {
cfA = Workable.make("A", 0.15);
cfB = Workable.make("B", 0.10);// Always win
}
static void join() {
cfA.join();
cfB.join();
System.out.println("**********************");
}
public static void main(String[] args) {
init();
voidr(cfA.runAfterEitherAsync(cfB,()->
System.out.println("runAfterEitherAsync")));
join();
init();
voidr(cfA.runAfterBothAsync(cfB,()->{
System.out.println("runAfterBothAsync");
}));
join();
init();
showr(cfA.applyToEitherAsync(cfB,w->{
System.out.println("applyToEitherAsync:" + w);
return w;
}));
join();
init();
voidr(cfA.acceptEitherAsync(cfB,w->{
System.out.println("acceptEitherAsync: " + w);
}));
join();
init();
voidr(cfA.thenAcceptBothAsync(cfB,(w1,w2)->{
System.out.println("thenAcceptBothAsync: " + w1 + ", " + w2);
}));
join();
init();
showr(cfA.thenCombineAsync(cfB,(w1,w2)->{
System.out.println("thenCombineAsync: " + w1 + ", " + w2);
return w1;
}));
join();
init();
CompletableFuture<Workable>
cfC = Workable.make("C", 0.08),
cfD = Workable.make("D", 0.09);
CompletableFuture.anyOf(cfA, cfB, cfC, cfD)
.thenRunAsync(() -> System.out.println("anyOf"));
join();
init();
cfC = Workable.make("C", 0.08);
cfD = Workable.make("D", 0.09);
CompletableFuture.allOf(cfA, cfB, cfC, cfD)
.thenRunAsync(() -> System.out.println("allOf"));
join();
}
}
/* outputs: Workable{BW} runAfterEitherAsync Workable{AW} ********************** Workable{BW} Workable{AW} runAfterBothAsync ********************** Workable{BW} applyToEitherAsync:Workable{BW} Workable{BW} Workable{AW} ********************** Workable{BW} acceptEitherAsync: Workable{BW} Workable{AW} ********************** Workable{BW} Workable{AW} thenAcceptBothAsync: Workable{AW}, Workable{BW} ********************** Workable{BW} Workable{AW} thenCombineAsync: Workable{AW}, Workable{BW} Workable{AW} ********************** Workable{CW} Workable{DW} anyOf Workable{BW} Workable{AW} ********************** Workable{CW} Workable{DW} Workable{BW} Workable{AW} ********************** allOf */
- For easy access to , take cfA and cfB Defined as static Of .
init()Method is used for A、B Initialize these two variables , because B Always give more than A Short delay , So always win On the side of .join()Is called on two methods And show the border ( Print *) Another convenient way to . - All of these “dual”( dual ) Methods are all based on a CompletableFuture As the object that calls the method , And put the second CompletableFuture As the first parameter , Then there is the operation to be performed .
- By using
showr()andvoidr()You can see ,“run” and “accept” It's terminal operation , and “apply” and “combine” Then a new payload-bearing( Load bearing ) Of CompletableFuture. - The function of the method is self-evident , You can verify this by looking at the output . A particularly interesting approach is
combineAsync(), It waits for two CompletableFuture complete , Then give them all to BiFunction The results can be added to the final CompletableFuture In the payload of .
6.3 abnormal
7、 Deadlock
Because tasks can be blocked , So one task may get stuck waiting for another task , The latter is waiting for other tasks , It goes on like this , Until the tasks in the chain are waiting for the resources held by the first task , To lock everyone in , No thread can continue , This is called a deadlock .
The real problem is , The program looks good , But there is a potential deadlock risk . At this time , Deadlocks can occur , There are no signs in advance and it is difficult to reproduce . So when writing concurrent programs , Careful programming to prevent deadlocks is a key part .
“ The question of philosophers eating ” Is a classic example of deadlock , The basic description specifies five philosophers , Their cutlery is limited , Each person has only one chopstick ( The number of philosophers is the same as that of chopsticks ), When a philosopher eats , You must hold both the left and right chopsticks . If the philosopher on either side is using chopsticks , Then the philosopher must wait , Until you get the necessary chopsticks .
public class StickHolder {
private static class Chopstick {
}
private Chopstick stick = new Chopstick();
private BlockingQueue<Chopstick> holders = new ArrayBlockingQueue<>(1);
public StickHolder() {
putDown();
}
public void pickUp() {
try {
holders.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void putDown() {
try {
holders.put(stick);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
StickHolder Class by keeping a single chopstick at a size of 1 Of BlockingQueue To manage it .BlookingQueue Is a collection designed for safe use in concurrent programming , If you call take() And the queue is empty , It will block . After putting the new element in the queue , The block is released and the value is returned .
For the sake of simplicity ,ChopStick(static) Actually, it's not by StickHolder Generated , Instead, it keeps private in its class .
If you call pickUp(), and stick Unavailable , Then it will block the stick, Until another philosopher calls putDown() take stick return .
Be careful : All thread safety in this class is through BlockingQueue Realized .
public class Philosopher implements Runnable{
private final int seat;
private final StickHolder left,right;
public Philosopher(int seat, StickHolder left, StickHolder right) {
this.seat = seat;
this.left = left;
this.right = right;
}
@Override
public String toString() {
return "P" + "seat=";
}
@Override
public void run() {
while (true) {
System.out.println("Thinking");
right.pickUp();
left.pickUp();
System.out.println(this + " eating");
right.putDown();
left.putDown();
}
}
}
Every philosopher is a task , They tried to separate the chopsticks pickUp() On the left hand and the right hand , So you can eat , And then through putDown() put down stick.
No two philosophers can successfully invoke take() The same chopsticks . And if a philosopher has picked up chopsticks , Then the next philosopher who tries to pick up the same chopsticks will block , Wait for the chopsticks to be released .
public class DiningPhilosophers {
private StickHolder[] sticks;
private Philosopher[] philosophers;
public DiningPhilosophers(int n) {
sticks = new StickHolder[n];
Arrays.setAll(sticks, i -> new StickHolder());
philosophers = new Philosopher[n];
Arrays.setAll(philosophers, i -> new Philosopher(i, sticks[i], sticks[(i + 1) % n]));//[1]
//
philosophers[1] = new Philosopher(0, sticks[0], sticks[1]);//[2]
Arrays.stream(philosophers)
.forEach(CompletableFuture::runAsync);//[3]
}
public static void main(String[] args) {
//
new DiningPhilosophers(5);//[4]
//
new Nap(3, "Shutdown");
}
}
As a result, a seemingly innocent thread fell into a deadlock . I use arrays instead of collections here , Just because this group of grammar is more brief :
- Computers with more than two cores are prone to deadlock .
- stay DiningPhilosophers.java In the construction method of , Every philosopher gets a quote about chopsticks . Except for the last philosopher , Are initialized by placing the philosopher between the next pair of chopsticks , The last philosopher got the 0 Chopsticks as his right chopsticks . That's because the last philosopher is sitting next to the first , And they both share The first 0 Chopsticks .[1] Shown in n Right chopsticks selected for module , Take the last philosopher around the first one .
- Now? , All philosophers can try to eat , Every philosopher is waiting for the philosopher to put down his chopsticks . In order for every philosopher to [ 3] Up operation , call
runAsync(), It means DiningPhilosophers The constructor of returns immediately to [4]; If nothing stops main() complete , The program will exit , Don't do too much . Nap Object block main() sign out , And then in 3 Seconds later, the deadlock program is forced to exit . In the given configuration , Philosophers spend little time thinking , therefore , They all scramble to use chopsticks , It will soon come to an impasse , You can change this condition :① By increasing the [4] To add more philosophers .② stay DiningPhilosophers Uncomment [1].
7.1 The condition of deadlock
To fix the deadlock problem , It has to be understood , When When the four conditions are met at the same time , A deadlock will occur :
- mutual exclusion : At least one of the resources used by the task cannot be shared , here , A chopstick can only be used by one person at a time .
- At least one task must hold a resource and is waiting to acquire another resource held by another task . in other words , There's going to be a deadlock , A philosopher must have one chopstick and be waiting for another .
- Resources cannot be preempted by tasks , The task must treat resource release as a normal event . Philosophers are polite , Will not snatch chopsticks from others .
- There must be a cycle waiting .
Because all the conditions must be met to deadlock , So to prevent deadlock , Just break one of the conditions .
In this program , A simple way to prevent deadlocks is to break the fourth condition . The reason for the deadlock , Because every philosopher tries to pick up his chopsticks in a certain order : First right then left . therefore , It is possible for every philosopher to hold the chopsticks of his right hand while waiting for his left hand , Which leads to waiting . however , If one of the philosophers tries to pick up the left chopstick first , Then the philosopher will never organize the philosopher on the right to pick up chopsticks , This eliminates circular waiting .
Of course , The simplest way to avoid concurrency problems , The best way is never to share resources - Unfortunately , It's not always possible .
边栏推荐
- The spingboot microservice is packaged into a docker image and connected to the database
- JS resource disaster recovery
- 【实战】STM32MP157开发教程之FreeRTOS系统篇3:FreeRTOS 计数型信号量
- R language obtains help information of global, package and function: use the rsitesearch function to search the information of the specified package or function in the R community help manual and arch
- 并发底层原理:线程、资源共享、volatile 关键字
- [practice] stm32mp157 development tutorial FreeRTOS system 6: FreeRTOS list and list items
- 118. summary of basic knowledge of typescript (data type, interface, abstract class, inheritance, attribute encapsulation, modifier)
- R language uses the < - operator to create a new variable, uses the existing data column (sum, mean) to create a new data column, uses the ifelse function or conditional judgment to create a discrete
- 音视频格式简介、编解码、音视频同步
- Alibaba cloud OSS uploading and intelligent image recognition garbage recognition
猜你喜欢

Ali has been working for 8 years. This learning note is left when he reaches P8. He has helped his friends get 10 offers

Verification code ----- SVG captcha

123. deep and shallow copy of JS implementation -- code text explanation

stm32mp1 Cortex M4开发篇9:扩展板空气温湿度传感器控制

TC软件概要设计文档(手机群控)

字符串

Lei niukesi --- basis of embedded AI

leetcode:19. Delete the penultimate node of the linked list

stm32mp1 Cortex M4开发篇11:扩展板蜂鸣器控制

The spring recruitment is also terrible. Ali asked at the beginning of the interview: how to design a high concurrency system? I just split
随机推荐
Waiting in webdriver
R language obtains help information of global, package and function: use the rsitesearch function to search the information of the specified package or function in the R community help manual and arch
Telecommuting Market Research Report
How to connect the Internet - FTTH
The skill of using ADB and the principle of USB communication
Solve the problem of error when typescript object gets value
Unity vuforia recommended equipment
JS resource disaster recovery
Mobile applications introduce static Cordova according to different platforms
QRcode dependency
1. is god horse a meta universe?
2022年中总结-一步一个脚印,踩出柳暗花明
Observation on the salary data of the post-90s: poor, counselled and serious
Use this for attributes in mapstate
The R language plot function visualizes multiple lines in the same plot, and uses the BMP function to save the visualization image to the BMP format file in the specified directory
Pingcap was selected as the "voice of customers" of Gartner cloud database in 2022, and won the highest score of "outstanding performer"
Application configuration management, basic principle analysis
The spingboot microservice is packaged into a docker image and connected to the database
Three key directories in R language and their corresponding priorities: R_ Home directory, user directory, current working directory, files read by R's startup process
113. summary of common usage of moment.js