当前位置:网站首页>This article introduces you to j.u.c's futuretask, fork/join framework and BlockingQueue

This article introduces you to j.u.c's futuretask, fork/join framework and BlockingQueue

2022-06-10 18:07:00 InfoQ

This article is shared from Huawei cloud community 《
【 High concurrency 】J.U.C Component extension
》, author :  ice   The river  .

FutureTask


FutureTask yes J.U.C(java.util.concurrent) Under the , But it's not AQS(AbstractQueuedSynchronizer) Subclasses of . Its processing of thread results is worthy of reference and use in the project .

Thread and Runnable The execution result cannot be obtained after executing the task .Java1.5 Began to offer Callable and Future, Through them, after the task is completed , Get the result of task execution .

Callable And Runnable Interface comparison


Callable: Generic interface , Provide a call() Method , Supports throwing exceptions , And there is a return value after execution

Runnable: Interface , Provide a run() Method , Throwing exceptions is not supported , No return value after execution

Future Interface


For specific Callable and Runnable Mission , You can cancel , Query whether the task has been cancelled , Whether the query is completed and the results are obtained .

Future You can monitor target thread calls call() The situation of , When calling Future Of get() When the method is used , You can get the results . here , The thread executing the task may not complete directly , The current thread starts blocking , until call() Method end return result , The current thread will continue to execute . All in all ,Future You can get the return value of other thread task methods .

FutureTask class


The implemented interface is RunnableFuture, and RunnableFuture Interface inherited Runnable and Future Two interfaces , therefore FutureTask Class is also execution in the end Callable Type of task . If FutureTask The constructor parameter of the class is Runnable Words , Will be converted into a Callable type .

FutureTask Class implements two interfaces :Runnable and Future. therefore , It can be used as Runnable Executed by thread , Can be used as Future obtain Callable The return value of , The benefits of this design are as follows :

Suppose there is a time-consuming logic , You need to calculate and return this value , meanwhile , This value is not needed immediately , You can use Runnable and Future The combination of , Use another thread to calculate the return value , Before the current thread uses this return value , You can do other things , Wait until you need this return value , Re pass Future obtain .

Future The sample code is as follows :


package io.binghe.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

@Slf4j

public class FutureExample {

static class MyCallable implements Callable<String>{

@Override

public String call() throws Exception {

log.info(&quot;do something in callable&quot;);

Thread.sleep(5000);

return &quot;Done&quot;;

}

}

public static void main(String[] args) throws Exception {

ExecutorService executorService = Executors.newCachedThreadPool();

Future<String> future = executorService.submit(new MyCallable());

log.info(&quot;do something in main&quot;);

Thread.sleep(1000);

String result = future.get();

log.info(&quot;result: {}&quot;, result);

executorService.shutdown();

}

}

FutureTask The sample code is as follows :


package io.binghe.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;

import java.util.concurrent.FutureTask;

@Slf4j

public class FutureTaskExample {

public static void main(String[] args) throws Exception{

FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {

@Override

public String call() throws Exception {

log.info(&quot;do something in callable&quot;);

Thread.sleep(5000);

return &quot;Done&quot;;

}

});

new Thread(futureTask).start();

log.info(&quot;do something in main&quot;);

Thread.sleep(1000);

String result = futureTask.get();

log.info(&quot;result: {}&quot;, result);

}

} Copy

Fork/Join frame


be located J.U.C(java.util.concurrent) in , yes Java7 A framework for performing parallel tasks provided in , It can divide a large task into several small tasks , Finally, summarize the results of each small task to get the final result . Basic ideas and Hadoop Of MapReduce Similar thinking .
The work stealing algorithm is mainly used ( A thread steals tasks from other queues to execute ), A kind of parallel divide and conquer computation Work-stealing Strategy

Why use a job stealing algorithm ?


If we need to do a bigger task , We can divide this task into several independent sub tasks , To reduce the competition between threads , So I put these subtasks in different queues , And create a separate thread for each queue to perform the tasks in the queue , Threads and queues correspond one by one , such as A The thread is responsible for handling A Tasks in a queue . But some threads will finish the tasks in their queues first , There are tasks waiting to be processed in the queue corresponding to other threads . The thread that does the work is not waiting for , Why don't you help other threads , So it goes to the queue of other threads to steal a task to execute . And at this point they access the same queue , So in order to reduce the competition between stealing task thread and being stolen task thread , We usually use a two terminal queue , The thread of the stolen task always takes the task from the head of the two terminal queue to execute , The thread that steals the task will always take the task from the end of the double end queue to execute .

Advantages of job theft algorithm :


Make full use of threads for Parallel Computing , And reduce the competition between threads

Disadvantages of job stealing algorithm :


In some cases there is still competition , For example, when there is only one task in the two terminal queue . And the algorithm will consume more system resources , For example, create multiple threads and multiple dual end queues .

about Fork/Join In terms of framework , When a task is waiting for it to use Join At the end of the subtask created by the operation , The worker thread executing this task looks for other tasks that have not been executed , And begin to perform the tasks that have not been performed , In this way , Threads make the most of their runtime to improve application performance . To achieve this goal ,Fork/Join There are some limitations to the tasks that the framework performs .

