当前位置:网站首页>Rational use of thread pool and thread variables
Rational use of thread pool and thread variables
2022-06-11 03:58:00 【Chestnut Shao】
background
With the continuous development of Computing Technology ,3 Nano process chips have entered the trial production stage , Moore's law is gradually facing a huge physical bottleneck under the existing technology , Improving the performance of the server through multi-core processor technology has become the main direction to improve computing power .
In the realm of servers , be based on java Built back-end servers take the lead , therefore , master java Concurrent programming technology , make the best of CPU Concurrent processing ability is a basic skill required by developers , This paper combines the source code and practice of thread pool , This paper briefly introduces the use of thread pool and thread variables .
Thread pool overview
▐ What is a thread pool
Thread pool is a kind of “ Pooling ” Thread usage mode , By creating a certain number of threads , Keep these threads in a ready state to improve system response speed , After the thread is used, it is returned to the thread pool to achieve the goal of reuse , So as to reduce the consumption of system resources .
▐ Why use thread pools
On the whole , Thread pools have the following advantages :
Reduce resource consumption . Reduces the cost of thread creation and destruction by reusing created threads .
Improve response time . When the mission arrives , Tasks can be executed immediately without waiting for a thread to be created .
Improve the manageability of threads . Threads are scarce resources , If unlimited creation , Not only does it consume system resources , It also reduces the stability of the system , Uniform allocation is possible using thread pools , Tune and monitor .
The use of thread pools
▐ Thread pool creation & Core parameter setting
stay java in , The implementation class of thread pool is ThreadPoolExecutor, The constructor is as follows :
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit timeUnit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
Can pass new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,handler) To create a thread pool .
corePoolSize Parameters
In the constructor ,corePoolSize Is the number of core threads in the thread pool . By default , The core thread will survive , But when it comes to allowCoreThreadTimeout Set to true when , The core thread timeout will also be recycled .
maximumPoolSize Parameters
In the constructor ,maximumPoolSize Is the maximum number of threads that the thread pool can hold .
keepAliveTime Parameters
In the constructor ,keepAliveTime Indicates the idle timeout duration of the thread . If the thread is idle for more than this time , Non core threads will be recycled . If you will allowCoreThreadTimeout Set to true when , The core thread also times out and recycles .
timeUnit Parameters
In the constructor ,timeUnit The unit of time that indicates the length of thread idle timeout . Commonly used :TimeUnit.MILLISECONDS( millisecond )、TimeUnit.SECONDS( second )、TimeUnit.MINUTES( branch ).
blockingQueue Parameters
In the constructor ,blockingQueue Indicates the task queue , The common implementation classes of thread pool task queue are :
ArrayBlockingQueue : A bounded blocking queue implemented by an array , This queue is based on FIFO The principle of sorting elements , Supports fair access to queues .
LinkedBlockingQueue : An optional bounded blocking queue composed of linked list structure , If you don't specify the size , Then use Integer.MAX_VALUE As the queue size , according to FIFO The principle of sorting elements .
PriorityBlockingQueue : An unbounded blocking queue supporting priority sorting , By default, it is arranged in natural order , You can also specify Comparator ( Compare ) .
DelayQueue: An unbounded blocking queue that supports delayed retrieval of elements , When creating an element, you can specify how long it will take to get the current element from the queue , It is often used in cache system design and scheduled task scheduling .
SynchronousQueue: A blocked queue that does not store elements . The save operation must wait for the get operation , vice versa .
LinkedTransferQueue: An unbounded blocking queue composed of linked list structure , And LinkedBlockingQueue Much more transfer and tryTranfer Method , This method will immediately pass the element to the consumer when there is a consumer waiting to receive the element .
LinkedBlockingDeque: A double ended blocking queue composed of linked list structure , You can insert and delete elements from both ends of the queue .
threadFactory Parameters
In the constructor ,threadFactory Represents a thread factory . Use to specify how to create new threads for the thread pool ,threadFactory You can set the thread name 、 Thread group 、 Priority and other parameters . Such as through Google The toolkit can set the thread name in the thread pool :
new ThreadFactoryBuilder().setNameFormat("general-detail-batch-%d").build()RejectedExecutionHandler Parameters
In the constructor ,rejectedExecutionHandler Indicates a rejection policy . The rejection policy to be executed when the maximum number of threads is reached and the queue task is full , Common rejection strategies are as follows :
ThreadPoolExecutor.AbortPolicy: The default policy , Thrown when the task queue is full RejectedExecutionException abnormal .
ThreadPoolExecutor.DiscardPolicy: New tasks cannot be discarded , Don't throw any exceptions .
ThreadPoolExecutor.CallerRunsPolicy: When the task queue is full, use the caller's thread to execute the task directly .
ThreadPoolExecutor.DiscardOldestPolicy: When the task queue is full, discard the task blocking the head of the queue ( The oldest task ), Then add the current task .
▐ Thread pool state transition diagram
ThreadPoolExecutor Thread pools have the following states :
RUNNING: Running state , Take on new tasks , Continuously process the tasks in the task queue ;
SHUTDOWN ( close ) : Stop taking on new tasks , But you have to deal with the tasks in the task queue ;
STOP: Stop taking on new tasks , No longer process tasks in the task queue , Interrupt an ongoing task ;
TIDYING: Indicates that the thread pool is stopping , Abort all tasks , Destroy all worker threads , When the thread pool executes terminated() Method when entering TIDYING state ;
TERMINATED: Indicates that the thread pool has stopped functioning , All worker threads have been destroyed , All tasks have been cleared or executed ,terminated() Method execution complete ;

