当前位置:网站首页>Asynchronous processing business using CompletableFuture
Asynchronous processing business using CompletableFuture
2022-07-31 14:08:00 【ZeKi_hao】
准备
pom.xml
- 为了使用
Lists.partition(list集合,Divide each onelist的长度)
- 将一个listThe collection is split into multipleslist集合
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0-jre</version>
</dependency>
自定义线程池(仅参考)
- CompletableFutureWhen using the business, you need to specify which thread pool to use
- Give the thread pool definition in the configuration classbean管理
@Configuration
public class ConcurrentExecuteSqlThreadPoolConfig {
private final static Logger logger = LoggerFactory.getLogger(ConcurrentExecuteSqlThreadPoolConfig.class);
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_CORE_SIZE:"+ PoolDefaultConfig.poolCoreSize_default +"}")
private int poolCoreSize;
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_MAX_SIZE:"+ PoolDefaultConfig.poolMaxSize_default +"}")
private int poolMaxSize;
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_QUEUE_MAX_SIZE:"+ PoolDefaultConfig.queueMaxSize_default +"}")
private int taskQueueMaxSize;
/**
* 并发执行的sql的线程池
*/
@Bean(name = "concurrentExecuteSqlThreadPool")
public ThreadPoolExecutor concurrentExecuteSqlThreadPool(){
logger.info("Load domesticUpdateSqlExecute thread pool configuration poolCoreSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_CORE_SIZE:{}", PoolDefaultConfig.poolCoreSize_default, poolCoreSize);
logger.info("Load domesticUpdateSqlExecute thread pool configuration poolMaxSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_MAX_SIZE:{}", PoolDefaultConfig.poolMaxSize_default, poolMaxSize);
logger.info("Load domesticUpdateSqlExecute thread pool configuration queueMaxSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_QUEUE_MAX_SIZE:{}", PoolDefaultConfig.queueMaxSize_default, taskQueueMaxSize);
final ConcurrentExecuteSqlThreadFactory threadFactory = new ConcurrentExecuteSqlThreadFactory();
final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(taskQueueMaxSize);
ThreadPoolExecutor pushTaskPool = new ThreadPoolExecutor(poolCoreSize, poolMaxSize,60, TimeUnit.SECONDS,
queue,threadFactory,new ThreadPoolExecutor.CallerRunsPolicy()
);
return pushTaskPool;
}
class ConcurrentExecuteSqlThreadFactory implements java.util.concurrent.ThreadFactory{
private final ThreadGroup group;
//编号
private final AtomicInteger threadNumber = new AtomicInteger(1);
//池子名称
private final String namePrefix = "ConcurrentExecuteSqlThreadPool-thread-";
public ConcurrentExecuteSqlThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
/**
* 默认配置
* ps:如果不配置 yml
*/
class PoolDefaultConfig {
private static final String poolCoreSize_default = "5";
private static final String poolMaxSize_default = "15";
private static final String queueMaxSize_default = "200";
}
}
业务代码演示
- 业务描述: There are many numbers in the imported file,Use these numbers to request the third-party interface test results to be restocked one by one
@Autowired
@Qualifier("concurrentExecuteSqlThreadPool")
private ThreadPoolExecutor concurrentExecuteSqlThreadPool;
@Override
public Result checkTransferUpload(MultipartFile file) throws Exception {
// Verify that the file is3Repeat submission within days
Boolean existsKey = redisOperator.existsKey("upload_" + file.getOriginalFilename()+file.getSize());
if (existsKey){
return Result.returnFail("Do not submit this document repeatedly");
}
redisOperator.set("upload_"+file.getOriginalFilename()+file.getSize(),"1");
redisOperator.expire("upload_"+file.getOriginalFilename()+file.getSize(),10);
// 校验文件
if (Objects.isNull(file)) {
return Result.returnFail("文件不能为空");
}
// Verify file data,超过1Werror
Result resultCheck = FileReadLimitUtils.readFileAndUpload(file);
if (Objects.equals(resultCheck.getStatus(), ResultCode.CODE_1)){
logger.error("【checkStateNumberUpload】"+resultCheck.getMessage());
return resultCheck;
}
List<String> data = (List<String>) resultCheck.getData();
if (data == null || data.size() == 0) {
logger.error("【checkStateNumberUpload】"+"内容不能为空");
return Result.returnFail("内容不能为空");
}
//Collection of legal numbers
List<String> result = new ArrayList<>();
boolean b;
for (String s : data) {
b = p.matcher(s).matches();
//b为true则表示合法,为falseIt means the number is illegal
if (b) {
if (StringUtils.isNotEmpty(s)) {
result.add(s);
}
}
}
if (result.size() == 0) {
logger.error("【checkStateNumberUpload】"+"No valid number");
return Result.returnFail("No valid number");
}
// 根据权重 获取唯一 Number porting interface configurationAPI
List<CfgInterfaceConfig> cfgInterfaceConfigAll = findCfgInterfaceConfigAll();
if (CollectionUtils.isEmpty(cfgInterfaceConfigAll)) {
logger.error("【checkStateNumberUpload】"+"No porting was foundAPI配置");
return Result.returnFail("No porting was foundAPI配置");
}
// 获取相应的apikey和baseUrl
String apiKey = "";
String baseUrl = "";
for (CfgInterfaceConfig cfgInterfaceConfig : cfgInterfaceConfigAll) {
if (cfgInterfaceConfig.getCfgKey().equalsIgnoreCase("apikey")) {
apiKey = cfgInterfaceConfig.getCfgValue();
} else if (cfgInterfaceConfig.getCfgKey().equals("baseUrl")) {
baseUrl = cfgInterfaceConfig.getCfgValue();
}
}
if (StringUtils.isBlank(apiKey) || StringUtils.isBlank(baseUrl)) {
logger.error("【checkStateNumberUpload】"+"接口配置的apikey或baseUrl为空");
return Result.returnFail("接口配置的apikey或baseUrl为空");
}
// 去重
List<String> resultPhone = result.stream().distinct().collect(Collectors.toList());
// Depending on the quantity Request third-party interface usage
int taskTotal = (resultPhone.size() / 20) > 5 ? 5 : (resultPhone.size() / 20);
if (taskTotal == 0) {
//数据量小 Processed in the current thread
checkPhoneTransfer(resultPhone, apiKey, baseUrl);
return Result.returnSuccessWithMsg("导入成功,检测完毕");
} else {
//数据量大 在线程池中处理
List<List<String>> tmp = Lists.partition(resultPhone, resultPhone.size() / taskTotal);
CompletableFuture[] allFuture = new CompletableFuture[tmp.size()];
for (int i = 0; i < tmp.size(); i++) {
List<String> tempIds = tmp.get(i);
if (tempIds.isEmpty()) {
continue;
}
String finalApiKey = apiKey;
String finalBaseUrl = baseUrl;
allFuture[i] = CompletableFuture.runAsync(() -> {
try {
checkPhoneTransfer(tempIds, finalApiKey, finalBaseUrl);
} catch (Exception e) {
logger.error("【checkTransferNumberUpload】异常:" + e.getMessage());
}
}, concurrentExecuteSqlThreadPool);
}
// try {
// //等待结果
// CompletableFuture.allOf(allFuture).join();
// } catch (Exception e) {
// logger.error("【checkTransferNumberUpload】异常", e);
// }
return Result.returnSuccessWithMsg("导入成功,正在检测中");
}
}
核心部分(重点)
- After verifying the filtered data, Determine whether the amount of data is large
- If the amount of data is small, the current thread directly completes the business and returns the front-end data results
检测完毕
- Use if the amount of data is large
CompletableFuture
Perform business asynchronous execution,Return directly to the front-end data result正在检测中
边栏推荐
- 图像大面积缺失,也能逼真修复,新模型CM-GAN兼顾全局结构和纹理细节
- 尚硅谷-JVM-内存和垃圾回收篇(P1~P203)
- 五个维度着手MySQL的优化
- Node version switching management using NVM
- A detailed guide to simulating latency with SQL/JDBC
- Network layer key protocol - IP protocol
- jOOQ 3.14 released - SQL/XML and SQL/JSON support
- AWS implements scheduled tasks - Lambda+EventBridge
- 清除浮动的四种方式及其原理理解
- 拥塞控制,CDN,端到端
猜你喜欢
MySQL【聚合函数】
An article makes it clear!What is the difference and connection between database and data warehouse?
IDEA连接MySQL数据库并使用数据
CLion用于STM32开发
endnote引用
The batch size does not have to be a power of 2!The latest conclusions of senior ML scholars
Uniapp WeChat small application reference standard components
八大排序汇总及其稳定性
STM32的CAN过滤器
LeetCode·每日一题·1161.最大层内元素和·层次遍历
随机推荐
go使用makefile脚本编译应用
Miller_Rabin Miller Rabin probability sieve [template]
An article makes it clear!What is the difference and connection between database and data warehouse?
LeetCode只出现一次的数字
BigDecimal 简介,常用方法
Detailed guide to compare two tables using natural full join in SQL
Linux bash: redis-server: command not found
How IDEA runs web programs
浏览器被hao360劫持解决办法
爱可可AI前沿推介(7.31)
页面整屏滚动效果
1小时直播招募令:行业大咖干货分享,企业报名开启丨量子位·视点
技能大赛训练题:域用户和组织单元的创建
C# Get network card information NetworkInterface IPInterfaceProperties
Uniapp WeChat small application reference standard components
清除浮动的四种方式及其原理理解
技能大赛训练题:MS15_034漏洞验证与安全加固
Tortoise speed by "template"
ML, DL, CV common problems sorting
技能大赛训练题:ftp 服务攻防与加固