当前位置:网站首页>User guide for JUC concurrency Toolkit

User guide for JUC concurrency Toolkit

2022-06-12 21:34:00 Besieged city_ city with high walls

JUC A guide to using the concurrency toolkit

preface

As long as this article is right java.util.concurrent Make a brief introduction to the relevant development tools under the package and try to use it in the project , This article will not explain about Java The core problem of concurrency The principle behind it , in other words , If you are interested in those things , Please refer to 《Java Concurrency Guide 》.

One 、 Locks and atomic types

java.util.concurrent.locks.Lock Is a similar to synchronized Block thread synchronization mechanism . however Lock Than synchronized Blocks are more flexible 、 fine .

1、 Reusable locks Lock

ava.util.concurrent.locks The package provides the following pairs Lock Implementation class of interface :ReentrantLock

1.2、Lock、synchronized Compare

One Lock Object and a synchronized The main difference between code blocks is :

  • synchronized The code block does not guarantee the order in which the threads entering the access wait .
  • You cannot pass any parameters to a synchronized Code block entry . therefore , about synchronized It is impossible to set a timeout for the access waiting of a code block .
  • synchronized Blocks must be completely contained in a single method . And one Lock The object can put its lock() and unlock() Method calls are placed in different methods .

1.3 、Lock Methods

