当前位置:网站首页>Asynchronous processing business using CompletableFuture
Asynchronous processing business using CompletableFuture
2022-07-31 14:08:00 【ZeKi_hao】
准备
pom.xml
- 为了使用
Lists.partition(list集合,Divide each onelist的长度) - 将一个listThe collection is split into multipleslist集合
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0-jre</version>
</dependency>
自定义线程池(仅参考)
- CompletableFutureWhen using the business, you need to specify which thread pool to use
- Give the thread pool definition in the configuration classbean管理
@Configuration
public class ConcurrentExecuteSqlThreadPoolConfig {
private final static Logger logger = LoggerFactory.getLogger(ConcurrentExecuteSqlThreadPoolConfig.class);
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_CORE_SIZE:"+ PoolDefaultConfig.poolCoreSize_default +"}")
private int poolCoreSize;
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_MAX_SIZE:"+ PoolDefaultConfig.poolMaxSize_default +"}")
private int poolMaxSize;
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_QUEUE_MAX_SIZE:"+ PoolDefaultConfig.queueMaxSize_default +"}")
private int taskQueueMaxSize;
/**
* 并发执行的sql的线程池
*/
@Bean(name = "concurrentExecuteSqlThreadPool")
public ThreadPoolExecutor concurrentExecuteSqlThreadPool(){
logger.info("Load domesticUpdateSqlExecute thread pool configuration poolCoreSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_CORE_SIZE:{}", PoolDefaultConfig.poolCoreSize_default, poolCoreSize);
logger.info("Load domesticUpdateSqlExecute thread pool configuration poolMaxSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_MAX_SIZE:{}", PoolDefaultConfig.poolMaxSize_default, poolMaxSize);
logger.info("Load domesticUpdateSqlExecute thread pool configuration queueMaxSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_QUEUE_MAX_SIZE:{}", PoolDefaultConfig.queueMaxSize_default, taskQueueMaxSize);
final ConcurrentExecuteSqlThreadFactory threadFactory = new ConcurrentExecuteSqlThreadFactory();
final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(taskQueueMaxSize);
ThreadPoolExecutor pushTaskPool = new ThreadPoolExecutor(poolCoreSize, poolMaxSize,60, TimeUnit.SECONDS,
queue,threadFactory,new ThreadPoolExecutor.CallerRunsPolicy()
);
return pushTaskPool;
}
class ConcurrentExecuteSqlThreadFactory implements java.util.concurrent.ThreadFactory{
private final ThreadGroup group;
//编号
private final AtomicInteger threadNumber = new AtomicInteger(1);
//池子名称
private final String namePrefix = "ConcurrentExecuteSqlThreadPool-thread-";
public ConcurrentExecuteSqlThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
/**
* 默认配置
* ps:如果不配置 yml
*/
class PoolDefaultConfig {
private static final String poolCoreSize_default = "5";
private static final String poolMaxSize_default = "15";
private static final String queueMaxSize_default = "200";
}
}
业务代码演示
- 业务描述: There are many numbers in the imported file,Use these numbers to request the third-party interface test results to be restocked one by one
@Autowired
@Qualifier("concurrentExecuteSqlThreadPool")
private ThreadPoolExecutor concurrentExecuteSqlThreadPool;
@Override
public Result checkTransferUpload(MultipartFile file) throws Exception {
// Verify that the file is3Repeat submission within days
Boolean existsKey = redisOperator.existsKey("upload_" + file.getOriginalFilename()+file.getSize());
if (existsKey){
return Result.returnFail("Do not submit this document repeatedly");
}
redisOperator.set("upload_"+file.getOriginalFilename()+file.getSize(),"1");
redisOperator.expire("upload_"+file.getOriginalFilename()+file.getSize(),10);
// 校验文件
if (Objects.isNull(file)) {
return Result.returnFail("文件不能为空");
}
// Verify file data,超过1Werror
Result resultCheck = FileReadLimitUtils.readFileAndUpload(file);
if (Objects.equals(resultCheck.getStatus(), ResultCode.CODE_1)){
logger.error("【checkStateNumberUpload】"+resultCheck.getMessage());
return resultCheck;
}
List<String> data = (List<String>) resultCheck.getData();
if (data == null || data.size() == 0) {
logger.error("【checkStateNumberUpload】"+"内容不能为空");
return Result.returnFail("内容不能为空");
}
//Collection of legal numbers
List<String> result = new ArrayList<>();
boolean b;
for (String s : data) {
b = p.matcher(s).matches();
//b为true则表示合法,为falseIt means the number is illegal
if (b) {
if (StringUtils.isNotEmpty(s)) {
result.add(s);
}
}
}
if (result.size() == 0) {
logger.error("【checkStateNumberUpload】"+"No valid number");
return Result.returnFail("No valid number");
}
// 根据权重 获取唯一 Number porting interface configurationAPI
List<CfgInterfaceConfig> cfgInterfaceConfigAll = findCfgInterfaceConfigAll();
if (CollectionUtils.isEmpty(cfgInterfaceConfigAll)) {
logger.error("【checkStateNumberUpload】"+"No porting was foundAPI配置");
return Result.returnFail("No porting was foundAPI配置");
}
// 获取相应的apikey和baseUrl
String apiKey = "";
String baseUrl = "";
for (CfgInterfaceConfig cfgInterfaceConfig : cfgInterfaceConfigAll) {
if (cfgInterfaceConfig.getCfgKey().equalsIgnoreCase("apikey")) {
apiKey = cfgInterfaceConfig.getCfgValue();
} else if (cfgInterfaceConfig.getCfgKey().equals("baseUrl")) {
baseUrl = cfgInterfaceConfig.getCfgValue();
}
}
if (StringUtils.isBlank(apiKey) || StringUtils.isBlank(baseUrl)) {
logger.error("【checkStateNumberUpload】"+"接口配置的apikey或baseUrl为空");
return Result.returnFail("接口配置的apikey或baseUrl为空");
}
// 去重
List<String> resultPhone = result.stream().distinct().collect(Collectors.toList());
// Depending on the quantity Request third-party interface usage
int taskTotal = (resultPhone.size() / 20) > 5 ? 5 : (resultPhone.size() / 20);
if (taskTotal == 0) {
//数据量小 Processed in the current thread
checkPhoneTransfer(resultPhone, apiKey, baseUrl);
return Result.returnSuccessWithMsg("导入成功,检测完毕");
} else {
//数据量大 在线程池中处理
List<List<String>> tmp = Lists.partition(resultPhone, resultPhone.size() / taskTotal);
CompletableFuture[] allFuture = new CompletableFuture[tmp.size()];
for (int i = 0; i < tmp.size(); i++) {
List<String> tempIds = tmp.get(i);
if (tempIds.isEmpty()) {
continue;
}
String finalApiKey = apiKey;
String finalBaseUrl = baseUrl;
allFuture[i] = CompletableFuture.runAsync(() -> {
try {
checkPhoneTransfer(tempIds, finalApiKey, finalBaseUrl);
} catch (Exception e) {
logger.error("【checkTransferNumberUpload】异常:" + e.getMessage());
}
}, concurrentExecuteSqlThreadPool);
}
// try {
// //等待结果
// CompletableFuture.allOf(allFuture).join();
// } catch (Exception e) {
// logger.error("【checkTransferNumberUpload】异常", e);
// }
return Result.returnSuccessWithMsg("导入成功,正在检测中");
}
}
核心部分(重点)
- After verifying the filtered data, Determine whether the amount of data is large
- If the amount of data is small, the current thread directly completes the business and returns the front-end data results
检测完毕 - Use if the amount of data is large
CompletableFuturePerform business asynchronous execution,Return directly to the front-end data result正在检测中
边栏推荐
猜你喜欢

