当前位置:网站首页>线程池的使用
线程池的使用
2022-06-09 07:06:00 【内卷第一人】
简单描述两个使用线程池的场景:
场景一:现在我们有一个需求,我们是一个商家来对接外渠,比如美团,我们可能在这个商家上有500家店,每家店有5000个商品(商品总量为5000,每家店可能有不同),现在要拉取每家店的商品。
首先拿到上述需求的第一反应就是使用线程池去解决,因为商品的量级太大,下面简述下用Java实现的过程:
第一步是配置线程池(可以参考阿里巴巴规范,不做赘述)
@Configuration
public class ThreadPoolServiceConfig {
@Bean
public ThreadPoolExecutor assignThreadPool(){
//corePoolSize:核心线程数
//maximumPoolSize: 最大线程数量
//keepAliveTime:存活时间
//TimeUnit:存活单位
//workQueue:存放需要执行的任务
//threadFactory;线程工厂
return new ThreadPoolExecutor(5, 10, 100L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(50),
new ThreadFactoryName("线程工厂名字"));
}
}PS:这里强调一下,核心线程数的确定需要根据当前服务器的核数来确定(这里又细分为CPU密集型和IO密集型两种)
//获取当前机器的核数(通常核心线程数用下面的cpuNum 来替换)
publicstaticfinalintcpuNum = Runtime.getRuntime().availableProcessors();
而最大线程数是核心线程数的两倍(上面没有具体的服务,就用数字代替了)
根据阿里巴巴规范要求我们的线程的名字一定要见名知意,所以还需要配置下线程名字,具体如下:
public class ThreadFactoryName implements ThreadFactory {
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
ThreadFactoryName(String name){
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
if (StringUtils.isBlank(name)){
name = "pool";
}
namePrefix = name +"-"+
POOL_NUMBER.getAndIncrement() +
"-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}具体的使用如下:
//CountDownLatch 是一个计数器,用来阻塞线程,只有当所有
//的线程执行完毕,主线程才继续向下执行
//根据上面的场景假设有500家店,我们分为5组,一组100家店
CountDownLatch latch = new CountDownLatch(5);
//遍历,一次跑100家店
for (List<ThirdChannelWarehouseResVo> pageItem : thirdChannelWarehouseList) {
threadPoolExecutor.execute(() -> {
try {
//具体的业务逻辑
} catch (Exception exception){
//如果异常则将该集合全部存储,拉取失败需要有补偿机制
} finally {
//执行完成计数减一
latch.countDown();
}
})
}
//阻塞等待计数器清0,则继续向下执行
latch.await();
场景二:
在我们的分布式项目中,我们肯定会遇到多项目调用的情况,如下图:

我们肯定会优先选择下面的粗粒度调用方式(因为明显上面的调用方式服务挂掉的可能性要大于后者)
而采用下面的方式去调用,我们为了提高系统整个的性能,我们完全可以使用线程池的方式,将整个的调用任务抽出来,为此编写了下述的工具类:
package com.qianfeng.gp09.util;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 同步调用的工具类
* 内部封装线程池与CountDownLatch
*
*/
public class SyncBatchUtil {
/**
* 线程池
*/
private static ExecutorService executorService;
static{
//核心线程数
//计算密集 还是 IO密集
//前者是 n 后者 是2n
int corePoolSize=16;
//最大线程数
int maxPoolSize=32;
//时间
long keepAliveTime=10;
//时间单位
TimeUnit unit=TimeUnit.SECONDS;
//队列
BlockingQueue<Runnable> workQueue=new LinkedBlockingDeque<>();
//工厂
ThreadFactory threadFactory=new UserThreadFactory("GP09");
//拒绝策略
RejectedExecutionHandler handler=new MyRejectPolicy();
//实例化线程池
executorService=new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,
unit,workQueue,threadFactory,handler);
}
/**
* 线程工厂
*/
static class UserThreadFactory implements ThreadFactory{
private final String namePrefix;
private final AtomicInteger nextId=new AtomicInteger(1);
public UserThreadFactory(String groupName) {
this.namePrefix = "From "+SyncBatchUtil.class.getName()+
"-"+groupName+"-";
}
@Override
public Thread newThread(Runnable r) {
String name=namePrefix+nextId.getAndIncrement();
Thread thread=new Thread(null,r,name);
return thread;
}
}
/**
* 拒绝策略
*/
static class MyRejectPolicy implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(SyncBatchUtil.class.getName()+"的线程拒绝了");
}
}
/**
* 自定义任务
*/
public static abstract class Task implements Runnable{
/**
* 任务调度的方法
*/
public abstract void exe();
/**
* 内部封装的CountDownLatch
*/
private CountDownLatch countDownLatch;
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try{
exe();
}catch (Exception e){
e.printStackTrace();
}finally {
//再让计数器-1
countDownLatch.countDown();
}
}
}
/**
* 创建一组任务
*/
private static boolean createTasks(Task [] tasks){
//给这组任务分配一个CountDownLatch
//注意:是一组任务对应一个CountDownLatch
//用户每一次请求我们的服务器,都会导致我们有一组任务
//因此其实是 每一次请求我们服务器,我们就要有一个CountDownLatch对象.
//有多少个任务,计数器就是多少,所以构造方法的参数为task.length
CountDownLatch countDownLatch =new CountDownLatch(tasks.length);
//给每一个任务设定相同的countDownLatch
for (int i = 0; i < tasks.length; i++) {
tasks[i].setCountDownLatch(countDownLatch);
}
//每一个task类型其实都是Runnable类型,它应该用线程池来执行
for (int i = 0; i < tasks.length; i++) {
//启动每一个任务
executorService.execute(tasks[i]);
}
//卡住当前线程
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 给外界提供可调用的方法
* @param tasks 指定一组任务
* @return 这组任务是否创建成功
*/
public static boolean batch(Task ... tasks){
return createTasks(tasks);
}
}
下面是测试:
package com.qianfeng.gp09.util;
public class SyncBatchUtilTest {
public static void main(String[] args) {
System.out.println("当前线程开始执行");
SyncBatchUtil.batch(new SyncBatchUtil.Task() {
@Override
public void exe() {
System.out.println("第一个线程开始执行");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一个线程执行完毕");
}
}, new SyncBatchUtil.Task() {
@Override
public void exe() {
System.out.println("第二个线程开始执行");
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第二个线程执行完毕");
}
}, new SyncBatchUtil.Task() {
@Override
public void exe() {
System.out.println("第三个线程开始执行");
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第三个线程执行完毕");
}
});
System.out.println("当前线程执行完毕");
}
}
边栏推荐
猜你喜欢