Lock The interface has the following main methods :

  • lock()
    lock() take Lock Instance locking . If it's time to Lock Instance is locked , call lock() The thread of the method will block , until Lock Instance unlock .
  • lockInterruptibly()
    lockInterruptibly() Method will be locked by the calling thread , Unless the thread is interrupted . Besides , If a thread is locking through this method Lock Object entry blocking wait , And if it's interrupted , The thread will exit the method call .
  • tryLock()
    tryLock() Method attempts to lock immediately Lock example . If the lock is successful , It will return true, If Lock Instance is locked. The method returns false. This method never blocks .
  • tryLock(long timeout, TimeUnit timeUnit)
    tryLock(long timeout, TimeUnit timeUnit) The work of is similar to tryLock() Method , Except that it's giving up the lock Lock Before waiting for a given timeout .
  • unlock()
    unlock() Method pair Lock Instance unlock . One Lock The implementation will only allow threads that lock the object to call this method . other ( This is not locked Lock Object's thread ) The thread of unlock() Method call will throw an unchecked exception (RuntimeException).

1.4、 Examples of locks

package com.juc.demo.lock;


import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ReentrantLockDemo implements Runnable{
    
	/** *  Reusable locks  */
    private Lock lock = new ReentrantLock();
    
    @Override
    public void run() {
    
        test();
    }

    private void test() {
    
        lock.lock();
        try{
    
            // Synchronization code block 
        }finally {
    
            lock.unlock();
        }
    }
}

1.5、Condition Accurate notification

Condition It defines waiting / There are two types of notification methods , When the current thread calls these methods , You need to get it in advance Condition The lock associated with the object .Condition The object is Lock object ( call Lock Object's newCondition() Method ) created , let me put it another way ,Condition It's dependence Lock Object's .

Condition It's easy to use , Note that the lock is acquired before the method is called , As shown in the following code :

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
	public void conditionWait() throws InterruptedException {
    
    	lock.lock();
        try {
    
            condition.await();
        } finally {
    
            lock.unlock();
        }
    }
    public void conditionSignal() throws InterruptedException {
    
        lock.lock();
        try {
    
            condition.signal();
        } finally {
    
            lock.unlock();
        }
    }

Here is an example of an exact notification , This example wants the first thread to execute first , Then it can accurately notify the second blocked thread to execute , The third blocked thread is notified to execute more precisely . The code is shown in the following example :

package com.test;

import com.utils.ThreadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestLock {
    
    private static Lock lock = new ReentrantLock();

    /** *  Semaphore  */
    private static Integer number = 0;

    private static Condition condition1 = lock.newCondition();
    private static Condition condition2 = lock.newCondition();
    private static Condition condition3 = lock.newCondition();

    public static void main(String[] args) {
    
    	// Use custom thread pool , This thread pool is described in Chapter 3 3 spot 3.2 in 
        ThreadPool instance = ThreadPool.getInstance();
        ExecutorService executorService = instance.getExecutorService();
        executorService.execute(() -> {
    
            lock.lock();
            try {
    
                while (number != 0) {
    
                    condition1.await();
                }
                for (int i = 0; i < 5; i++) {
    
                    System.out.println(Thread.currentThread().getName() + " Print 1");
                }
                number = 1;
                condition2.signal();
            } catch (Exception e) {
    
                e.printStackTrace();
            } finally {
    
                lock.unlock();
            }
        });
        executorService.execute(() -> {
    
            lock.lock();
            try {
    
                while (number != 1) {
    
                    condition2.await();
                }
                for (int i = 0; i < 10; i++) {
    
                    System.out.println(Thread.currentThread().getName() + " Print 2");
                }
                number = 2;
                condition2.signal();
            } catch (Exception e) {
    
                e.printStackTrace();
            } finally {
    
                lock.unlock();
            }
        });
        executorService.execute(() -> {
    
            lock.lock();
            try {
    
                while (number != 2) {
    
                    condition3.await();
                }
                for (int i = 0; i < 15; i++) {
    
                    System.out.println(Thread.currentThread().getName() + " Print 3");
                }
                number = 0;
                condition1.signal();
            } catch (Exception e) {
    
                e.printStackTrace();
            } finally {
    
                lock.unlock();
            }
        });
    }
}

2、 Read-write lock ReadWriteLock

java.util.concurrent.locks.ReadWriteLock Read write lock is an advanced thread locking mechanism . It allows multiple threads to read a particular resource at the same time , But only one thread can write to it at the same time . The idea of read-write lock is that multiple threads can read a shared resource , And it doesn't lead to concurrent problems . The concurrency problem occurs when the read and write operations of a shared resource are performed simultaneously , Or multiple write operations occur simultaneously .

2.1、ReadWriteLock The lock rules

A thread does the following before reading or writing to the protected resource ReadWriteLock The rules for locking are as follows :

  • Read the lock : If there is no write thread lock ReadWriteLock, And no write thread requires a write lock ( But you haven't got the lock yet ). therefore , Multiple read threads can lock the lock .
  • Write lock : If there is no read or write operation . therefore , When writing operations , Only one thread can lock the lock .

In short :

  • Reading can coexist with reading
  • Reading and writing cannot coexist
  • Writing cannot coexist

2.2、ReadWriteLock Realization

ReadWriteLock It's an interface , If you want to use it, you have to use one of its implementation classes .java.util.concurrent.locks The package provided. ReadWriteLock The following implementation classes of the interface :ReentrantReadWriteLock

2.3、ReadWriteLock Code example

package com.juc.demo.lock;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/** *  There is no problem with multiple threads reading a resource class at the same time , So in order to meet the concurrency , Reading shared resources should be possible at the same time . *  But if there is a thread like fetching and writing shared resources , It should not be free for other threads to read or write to resources  */
public class ReadWriteLockDemo {
    
    public static void main(String[] args) {
    
        MyCache myCache = new MyCache();

        for (int i = 0; i < 5; i++) {
    
            final int tmp = i;
            new Thread(()->{
    
                myCache.put(tmp+"",tmp+"");
            },"Thread"+i).start();
        }
        for (int i = 0; i < 5; i++) {
    
            final int tmp = i;
            new Thread(()->{
    
                myCache.get(tmp+"");
            },"Thread"+i).start();
        }

    }
}


class MyCache {
    
    private volatile Map<String,Object> map = new HashMap<>();
    private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    public void put(String key,Object value){
    
        readWriteLock.writeLock().lock();
        try{
    
            System.out.println(Thread.currentThread().getName() + "\t Are written to the :" + key);
            TimeUnit.MILLISECONDS.sleep(300);
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "\t Write completion ");
        }catch (Exception e){
    
            e.printStackTrace();
        }finally {
    
            readWriteLock.writeLock().unlock();
        }
    }

    public void get(String key){
    
        readWriteLock.readLock().lock();
        try{
    
            System.out.println(Thread.currentThread().getName() + "\t Reading :" + key);
            TimeUnit.MILLISECONDS.sleep(3000);
            Object o = map.get(key);
            System.out.println(Thread.currentThread().getName() + "\t Read complete " + o);
        }catch (Exception e){
    
            e.printStackTrace();
        }finally {
    
            readWriteLock.readLock().unlock();
        }
    }

    public void clear(){
    map.clear();}

}

Pay attention to how to use it ReadWriteLock Hold of two lock instances . One to protect read access , A write access protection .

3、 Atomicity type

3.1、AtomicBoolean

AtomicBoolean Class provides us with a Boolean value that can be read and written atomically , It also has some advanced atomic operations , such as compareAndSet().AtomicBoolean Class is located java.util.concurrent.atomic package , The full class name is java.util.concurrent.atomic.AtomicBoolean. The AtomicBoolean yes Java 8 In the version , Not the first time it was introduced Java 5 edition .

(1) Create a AtomicBoolean

AtomicBoolean atomicBoolean = new AtomicBoolean(); 

The above example creates a new default value of false Of AtomicBoolean.

If you want to AtomicBoolean Instance to set an explicit initial value , Then you can pass the initial value to AtomicBoolean Construction method of :

AtomicBoolean atomicBoolean = new AtomicBoolean(true)

(2) obtain AtomicBoolean Value

By using get() Method to get a AtomicBoolean Value . Examples are as follows :

AtomicBoolean atomicBoolean = new AtomicBoolean(true);  
boolean value = atomicBoolean.get();

After the above code is executed value The value of the variable will be true.

(3) Set up AtomicBoolean Value

By using set() Method to set a AtomicBoolean Value . Examples are as follows :

AtomicBoolean atomicBoolean = new AtomicBoolean(true);  
atomicBoolean.set(false);

After the above code is executed AtomicBoolean The value of is false.

(4) In exchange for AtomicBoolean Value

Can pass getAndSet() Method in exchange for one AtomicBoolean The value of the instance .getAndSet() Method will return AtomicBoolean The current value , And will be for AtomicBoolean Set a new value . Examples are as follows :

AtomicBoolean atomicBoolean = new AtomicBoolean(true);  
boolean oldValue = atomicBoolean.getAndSet(false); 

After the above code is executed oldValue The value of the variable is true,atomicBoolean Instance will hold false value . Code success will AtomicBoolean Current value ture In exchange for false.

(5) Compare and set AtomicBoolean Value

compareAndSet() Method allows for AtomicBoolean The current value of is compared to an expected value , If the current value is equal to the expected value , Will be right AtomicBoolean Set a new value .compareAndSet() The method is atomic , So a single thread executes it at the same time . therefore compareAndSet() Method can be used for some simple implementations of lock like synchronization .
Here is one compareAndSet() Example :

AtomicBoolean atomicBoolean = new AtomicBoolean(true);  
boolean expectedValue = true;  
boolean newValue      = false;  
boolean wasNewValueSet = atomicBoolean.compareAndSet(expectedValue, newValue);

This example is useful for AtomicBoolean The current value of and true Value comparison , If equal , take AtomicBoolean The value of is updated to false.

3.2、 Atomic integer AtomicInteger

AtomicInteger Class provides us with an atomic read and write operation int Variable , It also includes a series of advanced atomic operations , such as compareAndSet().AtomicInteger Class is located java.util.concurrent.atomic package , So the complete class is called java.util.concurrent.atomic.AtomicInteger. The AtomicInteger yes Java 8 In the version , Not the first time it was introduced Java 5 edition .

(1) Create a AtomicInteger

Create a AtomicInteger Examples are as follows :

AtomicInteger atomicInteger = new AtomicInteger(); 

This example creates an initial value of 0 Of AtomicInteger.
If you want to create a with a given initial value AtomicInteger, It can be like this :

AtomicInteger atomicInteger = new AtomicInteger(123); 

This example will 123 As a parameter to AtomicInteger Constructors of , It will set the AtomicInteger The initial value of the instance is 123.

(2) obtain AtomicInteger Value

have access to get() Method to get AtomicInteger The value of the instance . Examples are as follows :

AtomicInteger atomicInteger = new AtomicInteger(123);  
int theValue = atomicInteger.get(); 

(3) Set up AtomicInteger Value

Can pass set() Method pair AtomicInteger Reset the value of . Here are AtomicInteger.set() Example :

AtomicInteger atomicInteger = new AtomicInteger(123);  
atomicInteger.set(234);  

The above example creates an initial value of 123 Of AtomicInteger, On the second line, the value is updated to 234.

(4) Compare and set AtomicInteger Value

AtomicInteger Class also passes through an atomic compareAndSet() Method . This approach will AtomicInteger Compare the current value of the instance with the expected value , If the two are equal , by AtomicInteger Instance sets a new value .

AtomicInteger.compareAndSet()  Code example :
AtomicInteger atomicInteger = new AtomicInteger(123);  

int expectedValue = 123;  
int newValue      = 234;  
atomicInteger.compareAndSet(expectedValue, newValue);

This example first creates a new initial value of 123 Of AtomicInteger example . And then AtomicInteger And expectations 123 Compare , If equal , take AtomicInteger The value of is updated to 234.

(5) increase AtomicInteger value

AtomicInteger Class contains methods , They can increase AtomicInteger Value , And get its value . These methods are as follows :

  • addAndGet()

addAndGet() Methods AtomicInteger Added a value , Then return the added value . Here are examples of these two approaches :

AtomicInteger atomicInteger = new AtomicInteger();   
System.out.println(atomicInteger.addAndGet(10)); 

This example will print out 10. The second line in the example is AtomicInteger The value of the add 10, And return the value after the addition operation . The value is now zero 10.

  • getAndAdd()

getAndAdd() Method is AtomicInteger Added a value , But the return is to add the previous AtomicInteger Value . Which one to use depends on the application scenario . Here is an example of this approach :

AtomicInteger atomicInteger = new AtomicInteger();  
System.out.println(atomicInteger.getAndAdd(10));  

Of course, you can also use these two methods to AtomicInteger Add negative value . The result is actually a subtraction operation .

  • getAndIncrement()
  • incrementAndGet()
    getAndIncrement() and incrementAndGet() The method is similar to getAndAdd() and addAndGet(), But it's just AtomicInteger The value of the add 1.

(6) Reduce AtomicInteger Value

AtomicInteger Classes also provide some reduction AtomicInteger The atomic method of the value of . These methods are :

  • decrementAndGet()
  • getAndDecrement()

decrementAndGet() take AtomicInteger Value of minus one , And returns the minus one value .getAndDecrement() Will also be AtomicInteger Value of minus one , But it returns the value before minus one .

3.3、 Atomic length AtomicLong

AtomicLong Class provides us with an atomic read and write operation long Variable , It also includes a series of advanced atomic operations , such as compareAndSet()AtomicLong Class is located java.util.concurrent.atomic package , So the complete class is called java.util.concurrent.atomic.AtomicLong. The AtomicLong yes Java 8 In the version , Not the first time it was introduced Java 5 edition . Due to the use of this type and API And AtomicInteger Basically similar , No more details here .

3.4、 Atomic reference type AtomicReference

AtomicReference Provides an object reference variable that can be read and written atomically . Atomicity means that more than one wants to change the same AtomicReference Does not cause AtomicReference In a state of inconsistency .AtomicReference One more compareAndSet() Method , It allows you to reference the current value to an expected value ( quote ) Compare , If equal , In the AtomicReference Set a new reference inside the object .

(1) Create a AtomicReference

establish AtomicReference as follows :

AtomicReference atomicReference = new AtomicReference(); 

If you need to use a specified reference to create AtomicReference, Sure :

String initialReference = "the initially referenced string";  
AtomicReference atomicReference = new AtomicReference(initialReference);

(2) Creating generics AtomicReference

have access to Java Generics to create a generic AtomicReference. Example :

AtomicReference<String> atomicStringReference =  new AtomicReference<String>();

It can also be generic AtomicReference Set an initial value . Example :

String initialReference = "the initially referenced string";  
AtomicReference<String> atomicStringReference =  new AtomicReference<String>(initialReference);

(3) obtain AtomicReference quote

Can pass AtomicReference Of get() Method to obtain the data stored in AtomicReference Quote from . If AtomicReference It is non generic ,get() Method will return a Object Type references . If it's generic ,get() Will return to create AtomicReference The type declared when .
Let's start with a non generic AtomicReference get() Example :

AtomicReference atomicReference = new AtomicReference("first value referenced");  
String reference = (String) atomicReference.get();

Pay attention to how get() The reference returned by the method is cast to String.
Generic AtomicReference Example :

AtomicReference<String> atomicReference =   new AtomicReference<String>("first value referenced");
String reference = atomicReference.get();

The compiler knows the type of reference , So we don't have to deal with get() The returned reference is cast .

(4) Set up AtomicReference quote

have access to get() Method pair AtomicReference Set the reference saved inside . If you define a non generic type AtomicReference,set() There will be a Object Reference as parameter . If it's generic AtomicReference,set() Method will only accept the type defined to .
AtomicReference set() Example :

AtomicReference atomicReference = new AtomicReference();  
atomicReference.set("New object referenced");

It looks like there's no difference between non generics and generics . The real difference is that the compiler will be able to set to a generic AtomicReference Parameter types are limited .

(5) Compare and set AtomicReference quote

AtomicReference Class has a very useful method :compareAndSet().compareAndSet() You can save the AtomicReference The reference in is compared to an expected reference , If the two references are the same ( Is not equals() The equality of , It is == The same as ), Will give AtomicReference Instance sets a new reference .
If compareAndSet() by AtomicReference A new reference is set ,compareAndSet() Will return true. otherwise compareAndSet() return false.
AtomicReference compareAndSet() Example :

String initialReference = "initial value referenced";  
AtomicReference<String> atomicStringReference =  new AtomicReference<String>(initialReference);  
String newReference = "new value referenced";  
boolean exchanged = atomicStringReference.compareAndSet(initialReference, newReference);  
System.out.println("exchanged: " + exchanged);  
exchanged = atomicStringReference.compareAndSet(initialReference, newReference);  
System.out.println("exchanged: " + exchanged);  1234567891011

This example creates a generic... With an initial reference AtomicReference. The next two calls comparesAndSet() To compare the stored value with the expected value , If they are the same , by AtomicReference Set a new reference . The first comparison , Stored references (initialReference) And expected references (initialReference) Agreement , So a new quote (newReference) Set to AtomicReference,compareAndSet() Method returns true. On the second comparison , Stored references (newReference) And expected references (initialReference) atypism , So the new reference is not set to AtomicReference,compareAndSet() Method returns false.

Two 、 Blocking queues BlockingQueue

java.util.concurrent In the bag BlockingQueue Interface represents the queue for a thread to put and extract instances . In this section, I will demonstrate how to use this BlockingQueue.

1、BlockingQueue Basic introduction

BlockingQueue Usually used for a thread production object , And another thread consumes these objects . Here is an illustration of the principle :

 Insert picture description here
A thread puts , Another thread took one of them BlockingQueue.

A thread will continue to produce new objects and insert them into the queue , Until the queue reaches the critical point it can hold . If the blocking queue reaches its critical point , The thread in charge of production will block when inserting new objects into it . It's going to be stuck , Until the thread responsible for consumption takes an object from the queue .

The thread responsible for consumption will always take objects out of the blocking queue . If the consuming thread tries to extract objects from an empty queue , This consuming thread will be blocked , Until a production thread drops an object into the queue .

2、BlockingQueue Methods

BlockingQueue have 4 Group different methods for inserting 、 Remove and check elements in the queue . If the requested operation cannot be performed immediately , Each method behaves differently . These methods are as follows :

operation Throw exceptions Specific value Blocking Overtime
Insert add(o)offer(o)put(o)offer(o, timeout, timeunit)
remove remove(o)poll(o)take(o)poll(timeout, timeunit)
Check element(o)peek(o) Unavailable Unavailable

Four different groups of behavioral explanations :

  • Throw exceptions : If the attempted operation cannot be performed immediately , Throw an exception .
  • Specific value : If the attempted operation cannot be performed immediately , Returns a specific value ( Often true / false).
  • Blocking : If the attempted operation cannot be performed immediately , This method call will block , Until it can execute .
  • Overtime : If the attempted operation cannot be performed immediately , This method call will block , Until it can execute , But the waiting time will not exceed the given value . Returns a specific value to tell if the operation was successful ( Typically true / false).

Can't give a BlockingQueue Insert null. If you try to insert null,BlockingQueue Will throw a NullPointerException.
Access to BlockingQueue All elements in , Not just the elements of the beginning and the end . for instance , Put an object in a queue for processing , But the app wants to cancel it . Then you can call remove(o) Method to remove a specific object from the queue . But it's not efficient ( translator's note : Queue based data structure , It's not very efficient to get objects other than the start or end position ), So try not to use this kind of method , Unless you really have to .

3、BlockingQueue The implementation of the

BlockingQueue It's an interface , You need to use one of its implementations BlockingQueue.java.util.concurrent Have the following BlockingQueue Interface implementation (Java 6):

  • ArrayBlockingQueue
  • DelayQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue

4、 Array blocking queue ArrayBlockingQueue

ArrayBlockingQueue Class implements the BlockingQueue Interface .

ArrayBlockingQueue It's a bounded blocking queue , Its internal implementation is to put objects into an array . Boundedness means , It can't store an infinite number of elements . It has an upper limit on the number of elements that can be stored at one time . This upper limit can be set during initialization , But then you can't change that ceiling ( translator's note : Because it's based on arrays , It has the property of array : Once initialized , The size cannot be changed ).

ArrayBlockingQueue Inside with FIFO( fifo ) The order in which the elements are stored . The head element in the queue is the one with the longest put time among all elements , And the tail element is the shortest one .

Here is the use of ArrayBlockingQueue This is an example of how to initialize it :

BlockingQueue queue = new ArrayBlockingQueue(1024);  
queue.put("1");  
Object object = queue.take();

Here is the use of Java One of generics BlockingQueue Example . Pay attention to how to deal with String Elements put in and extracted from :

BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);  
queue.put("1");  
String string = queue.take(); 

5、 Delay queue DelayQueue

DelayQueue Realized BlockingQueue Interface .

DelayQueue Hold the element until a specific delay expires . The elements injected into it must implement java.util.concurrent.Delayed Interface , This interface definition :

public interface Delayed extends Comparable<Delayed> {
      
	public long getDelay(TimeUnit timeUnit);  
}  

DelayQueue Will be in the... Of each element getDelay() The element is released after the time period of the value returned by the method . If you return 0 Or negative , The delay will be considered overdue , The element will be in the DelayQueue Next time take Released when called .

Pass to getDelay Methodical getDelay An instance is an enumeration type , It indicates the period of time that will be delayed .TimeUnit Enumeration will take the following values :

  • DAYS
  • HOURS
  • MINUTES
  • SECONDS
  • MILLISECONDS
  • MICROSECONDS
  • NANOSECONDS

As you can see ,Delayed The interface also inherits java.lang.Comparable Interface , That means Delayed Objects can be compared . This may be right DelayQueue It is useful to sort the elements in a queue , So they can be released in an orderly manner according to the expiration time .

Here are the USES DelayQueue Example :

public class DelayQueueExample {
      

    public static void main(String[] args) {
      
        DelayQueue queue = new DelayQueue();  
        Delayed element1 = new DelayedElement();  
        queue.put(element1);  
        Delayed element2 = queue.take();  
    }  
} 

DelayedElement It's the one I created DelayedElement Implementation class of interface , It's not in Java.util.concurrent In the bag . You need to create your own Delayed Interface to use DelayQueue class .

6、 Chain blocking queue LinkedBlockingQueue

LinkedBlockingQueue Class implements the BlockingQueue Interface .

LinkedBlockingQueue It's a chain structure inside ( Link nodes ) Store its elements . If necessary , This chain structure can choose a upper limit . If there is no upper limit defined , Will use Integer.MAX_VALUE As a ceiling .

LinkedBlockingQueue Inside with FIFO( fifo ) The order in which the elements are stored . The head element in the queue is the one with the longest put time among all elements , And the tail element is the shortest one .

Here are LinkedBlockingQueue Initialization and use of sample code :

BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();  
BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);  
bounded.put("Value");  
String value = bounded.take(); 

