当前位置:网站首页>For a while, a dynamic thread pool was created, and the source code was put into GitHub
For a while, a dynamic thread pool was created, and the source code was put into GitHub
2020-11-06 01:33:00 【Yin Jihuan】
Explain the background
Thread pool is used a lot in daily work , When you need asynchrony , When batch processing some tasks, we will define a thread pool to process .
There are some problems in using thread pools , The following is a brief introduction to some problems encountered before .
Scene one : Realize some functions of batch data processing , At the beginning, the number of core threads in the thread pool is set to be small , And then I want to adjust , You can only restart the application after changing it .
Scene two : There's an application for task processing , Will receive MQ The message is processed by the task , The thread pool queue also allows a certain number of tasks to be cached . When tasks are slow , It's not very convenient to see how many have not been disposed of . At that time, in order to be quick and convenient , It starts a thread directly to print the size of the thread pool queue .
Just before I had a thread pool application in the official account of the US (https://mp.weixin.qq.com/s/tIWAocevZThfbrfWoJGa9w), I think their ideas are very good , There is no open source , So I took the time to work on my open source project Kitty A dynamic thread pool component has been added in , Support Cat monitor , Dynamically change the core parameters , Task accumulation alarm, etc . Today, I'd like to share with you how to realize it .
Project source code address :https://github.com/yinjihuan/kitty
Usage mode
Add dependency
Components that depend on thread pools , at present Kitty Unpublished , You need to download the source code yourself install Local or private warehouse .
<dependency>
<groupId>com.cxytiandi</groupId>
<artifactId>kitty-spring-cloud-starter-dynamic-thread-pool</artifactId>
</dependency>
Add the configuration
And then in Nacos Configure thread pool information , This one of mine integrates Nacos. It is recommended that an application create a separate thread pool configuration file , For example, our name is dataId by kitty-cloud-thread-pool.properties,group by BIZ_GROUP.
The contents are as follows :
kitty.threadpools.nacosDataId=kitty-cloud-thread-pool.properties
kitty.threadpools.nacosGroup=BIZ_GROUP
kitty.threadpools.accessToken=ae6eb1e9e6964d686d2f2e8127d0ce5b31097ba23deee6e4f833bc0a77d5b71d
kitty.threadpools.secret=SEC6ec6e31d1aa1bdb2f7fd5eb5934504ce09b65f6bdc398d00ba73a9857372de00
kitty.threadpools.owner= Yin Jihuan
kitty.threadpools.executors[0].threadPoolName=TestThreadPoolExecutor
kitty.threadpools.executors[0].corePoolSize=4
kitty.threadpools.executors[0].maximumPoolSize=4
kitty.threadpools.executors[0].queueCapacity=5
kitty.threadpools.executors[0].queueCapacityThreshold=5
kitty.threadpools.executors[1].threadPoolName=TestThreadPoolExecutor2
kitty.threadpools.executors[1].corePoolSize=2
kitty.threadpools.executors[1].maximumPoolSize=4
nacosDataId,nacosGroup
When the monitoring configuration is modified, you need to know which one to listen to DataId, Value is the currently configured DataId.
accessToken,secret
Verification information of the nailing robot , For warning .
owner
The person in charge of this app , The warning message will show .
threadPoolName
The name of the thread pool , You need to pay attention to .
The rest of the configuration will not be introduced one by one , Consistent with the parameters inside the thread pool , There are also some can see the source code to know .
Injection use
@Autowired
private DynamicThreadPoolManager dynamicThreadPoolManager;
dynamicThreadPoolManager.getThreadPoolExecutor("TestThreadPoolExecutor").execute(() -> {
log.info(" The use of thread pools ");
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "getArticle");
adopt DynamicThreadPoolManager Of getThreadPoolExecutor Method to get the thread pool object , Then incoming Runnable,Callable etc. . The second parameter is the name of the task , The reason to extend a parameter is that if the task has no identity , So there's no way to distinguish between tasks .
The thread pool component integrates by default Cat Dot , The name can be set in Cat Check the monitoring data related to this task .
Extend the functionality
Task execution monitoring
stay Cat Of Transaction The report will display the name of the thread pool as the type .
The details will be displayed with the name of the task .
Dynamic modification of core parameters
Core parameters currently only support corePoolSize,maximumPoolSize,queueCapacity( The queue type is LinkedBlockingDeque To modify ),rejectedExecutionType,keepAliveTime,unit Modification of these parameters .
commonly corePoolSize,maximumPoolSize,queueCapacity Is the most dynamic change .
If you need to change it directly in Nacos Modify the corresponding configuration value , The client will listen for configuration changes , Then synchronously modify the parameters of the first thread pool .
Queue capacity alarm
queueCapacityThreshold Is the threshold of queue capacity alarm , If the number of tasks in the queue exceeds queueCapacityThreshold It will alert .
Number of rejection alarms
When the queue is full , The new task will select the corresponding processing method according to the rejection policy set by the user . If you use AbortPolicy Strategy , There will also be an alarm . It's equivalent to that consumers are already overloaded .
Thread pool operation
The bottom docking Cat, So the running data of the thread is reported to Cat. We can do it in Cat View this information in .
If you want to show on your own platform , I've exposed /actuator/thread-pool Endpoint , You can pull the data yourself .
{
threadPools: [{
threadPoolName: "TestThreadPoolExecutor",
activeCount: 0,
keepAliveTime: 0,
largestPoolSize: 4,
fair: false,
queueCapacity: 5,
queueCapacityThreshold: 2,
rejectCount: 0,
waitTaskCount: 0,
taskCount: 5,
unit: "MILLISECONDS",
rejectedExecutionType: "AbortPolicy",
corePoolSize: 4,
queueType: "LinkedBlockingQueue",
completedTaskCount: 5,
maximumPoolSize: 4
}, {
threadPoolName: "TestThreadPoolExecutor2",
activeCount: 0,
keepAliveTime: 0,
largestPoolSize: 0,
fair: false,
queueCapacity: 2147483647,
queueCapacityThreshold: 2147483647,
rejectCount: 0,
waitTaskCount: 0,
taskCount: 0,
unit: "MILLISECONDS",
rejectedExecutionType: "AbortPolicy",
corePoolSize: 2,
queueType: "LinkedBlockingQueue",
completedTaskCount: 0,
maximumPoolSize: 4
}]
}
Custom reject policy
Usually we use code to create thread pool, and we can customize the rejection policy , When the thread pool object is constructed, it can be passed in . Here, the thread pool creation is encapsulated , We can only Nacos Configure the name of the deny policy to use the corresponding policy . By default, it can be configured JDK Self contained CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy These four .
If you want to customize it, it is also supported , The definition is the same as before , as follows :
@Slf4j
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info(" Came in .........");
}
}
To make this strategy work, use SPI The way , Need to be in resources So let's create one META-INF Folder , And then create a services Folder , Create another java.util.concurrent.RejectedExecutionHandler The file of , The content is the full path of your defined class .
Custom alarm mode
The default is the alarm mode integrated with the nail robot , You can turn it off if you don't want to use it . Or send the alarm information to your monitoring platform .
If there is no alarm platform, new alarm mode can be realized in the project , For example, SMS, etc .
Just implement ThreadPoolAlarmNotify This class can .
/**
* Custom SMS alarm notification
*
* @ author Yin Jihuan
* @ Personal wechat jihuan900
* @ WeChat official account Ape world
* @GitHub https://github.com/yinjihuan
* @ The authors introduce http://cxytiandi.com/about
* @ Time 2020-05-27 22:26
*/
@Slf4j
@Component
public class ThreadPoolSmsAlarmNotify implements ThreadPoolAlarmNotify {
@Override
public void alarmNotify(AlarmMessage alarmMessage) {
log.info(alarmMessage.toString());
}
}
Code implementation
I don't want to talk about it in detail , The source code in https://github.com/yinjihuan/kitty/tree/master/kitty-dynamic-thread-pool, Let's see for ourselves , Is not complicated .
Creating a thread pool
Create thread pool according to configuration ,ThreadPoolExecutor Is custom , Because it needs to be done Cat Buried point .
/**
* Creating a thread pool
* @param threadPoolProperties
*/
private void createThreadPoolExecutor(DynamicThreadPoolProperties threadPoolProperties) {
threadPoolProperties.getExecutors().forEach(executor -> {
KittyThreadPoolExecutor threadPoolExecutor = new KittyThreadPoolExecutor(
executor.getCorePoolSize(),
executor.getMaximumPoolSize(),
executor.getKeepAliveTime(),
executor.getUnit(),
getBlockingQueue(executor.getQueueType(), executor.getQueueCapacity(), executor.isFair()),
new KittyThreadFactory(executor.getThreadPoolName()),
getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName()), executor.getThreadPoolName());
threadPoolExecutorMap.put(executor.getThreadPoolName(), threadPoolExecutor);
});
}
Refresh thread pool
First of all, you need to monitor Nacos Modification of .
/**
* Monitoring configuration modification ,spring-cloud-alibaba 2.1.0 Version not supported @NacosConfigListener Listening in
*/
public void initConfigUpdateListener(DynamicThreadPoolProperties dynamicThreadPoolProperties) {
ConfigService configService = nacosConfigProperties.configServiceInstance();
try {
configService.addListener(dynamicThreadPoolProperties.getNacosDataId(), dynamicThreadPoolProperties.getNacosGroup(), new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
new Thread(() -> refreshThreadPoolExecutor()).start();
log.info(" Thread pool configuration changed , Refresh complete ");
}
});
} catch (NacosException e) {
log.error("Nacos Configure listening exception ", e);
}
}
Then refresh the parameter information of thread pool , Because when the listening event is triggered , The configuration has not been refreshed at this time , So I waited 1 Second , Let the configuration refresh, and then take values directly from the configuration class .
It works even if it's a little bit frustrating , In fact, a better way is to parse receiveConfigInfo that configInfo,configInfo It is the whole configuration content after the change . Because it's not easy to parse into a property file , Don't do , I'll change it later .
/**
* Refresh thread pool
*/
private void refreshThreadPoolExecutor() {
try {
// Wait for the configuration refresh to complete
Thread.sleep(1000);
} catch (InterruptedException e) {
}
dynamicThreadPoolProperties.getExecutors().forEach(executor -> {
ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(executor.getThreadPoolName());
threadPoolExecutor.setCorePoolSize(executor.getCorePoolSize());
threadPoolExecutor.setMaximumPoolSize(executor.getMaximumPoolSize());
threadPoolExecutor.setKeepAliveTime(executor.getKeepAliveTime(), executor.getUnit());
threadPoolExecutor.setRejectedExecutionHandler(getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName()));
BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
if (queue instanceof ResizableCapacityLinkedBlockIngQueue) {
((ResizableCapacityLinkedBlockIngQueue<Runnable>) queue).setCapacity(executor.getQueueCapacity());
}
});
}
Other refreshes come with the thread pool , It should be noted that the thread pool queue size refresh , Currently only supported LinkedBlockingQueue queue , because LinkedBlockingQueue The size of is not allowed to be modified , So according to the idea provided by meituan's article , Customized a queue that can be modified , It's just a way of LinkedBlockingQueue A copy of the code for , You can change it .
Go to Cat Report operation information
Go to Cat Of Heartbeat Report upload data code is as follows , Mainly still Cat It provides the ability to expand . Just call the following method to report data regularly .
public void registerStatusExtension(ThreadPoolProperties prop, KittyThreadPoolExecutor executor) {
StatusExtensionRegister.getInstance().register(new StatusExtension() {
@Override
public String getId() {
return "thread.pool.info." + prop.getThreadPoolName();
}
@Override
public String getDescription() {
return " Thread pool monitoring ";
}
@Override
public Map<String, String> getProperties() {
AtomicLong rejectCount = getRejectCount(prop.getThreadPoolName());
Map<String, String> pool = new HashMap<>();
pool.put("activeCount", String.valueOf(executor.getActiveCount()));
pool.put("completedTaskCount", String.valueOf(executor.getCompletedTaskCount()));
pool.put("largestPoolSize", String.valueOf(executor.getLargestPoolSize()));
pool.put("taskCount", String.valueOf(executor.getTaskCount()));
pool.put("rejectCount", String.valueOf(rejectCount == null ? 0 : rejectCount.get()));
pool.put("waitTaskCount", String.valueOf(executor.getQueue().size()));
return pool;
}
});
}
Define thread pool endpoints
The configuration and operation of thread pool can be exposed through custom endpoint , Can let the external monitoring system pull data to do the corresponding processing .
@Endpoint(id = "thread-pool")
public class ThreadPoolEndpoint {
@Autowired
private DynamicThreadPoolManager dynamicThreadPoolManager;
@Autowired
private DynamicThreadPoolProperties dynamicThreadPoolProperties;
@ReadOperation
public Map<String, Object> threadPools() {
Map<String, Object> data = new HashMap<>();
List<Map> threadPools = new ArrayList<>();
dynamicThreadPoolProperties.getExecutors().forEach(prop -> {
KittyThreadPoolExecutor executor = dynamicThreadPoolManager.getThreadPoolExecutor(prop.getThreadPoolName());
AtomicLong rejectCount = dynamicThreadPoolManager.getRejectCount(prop.getThreadPoolName());
Map<String, Object> pool = new HashMap<>();
Map config = JSONObject.parseObject(JSONObject.toJSONString(prop), Map.class);
pool.putAll(config);
pool.put("activeCount", executor.getActiveCount());
pool.put("completedTaskCount", executor.getCompletedTaskCount());
pool.put("largestPoolSize", executor.getLargestPoolSize());
pool.put("taskCount", executor.getTaskCount());
pool.put("rejectCount", rejectCount == null ? 0 : rejectCount.get());
pool.put("waitTaskCount", executor.getQueue().size());
threadPools.add(pool);
});
data.put("threadPools", threadPools);
return data;
}
}
Cat Monitor the execution time of threads in the thread pool
It was supposed to put surveillance on KittyThreadPoolExecutor Of execute,submit In the method . After the test, we found that there was a problem , The data is in Cat It does have , But the execution time is 1 millisecond , It doesn't work .
I don't think we all know , Because threads are executed separately later , So it's meaningless to bury a point where you add a task .
I still think of a way to realize the function of buried point , Is to use the thread pool to provide beforeExecute and afterExecute Two methods , Both methods are triggered before and after thread execution .
@Override
protected void beforeExecute(Thread t, Runnable r) {
String threadName = Thread.currentThread().getName();
Transaction transaction = Cat.newTransaction(threadPoolName, runnableNameMap.get(r.getClass().getSimpleName()));
transactionMap.put(threadName, transaction);
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
String threadName = Thread.currentThread().getName();
Transaction transaction = transactionMap.get(threadName);
transaction.setStatus(Message.SUCCESS);
if (t != null) {
Cat.logError(t);
transaction.setStatus(t);
}
transaction.complete();
transactionMap.remove(threadName);
}
After the code we go to see on their own , This is the end of the article . If you feel that this article is good, please forward it !
Thank you very much. .
Finally, I would like to thank the technical team of meituan for the article , Although not sharing the source code , But the ideas and application scenarios are very clear .
Interested in Star Let's go :https://github.com/yinjihuan/kitty
About author : Yin Jihuan , Simple technology enthusiasts ,《Spring Cloud Microservices - Full stack technology and case analysis 》, 《Spring Cloud Microservices introduction Actual combat and advanced 》 author , official account Ape world Originator . Personal wechat jihuan900, Welcome to hook up with .
I have compiled a complete set of learning materials , Those who are interested can search through wechat 「 Ape world 」, Reply key 「 Learning materials 」 Get what I've sorted out Spring Cloud,Spring Cloud Alibaba,Sharding-JDBC Sub database and sub table , Task scheduling framework XXL-JOB,MongoDB, Reptiles and other related information .
版权声明
本文为[Yin Jihuan]所创,转载请带上原文链接,感谢
边栏推荐
- Deep understanding of common methods of JS array
- 一篇文章教会你使用Python网络爬虫下载酷狗音乐
- ES6 essence:
- 6.6.1 localeresolver internationalization parser (1) (in-depth analysis of SSM and project practice)
- Save the file directly to Google drive and download it back ten times faster
- A brief history of neural networks
- PN8162 20W PD快充芯片,PD快充充电器方案
- Arrangement of basic knowledge points
- Python Jieba segmentation (stuttering segmentation), extracting words, loading words, modifying word frequency, defining thesaurus
- 仅用六种字符来完成Hello World,你能做到吗?
猜你喜欢
I've been rejected by the product manager. Why don't you know
vue任意关系组件通信与跨组件监听状态 vue-communication
Word segmentation, naming subject recognition, part of speech and grammatical analysis in natural language processing
一篇文章教会你使用HTML5 SVG 标签
With the advent of tensorflow 2.0, can pytoch still shake the status of big brother?
[C / C + + 1] clion configuration and running C language
华为云“四个可靠”的方法论
Building and visualizing decision tree with Python
MeterSphere开发者手册
[JMeter] two ways to realize interface Association: regular representation extractor and JSON extractor
随机推荐
01. SSH Remote terminal and websocket of go language
ES6学习笔记(五):轻松了解ES6的内置扩展对象
What is the difference between data scientists and machine learning engineers? - kdnuggets
vue-codemirror基本用法:实现搜索功能、代码折叠功能、获取编辑器值及时验证
It's so embarrassing, fans broke ten thousand, used for a year!
Cglib 如何实现多重代理?
Word segmentation, naming subject recognition, part of speech and grammatical analysis in natural language processing
小程序入门到精通(二):了解小程序开发4个重要文件
Our best practices for writing react components
Relationship between business policies, business rules, business processes and business master data - modern analysis
Arrangement of basic knowledge points
Custom function form of pychar shortcut key
一篇文章带你了解CSS 分页实例
The data of pandas was scrambled and the training machine and testing machine set were selected
vue任意关系组件通信与跨组件监听状态 vue-communication
每个前端工程师都应该懂的前端性能优化总结:
Flink的DataSource三部曲之一:直接API
Analysis of query intention recognition
中小微企业选择共享办公室怎么样?
Windows 10 tensorflow (2) regression analysis of principles, deep learning framework (gradient descent method to solve regression parameters)