当前位置:网站首页>使用CompletableFuture进行异步处理业务
使用CompletableFuture进行异步处理业务
2022-07-31 13:25:00 【ZeKi_豪】
准备
pom.xml
- 为了使用
Lists.partition(list集合,分割后每一份list的长度)
- 将一个list集合分割成多个list集合
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0-jre</version>
</dependency>
自定义线程池(仅参考)
- CompletableFuture使用业务的时候要指定放在哪个线程池中使用
- 把线程池定义在配置类中交给bean管理
@Configuration
public class ConcurrentExecuteSqlThreadPoolConfig {
private final static Logger logger = LoggerFactory.getLogger(ConcurrentExecuteSqlThreadPoolConfig.class);
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_CORE_SIZE:"+ PoolDefaultConfig.poolCoreSize_default +"}")
private int poolCoreSize;
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_MAX_SIZE:"+ PoolDefaultConfig.poolMaxSize_default +"}")
private int poolMaxSize;
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_QUEUE_MAX_SIZE:"+ PoolDefaultConfig.queueMaxSize_default +"}")
private int taskQueueMaxSize;
/**
* 并发执行的sql的线程池
*/
@Bean(name = "concurrentExecuteSqlThreadPool")
public ThreadPoolExecutor concurrentExecuteSqlThreadPool(){
logger.info("加载国内UpdateSql执行线程池配置 poolCoreSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_CORE_SIZE:{}", PoolDefaultConfig.poolCoreSize_default, poolCoreSize);
logger.info("加载国内UpdateSql执行线程池配置 poolMaxSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_MAX_SIZE:{}", PoolDefaultConfig.poolMaxSize_default, poolMaxSize);
logger.info("加载国内UpdateSql执行线程池配置 queueMaxSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_QUEUE_MAX_SIZE:{}", PoolDefaultConfig.queueMaxSize_default, taskQueueMaxSize);
final ConcurrentExecuteSqlThreadFactory threadFactory = new ConcurrentExecuteSqlThreadFactory();
final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(taskQueueMaxSize);
ThreadPoolExecutor pushTaskPool = new ThreadPoolExecutor(poolCoreSize, poolMaxSize,60, TimeUnit.SECONDS,
queue,threadFactory,new ThreadPoolExecutor.CallerRunsPolicy()
);
return pushTaskPool;
}
class ConcurrentExecuteSqlThreadFactory implements java.util.concurrent.ThreadFactory{
private final ThreadGroup group;
//编号
private final AtomicInteger threadNumber = new AtomicInteger(1);
//池子名称
private final String namePrefix = "ConcurrentExecuteSqlThreadPool-thread-";
public ConcurrentExecuteSqlThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
/**
* 默认配置
* ps:如果不配置 yml
*/
class PoolDefaultConfig {
private static final String poolCoreSize_default = "5";
private static final String poolMaxSize_default = "15";
private static final String queueMaxSize_default = "200";
}
}
业务代码演示
- 业务描述: 导入的文件里面有很多号码,要使用这些号码逐个去请求第三方接口检测结果再入库
@Autowired
@Qualifier("concurrentExecuteSqlThreadPool")
private ThreadPoolExecutor concurrentExecuteSqlThreadPool;
@Override
public Result checkTransferUpload(MultipartFile file) throws Exception {
// 检验该文件是否3天内重复提交
Boolean existsKey = redisOperator.existsKey("upload_" + file.getOriginalFilename()+file.getSize());
if (existsKey){
return Result.returnFail("请勿重复提交该文件");
}
redisOperator.set("upload_"+file.getOriginalFilename()+file.getSize(),"1");
redisOperator.expire("upload_"+file.getOriginalFilename()+file.getSize(),10);
// 校验文件
if (Objects.isNull(file)) {
return Result.returnFail("文件不能为空");
}
// 校验文件数据,超过1W行则报错
Result resultCheck = FileReadLimitUtils.readFileAndUpload(file);
if (Objects.equals(resultCheck.getStatus(), ResultCode.CODE_1)){
logger.error("【checkStateNumberUpload】"+resultCheck.getMessage());
return resultCheck;
}
List<String> data = (List<String>) resultCheck.getData();
if (data == null || data.size() == 0) {
logger.error("【checkStateNumberUpload】"+"内容不能为空");
return Result.returnFail("内容不能为空");
}
//合法号码集合
List<String> result = new ArrayList<>();
boolean b;
for (String s : data) {
b = p.matcher(s).matches();
//b为true则表示合法,为false则表示号码非法
if (b) {
if (StringUtils.isNotEmpty(s)) {
result.add(s);
}
}
}
if (result.size() == 0) {
logger.error("【checkStateNumberUpload】"+"没有有效号码");
return Result.returnFail("没有有效号码");
}
// 根据权重 获取唯一 携号转网接口配置API
List<CfgInterfaceConfig> cfgInterfaceConfigAll = findCfgInterfaceConfigAll();
if (CollectionUtils.isEmpty(cfgInterfaceConfigAll)) {
logger.error("【checkStateNumberUpload】"+"未发现携号转网API配置");
return Result.returnFail("未发现携号转网API配置");
}
// 获取相应的apikey和baseUrl
String apiKey = "";
String baseUrl = "";
for (CfgInterfaceConfig cfgInterfaceConfig : cfgInterfaceConfigAll) {
if (cfgInterfaceConfig.getCfgKey().equalsIgnoreCase("apikey")) {
apiKey = cfgInterfaceConfig.getCfgValue();
} else if (cfgInterfaceConfig.getCfgKey().equals("baseUrl")) {
baseUrl = cfgInterfaceConfig.getCfgValue();
}
}
if (StringUtils.isBlank(apiKey) || StringUtils.isBlank(baseUrl)) {
logger.error("【checkStateNumberUpload】"+"接口配置的apikey或baseUrl为空");
return Result.returnFail("接口配置的apikey或baseUrl为空");
}
// 去重
List<String> resultPhone = result.stream().distinct().collect(Collectors.toList());
// 根据量的情况 请求第三方接口使用
int taskTotal = (resultPhone.size() / 20) > 5 ? 5 : (resultPhone.size() / 20);
if (taskTotal == 0) {
//数据量小 在当前线程处理
checkPhoneTransfer(resultPhone, apiKey, baseUrl);
return Result.returnSuccessWithMsg("导入成功,检测完毕");
} else {
//数据量大 在线程池中处理
List<List<String>> tmp = Lists.partition(resultPhone, resultPhone.size() / taskTotal);
CompletableFuture[] allFuture = new CompletableFuture[tmp.size()];
for (int i = 0; i < tmp.size(); i++) {
List<String> tempIds = tmp.get(i);
if (tempIds.isEmpty()) {
continue;
}
String finalApiKey = apiKey;
String finalBaseUrl = baseUrl;
allFuture[i] = CompletableFuture.runAsync(() -> {
try {
checkPhoneTransfer(tempIds, finalApiKey, finalBaseUrl);
} catch (Exception e) {
logger.error("【checkTransferNumberUpload】异常:" + e.getMessage());
}
}, concurrentExecuteSqlThreadPool);
}
// try {
// //等待结果
// CompletableFuture.allOf(allFuture).join();
// } catch (Exception e) {
// logger.error("【checkTransferNumberUpload】异常", e);
// }
return Result.returnSuccessWithMsg("导入成功,正在检测中");
}
}
核心部分(重点)
- 校验过滤完数据后, 判断该数据量是否大
- 数据量小的话直接当前线程执行完业务返回给前端数据结果
检测完毕
- 数据量大的话则使用
CompletableFuture
进行业务异步执行,直接返回给前端数据结果正在检测中
边栏推荐
- PHP序列化:eval
- 技能大赛训练题:ftp 服务攻防与加固
- Reasons and solutions for Invalid bound statement (not found)
- The importance of strategic offensive capability is much higher than strategic defensive capability
- Ali on three sides: MQ message loss, repetition, backlog problem, how to solve?
- 百度网盘安装在c盘显示系统权限限制的解决方法
- 报错IDEA Terminated with exit code 1
- 365-day challenge LeetCode1000 questions - Day 044 Maximum element in the layer and level traversal
- 计算机复试面试问题(计算机面试常见问题)
- The cluster of safe mode
猜你喜欢
Even if the image is missing in a large area, it can also be repaired realistically. The new model CM-GAN takes into account the global structure and texture details
Istio微服务治理网格的全方面可视化监控(微服务架构展示、资源监控、流量监控、链路监控)
C#控件CheckBox的使用
ADS与C#通信
IDEA如何运行web程序
IDEA的database使用教程(使用mysql数据库)
go中select语句
STM32的CAN过滤器
Centos7 install mysql5.7 steps (graphical version)
拥塞控制,CDN,端到端
随机推荐
IDEA如何运行web程序
[CPU Design Practice] Simple Pipeline CPU Design
ICML2022 | 面向自监督图表示学习的全粒度自语义传播
Reasons and solutions for Invalid bound statement (not found)
Six Stones Programming: No matter which function you think is useless, people who can use it will not be able to leave, so at least 99%
查看Oracle数据库的用户名和密码
C# List Usage List Introduction
0X7FFFFFFF,0X80000000「建议收藏」
The cluster of safe mode
SAP e-commerce cloud Spartacus SSR Optimization Engine execution sequence of several timeouts
The importance of strategic offensive capability is much higher than strategic defensive capability
滑窗法切分数据
Centos7 install mysql5.7 steps (graphical version)
八大排序汇总及其稳定性
Spark Learning: Add Custom Optimization Rules for Spark Sql
Adding data nodes and decommissioning data nodes in the cluster
PartImageNet物体部件分割(Semantic Part Segmentation)数据集介绍
最新完整代码:使用word2vec预训练模型进行增量训练(两种保存方式对应的两种加载方式)适用gensim各种版本
技能大赛dhcp服务训练题
Edge Cloud Explained in Simple Depth | 4. Lifecycle Management