7、 Blocking queues with priority PriorityBlockingQueue

PriorityBlockingQueue Class implements the BlockingQueue Interface .

PriorityBlockingQueue It's an unbounded concurrent queue . It uses and classes java.util.PriorityQueue Same sort rule . Cannot insert... Into this queue null value .
All inserted into PriorityBlockingQueue Must be implemented java.lang.Comparable Interface . Therefore, the order of the elements in the queue depends on their own Comparable Realization .

Be careful PriorityBlockingQueue For having equal priority (compare() == 0) The element of does not force any particular behavior .

At the same time pay attention to , If from a PriorityBlockingQueue To obtain a Iterator Words , The Iterator There is no guarantee that it traverses the elements in order of priority .

Here are the USES PriorityBlockingQueue An example of :

BlockingQueue queue   = new PriorityBlockingQueue();  
//String implements java.lang.Comparable 
queue.put("Value");  
String value = queue.take(); 

8、 Synchronous queue SynchronousQueue

SynchronousQueue Class implements the BlockingQueue Interface .

SynchronousQueue It's a special queue , It can only hold a single element inside at the same time . If the queue already has an element , The thread trying to insert a new element into the queue will block , Until another thread pulls the element out of the queue . Again , If the queue is empty , The thread trying to extract an element from the queue will block , Until another thread inserts a new element into the queue .

Accordingly , Calling this class a queue is obviously hyperbolic . It's more like a meeting point .

9、 Blocking double ended queues BlockingDeque

java.util.concurrent In the bag BlockingDeque The interface represents a two terminal queue where a thread is placed and extracted . In this section, I will demonstrate how to use BlockingDeque.

BlockingDeque Class is a two terminal queue , When the element cannot be inserted , It will block the thread trying to insert the element ; When elements cannot be extracted , It will block the thread that is trying to extract .

deque( deque ) yes “Double Ended Queue” Abbreviation . therefore , A double ended queue is a queue in which elements can be inserted or extracted from either end .

9.1、BlockingDeque Use

When a thread is both a producer and a consumer of a queue, it can be used BlockingDeque. If the producer thread needs to insert data at both ends of the queue , Consumer threads need to be able to remove data at both ends of the queue , You can also use BlockingDeque.BlockingDeque The illustration :

 Insert picture description here
One BlockingDeque - Threads can insert and extract elements at both ends of the dual ended queue .

A thread produces elements , And insert them at either end of the queue . If the double ended queue is full , The insert thread will be blocked , Until a remove thread removes an element from the queue . If the double ended queue is empty , Remove thread will be blocked , Until an insertion thread inserts a new element into the queue .

9.2 BlockingDeque Methods

BlockingDeque have 4 Group different methods for inserting 、 Remove and check the elements in the double ended queue . If the requested operation cannot be performed immediately , Each method behaves differently . These methods are as follows :

operation Throw exceptions Specific value Blocking Overtime
Insert addFirst(o)offerFirst(o)putFirst(o)offerFirst(o, timeout, timeunit)
remove removeFirst(o)pollFirst(o)takeFirst(o)pollFirst(timeout, timeunit)
Check getFirst(o)peekFirst(o) Unavailable Unavailable

Method First The end refers to the operation from the end of the team , Corresponding , speak First Replace with Last, Is to operate at the end of the team .

Four different groups of behavioral explanations :

  • Throw exceptions : If the attempted operation cannot be performed immediately , Throw an exception .
  • Specific value : If the attempted operation cannot be performed immediately , Returns a specific value ( Often true / false).
  • Blocking : If the attempted operation cannot be performed immediately , This method call will block , Until it can execute .
  • Overtime : If the attempted operation cannot be performed immediately , This method call will block , Until it can execute , But the waiting time will not exceed the given value . Returns a specific value to tell if the operation was successful ( Typically true / false).