▐ Thread pool task scheduling mechanism

When the thread pool submits a task, the main steps of task scheduling are as follows :
When the number of surviving core threads in the thread pool is less than corePoolSize The value of the number of core threads parameter , Thread pool will create a core thread to handle the submitted tasks ;
If the number of core threads in the thread pool is full , That is, the number of threads is equal to corePoolSize, The newly submitted task will be tried to put into the task queue workQueue Waiting for execution ;
When the number of threads in the thread pool is equal to corePoolSize 了 , And the task queue workQueue Is full , Then judge whether the current number of threads has reached maximumPoolSize, That is, whether the maximum number of threads is full , If it doesn't arrive , Create a non core thread to perform the submitted task ;
If the current number of threads has reached maximumPoolSize, When new tasks are submitted , Execute the rejection policy for processing .
The core code is as follows :
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
▐ Tomcat Thread pool analysis
Tomcat Request processing

Tomcat The overall architecture of includes two parts: connector and container , The connector is responsible for external communication , The container is responsible for internal logic processing . In the connector :
Use ProtocolHandler Interface to encapsulate I/O Differences between model and application layer protocol , among I/O Non blocking models can be selected I/O、 asynchronous I/O or APR, The application layer protocol can choose HTTP、HTTPS or AJP.ProtocolHandler take I/O Model and application layer protocol , Give Way EndPoint Only responsible for sending and receiving byte stream ,Processor Responsible for parsing the byte stream into Tomcat Request/Response object , Realize high cohesion and low coupling of functional modules ,ProtocolHandler The interface inheritance relationship is shown as follows .
Through the adapter Adapter ( Adapter ) take Tomcat Request Object to standard ServletRequest object .

Tomcat In order to achieve rapid response to requests , Use thread pool to improve the processing capacity of requests . Now let's HTTP Non blocking I/O For example Tomcat A brief analysis of thread pool .
Tomcat Thread pool creation