High precision personnel positioning system, power plant indoor positioning application solution

【系统分析师之路】第十五章 复盘数据库系统(关系数据库应用)

Related methods of thread class

Summarize the key knowledge of multithreading.

WPF 数据绑定(二)

UML series (24) advanced behavior - time and space

The original, skillful and vulgar skills of cloud

Fourier transform signal processing

蘑菇街发布2022财年财报:下半年亏损同比收窄50%

DeFi 去風險:分析去中心化系統中的系統性風險
随机推荐
The sandbox and ikonia have reached a cooperation to restore the murals of the football legend Pirlo in the metauniverse
Mushroom Street released the financial report of fiscal year 2022: the loss in the second half of the year narrowed by 50% year-on-year
defineProperty
InfoWorld文章丨将数据编排技术用于AI模型训练
[C language] summary of document knowledge points
QT console project display label small window
Error parsing command line: unrecognized option '--fork' in mongodb
多线程重点知识归纳总结。
UML series article (22) advanced behavior -- state machine
Hummingbird e203 image recognition -- to be continued
Basic knowledge summary of database
不懂数学可以使用机器学习编程吗?
100 code structure and how to avoid them (4)
数据库操作语句
外设驱动库开发笔记42:DAC8552 DAC驱动
市场变化,欢聚集团如何穿越不确定性风暴?
Poor math can you learn machine learning?
Wechat applet mind map
Performance test process, index, implementation method and summary
当内卷风波及代码领域,看Alluxio将会采取怎样的块分配策略