9.3、BlockingDeque Inherited from BlockingQueue

BlockingDeque Interface inherited from BlockingQueue Interface . This means that you can use a BlockingQueue Use like that BlockingDeque. If you do that , Various insertion methods will add new elements to the end of the double ended queue , The removal method will remove the elements at the first end of the double ended queue . just as BlockingQueue Interface insertion and removal methods are the same .

Here are BlockingDeque Yes BlockingQueue The specific internal implementation of the method of the interface :

BlockingQueueBlockingDeque
add()addLast()
offer()offerLast()
put()putLast()
offer(e, time, unit)offerLast(e, time, unit)
remove()removeFirst()
poll()pollFirst()
take()takeFirst()
poll(time, unit)pollLast(time, unit)
element()getFirst()
peek()peekFirst()

9.4、BlockingDeque The implementation of the

since BlockingDeque It's an interface , If you want to use it, you have to use one of its many implementation classes .java.util.concurrent The package provides the following BlockingDeque Implementation class of interface :LinkedBlockingDeque

9.5、BlockingDeque Code example

Here's how to use BlockingDeque A short code example of the method :

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();  

deque.addFirst("1");  
deque.addLast("2");  

String two = deque.takeLast();  
String one = deque.takeFirst();  

10、 Chain blocking double ended queue LinkedBlockingDeque

LinkedBlockingDeque Class implements the BlockingDeque Interface .

deque( deque ) yes “Double Ended Queue” Abbreviation . therefore , A double ended queue is a queue in which elements can be inserted or extracted from either end .

LinkedBlockingDeque It's a double ended queue , When it's empty , A thread trying to extract data from it will block , No matter which end the thread is trying to extract data from .

Here are LinkedBlockingDeque Examples of instantiation and use :

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();  

deque.addFirst("1");  
deque.addLast("2");  

String two = deque.takeLast();  
String one = deque.takeFirst(); 

3、 ... and 、 Threads and thread pools

java.util.concurrent.ExecutorService The interface is an asynchronous execution mechanism , Enables us to perform tasks in the background . So a ExecutorService It's very similar to a thread pool . actually , Exist in java.util.concurrent In the bag ExecutorService Implementation is a thread pool implementation .

1、 Create a ExecutorService

ExecutorService The creation of depends on the specific implementation used . But it can also be used Executors Factory class to create ExecutorService example . Here are a few ExecutorService Examples of examples :

// Thread pool with only one thread 
ExecutorService executorService1 = Executors.newSingleThreadExecutor();
// A thread pool with a fixed number of threads  
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
// Support regular and periodic execution of tasks 
ExecutorService executorService3 = Executors.newScheduledThreadPool(10); 
// Cache thread pool , As the usage increases, the thread pool will also increase 
Executors.newCachedThreadPool();

2、ExecutorService Use

There are several different ways to delegate tasks to ExecutorService To carry out :

2.1、execute(Runnable)

execute(Runnable) Method requires a java.lang.Runnable object , And then execute it asynchronously . Here are the USES ExecutorService Execute one Runnable An example of :

ExecutorService executorService = Executors.newSingleThreadExecutor();  
executorService.execute(new Runnable() {
      
    public void run() {
      
        System.out.println("Asynchronous task");  
    }  
});  
executorService.shutdown();

There is no way to know what has been carried out Runnable The results of the implementation of . Use one if necessary Callable( The following is an introduction ).

2.2、submit(Runnable)

submit(Runnable) Method also requires a Runnable Implementation class , But it returns a Future object . This Future Object can be used to check Runnable Has it been completed .

Here are ExecutorService submit() Example :

Future future = executorService.submit(new Runnable() {
      
    public void run() {
      
        System.out.println("Asynchronous task");  
    }  
});  

future.get();  //returns null if the task has finished correctly. 

2.3、submit(Callable)

submit(Callable) The method is similar to submit(Runnable) Method , In addition to the type of parameters it requires .Callable Instance except its call() A method and a result can be returned Runnable Very much alike .Runnable.run() Cannot return a result .
Callable The results of the submit(Callable) Method Future Object . Here is one ExecutorService Callable Example :

Future future = executorService.submit(new Callable(){
      
    public Object call() throws Exception {
      
        System.out.println("Asynchronous Callable");  
        return "Callable Result";  
    }  
});  

System.out.println("future.get() = " + future.get());

Above code output :

Asynchronous Callable
future.get() = Callable Result

2.4、invokeAny(…)

invokeAny() Methods require a series of Callable Or the instance object of its sub interface . Calling this method does not return a Future, But it returns to one of them Callable The result of the object . There is no guarantee of which Callable The result of can only indicate that one of them has been executed . If one of the tasks ends ( Or throw an exception ), other Callable Will be cancelled . Here is the sample code :

ExecutorService executorService = Executors.newSingleThreadExecutor();  

Set<Callable<String>> callables = new HashSet<Callable<String>>();  

callables.add(new Callable<String>() {
      
    public String call() throws Exception {
      
        return "Task 1";  
    }  
});  
callables.add(new Callable<String>() {
      
    public String call() throws Exception {
      
        return "Task 2";  
    }  
});  
callables.add(new Callable<String>() {
      
    public String call() throws Exception {
      
        return "Task 3";  
    }  
});  

String result = executorService.invokeAny(callables);  

System.out.println("result = " + result);  

executorService.shutdown();

The above code will print out the given Callable The result of the execution of one of the sets . I tried to execute it myself a few times , The results are always changing . Sometimes “Task 1”, Sometimes “Task 2” wait .

2.5、invokeAll(…)

invokeAll() Method passes the call collection to ExecutorService All of the Callable object .invokeAll() Return to a series of Future object , Through them, each Callable The results of the implementation of . A task may end with an exception , So it may not “ success ”. Can't pass a Future The object tells us which of the two endings . Here is a code example :

ExecutorService executorService = Executors.newSingleThreadExecutor();  

Set<Callable<String>> callables = new HashSet<Callable<String>>();  

callables.add(new Callable<String>() {
      
    public String call() throws Exception {
      
        return "Task 1";  
    }  
});  
callables.add(new Callable<String>() {
      
    public String call() throws Exception {
      
        return "Task 2";  
    }  
});  
callables.add(new Callable<String>() {
      
    public String call() throws Exception {
      
        return "Task 3";  
    }  
});  

List<Future<String>> futures = executorService.invokeAll(callables);  

for(Future<String> future : futures){
      
    System.out.println("future.get = " + future.get());  
}  
//ExecutorService  close 
executorService.shutdown(); 

2.6 ExecutorService close

Use up ExecutorService It should then be turned off , To make the threads in it no longer run .
such as , If the application is through a main() Method started , after main Method exited the application , If the application has an active ExexutorService It will also keep running .ExecutorService The active thread in is blocking JVM The closing of the .
To terminate ExecutorService The thread in the needs to call ExecutorService Of shutdown() Method .ExecutorService It doesn't shut down immediately , But it will no longer accept new tasks , And once all threads have completed the current task ,ExecutorService Will close . stay shutdown() All submitted to before being called ExecutorService All the tasks are carried out .
If you want to close it immediately ExecutorService, You can call shutdownNow() Method . This will immediately try to stop all tasks in progress , And ignore tasks that have been submitted but not yet started to process . There is no guarantee that the task will be carried out correctly . Maybe they were stopped , It may have been over .

3、 Custom thread pool

3.1、 Why customize the thread pool

Before we start talking about custom threads , First of all , since JDK Has provided us with a powerful Executors Tool classes to create four types of threads , Why do you need custom threads ?

Here we can refer to Alibaba java Development Manual , In the first chapter, the programming specification, section 4 Point in , The Alibaba statute gives a very clear explanation , Here's the statute :

  1. 【 mandatory 】 Thread pools are not allowed Executors To create , But through ThreadPoolExecutor The way , This way of processing makes the students who write more clear about the running rules of the thread pool , Avoid the risk of resource depletion .
    explain :Executors The disadvantages of the returned thread pool object are as follows :
    1) FixedThreadPool and SingleThreadPool: The allowed request queue length is Integer.MAX_VALUE, A large number of requests may pile up , Which leads to OOM.
    2) CachedThreadPool: The number of threads allowed to be created is Integer.MAX_VALUE, A large number of threads may be created , Which leads to OOM.

The following are the construction methods of these four types of threads , The above explanation is noted in the notes , The meaning of the seven parameters of thread pool , No more details here , If you don't understand , Can open ThreadPoolExecutor Constructor source code

public static ExecutorService newFixedThreadPool(int nThreads) {
    
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());// The blocking queue can be expanded infinitely 
}


public static ExecutorService newSingleThreadExecutor() {
    
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));// The blocking queue can be expanded infinitely 
}


public static ExecutorService newCachedThreadPool() {
    
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,// The number of thread pools can be unlimited 
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}



public ScheduledThreadPoolExecutor(int corePoolSize) {
    
        super(corePoolSize, Integer.MAX_VALUE,// The number of thread pools can be unlimited 
         	   0, NANOSECONDS,
              new DelayedWorkQueue());
}

3.2、 Custom thread pool policy

In the process of customizing the thread pool , Except that it may make memory OOM Beyond the exception , We should pay more attention to the stability of our system , The system can efficiently support the load of threads , Use a good thread pool , There are a few rules to follow :

  • Setting the number and size of threads