Solution for browser hijacking by hao360

The JVM a class loader

Even if the image is missing in a large area, it can also be repaired realistically. The new model CM-GAN takes into account the global structure and texture details

“听我说谢谢你”还能用古诗来说?清华搞了个“据意查句”神器,一键搜索你想要的名言警句...

最近很火的国产接口神器Apipost体验

新款现代帕里斯帝预售开启,安全、舒适一个不落

Resnet&API

组合系列--有排列就有组合

技能大赛训练题:MS15_034漏洞验证与安全加固

Error IDEA Terminated with exit code 1
随机推荐
「面经分享」西北大学 | 字节 生活服务 | 一面二面三面 HR 面
Shell script classic case: detecting whether a batch of hosts is alive
技能大赛训练题:登录安全加固
Reasons and solutions for Invalid bound statement (not found)
C# Get network card information NetworkInterface IPInterfaceProperties
csdn发文助手问题
百度网盘安装在c盘显示系统权限限制的解决方法
页面整屏滚动效果
尚硅谷-JVM-内存和垃圾回收篇(P1~P203)
ML、DL、CV常见的问题整理
Linux bash: redis-server: 未找到命令
Sliding window method to segment data
LeetCode rotate array
LeetCode旋转数组
新款现代帕里斯帝预售开启,安全、舒适一个不落
[Niu Ke brush questions - SQL big factory interview questions] NO3. E-commerce scene (some east mall)
已解决(pymysqL连接数据库报错)pymysqL.err.ProgrammingError: (1146,“Table ‘test.students‘ doesn‘t exist“)
Node version switching management using NVM
[QNX Hypervisor 2.2用户手册]9.13 rom
小试牛刀:Go 反射帮我把 Excel 转成 Struct