stay Tomcat in , adopt AbstractEndpoint Class provides the underlying network I/O To deal with , If the user does not configure a custom common thread pool , be AbstractEndpoint adopt createExecutor Method to create Tomcat Default thread pool .
The core part of the code is as follows :
public void createExecutor() {internalExecutor = true;TaskQueue taskqueue = new TaskQueue();TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);taskqueue.setParent( (ThreadPoolExecutor) executor);}
among ,TaskQueue、ThreadPoolExecutor Respectively Tomcat Custom task queue 、 Thread pool implementation .
Tomcat Customize ThreadPoolExecutor
Tomcat Custom thread pool inherits from java.util.concurrent.ThreadPoolExecutor, And added some member variables to more efficiently count the number of tasks that have been submitted but not completed (submittedCount), Including tasks that are already in the queue and tasks that have been handed over to the worker thread but have not yet started execution .
/*** Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient* {@link #getSubmittedCount()} method, to be used to properly handle the work queue.* If a RejectedExecutionHandler is not specified a default one will be configured* and that one will always throw a RejectedExecutionException**/public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {/*** The number of tasks submitted but not yet finished. This includes tasks* in the queue and tasks that have been handed to a worker thread but the* latter did not start executing the task yet.* This number is always greater or equal to {@link #getActiveCount()}.*/// Newly added submittedCount Member variables , Used to count the number of tasks submitted but not completedprivate final AtomicInteger submittedCount = new AtomicInteger(0);private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);// Constructorspublic ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);// Pre start all core threadsprestartAllCoreThreads();}}
Tomcat In the custom thread pool ThreadPoolExecutor Rewritten in execute() Method , And implement the task submitted for execution submittedCount Add one .Tomcat In the custom ThreadPoolExecutor in , When the thread pool throws RejectedExecutionException After abnormality , Would call force() Method again to TaskQueue Try to add a task in . If the addition fails , be submittedCount Minus one , And throw out RejectedExecutionException.
@Overridepublic void execute(Runnable command) {execute(command,0,TimeUnit.MILLISECONDS);}public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else {submittedCount.decrementAndGet();throw rx;}}}
Tomcat Custom task queue
stay Tomcat A blocking queue is redefined in TaskQueue, It is inherited from LinkedBlockingQueue. stay Tomcat in , The default value of the number of core threads is 10, The maximum number of threads implies 200, In order to prevent the subsequent tasks from being put into the queue after the thread reaches the number of core threads ,Tomcat By customizing the task queue TaskQueue rewrite offer Method realizes the creation of threads after the number of core thread pools reaches the configured number .
In particular , From the implementation of thread pool task scheduling mechanism , When offer Method returns false when , The thread pool will attempt to create a new thread , So as to realize the rapid response of tasks .TaskQueue The core implementation code is as follows :
/*** As task queue specifically designed to run with a thread pool executor. The* task queue is optimised to properly utilize threads within a thread pool* executor. If you use a normal queue, the executor will spawn threads when* there are idle threads and you wont be able to force items onto the queue* itself.*/public class TaskQueue extends LinkedBlockingQueue<Runnable> {public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected}@Overridepublic boolean offer(Runnable o) {// 1. parent For thread pool ,Tomcat Is a custom thread pool instance//we can't do any checksif (parent==null) return super.offer(o);// 2. When the number of threads reaches the maximum number of threads , New tasks submitted to join the team//we are maxed out on threads, simply queue the objectif (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);// 3. When the number of tasks submitted is less than the number of threads already in the thread pool , That is, there are idle threads , Just join the task team//we have idle threads, just add it to the queueif (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);// 4. 【 Key points 】 If the current number of threads does not reach the maximum number of threads , Go straight back to false, Let the thread pool create new threads//if we have less threads than maximum force creation of a new threadif (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;// 5. The last tip , Put in queue//if we reached here, we need to add it to the queuereturn super.offer(o);}}
Tomcat Custom task thread
Tomcat Through the custom task thread TaskThread Record the creation time of each thread ; Use static inner classes WrappingRunnable Yes Runnable For packaging , Used to deal with StopPooledThreadException Handling of exception types .
/*** A Thread implementation that records the time at which it was created.**/public class TaskThread extends Thread {private final long creationTime;public TaskThread(ThreadGroup group, Runnable target, String name) {super(group, new WrappingRunnable(target), name);this.creationTime = System.currentTimeMillis();}/*** Wraps a {@link Runnable} to swallow any {@link StopPooledThreadException}* instead of letting it go and potentially trigger a break in a debugger.*/private static class WrappingRunnable implements Runnable {private Runnable wrappedRunnable;WrappingRunnable(Runnable wrappedRunnable) {this.wrappedRunnable = wrappedRunnable;}@Overridepublic void run() {try {wrappedRunnable.run();} catch(StopPooledThreadException exc) {//expected : we just swallow the exception to avoid disturbing//debuggers like eclipse'slog.debug("Thread exiting on purpose", exc);}}}}
reflection & Summary
Tomcat Why customize the implementation of thread pool and task queue ?
JUC When the native thread pool submits a task , When the number of worker threads reaches the number of core threads , Continuing to submit a task will attempt to put the task in the blocking queue , Only when the number of currently running threads does not reach the maximum set value and the task queue is full , Will continue to create new worker threads to process tasks , therefore JUC Native thread pool cannot satisfy Tomcat Fast response demands .
Tomcat Why use unbounded queues ?
Tomcat stay EndPoint Pass through acceptCount and maxConnections Two parameters to avoid excessive request backlog . among maxConnections by Tomcat The maximum number of connections received and processed at any time , When Tomcat The number of connections received reached maxConnections when ,Acceptor Don't read accept Connections in the queue ; At this time accept The threads in the queue will always be blocked , until Tomcat The number of connections received is less than maxConnections(maxConnections The default is 10000, If set to -1, The number of connections is unlimited ).acceptCount by accept Length of queue , When accept The number of connections in the queue has reached acceptCount when , That is, the queue is full , At this time, all requests to come in are rejected , The default value is 100( be based on Tomcat 8.5.43 edition ). therefore , adopt acceptCount and maxConnections After the action of two parameters ,Tomcat The default unbounded task queue usually does not cause OOM.
/*** Allows the server developer to specify the acceptCount (backlog) that* should be used for server sockets. By default, this value* is 100.*/private int acceptCount = 100;private int maxConnections = 10000;
▐ Best practices
Avoid using Executors ( Executor ) Create thread pool for

Executors ( Executor ) There are several common methods :
newCachedThreadPool(): Create a cacheable thread pool , call execute The previously constructed threads are reused ( If threads are available ). If no threads are available , A new thread is created and added to the thread pool . Terminate and remove those existing from the cache 60 Second unused thread .CachedThreadPool It is suitable for concurrent execution of a large number of short-term and time-consuming tasks , Or a lighter load server ;
newFiexedThreadPool(int nThreads): Creates a thread pool with a fixed number of threads , The number of threads is less than nThreads when , Submitting a new task creates a new thread , When the number of threads equals nThreads when , After submitting a new task, the task will be added to the blocking queue , After the executing thread finishes executing, it fetches the task from the queue for execution ,FiexedThreadPool It is suitable for scenarios with slightly heavy load but not too many tasks , In order to make rational use of resources , Need to limit the number of threads ;
newSingleThreadExecutor() Create a single threaded Executor,SingleThreadExecutor It is suitable for serial execution of tasks , Each task is performed in sequence , No concurrent execution required ;
newScheduledThreadPool(int corePoolSize) Create a thread pool that supports scheduled and periodic task execution , In most cases it can be used instead Timer ( Timer ) class .ScheduledThreadPool in , Returned a ScheduledThreadPoolExecutor example , and ScheduledThreadPoolExecutor Actually inherited ThreadPoolExecutor. As you can see from the code ,ScheduledThreadPool be based on ThreadPoolExecutor,corePoolSize The size is the incoming corePoolSize,maximumPoolSize The size is Integer.MAX_VALUE, The timeout is 0,workQueue by DelayedWorkQueue. actually ScheduledThreadPool It's a scheduling pool , Its implementation schedule、scheduleAtFixedRate、scheduleWithFixedDelay Three methods , Delayed execution can be achieved 、 Cycle execution and other operations ;
newSingleThreadScheduledExecutor() Create a corePoolSize by 1 Of ScheduledThreadPoolExecutor;
newWorkStealingPool(int parallelism) Return to one ForkJoinPool example ,ForkJoinPool Mainly used to realize “ Divide and rule ” The algorithm of , Suitable for computing intensive tasks .
Executors ( Executor ) Class looks more powerful 、 It's more convenient to use , However, there are the following disadvantages :
FiexedThreadPool and SingleThreadPool The task queue length is Integer.MAX_VALUE, A large number of requests may pile up , Which leads to OOM;
CachedThreadPool and ScheduledThreadPool The number of threads allowed to be created is Integer.MAX_VALUE, A large number of threads may be created , Which leads to OOM;
When using threads , Can be called directly ThreadPoolExecutor To create a thread pool , And set it according to the actual business scenario corePoolSize、blockingQueue、RejectedExecuteHandler Equal parameter .
Avoid using local thread pools
When using a local thread pool , If the task is not executed after execution shutdown ( close ) () Methods or other improper references , It is very easy to cause system resource depletion .
Reasonably set thread pool parameters
In engineering practice , The following formula is usually used to calculate the number of core threads :
nThreads=(w+c)/c*n*u=(w/c+1)*n*u
among ,w For waiting time ,c To calculate the time ,n by CPU The core number ( Usually through Runtime.getRuntime().availableProcessors() Method to get ),u by CPU Target utilization ( The value range is [0, 1]); To maximize CPU In the case of utilization , When the task being processed is computationally intensive , Waiting time w by 0, At this time, the number of core threads is equal to CPU The core number .
The above calculation formula is the recommended number of core threads in the ideal case , And different systems / Applications may be different when running different tasks , Therefore, the optimal number of threads parameter also needs to be fine tuned according to the actual operation of the task and the pressure test performance .
Add exception handling
To better discover 、 Analyze and solve problems , It is recommended to increase exception handling when using multithreading , Exception handling usually has the following schemes :
Add... In the task code try...catch exception handling
If used Future The way , You can go through Future Object's get Method to receive the exception thrown
Set... For worker threads setUncaughtExceptionHandler, stay uncaughtException Method
Graceful shutdown of thread pool
public void destroy() {try {poolExecutor.shutdown();if (!poolExecutor.awaitTermination(AWAIT_TIMEOUT, TimeUnit.SECONDS)) {poolExecutor.shutdownNow();}} catch (InterruptedException e) {// If the current thread is interrupted , Cancel all tasks againpool.shutdownNow();// Keep interruptedThread.currentThread().interrupt();}}
To achieve the goal of elegant downtime , We should call shutdown Method , Calling this method also means , This thread pool will not receive any new tasks , But the tasks that have been submitted will continue to be performed . Then we should call awaitTermination Method , This method can set the maximum timeout of thread pool before shutdown , If the frontline process pool can be closed normally at the end of the timeout period, it will return true, otherwise , Timeout will return false. Usually, we need to estimate a reasonable timeout according to the business scenario , Then call the method .
If awaitTermination Method returns false, But I hope to do other resource recovery after the process pool is closed as much as possible , Think about calling again shutdownNow Method , At this time, all tasks in the queue that have not been processed will be discarded , At the same time, the interrupt flag bit of each thread in the thread pool will be set .shutdownNow There is no guarantee that the running thread will stop working , Unless the task submitted to the thread responds correctly to the interrupt .
Hawkeye context parameter passing
/*** In the main thread , Turn on eagle eye asynchronous mode , And will ctx Pass to multithreaded tasks**/// Prevent eagle eye link loss , Delivery requiredRpcContext_inner ctx = EagleEye.getRpcContext();// Turn on asynchronous modectx.setAsyncMode(true);/*** Thread pool task thread , Set eagle eye rpc Environmental Science**/private void runTask() {try {EagleEye.setRpcContext(ctx);// do something...} catch (Exception e) {log.error("requestError, params: {}", this.params, e);} finally {// Determine whether the current task is running on the main thread , When Rejected Strategies for CallerRunsPolicy When , Check the current threadif (mainThread != Thread.currentThread()) {EagleEye.clearRpcContext();}}}
ThreadLocal Thread variables overview
▐ What is? ThreadLocal
ThreadLocal Class provides thread local variables (thread-local variables), These variables are different from ordinary variables , Each thread that accesses thread local variables ( Through its get or set Method ) Each has its own independently initialized variable copy , therefore ThreadLocal There is no problem of multithreading competition , There's no need to lock separately .
▐ ThreadLocal Use scenarios
Each thread needs its own instance data ( Thread isolation );
Framework cross layer data transfer ;
Scenario of complex call link requiring global parameter transfer ;
Management of database connection , stay AOP Ensure the consistency of transactions in various nested calls ;
ThreadLocal The principle and practice of
about ThreadLocal for , Common methods are get/set/initialValue 3 A way .
as everyone knows , stay java in SimpleDateFormat There are thread safety issues , For safe use SimpleDateFormat, except 1) establish SimpleDateFormat local variable ; and 2) Add synchronization lock In addition to the two schemes , We can also use 3)ThreadLocal The plan :
/*** Use ThreadLocal Define a global SimpleDateFormat*/private static ThreadLocal<SimpleDateFormat> simpleDateFormatThreadLocal = newThreadLocal<SimpleDateFormat>() {@Overrideprotected SimpleDateFormat initialValue() {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");}};// usageString dateString = simpleDateFormatThreadLocal.get().format(calendar.getTime());
▐ ThreadLocal principle
Thread One was maintained internally ThreadLocal.ThreadLocalMap example (threadLocals),ThreadLocal All the operations are around threadLocals To operate the .
threadLocal.get() Method
/*** Returns the value in the current thread's copy of this* thread-local variable. If the variable has no value for the* current thread, it is first initialized to the value returned* by an invocation of the {@link #initialValue} method.** @return the current thread's value of this thread-local*/public T get() {// 1. Get the current threadThread t = Thread.currentThread();// 2. Gets inside the current thread ThreadLocalMap Variable t.threadLocals;ThreadLocalMap map = getMap(t);// 3. Judge map Is it nullif (map != null) {// 4. With the current threadLocal Variable acquisition entryThreadLocalMap.Entry e = map.getEntry(this);// 5. Judge entry Is it nullif (e != null) {// 6. return Entry.value@SuppressWarnings("unchecked")T result = (T)e.value;return result;}}// 7. If map/entry by null Set initial valuereturn setInitialValue();}/*** Variant of set() to establish initialValue. Used instead* of set() in case user has overridden the set() method.** @return the initial value*/private T setInitialValue() {// 1. initialization value, If rewritten, use the rewritten value, Default nullT value = initialValue();// 2. Get the current threadThread t = Thread.currentThread();// 3. Gets inside the current thread ThreadLocalMap VariableThreadLocalMap map = getMap(t);if (map != null)// 4. Not for null Just set, key: threadLocal, value: valuemap.set(this, value);else// 5. map if null Create ThreadLocalMap objectcreateMap(t, value);return value;}/*** Create the map associated with a ThreadLocal. Overridden in* InheritableThreadLocal.** @param t the current thread* @param firstValue value for the initial entry of the map*/void createMap(Thread t, T firstValue) {t.threadLocals = new ThreadLocalMap(this, firstValue);}/*** Construct a new map initially containing (firstKey, firstValue).* ThreadLocalMaps are constructed lazily, so we only create* one when we have at least one entry to put in it.*/ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {// 1. initialization entry Array ,size: 16table = new Entry[INITIAL_CAPACITY];// 2. Calculation value Of indexint i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);// 3. In the corresponding index Location assignmenttable[i] = new Entry(firstKey, firstValue);// 4. entry sizesize = 1;// 5. Set up threshold: threshold = len * 2 / 3;setThreshold(INITIAL_CAPACITY);}/*** Set the resize threshold to maintain at worst a 2/3 load factor.*/private void setThreshold(int len) {threshold = len * 2 / 3;}
threadLocal.set() Method
/*** Sets the current thread's copy of this thread-local variable* to the specified value. Most subclasses will have no need to* override this method, relying solely on the {@link #initialValue}* method to set the values of thread-locals.** @param value the value to be stored in the current thread's copy of* this thread-local.*/public void set(T value) {// 1. Get the current threadThread t = Thread.currentThread();// 2. Gets inside the current thread ThreadLocalMap VariableThreadLocalMap map = getMap(t);if (map != null)// 3. Set up valuemap.set(this, value);else// 4. if map by null Create ThreadLocalMapcreateMap(t, value);}
ThreadLocalMap
from JDK Source code visible ,ThreadLocalMap Medium Entry It's a weak reference type , That means if this ThreadLocal Only by this Entry quote , When not strongly referenced by other objects , It will be the next time GC Recycle it when you need it .
static class ThreadLocalMap {/*** The entries in this hash map extend WeakReference, using* its main ref field as the key (which is always a* ThreadLocal object). Note that null keys (i.e. entry.get()* == null) mean that the key is no longer referenced, so the* entry can be expunged from table. Such entries are referred to* as "stale entries" in the code that follows.*/static class Entry extends WeakReference<ThreadLocal<?>> {/** The value associated with this ThreadLocal. */Object value;Entry(ThreadLocal<?> k, Object v) {super(k);value = v;}}// ...}
▐ ThreadLocal Example
Eagle eye link ThreadLocal Use
EagleEye( Eagle eye ) As a full link monitoring system, it is widely used within the group ,traceId、rpcId、 Information such as pressure gauge is stored in EagleEye Of ThreadLocal variable , And in HSF/Dubbo Transfer between service calls .EagleEye adopt Filter Initialize data to ThreadLocal in , Some relevant codes are as follows :
EagleEyeHttpRequest eagleEyeHttpRequest = this.convertHttpRequest(httpRequest);// 1. initialization , take traceId、rpcId Wait for data to be stored in eagle eye ThreadLocal variableEagleEyeRequestTracer.startTrace(eagleEyeHttpRequest, false);try {chain.doFilter(httpRequest, httpResponse);} finally {// 2. clear ThreadLocal A variable's valueEagleEyeRequestTracer.endTrace(this.convertHttpResponse(httpResponse));}
stay EagleEyeFilter in , adopt EagleEyeRequestTracer.startTrace Method to initialize , After the pre input parameter conversion , adopt startTrace Overloaded methods store eagle eye context parameters in ThreadLocal in , The relevant code is as follows :


EagleEyeFilter stay finally Block of code , adopt EagleEyeRequestTracer.endTrace Method ends the call chain , adopt clear Methods will ThreadLocal Clean up the data in , The relevant code implementation is as follows :

Bad case:XX Failed to receive project rights and interests
In the original link of a claim , adopt app The claim can be initiated only after the first level page is opened , The request goes through Amoy wireless gateway (Mtop) And then arrive at the server , Server through mtop sdk Get current session information .
stay XX In the project , The rights and interests receiving link has been upgraded , When a first level page is requested , Simultaneously initiate the claim request through the server . In particular , When the server handles the first level page request , At the same time, by calling hsf/dubbo Interface to collect rights and interests , So I'm initiating rpc When calling, you need to carry the current session information of the user , At the service provider, the session information is extracted and injected into mtop Context , In order to pass mtop sdk Get session id Etc . When a development student realizes , because ThreadLocal Improper use causes the following problems :
problem 1: because ThreadLocal Improper initialization timing , The result is that the session information cannot be obtained , Which leads to the failure of rights and interests ;
problem 2: When the request is completed , Because it is not cleaned ThreadLocal The value of the variable in , Cause dirty data ;
【 problem 1: Failure analysis of claim 】
In entitlement Services , This application constructs an efficient and thread safe dependency injection framework , The business logic module based on this framework is usually abstracted as xxxModule form ,Module There is a network of dependencies between , The framework will automatically call... According to the dependency init Method ( among , Dependent on module Of init Method first ).
In the application , The main entrance of the benefit collection interface is CommonXXApplyModule class ,CommonXXApplyModule rely on XXSessionModule. When the request comes , Will be called in order of dependency init Method , therefore XXSessionModule Of init Methods take precedence ; And the development students are CommonXXApplyModule Class init Method by calling recoverMtopContext() Methods to expect recovery mtop Context , because recoverMtopContext() Method was called too late , Which leads to XXSessionModule The module cannot get the correct session id And other information, resulting in the failure of claim .

【 problem 2: Dirty data analysis 】
Benefit collection services when processing requests , If the current thread has ever processed the claim request , because ThreadLocal The variable value was not cleaned up , here XXSessionModule adopt mtop SDK When you get the session information, you get the session information of the previous request , This causes dirty data .
【 Solution 】
At the entrance of the dependency injection framework AbstractGate#visit( Or in the XXSessionModule in ) adopt recoverMtopContext Methods to inject mtop Context information , And in the entrance method finally The code block cleans up the currently requested threadlocal A variable's value .
▐ reflection & Summary
ThreadLocalMap Medium Entry Why should it be designed as a weak reference type ?
If strong reference type is used , be threadlocal The reference chain of is :Thread -> ThreadLocal.ThreadLocalMap -> Entry[] -> Entry -> key(threadLocal object ) and value; In this case , As long as this thread is still running ( Such as thread pool scenario ), If you don't call remove Method , Then the object and all associated strongly referenced objects will not be recycled by the garbage collector .
Use static And don't use static modification threadlocal Variables are different from ?
If you use static Keyword modification , Then a thread only corresponds to one thread variable ; otherwise ,threadlocal Semantic change perThread-perInstance, Easy to cause memory leaks , As the following example :
public class ThreadLocalTest {public static class ThreadLocalDemo {private ThreadLocal<String> threadLocalHolder = new ThreadLocal();public void setValue(String value) {threadLocalHolder.set(value);}public String getValue() {return threadLocalHolder.get();}}public static void main(String[] args) {int count = 3;List<ThreadLocalDemo> list = new LinkedList<>();for (int i = 0; i < count; i++) {ThreadLocalDemo demo = new ThreadLocalDemo();demo.setValue("demo-" + i);list.add(demo);}System.out.println();}}
In the above main Method No 22 That's ok debug, Visible thread threadLocals The variables have 3 individual threadlocal example . In engineering practice , Use threadlocal It is usually expected that a thread has only one threadlocal example , therefore , If not used static modification , The semantics of expectations have changed , At the same time, it is easy to cause memory leakage .

▐ Best practices
ThreadLocal Initialization and cleanup of variables in pairs are recommended
If the cleanup operation is not performed , There may be :
Memory leak : because ThreadLocalMap In the middle key Is a weak reference , and Value Is a strong quote . This leads to a problem ,ThreadLocal When there is no strong reference to an external object , happen GC Weak quotation Key Will be recycled , and Value It won't be recycled , thus Entry The elements inside appear <null,value> The situation of . If you create ThreadLocal The thread of has been running , So this Entry Object value It's possible that they won't be recycled , This may lead to memory leaks .
Dirty data : Due to thread reuse , In user 1 When asked , Business data may be saved in ThreadLocal in , If not cleaned up , Then the user 2 When your request comes in , May read the user 1 The data of .
It is recommended to use try...finally Clean up .
ThreadLocal Variables are recommended to use static To embellish
We are using ThreadLocal when , The expected semantics are perThread, If not used static To embellish , Then the semantics becomes perThread-perInstance; In the process pool scenario , If not static To embellish , Thread related instances created may reach M * N individual ( among M Number of threads ,N Is the number of instances of the corresponding class ), Easy to cause memory leakage (https://errorprone.info/bugpattern/ThreadLocalUsage).
Use caution ThreadLocal.withInitial
In the application , Use caution ThreadLocal.withInitial(Supplier<? extends S> supplier) This factory method creates ThreadLocal object , Once different threads ThreadLocal Used the same Supplier object , Then isolation is out of the question , Such as :
// Counter example , In fact, shared objects are used obj Without isolation ,private static ThreadLocal<Obj> threadLocal = ThreadLocal.withIntitial(() -> obj);
summary
stay java In engineering practice , Thread pools and thread variables are widely used , Improper use of thread pool and thread variables often leads to safety production accidents , therefore , Using thread pool and thread variables correctly is the basic skill that every developer must practice . This paper starts from the use of thread pool and thread variables , This paper briefly introduces the principle and application practice of thread pool and thread variable , Developers can combine best practices and practical application scenarios , Use threads and thread variables correctly , Build a stable 、 efficient java Application service .
边栏推荐
- Go failing - expected ‘package‘, found ‘EOF‘
- Quartz: an old and robust open source task scheduling framework, which is smooth enough to use
- Management system of College Students' associations based on SSM
- Build local source 2.0 for Galaxy Kirin server version
- Lianyirong (passed)
- ImportError: DLL load failed while importing _ Sqlite3: the specified module could not be found
- Sentence s, paragraph P in VIM text object
- After the college entrance examination, what can I do and how should I choose my major-- From the heart of a college student
- unforgettable moments
- 雷达辐射源调制信号仿真
猜你喜欢

雷达辐射源调制信号仿真

Shopping and retail backstage management system of chain supermarket based on SSM framework

OpenGL Chapter 11 multiple light sources

Detailed explanation of network time synchronization (NTP network timing) of video monitoring system

Nsthread of the multithreaded Trilogy

Docker builds a redis Cluster - three machines, three masters and three slaves

Quartz: an old and robust open source task scheduling framework, which is smooth enough to use
![[elt.zip] openharmony paper Club - multi tier storage hierarchical data compression](/img/28/ec83b2ebb1f0772acdec443525c26d.png)
[elt.zip] openharmony paper Club - multi tier storage hierarchical data compression

Google 有哪些牛逼的开源项目?

Guide de migration Maui
随机推荐
Maui migration guide
Exam item management system based on SSM
Optimize your code execution efficiency with completabilefuture
什么样的人才是幸福的?
Shopping and retail backstage management system of chain supermarket based on SSM framework
Radar emitter modulation signal simulation (code)
Build local source 2.0 for Galaxy Kirin server version
7. 列表标签
Web upload file Preview
Record a VMware problem
Large factory outsourcing or self research company? How to choose a job for a tester?
检测php网站是否已经被攻破的方法
Samsung Galaxy S21 ultra and Apple iPhone 13 Pro Max: which one should you choose
Notes on redisson distributed lock usage
Go failing - expected ‘package‘, found ‘EOF‘
[signalr complete series] Net6 Zhongshi signalr communication
Discussion on the development trend of remote power management unit (Intelligent PDU)
[cnn]| translation invariance
Programming battle -- challenging college entrance examination questions
Student online education and teaching course management system based on SSM framework