The first is on this issue , We have to figure out whether our requirements are computationally intensive or not IO intensive , Only to understand this point , We can better limit the number of thread pools .

  1. Computationally intensive :
    As the name implies, the application needs a lot CPU Computing resources , In multicore CPU Time , We're going to make each one CPU The core is involved in the calculation , take CPU Take full advantage of the performance , This is not a waste of server configuration , What a waste it would be to have a single threaded program running on a very good server configuration . For computationally intensive applications , Entirely on CPU To work , So in order for it to fully play its advantages , Avoid excessive thread context switching , The ideal solution is : Number of threads = CPU Check the number +1, You can also set it to CPU Check the number *2, But also look at JDK Version and CPU To configure ( Server's CPU Have hyper-threading ). General Settings CPU * 2 that will do .
  2. IO intensive :
    Most of the development we're doing right now is WEB application , A lot of network traffic is involved , More Than This , With the database , The interaction with the cache is also involved IO, Once occurred IO, The thread will be in a waiting state , When IO end , When the data is ready , The thread will continue execution . So you can see it here , about IO Intensive application , We can set the number of threads in the thread pool a few more , This will keep you waiting IO During this period of time , Threads can do other things , Improve the efficiency of concurrent processing . So the amount of data in this thread pool can be set arbitrarily ? Of course not , Please do remember , Thread context switching comes at a cost . At present, a set of formulas is summarized , about IO Intensive application : Number of threads = CPU The core number /(1- Block coefficient ) The blocking coefficient is usually zero 0.8~0.9 Between , Can also take 0.8 perhaps 0.9. To paraphrase formula , The dual - CPU Come on , The ideal number of threads 20, Of course, none of this is absolute , It needs to be adjusted according to the actual situation and the actual business :final int poolSize = (int)(cpuCore/(1-0.9))

For the blocking factor ,《Programming Concurrency on the JVM Mastering》 namely 《Java Concurrent programming of virtual machines 》 There is a sentence in" : For blocking factor , We can try to guess first , Or use some performance analysis tools or java.lang.managementAPI To determine that threads are spent on the system /IO Operation time and CPU The ratio of time spent on intensive tasks .

  • Thread pool related parameter configuration
    Be sure not to select configuration items that have no upper limit .

3.2、 Custom thread pool

package com.utils;
/** * Created by xsl on 2020/7/31 10:57 */
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
/** * @ClassName ThreadPool * @Description:  Thread pool   Singleton and agent mode  * @Author xsl * @Date 2020/7/31 10:57 **/
public class ThreadPool {
    
    /** *  Number of initialization core threads  */
    private static final int processors = Runtime.getRuntime().availableProcessors();

    /** *  Maximum thread pool capacity  */
    private static int maxPoolSize = processors * 2;

    /** *  Number of threads for capacity expansion and idle life  */
    private static int keepAliveTime = 2;

    /** *  Waiting queue length  */
    private static int waitQueueSize = 1000;

    /** *  Refusal strategy  */
    private static RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();

    /** *  Thread pool instance , Slacker type  */
    private static ThreadPool instance = null;

    /** *  Reusable locks  */
    private static ReentrantLock lock = new ReentrantLock();

    /** *  Thread pool  */
    private static ExecutorService fixedThreadPool = null;

    private ThreadPool() {
    
        // Private constructor 
    }

    public ExecutorService getExecutorService() {
    
        return fixedThreadPool;
    }
    public static ThreadPool getInstance() {
    
        // Double judgment lock 
        if (instance == null) {
    
            lock.lock();
            if (instance == null) {
    
                instance = new ThreadPool();
                // Use ThreadPoolExecutor Do not use Executors
                fixedThreadPool = new ThreadPoolExecutor(
                        processors,
                        maxPoolSize,
                        keepAliveTime,
                        TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(waitQueueSize),
                        Executors.defaultThreadFactory(),
                        handler
                );
            }
            lock.unlock();
        }
        return instance;
    }
}

4、 There are return value threads

Java Callable Interface java.util.concurrent.Callable Represents an asynchronous task that can be executed by a separate thread . for example , Can be Callable Object submitted to Java ExecutorService, then Java ExecutorService It will be executed asynchronously .

4.1、Callable Interface definition

Java Callable The interface is very simple . It contains a name called call() Methods . Here are Callable Interface :

public interface Callable<V> {
    
    V call() throws Exception;
}

Copy code calls call() Method to perform asynchronous tasks .call() Method can return the result . If the task is executed asynchronously , The result is usually through Java Future Propagate to the creator of the task . When one Callable Be submitted to ExecutorService When executing concurrently, you can use Future Object to receive the returned result .
If the task fails during execution , call() Methods can also throw Exception.

4.2、 Interface implementation

This is an implementation Java Callable A simple example of an interface :

public class MyCallable implements Callable<String> {
    
    @Override
    public String call() throws Exception {
    
        return String.valueOf(System.currentTimeMillis());
    }
}

This implementation is very simple . The result is call() Method will return a of the current time String . In a real application , A task may be a more complex or larger set of operations .
Usually ,IO operation ( Such as reading or writing to disk or network ) It is a task that can be performed at the same time .IO Operations usually have a long wait time between reading and writing data blocks . By executing such tasks in a separate thread , You can avoid unnecessarily blocking the main thread .

4.3、Callable And Runnable

Java Callable The interface is similar to Java Runnable Interface , Because they all represent tasks that can be executed concurrently by a single thread .

One Java Callable From different Runnable Lies in the Runnable Interface run() Method does not return a value , And it can't throw checked abnormal ( only RuntimeException ).

Besides ,Runnable Originally designed for long-running concurrent tasks , For example, a network server running at the same time , Or check the directory of the new file .Callable Interface is more suitable for one-time tasks that return a single result .

4、 Thread execution result :Java Future

java Future,java.util.concurrent.Future Represents the result of an asynchronous calculation . When creating an asynchronous task , take Future Returned as a thread Java object . this Future Object is used as a handle to the result of an asynchronous task . After the asynchronous task is completed , Can pass Future Object access results .

Java Some built-in concurrency services for ( for example Java ExecutorService) Returns... From some of its methods Future object . under these circumstances ExecutorService Will submit and execute a Callable Task returns a Future object .

4.1、 Interface definition

In order to understand Java Future How the interface works , Interface definition :

public interface Future<V> {
    
    boolean cancel(boolean mayInterruptIfRunning)
    V       get();
    V       get(long timeout, TimeUnit unit);
    boolean isCancelled();
    boolean isDone();
}

4.2、 from Future To get the results

As mentioned earlier ,Java Future Represents the result of an asynchronous task . To get results , adopt Future Object calls two get() One of the methods . The get() Methods all return a Object, But the return type can also be a general return type ( An object of a particular class , Not just one Object). Here are Future Through its get() Method to get the result :

Future future = ... // get Future by starting async task
// do something else, until ready to check result via Future
// get result from Future
try {
    
    Object result = future.get();
} catch (InterruptedException e) {
    
    e.printStackTrace();
} catch (ExecutionException e) {
    
    e.printStackTrace();
}

If... Is called before the asynchronous task is completed get() Method , Then get() Method will block , Until the end of thread execution .

The second kind get() Method can return after timeout after a period of time , You can specify the timeout time through the method parameter . The following is how to call this get() An example of this version :