Fork/Join Framework limitations :


(1) Tasks can only use Fork and Join Operation for synchronization mechanism , If other synchronization mechanisms are used , During synchronous operation , The worker thread cannot perform other tasks . such as , stay Fork/Join In the frame , Make the task sleep , that , During sleep , The worker thread that is performing this task will not perform other tasks .

(2) stay Fork/Join In the frame , The split task should not be performed IO operation , such as : Read and write data files

(3) The task cannot throw a check exception , The necessary code must be used to get these exceptions out

Fork/Join Core classes of framework


Fork/Join At the core of the framework are two classes :ForkJoinPool and ForkJoinTask.ForkJoinPool Responsible for the implementation of work stealing algorithm 、 Manage worker threads 、 Provide information about the status and execution of the task .ForkJoinTask Mainly provides in the task execution Fork and Join The mechanism of operation .

The sample code is as follows :


package io.binghe.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.Future;

import java.util.concurrent.RecursiveTask;

@Slf4j

public class ForkJoinTaskExample extends RecursiveTask<Integer> {

public static final int threshold = 2;

private int start;

private int end;

public ForkJoinTaskExample(int start, int end) {

this.start = start;

this.end = end;

}

@Override

protected Integer compute() {

int sum = 0;

// If the task is small enough, calculate the task

boolean canCompute = (end - start) <= threshold;

if (canCompute) {

for (int i = start; i <= end; i++) {

sum += i;

}

} else {

//  If the task is greater than the threshold , Split it into two subtasks

int middle = (start + end) / 2;

ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);

ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

//  Perform subtasks

leftTask.fork();

rightTask.fork();

//  Its merge task is finished

int leftResult = leftTask.join();

int rightResult = rightTask.join();

//  Merge subtasks

sum = leftResult + rightResult;

}

return sum;

}

public static void main(String[] args) {

ForkJoinPool forkjoinPool = new ForkJoinPool();

// Generate a calculation task , Calculation 1+2+3+4

ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

// Perform a task

Future<Integer> result = forkjoinPool.submit(task);

try {

log.info(&quot;result:{}&quot;, result.get());

} catch (Exception e) {

log.error(&quot;exception&quot;, e);

}

}

}

BlockingQueue


Blocking queues , It's thread safe .

The blocking conditions are as follows :


(1) When the queue is full , Enter the queue

(2) When the queue is empty , Do queue out operation

The usage scenario is as follows :


Mainly in producer and consumer scenarios

BlockingQueue Methods


BlockingQueue  have  4  Group different methods for inserting 、 Remove and check elements in the queue . If the requested operation cannot be performed immediately , Each method behaves differently . These methods are as follows :

null
&nbsp;
Four different groups of behavioral explanations :

  • Throw an exception

If the attempted operation cannot be performed immediately , Throw an exception .

  • Special values

If the attempted operation cannot be performed immediately , Returns a specific value ( Often  true / false).

  • Blocking

If the attempted operation cannot be performed immediately , This method call will block , Until it can execute .

  • Overtime

If the attempted operation cannot be performed immediately , This method call will block , Until it can execute , But the waiting time will not exceed the given value . Returns a specific value to tell if the operation was successful ( Typically  true / false).

BlockingQueue The implementation classes of are as follows :


  • ArrayBlockingQueue: Bounded blocking queues ( Limited capacity , The capacity size must be specified during initialization , The capacity cannot be changed after it is specified ), The internal implementation is an array , With FIFO How to store data , The most recently inserted object is the tail , The most recently removed object is the head .
  • DelayQueue: What's blocking is the internal elements ,DelayQueue The element in must implement an interface ——Delayed( Exist in J.U.C Next ).Delayed Interface inherited Comparable Interface , This is because Delayed The elements in the interface need to be sorted , In general , This is the Delayed The elements in the interface are sorted according to the priority of expiration time . Application scenarios mainly include : Close the connection regularly 、 Cache object 、 Timeout processing, etc . Internal implementation uses PriorityQueue and ReentrantLock.
  • LinkedBlockingQueue: Size configuration is optional , If the size is specified during initialization , There are boundaries ; If no size is specified during initialization , Is boundless ( In fact, the default size is Integer Maximum of type ). Internal implementation is a linked list , With FIFO How to store data , The most recently inserted object is the tail , The most recently removed object is the head .
  • PriorityBlockingQueue: Blocking queue with priority , Borderless , But there are sorting rules , Allow inserting empty objects ( That is to say null). All inserted objects must implement Comparable Interface , The sorting rule of queue priority is based on Comparable Interface . It can be downloaded from PriorityBlockingQueue Get an iterator in Iterator, But this iterator does not guarantee that iterations are performed in order of priority .
  • SynchronousQueue: Only one element is allowed inside the queue , When a thread inserts an element , Will be blocked , Unless this element is consumed by another thread . therefore , Also known as SynchronousQueue For synchronization queue .SynchronousQueue Is an unbounded non cached queue . Accurately speaking , It does not store elements , Put in the element only after waiting for the element to be taken away , To put in the elements again

Click to follow , The first time to learn about Huawei's new cloud technology ~
原网站

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/161/202206101723483890.html