当前位置:网站首页>一时技痒,撸了个动态线程池,源码放Github了
一时技痒,撸了个动态线程池,源码放Github了
2020-11-06 01:33:00 【尹吉欢】
阐述背景
线程池在日常工作中用的还挺多,当需要异步,批量处理一些任务的时候我们会定义一个线程池来处理。
在使用线程池的过程中有一些问题,下面简单介绍下之前遇到的一些问题。
场景一:实现一些批量处理数据的功能,刚开始线程池的核心线程数设的比较小,然后想调整下,只能改完后重启应用。
场景二:有一个任务处理的应用,会接收 MQ 的消息进行任务的处理,线程池的队列也允许缓存一定数量的任务。当任务处理的很慢的时候,想看看到底有多少没有处理完不是很方便。当时为了快速方便,就直接启动了一个线程去循环打印线程池队列的大小。
正好之前在我公众号有转发过美团的一篇线程池应用的文章(https://mp.weixin.qq.com/s/tIWAocevZThfbrfWoJGa9w),觉得他们的思路非常好,就是没有开放源码,所以自己就抽时间在我的开源项目 Kitty 中增加了一个动态线程池的组件,支持了 Cat 监控,动态变更核心参数,任务堆积告警等。今天就给大家分享一下实现的方式。
项目源代码地址:https://github.com/yinjihuan/kitty
使用方式
添加依赖
依赖线程池的组件,目前 Kitty 未发布,需要自己下载源码 install 本地或者私有仓库。
<dependency>
<groupId>com.cxytiandi</groupId>
<artifactId>kitty-spring-cloud-starter-dynamic-thread-pool</artifactId>
</dependency>
添加配置
然后在 Nacos 配置线程池的信息,我的这个整合了 Nacos。推荐一个应用创建一个单独的线程池配置文件,比如我们这个叫 dataId 为 kitty-cloud-thread-pool.properties,group 为 BIZ_GROUP。
内容如下:
kitty.threadpools.nacosDataId=kitty-cloud-thread-pool.properties
kitty.threadpools.nacosGroup=BIZ_GROUP
kitty.threadpools.accessToken=ae6eb1e9e6964d686d2f2e8127d0ce5b31097ba23deee6e4f833bc0a77d5b71d
kitty.threadpools.secret=SEC6ec6e31d1aa1bdb2f7fd5eb5934504ce09b65f6bdc398d00ba73a9857372de00
kitty.threadpools.owner=尹吉欢
kitty.threadpools.executors[0].threadPoolName=TestThreadPoolExecutor
kitty.threadpools.executors[0].corePoolSize=4
kitty.threadpools.executors[0].maximumPoolSize=4
kitty.threadpools.executors[0].queueCapacity=5
kitty.threadpools.executors[0].queueCapacityThreshold=5
kitty.threadpools.executors[1].threadPoolName=TestThreadPoolExecutor2
kitty.threadpools.executors[1].corePoolSize=2
kitty.threadpools.executors[1].maximumPoolSize=4
nacosDataId,nacosGroup
监听配置修改的时候需要知道监听哪个 DataId,值就是当前配置的 DataId。
accessToken,secret
钉钉机器人的验证信息,用于告警。
owner
这个应用的负责人,告警的消息中会显示。
threadPoolName
线程池的名称,使用的时候需要关注。
剩下的配置就不一一介绍了,跟线程池内部的参数一致,还有一些可以查看源码得知。
注入使用
@Autowired
private DynamicThreadPoolManager dynamicThreadPoolManager;
dynamicThreadPoolManager.getThreadPoolExecutor("TestThreadPoolExecutor").execute(() -> {
log.info("线程池的使用");
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "getArticle");
通过 DynamicThreadPoolManager 的 getThreadPoolExecutor 方法获取线程池对象,然后传入 Runnable,Callable 等。第二个参数是这个任务的名称,之所以要扩展一个参数是因为如果任务没有标识,那么无法区分任务。
这个线程池组件默认集成了 Cat 打点,设置了名称可以在 Cat 上查看这个任务相关的监控数据。
扩展功能
任务执行情况监控
在 Cat 的 Transaction 报表中会以线程池的名称为类型显示。
详情中会以任务的名称显示。
核心参数动态修改
核心参数目前只支持 corePoolSize,maximumPoolSize,queueCapacity(队列类型为 LinkedBlockingDeque 才可以修改),rejectedExecutionType,keepAliveTime,unit 这些参数的修改。
一般 corePoolSize,maximumPoolSize,queueCapacity 是最常要动态改变的。
需要改动的话直接在 Nacos 中将对应的配置值修改即可,客户端会监听配置的修改,然后同步修改先线程池的参数。
队列容量告警
queueCapacityThreshold 是队列容量告警的阀值,如果队列中的任务数量超过了 queueCapacityThreshold 就会告警。
拒绝次数告警
当队列容量满了后,新进来的任务会根据用户设置的拒绝策略去选择对应的处理方式。如果是采用 AbortPolicy 策略,也会进行告警。相当于消费者已经超负荷了。
线程池运行情况
底层对接了 Cat,所以将线程的运行数据上报给了 Cat。我们可以在 Cat 中查看这些信息。
如果你想在自己的平台去展示,我这边暴露了/actuator/thread-pool 端点,你可以自行拉取数据。
{
threadPools: [{
threadPoolName: "TestThreadPoolExecutor",
activeCount: 0,
keepAliveTime: 0,
largestPoolSize: 4,
fair: false,
queueCapacity: 5,
queueCapacityThreshold: 2,
rejectCount: 0,
waitTaskCount: 0,
taskCount: 5,
unit: "MILLISECONDS",
rejectedExecutionType: "AbortPolicy",
corePoolSize: 4,
queueType: "LinkedBlockingQueue",
completedTaskCount: 5,
maximumPoolSize: 4
}, {
threadPoolName: "TestThreadPoolExecutor2",
activeCount: 0,
keepAliveTime: 0,
largestPoolSize: 0,
fair: false,
queueCapacity: 2147483647,
queueCapacityThreshold: 2147483647,
rejectCount: 0,
waitTaskCount: 0,
taskCount: 0,
unit: "MILLISECONDS",
rejectedExecutionType: "AbortPolicy",
corePoolSize: 2,
queueType: "LinkedBlockingQueue",
completedTaskCount: 0,
maximumPoolSize: 4
}]
}
自定义拒绝策略
平时我们使用代码创建线程池可以自定义拒绝策略,在构造线程池对象的时候传入即可。这里由于创建线程池都被封装好了,我们只能在 Nacos 配置拒绝策略的名称来使用对应的策略。默认是可以配置 JDK 自带的 CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy 这四种。
如果你想自定义的话也是支持的,定义方式跟以前一样,如下:
@Slf4j
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info("进来了。。。。。。。。。");
}
}
要让这个策略生效的话使用的是 SPI 的方式,需要在 resources 下面创建一个 META-INF 的文件夹,然后创建一个 services 的文件夹,再创建一个 java.util.concurrent.RejectedExecutionHandler 的文件,内容为你定义的类全路径。
自定义告警方式
默认是内部集成了钉钉机器人的告警方式,如果你不想用也可以将其关闭。或者将告警信息对接到你的监控平台去。
如果没有告警平台也可以在项目中实现新的告警方式,比如短信等。
只需要实现 ThreadPoolAlarmNotify 这个类即可。
/**
* 自定义短信告警通知
*
* @作者 尹吉欢
* @个人微信 jihuan900
* @微信公众号 猿天地
* @GitHub https://github.com/yinjihuan
* @作者介绍 http://cxytiandi.com/about
* @时间 2020-05-27 22:26
*/
@Slf4j
@Component
public class ThreadPoolSmsAlarmNotify implements ThreadPoolAlarmNotify {
@Override
public void alarmNotify(AlarmMessage alarmMessage) {
log.info(alarmMessage.toString());
}
}
代码实现
具体的就不讲的很细了,源码在https://github.com/yinjihuan/kitty/tree/master/kitty-dynamic-thread-pool,大家自己去看,并不复杂。
创建线程池
根据配置创建线程池,ThreadPoolExecutor 是自定义的,因为需要做 Cat 埋点。
/**
* 创建线程池
* @param threadPoolProperties
*/
private void createThreadPoolExecutor(DynamicThreadPoolProperties threadPoolProperties) {
threadPoolProperties.getExecutors().forEach(executor -> {
KittyThreadPoolExecutor threadPoolExecutor = new KittyThreadPoolExecutor(
executor.getCorePoolSize(),
executor.getMaximumPoolSize(),
executor.getKeepAliveTime(),
executor.getUnit(),
getBlockingQueue(executor.getQueueType(), executor.getQueueCapacity(), executor.isFair()),
new KittyThreadFactory(executor.getThreadPoolName()),
getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName()), executor.getThreadPoolName());
threadPoolExecutorMap.put(executor.getThreadPoolName(), threadPoolExecutor);
});
}
刷新线程池
首先需要监听 Nacos 的修改。
/**
* 监听配置修改,spring-cloud-alibaba 2.1.0版本不支持@NacosConfigListener的监听
*/
public void initConfigUpdateListener(DynamicThreadPoolProperties dynamicThreadPoolProperties) {
ConfigService configService = nacosConfigProperties.configServiceInstance();
try {
configService.addListener(dynamicThreadPoolProperties.getNacosDataId(), dynamicThreadPoolProperties.getNacosGroup(), new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
new Thread(() -> refreshThreadPoolExecutor()).start();
log.info("线程池配置有变化,刷新完成");
}
});
} catch (NacosException e) {
log.error("Nacos配置监听异常", e);
}
}
然后再刷新线程池的参数信息,由于监听事件触发的时候,这个时候配置其实还没刷新,所以我就等待了 1 秒钟,让配置完成刷新然后直接从配置类取值。
虽然有点挫还是可以用,其实更好的方式是解析 receiveConfigInfo 那个 configInfo,configInfo 就是改变之后的整个配置内容。因为不太好解析成属性文件,就没做,后面再改吧。
/**
* 刷新线程池
*/
private void refreshThreadPoolExecutor() {
try {
// 等待配置刷新完成
Thread.sleep(1000);
} catch (InterruptedException e) {
}
dynamicThreadPoolProperties.getExecutors().forEach(executor -> {
ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(executor.getThreadPoolName());
threadPoolExecutor.setCorePoolSize(executor.getCorePoolSize());
threadPoolExecutor.setMaximumPoolSize(executor.getMaximumPoolSize());
threadPoolExecutor.setKeepAliveTime(executor.getKeepAliveTime(), executor.getUnit());
threadPoolExecutor.setRejectedExecutionHandler(getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName()));
BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
if (queue instanceof ResizableCapacityLinkedBlockIngQueue) {
((ResizableCapacityLinkedBlockIngQueue<Runnable>) queue).setCapacity(executor.getQueueCapacity());
}
});
}
其他的刷新都是线程池自带的,需要注意的是线程池队列大小的刷新,目前只支持 LinkedBlockingQueue 队列,由于 LinkedBlockingQueue 的大小是不允许修改的,所以按照美团那篇文章提供的思路,自定义了一个可以修改的队列,其实就是把 LinkedBlockingQueue 的代码复制了一份,改一下就可以。
往 Cat 上报运行信息
往 Cat 的 Heartbeat 报表上传数据的代码如下,主要还是 Cat 本身提供了扩展的能力。只需要定时去调用下面的方式上报数据即可。
public void registerStatusExtension(ThreadPoolProperties prop, KittyThreadPoolExecutor executor) {
StatusExtensionRegister.getInstance().register(new StatusExtension() {
@Override
public String getId() {
return "thread.pool.info." + prop.getThreadPoolName();
}
@Override
public String getDescription() {
return "线程池监控";
}
@Override
public Map<String, String> getProperties() {
AtomicLong rejectCount = getRejectCount(prop.getThreadPoolName());
Map<String, String> pool = new HashMap<>();
pool.put("activeCount", String.valueOf(executor.getActiveCount()));
pool.put("completedTaskCount", String.valueOf(executor.getCompletedTaskCount()));
pool.put("largestPoolSize", String.valueOf(executor.getLargestPoolSize()));
pool.put("taskCount", String.valueOf(executor.getTaskCount()));
pool.put("rejectCount", String.valueOf(rejectCount == null ? 0 : rejectCount.get()));
pool.put("waitTaskCount", String.valueOf(executor.getQueue().size()));
return pool;
}
});
}
定义线程池端点
通过自定义端点来暴露线程池的配置和运行的情况,可以让外部的监控系统拉取数据做对应的处理。
@Endpoint(id = "thread-pool")
public class ThreadPoolEndpoint {
@Autowired
private DynamicThreadPoolManager dynamicThreadPoolManager;
@Autowired
private DynamicThreadPoolProperties dynamicThreadPoolProperties;
@ReadOperation
public Map<String, Object> threadPools() {
Map<String, Object> data = new HashMap<>();
List<Map> threadPools = new ArrayList<>();
dynamicThreadPoolProperties.getExecutors().forEach(prop -> {
KittyThreadPoolExecutor executor = dynamicThreadPoolManager.getThreadPoolExecutor(prop.getThreadPoolName());
AtomicLong rejectCount = dynamicThreadPoolManager.getRejectCount(prop.getThreadPoolName());
Map<String, Object> pool = new HashMap<>();
Map config = JSONObject.parseObject(JSONObject.toJSONString(prop), Map.class);
pool.putAll(config);
pool.put("activeCount", executor.getActiveCount());
pool.put("completedTaskCount", executor.getCompletedTaskCount());
pool.put("largestPoolSize", executor.getLargestPoolSize());
pool.put("taskCount", executor.getTaskCount());
pool.put("rejectCount", rejectCount == null ? 0 : rejectCount.get());
pool.put("waitTaskCount", executor.getQueue().size());
threadPools.add(pool);
});
data.put("threadPools", threadPools);
return data;
}
}
Cat 监控线程池中线程的执行时间
本来是将监控放在 KittyThreadPoolExecutor 的 execute,submit 方法里的。后面测试下来发现有问题,数据在 Cat 上确实有了,但是执行时间都是 1 毫秒,也就是没生效。
不说想必大家也知道,因为线程是后面单独去执行的,所以再添加任务的地方埋点没任务意义。
后面还是想到了一个办法来实现埋点的功能,就是利用线程池提供的 beforeExecute 和 afterExecute 两个方法,在线程执行之前和执行之后都会触发这两个方法。
@Override
protected void beforeExecute(Thread t, Runnable r) {
String threadName = Thread.currentThread().getName();
Transaction transaction = Cat.newTransaction(threadPoolName, runnableNameMap.get(r.getClass().getSimpleName()));
transactionMap.put(threadName, transaction);
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
String threadName = Thread.currentThread().getName();
Transaction transaction = transactionMap.get(threadName);
transaction.setStatus(Message.SUCCESS);
if (t != null) {
Cat.logError(t);
transaction.setStatus(t);
}
transaction.complete();
transactionMap.remove(threadName);
}
后面的代码大家自己去看就行了,本文到这里就结束了。如果感觉本文还不错的记得转发下哦!
多谢多谢。
最后感谢美团技术团队的那篇文章,虽然没有分享源码,但是思路什么的和应用场景都讲的很明白。
感兴趣的 Star 下呗:https://github.com/yinjihuan/kitty
关于作者:尹吉欢,简单的技术爱好者,《Spring Cloud 微服务-全栈技术与案例解析》, 《Spring Cloud 微服务 入门 实战与进阶》作者, 公众号 猿天地 发起人。个人微信 jihuan900,欢迎勾搭。
我整理了一份很全的学习资料,感兴趣的可以微信搜索 「猿天地」,回复关键字 「学习资料」获取我整理好了的Spring Cloud,Spring Cloud Alibaba,Sharding-JDBC分库分表,任务调度框架XXL-JOB,MongoDB,爬虫等相关资料。
版权声明
本文为[尹吉欢]所创,转载请带上原文链接,感谢
http://cxytiandi.com/blog/detail/36488
边栏推荐
猜你喜欢
随机推荐
7.2.2 compressing static resources through gzipresourceresolver
NodeJs爬虫抓取古代典籍,共计16000个页面心得体会总结及项目分享
keras model.compile损失函数与优化器
如何选择分类模型的评价指标
适合时间序列数据的计算脚本
【Flutter 實戰】pubspec.yaml 配置檔案詳解
使用Consul实现服务发现:instance-id自定义
什么是无副作用的函数方法?如何取名? - Mario
十二因子原则和云原生微服务 - DZone
mongodb(从0到1),11天mongodb初级到中级进阶秘籍
【數量技術宅|金融資料系列分享】套利策略的價差序列計算,恐怕沒有你想的那麼簡單
6.9.2 session flashmapmanager redirection management
【C/C++ 1】Clion配置与运行C语言
为了省钱,我用1天时间把PHP学了!
阻塞队列之LinkedBlockingQueue分析
Python3網路學習案例四:編寫Web Proxy
8.1.1 handling global exceptions through handlerexceptionresolver
刚刚,给学妹普及了登录的两大绝学
链表的常见算法总结
计组-字长