try {
    
    Object result =
        future.get(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
    

} catch (ExecutionException e) {
    

} catch (TimeoutException e) {
    
    // thrown if timeout time interval passes
    // before a result is available.
}

The above example Future Waiting for the most 1000 millisecond . If in 1000 No results available in milliseconds , Throw out TimeoutException abnormal .

4.3、 adopt Future Cancel the task

You can call Future Of cancel() Method to cancel the corresponding asynchronous task . The asynchronous task must be implemented and executing . without , call cancel() Will be invalid . The following is through Java Futurecancel() Method to cancel a task :

future.cancel();

4.4、 Check whether the task is completed

You can call Future isDone() Method to check whether the asynchronous task is completed ( And whether the results are available ). Here is the call Java Future isDone() Example of method :

Future future = ... // Get Future from somewhere
if(future.isDone()) {
    
    Object result = future.get();
} else {
    
    // do something else
}

4.5、 Check if the task has been cancelled

You can also check Java Indicates whether the asynchronous task of is canceled . You can call FutureisCancelled() Methods to check . The following is an example of checking whether a task has been canceled :

Future future = ... // get Future from somewhere

if(future.isCancelled()) {
    

} else {
    

}

4.6、 Thread pool execution Callable example

package com;

import com.utils.ThreadPool;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

public class testCallableTest {
    
    public static void main(String[] args) {
    
   		// Get thread pool instance 
        ThreadPool instance = ThreadPool.getInstance();
        ExecutorService executorService = instance.getExecutorService();
        //submit perform 
        Future<String> submit = executorService.submit(() -> "1234");
        try {
    
        	// Blocking acceptance 
            String s = submit.get();
            System.out.println(s);
        } catch (InterruptedException e) {
    
            e.printStackTrace();
        } catch (ExecutionException e) {
    
            e.printStackTrace();
        }
    }
}

5、 Use ForkJoinPool Split and merge

ForkJoinPool stay Java 7 Introduced in . It and ExecutorService Very similar , Except for a little difference .ForkJoinPool So we can easily split the task into smaller tasks , These split tasks will also be submitted to ForkJoinPool. Tasks can continue to be split into smaller subtasks , As long as it can be divided . It may sound abstract , So in this section we will explain ForkJoinPool How it works , And how task segmentation works .

5.1、 Bifurcation and merger interpretation

At the beginning ForkJoinPool Let's briefly explain the principle of bifurcation and merging . The bifurcation and merge principle consists of two recursive steps . The two steps are the fork step and the merge step .

5.2 Bifurcation

A task that uses the principles of branching and merging can bifurcate itself ( Division ) For smaller subtasks , These subtasks can be executed concurrently . As shown in the figure below :

 Insert picture description here
By dividing yourself into subtasks , Each subtask can have a different CPU Parallel execution , Or by the same CPU Different threads on the .

Only when the task is too big , It makes sense to divide it into subtasks . There is a certain cost of dividing a task into subtasks , So for small tasks , The cost of this partition may be greater than that of concurrent execution of each subtask .

When it makes sense to split a task into subtasks , This limit is also called a threshold . It depends on each task's decision on the meaningful threshold . A lot depends on the kind of work it's going to do .

5.3、 Merge

When a task divides itself into several subtasks , The task will go into waiting for the end of all subtasks . Once the subtask is finished , This task can merge all the results into the same result . Here is the following :

 Insert picture description here
Of course , Not all types of tasks return a result . If the task doesn't return a result , It just waits for all subtasks to finish . There is no need to merge the results .

5.4、ForkJoinPool

ForkJoinPool It is a special thread pool , It's designed to work better with Bifurcation - and - Merge Task split work .ForkJoinPool Also in the java.util.concurrent In bag , Its complete class name is java.util.concurrent.ForkJoinPool.

5.5、 Create a ForkJoinPool

You can create a through its constructor ForkJoinPool. As passed to ForkJoinPool One parameter of the constructor , You can define the desired level of parallelism . The level of parallelism indicates that you want to pass to ForkJoinPool The thread or the CPU Number . Here is one ForkJoinPool Example :

ForkJoinPool forkJoinPool = new ForkJoinPool(4);  

This example creates a parallel level of 4 Of ForkJoinPool.

5.6、 Submit task to ForkJoinPool

It's like submitting a task to ExecutorService like that , Submit task to ForkJoinPool. You can submit two types of tasks . One is without any return value ( One “ action ”), The other one has a return value ( One ” Mission ”). These two types are respectively composed of RecursiveAction and RecursiveTask Express . Next, how to use these two types of tasks , And how to submit them .

5.7、RecursiveAction

RecursiveAction It's a task that doesn't have any return values . It just does some work , For example, write data to disk , Then he quit .
One RecursiveAction You can divide your work into smaller pieces , So they can be made up of separate threads or CPU perform .
You can implement a by inheritance RecursiveAction. Examples are as follows :

import java.util.ArrayList;  
import java.util.List;  
import java.util.concurrent.RecursiveAction;  

public class MyRecursiveAction extends RecursiveAction {
      

    private long workLoad = 0;  

    public MyRecursiveAction(long workLoad) {
      
        this.workLoad = workLoad;  
    }  

    @Override  
    protected void compute() {
      

        //if work is above threshold, break tasks up into smaller tasks 
        if(this.workLoad > 16) {
      
            System.out.println("Splitting workLoad : " + this.workLoad);  

            List<MyRecursiveAction> subtasks =  
                new ArrayList<MyRecursiveAction>();  

            subtasks.addAll(createSubtasks());  

            for(RecursiveAction subtask : subtasks){
      
                subtask.fork();  
            }  

        } else {
      
            System.out.println("Doing workLoad myself: " + this.workLoad);  
        }  
    }  

    private List<MyRecursiveAction> createSubtasks() {
      
        List<MyRecursiveAction> subtasks =  
            new ArrayList<MyRecursiveAction>();  

        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);  
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);  

        subtasks.add(subtask1);  
        subtasks.add(subtask2);  

        return subtasks;  
    }  

} 

The example is very simple .MyRecursiveAction A fictional workLoad Pass to your constructor as a parameter . If workLoad Above a certain threshold , The job will be divided into several sub tasks , Sub work continues to split . If workLoad Below specific threshold , The work will be carried out by MyRecursiveAction Do it yourself .

You can plan a MyRecursiveAction Implementation :

MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);  
forkJoinPool.invoke(myRecursiveAction);  

5.8、RecursiveTask

RecursiveTask It's a task that returns results . It can divide its work into smaller tasks , And merge the execution results of these subtasks into a collective result . There can be several levels of segmentation and merging . Here is one RecursiveTask Example :

import java.util.ArrayList;  
import java.util.List;  
import java.util.concurrent.RecursiveTask;  


public class MyRecursiveTask extends RecursiveTask<Long> {
      

    private long workLoad = 0;  

    public MyRecursiveTask(long workLoad) {
      
        this.workLoad = workLoad;  
    }  

    protected Long compute() {
      

        //if work is above threshold, break tasks up into smaller tasks 
        if(this.workLoad > 16) {
      
            System.out.println("Splitting workLoad : " + this.workLoad);  

            List<MyRecursiveTask> subtasks =  
                new ArrayList<MyRecursiveTask>();  
            subtasks.addAll(createSubtasks());  

            for(MyRecursiveTask subtask : subtasks){
      
                subtask.fork();  
            }  

            long result = 0;  
            for(MyRecursiveTask subtask : subtasks) {
      
                result += subtask.join();  
            }  
            return result;  

        } else {
      
            System.out.println("Doing workLoad myself: " + this.workLoad);  
            return workLoad * 3;  
        }  
    }  

    private List<MyRecursiveTask> createSubtasks() {
      
        List<MyRecursiveTask> subtasks =  
        new ArrayList<MyRecursiveTask>();  

        MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);  
        MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);  

        subtasks.add(subtask1);  
        subtasks.add(subtask2);  

        return subtasks;  
    }  
} 

In addition to having a result returned , This example and RecursiveAction The example is very similar to .MyRecursiveTask Class inherits from RecursiveTask, This means that it will return a Long Result of type .

MyRecursiveTask The example also splits the work into subtasks , And pass fork() Method to schedule the execution of these subtasks .

Besides , This example also calls the join() Methods to collect the results they return . The results of the subtasks are then merged into a larger result , And eventually return it . For different levels of recursion , The merging of the results of this subtask may be recursive .

You can plan a RecursiveTask:

MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);  
long mergedResult = forkJoinPool.invoke(myRecursiveTask);  
System.out.println("mergedResult = " + mergedResult);   12345

Notice how to pass ForkJoinPool.invoke() Method to get the final execution result .

5.9、ForkJoinPool Comment on

It seems that not everyone is right about Java 7 Inside ForkJoinPool Satisfied :《 One Java Bifurcation - Merge Disaster 》.
When you plan to use ForkJoinPool It's better to read this article before .

Four 、 Concurrent container

1、CopyOnWrite

CopyOnWrite– The write - time replication container is a common concurrency container , It improves the concurrency performance by separating read and write under multithreading , Tell us about StampedLock The solution used is similar : You can read at any time , Write operations require locking . The difference is , stay CopyOnWrite in , After the container modification is locked , adopt copy A new container to modify , After modification, replace the container with a new one .

The benefits of this approach are obvious : adopt copy A new container to modify , In this way, the read operation does not need to be locked , Can read concurrently , Because the old container is used in the process of reading , Even if the new container is modified, it will not affect the old container , At the same time, it also solves the concurrency problem caused by the modification of other threads in the iteration process .

JDK The concurrency containers provided in include CopyOnWriteArrayList and CopyOnWriteArraySet, Pass below CopyOnWriteArrayList Part of the source code to understand this idea :

// Additive elements 
public boolean add(E e) {
    
    // An exclusive lock 
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    
        Object[] elements = getArray();
        int len = elements.length;
        // Copy a new array newElements
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        // Point to the new array after modification 
        setArray(newElements);
        return true;
    } finally {
    
        lock.unlock();
    }
}

public E get(int index) {
    
    // Unlocked , Direct access to 
    return get(getArray(), index);
}

The code is simple , stay add Operation through a shared ReentrantLock To get the lock , This can prevent multiple threads from modifying the contents of the container at the same time . After obtaining the lock, you can pass Arrays.copyOf Copied a new container , Then the new container was modified , Finally, go straight through setArray Point the original array reference to the new array , Avoid iteration data errors in the modification process .get The operation is a read operation , Unlocked , Just read it directly .CopyOnWriteArraySet similar , There's not much discussion here .

CopyOnWrite Containers are safe to use in multithreading , Comparison Vector It also greatly improves the performance of reading and writing , But it has its own problems .

The first is performance , In the interpretation of the ArrayList In the article of ,ArrayList Due to the use of Arrays.copyOf Each time you need to request more space and copy existing elements to a new array , There is a certain impact on performance .CopyOnWrite Containers are no exception , Each modification will request a new array space , Then replace . So in the case of high concurrency and frequent container modification , Will continue to apply for new space , At the same time, it will cause frequent GC, Use this time CopyOnWrite Containers are not a good choice .
Secondly, there is the problem of data consistency , Because in the process of modification copy A new array to replace , At the same time, if the old array is still being used , Then new data cannot be read in time , This creates data inconsistencies , If you need strong data consistency ,CopyOnWrite The container is not suitable for .

2、ConcurrentMap

2.1 java.util.concurrent.ConcurrentMap

java.util.concurrent.ConcurrentMap Interface represents an access to other people ( Insert and extract ) Concurrent processing java.util.Map.
ConcurrentMap Except from its parent interface java.util.Map In addition to the inherited methods, there are some additional atomic methods .

2.2 ConcurrentMap The implementation of the

since ConcurrentMap It's an interface , If you want to use it, you have to use one of its implementation classes .java.util.concurrent Package with ConcurrentMap The following implementation classes of the interface :ConcurrentHashMap

2.3 ConcurrentHashMap

ConcurrentHashMap and java.util.HashTable Class is very similar , but ConcurrentHashMap Be able to provide HashTable Better concurrency performance . When you read objects from it ConcurrentHashMap It doesn't take the whole thing Map Lock the . Besides , When you write objects to it ,ConcurrentHashMap It won't lock the whole thing Map. Its interior is just Map The part being written in is locked .

Another difference is , When traversed , Even if it's ConcurrentHashMap Changed , It doesn't throw ConcurrentModificationException. Even though Iterator Is not designed for simultaneous use of multiple threads .

More about ConcurrentMap and ConcurrentHashMap Please refer to the official documents for details .

2.4 ConcurrentMap Example

Here's how to use ConcurrentMap An example of an interface . This example uses ConcurrentHashMap Implementation class :

ConcurrentMap concurrentMap = new ConcurrentHashMap();  
concurrentMap.put("key", "value");  
Object value = concurrentMap.get("key");

3、 Concurrent navigation mapping ConcurrentNavigableMap

java.util.concurrent.ConcurrentNavigableMap It supports concurrent access java.util.NavigableMap, It can also let its son map Have the ability of concurrent access . So-called “ Son map” It means something like headMap(),subMap(),tailMap() And so on map.

NavigableMap We will not repeat the method in , Let's take a look at this section ConcurrentNavigableMap How to add .

3.1、headMap()

headMap(T toKey) Method returns a containing less than the given toKey Of key The son of map.
If you're interested in primordial map The elements in it have been changed , These changes will affect the children map The elements in ( translator's note :map Collections hold only references to objects ).
The following example shows how to headMap() Use of methods :

ConcurrentNavigableMap map = new ConcurrentSkipListMap();  
map.put("1", "one");  
map.put("2", "two");  
map.put("3", "three");  
ConcurrentNavigableMap headMap = map.headMap("2"); 

headMap Will point to a key only “1” Of ConcurrentNavigableMap, Because only this key is less than “2”. For details on how this method and its overloaded version work, please refer to Java file .

3.2、tailMap()

tailMap(T fromKey) Method returns a containing no less than the given fromKey Of key The son of map.

If you're interested in primordial map The elements in it have been changed , These changes will affect the children map The elements in ( translator's note :map Collections hold only references to objects ).

The following example shows how to tailMap() Use of methods :

ConcurrentNavigableMap map = new ConcurrentSkipListMap();  

map.put("1", "one");  
map.put("2", "two");  
map.put("3", "three");  

ConcurrentNavigableMap tailMap = map.tailMap("2");  

tailMap Will have the key “2” and “3”, Because they're not less than the given key “2”. For details on how this method and its overloaded version work, please refer to Java file .

3.3、subMap()

subMap() Method returns the original map in , Key between from( contain ) and to ( It doesn't contain ) Between the son map. Examples are as follows :

ConcurrentNavigableMap map = new ConcurrentSkipListMap();  
map.put("1", "one");  
map.put("2", "two");  
map.put("3", "three");  
ConcurrentNavigableMap subMap = map.subMap("2", "3");

Back to submap Key only “2”, Because only it satisfies not less than “2”, Than “3” Small .

10.4 More ways

ConcurrentNavigableMap There are other ways to use the interface , such as :

  • descendingKeySet()
  • descendingMap()
  • navigableKeySet()

For more information on these methods, refer to the official Java file .

ConcurrentSkipListMap

java.util The corresponding container in java.util.concurrent The corresponding concurrency container can be found in the package :List and Set There are corresponding CopyOnWriteArrayList And CopyOnWriteArraySet,HashMap There are corresponding ConcurrentHashMap, But orderly TreeMap Or there is no corresponding ConcurrentTreeMap. Why not? ConcurrentTreeMap Well ? This is because TreeMap The red black tree is used internally to realize , Red black tree is a kind of self balanced binary tree , When the tree is modified , Need to rebalance , Rebalancing can affect most nodes of the tree , If the concurrency is very large , This requires adding mutexes to many tree nodes , Then concurrency is meaningless . So it provides another kind of order under concurrency map Realization :ConcurrentSkipListMap.

ConcurrentSkipListMap Internal use of skip table (SkipList) This data structure is used to realize , Its structure is very simple to understand compared with red and black trees , It's relatively simple to implement , And in theory its search 、 Insert 、 Delete time complexity is log(n). On concurrency ,ConcurrentSkipListMap Adopt lockless CAS+ Spin to control .

A jump list is simply a multi-layer linked list , The bottom layer is a common linked list , And then reduce it layer by layer , Usually, a simple algorithm is used to realize that the elements of each layer are half of those of the next layer , So when searching for elements, start searching from the top , It can be said to be another form of binary search .

A simple probability algorithm for obtaining the number of hops is implemented as follows :

int random_level()  {
      
    K = 1;  
    while (random(0,1))  
        K++;  
  
    return K;  
}  

Through simple 0 and 1 Get the probability ,1 The probability of a layer is 50%,2 The probability of a layer is 25%,3 The probability of a layer is 12.5%, In this way, it decreases step by step .

The process of adding elements to a three-level jump table is as follows :
 Insert picture description here
Insert value is 15 The node of :

 Insert picture description here
After inserting :
 Insert picture description here
Wikipedia has a dynamic diagram for adding nodes , It is also posted here for easy understanding :
 Insert picture description here
Through analysis ConcurrentSkipListMap Of put Methods to understand jump tables and CAS Spin concurrency control :

 private V doPut(K key, V value, boolean onlyIfAbsent) {
    
    Node<K,V> z;             // added node
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
    
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
     // Find the predecessor node 
            if (n != null) {
     // Find the predecessor node 
                Object v; int c;
                Node<K,V> f = n.next; // Get the successor node of the successor node 
                if (n != b.next)  // There is competition , The two node acquisitions are inconsistent , Concurrency leads to 
                    break;
                if ((v = n.value) == null) {
      //  The node has been deleted 
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n) 
                    break;
                if ((c = cpr(cmp, key, n.key)) > 0) {
     // Go to the next round of search , More than current key Big 
                    b = n;
                    n = f;
                    continue;
                }
                if (c == 0) {
     // When equal, direct cas Modified value 
                    if (onlyIfAbsent || n.casValue(v, value)) {
    
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        return vv;
                    }
                    break; // restart if lost race to replace value
                }
                // else c < 0; fall through
            }

            z = new Node<K,V>(key, value, n); //9. n.key > key > b.key
            if (!b.casNext(n, z)) //cas Modified value  
                break;         // restart if lost race to append to b
            break outer;
        }
    }

    int rnd = ThreadLocalRandom.nextSecondarySeed(); // Get random numbers 
    if ((rnd & 0x80000001) == 0) {
     // test highest and lowest bits
        int level = 1, max;
        while (((rnd >>>= 1) & 1) != 0) //  Get the skip level 
            ++level;
        Index<K,V> idx = null;
        HeadIndex<K,V> h = head;
        if (level <= (max = h.level)) {
     // If the obtained table adjustment level is less than or equal to the current maximum level , Then add , And make them into an up-down linked list 
            for (int i = 1; i <= level; ++i)
                idx = new Index<K,V>(z, idx, null);
        }
        else {
     // try to grow by one level // Otherwise, add another layer level, It is reflected here as Index<K,V> Array 
            level = max + 1; // hold in array and later pick the one to use
            @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                (Index<K,V>[])new Index<?,?>[level+1];
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K,V>(z, idx, null);
            for (;;) {
    
                h = head;
                int oldLevel = h.level;
                if (level <= oldLevel) // lost race to add level
                    break;
                HeadIndex<K,V> newh = h;
                Node<K,V> oldbase = h.node;
                for (int j = oldLevel+1; j <= level; ++j) // Newly added level Layer specific data 
                    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                if (casHead(h, newh)) {
    
                    h = newh;
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }
        //  The process of inserting data layer by layer 
        splice: for (int insertionLevel = level;;) {
    
            int j = h.level;
            for (Index<K,V> q = h, r = q.right, t = idx;;) {
    
                if (q == null || t == null)
                    break splice;
                if (r != null) {
    
                    Node<K,V> n = r.node;
                    // compare before deletion check avoids needing recheck
                    int c = cpr(cmp, key, n.key);
                    if (n.value == null) {
    
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    if (c > 0) {
    
                        q = r;
                        r = r.right;
                        continue;
                    }
                }

                if (j == insertionLevel) {
    
                    if (!q.link(r, t))
                        break; // restart
                    if (t.node.value == null) {
    
                        findNode(key);
                        break splice;
                    }
                    if (--insertionLevel == 0)
                        break splice;
                }

                if (--j >= insertionLevel && j < level)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}

The insertion method here is very complicated , Can be divided into 3 Take big steps to understand : The first step is to obtain the previous node and then pass CAS To insert nodes ; The second step is right level Judge the number of layers , If it is greater than the maximum number of layers , Then insert a layer ; Step 3 insert the data of the corresponding layer . The entire insertion process passes through CAS The spin method ensures the data correctness in the case of concurrency .

5、 ... and 、 Thread execution control helper class

1、 atresia CountDownLatch

java.util.concurrent.CountDownLatch It's a concurrent construct , It allows one or more threads to wait for a series of specified operations to complete .

CountDownLatch Initialize... With a given number .countDown() Every time you are called , The quantity will be reduced by one . By calling await() One method , Threads can block waiting for this number to reach zero .

Here is a simple example .Decrementer The three call countDown() after , Waiting Waiter Will be from await() Release from call .

CountDownLatch latch = new CountDownLatch(3);  

Waiter waiter = new Waiter(latch);  
Decrementer decrementer = new Decrementer(latch);  

new Thread(waiter).start();  
new Thread(decrementer).start();  

Thread.sleep(4000);  

public class Waiter implements Runnable{
      

    CountDownLatch latch = null;  

    public Waiter(CountDownLatch latch) {
      
        this.latch = latch;  
    }  

    public void run() {
      
        try {
      
            latch.await();  
        } catch (InterruptedException e) {
      
            e.printStackTrace();  
        }  

        System.out.println("Waiter Released");  
    }  
}  

public class Decrementer implements Runnable {
      

    CountDownLatch latch = null;  

    public Decrementer(CountDownLatch latch) {
      
        this.latch = latch;  
    }  

    public void run() {
      

        try {
      
            Thread.sleep(1000);  
            this.latch.countDown();  

            Thread.sleep(1000);  
            this.latch.countDown();  

            Thread.sleep(1000);  
            this.latch.countDown();  
        } catch (InterruptedException e) {
      
            e.printStackTrace();  
        }  
    }  
}  

2、 fence CyclicBarrier

java.util.concurrent.CyclicBarrier Class is a synchronization mechanism , It can synchronize the threads that process some algorithms . In other words , It's a fence that all threads have to wait for , Until all the threads get here , Then all threads can continue to do other things . Here is the following :
 Insert picture description here
Two threads are waiting for each other by the fence . By calling CyclicBarrier Object's await() Method , Two threads can wait for each other . once N Threads are waiting CyclicBarrier a , All threads will be released to continue running .

2.1、 Create a CyclicBarrier

Creating a CyclicBarrier You need to define how many threads are waiting for the fence before being released . establish CyclicBarrier Example :

CyclicBarrier barrier = new CyclicBarrier(2);  

2.2、 Waiting for one CyclicBarrier

The following shows how to make a thread wait for a CyclicBarrier:

barrier.await(); 

Of course , You can also set a timeout for the waiting thread . After waiting for more than the timeout , Even if it's not done yet N Threads waiting CyclicBarrier Conditions , The thread will also be released . Here is an example of defining a timeout :

barrier.await(10, TimeUnit.SECONDS);  

Any of the following conditions can make you wait CyclicBarrier Thread release for :

  • The last thread also arrives CyclicBarrier( call await())
  • The current thread is interrupted by another thread ( Another thread called this thread's interrupt() Method )
  • Other threads waiting for the fence are interrupted
  • Other threads waiting for the fence were released due to timeout
  • An external thread called the fence's CyclicBarrier.reset() Method

2.3、CyclicBarrier action

CyclicBarrier Support a fence operation , Operation fence is a Runnable example , Once the last thread waiting for the fence arrives , The instance will be executed . You can CyclicBarrier The construction method of will Runnable Pass it on to the fence :

Runnable      barrierAction = ... ;  
CyclicBarrier barrier       = new CyclicBarrier(2, barrierAction);  

2.4 CyclicBarrier Example

The following code demonstrates how to use CyclicBarrier:

Runnable barrier1Action = new Runnable() {
      
    public void run() {
      
        System.out.println("BarrierAction 1 executed ");  
    }  
};  
Runnable barrier2Action = new Runnable() {
      
    public void run() {
      
        System.out.println("BarrierAction 2 executed ");  
    }  
};  

CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);  
CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action);  

CyclicBarrierRunnable barrierRunnable1 =  new CyclicBarrierRunnable(barrier1, barrier2);  

CyclicBarrierRunnable barrierRunnable2 =  new CyclicBarrierRunnable(barrier1, barrier2);  

new Thread(barrierRunnable1).start();  
new Thread(barrierRunnable2).start(); 

CyclicBarrierRunnable class :

public class CyclicBarrierRunnable implements Runnable{
      

    CyclicBarrier barrier1 = null;  
    CyclicBarrier barrier2 = null;  

    public CyclicBarrierRunnable(  
            CyclicBarrier barrier1,  
            CyclicBarrier barrier2) {
      

        this.barrier1 = barrier1;  
        this.barrier2 = barrier2;  
    }  

    public void run() {
      
        try {
      
            Thread.sleep(1000);  
            System.out.println(Thread.currentThread().getName() +  
                                " waiting at barrier 1");  
            this.barrier1.await();  

            Thread.sleep(1000);  
            System.out.println(Thread.currentThread().getName() +  
                                " waiting at barrier 2");  
            this.barrier2.await();  

            System.out.println(Thread.currentThread().getName() +  
                                " done!");  

        } catch (InterruptedException e) {
      
            e.printStackTrace();  
        } catch (BrokenBarrierException e) {
      
            e.printStackTrace();  
        }  
    }  
}

The above code console output is as follows . Note that the timing at which each thread writes to the console may not be the same as your actual execution . For example, sometimes Thread-0 Print first , Sometimes Thread-1 Print first .

Thread-0 waiting at barrier 1
Thread-1 waiting at barrier 1
BarrierAction 1 executed
Thread-1 waiting at barrier 2
Thread-0 waiting at barrier 2
BarrierAction 2 executed
Thread-0 done!
Thread-1 done!12345678

3、 Switch Exchanger

java.util.concurrent.Exchanger Class represents a meeting point where two threads can exchange objects with each other . This mechanism is illustrated as follows :
 Insert picture description here

Two threads go through a Exchanger Exchange object .

The action of exchanging objects is by Exchanger Of the two exchange() One of the methods is done . Here is an example :

Exchanger exchanger = new Exchanger();  
ExchangerRunnable exchangerRunnable1 =  new ExchangerRunnable(exchanger, "A");  
ExchangerRunnable exchangerRunnable2 =  new ExchangerRunnable(exchanger, "B");  
new Thread(exchangerRunnable1).start();  
new Thread(exchangerRunnable2).start();  

ExchangerRunnable Code :

public class ExchangerRunnable implements Runnable{
      

    Exchanger exchanger = null;  
    Object    object    = null;  

    public ExchangerRunnable(Exchanger exchanger, Object object) {
      
        this.exchanger = exchanger;  
        this.object = object;  
    }  

    public void run() {
      
        try {
      
            Object previous = this.object;  

            this.object = this.exchanger.exchange(this.object);  

            System.out.println(  
                    Thread.currentThread().getName() +  
                    " exchanged " + previous + " for " + this.object  
            );  
        } catch (InterruptedException e) {
      
            e.printStackTrace();  
        }  
    }  
}

The above program output :

Thread-0 exchanged A for B
Thread-1 exchanged B for A12

4、 Semaphore Semaphore

java.util.concurrent.Semaphore Class is a count semaphore . That means it has two main methods :

acquire()
release()

The count semaphore consists of a specified number of “ The license ” initialization . Every call acquire(), A permission is taken by the calling thread . Every call release(), A license is returned to the semaphore . therefore , In the absence of any release() Invocation time , At most N Threads can pass through acquire() Method ,N Is the specified number of permissions for the semaphore initialization . These licenses are just a simple counter . There's nothing special here .

4.1、Semaphore usage

Semaphores have two main uses :

  • Protecting an important ( Code ) Partially prevent from exceeding at one time N Threads in
  • Signaling between two threads

4.2、 Protect important parts

If you use semaphores to protect an important part , Code trying to get into this section usually tries to get a license first , Then we can get to the important part ( Code block ), After execution , And release the license . Such as this :

Semaphore semaphore = new Semaphore(1);  
//critical section 
semaphore.acquire();  

...  

semaphore.release();

4.3、 Send signals between threads

If you use a semaphore to signal between two threads , Usually you should use a thread to call acquire() Method , And another thread calls release() Method .
If there is no license available ,acquire() The call will block , Until a license is released by another thread . Empathy , If you can't release more permission to the semaphore , One release() Calls can also block .

Through this, multiple threads can be coordinated . such as , If the thread 1 Insert an object into a shared list (list) And then called. acquire(), And threads 2 It is called before getting an object from the list release(), At this point, you have actually created a blocking queue . The number of permissions available in the semaphore is equal to the number of elements that the blocking queue can hold .

4.4、 fair

There is no way to guarantee that threads can get permission from semaphores fairly . in other words , Cannot guarantee the first call acquire() Will be the first thread to get a license . If the first thread blocks while waiting for a permission , When the second thread came to ask for a permission, a permission was released , Then it may be licensed before the first thread .

If you want to enforce fairness ,Semaphore Class has a constructor with a boolean type parameter , This parameter is used to tell Semaphore Do you want to enforce fairness . Enforcing fairness affects concurrency performance , So don't enable it unless you really need it .

Here's how to create one in fair mode Semaphore An example of :

Semaphore semaphore = new Semaphore(1, true);

4.5、 More ways

java.util.concurrent.Semaphore Class also has many methods , such as :

  • availablePermits()
  • acquireUninterruptibly()
  • drainPermits()
  • hasQueuedThreads()
  • getQueuedThreads()
  • tryAcquire()
原网站

版权声明
本文为[Besieged city_ city with high walls]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/163/202206122125175730.html