当前位置:网站首页>Asynchronous processing business using CompletableFuture
Asynchronous processing business using CompletableFuture
2022-07-31 14:08:00 【ZeKi_hao】
准备
pom.xml
- 为了使用
Lists.partition(list集合,Divide each onelist的长度)
- 将一个listThe collection is split into multipleslist集合
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0-jre</version>
</dependency>
自定义线程池(仅参考)
- CompletableFutureWhen using the business, you need to specify which thread pool to use
- Give the thread pool definition in the configuration classbean管理
@Configuration
public class ConcurrentExecuteSqlThreadPoolConfig {
private final static Logger logger = LoggerFactory.getLogger(ConcurrentExecuteSqlThreadPoolConfig.class);
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_CORE_SIZE:"+ PoolDefaultConfig.poolCoreSize_default +"}")
private int poolCoreSize;
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_MAX_SIZE:"+ PoolDefaultConfig.poolMaxSize_default +"}")
private int poolMaxSize;
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_QUEUE_MAX_SIZE:"+ PoolDefaultConfig.queueMaxSize_default +"}")
private int taskQueueMaxSize;
/**
* 并发执行的sql的线程池
*/
@Bean(name = "concurrentExecuteSqlThreadPool")
public ThreadPoolExecutor concurrentExecuteSqlThreadPool(){
logger.info("Load domesticUpdateSqlExecute thread pool configuration poolCoreSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_CORE_SIZE:{}", PoolDefaultConfig.poolCoreSize_default, poolCoreSize);
logger.info("Load domesticUpdateSqlExecute thread pool configuration poolMaxSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_MAX_SIZE:{}", PoolDefaultConfig.poolMaxSize_default, poolMaxSize);
logger.info("Load domesticUpdateSqlExecute thread pool configuration queueMaxSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_QUEUE_MAX_SIZE:{}", PoolDefaultConfig.queueMaxSize_default, taskQueueMaxSize);
final ConcurrentExecuteSqlThreadFactory threadFactory = new ConcurrentExecuteSqlThreadFactory();
final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(taskQueueMaxSize);
ThreadPoolExecutor pushTaskPool = new ThreadPoolExecutor(poolCoreSize, poolMaxSize,60, TimeUnit.SECONDS,
queue,threadFactory,new ThreadPoolExecutor.CallerRunsPolicy()
);
return pushTaskPool;
}
class ConcurrentExecuteSqlThreadFactory implements java.util.concurrent.ThreadFactory{
private final ThreadGroup group;
//编号
private final AtomicInteger threadNumber = new AtomicInteger(1);
//池子名称
private final String namePrefix = "ConcurrentExecuteSqlThreadPool-thread-";
public ConcurrentExecuteSqlThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
/**
* 默认配置
* ps:如果不配置 yml
*/
class PoolDefaultConfig {
private static final String poolCoreSize_default = "5";
private static final String poolMaxSize_default = "15";
private static final String queueMaxSize_default = "200";
}
}
业务代码演示
- 业务描述: There are many numbers in the imported file,Use these numbers to request the third-party interface test results to be restocked one by one
@Autowired
@Qualifier("concurrentExecuteSqlThreadPool")
private ThreadPoolExecutor concurrentExecuteSqlThreadPool;
@Override
public Result checkTransferUpload(MultipartFile file) throws Exception {
// Verify that the file is3Repeat submission within days
Boolean existsKey = redisOperator.existsKey("upload_" + file.getOriginalFilename()+file.getSize());
if (existsKey){
return Result.returnFail("Do not submit this document repeatedly");
}
redisOperator.set("upload_"+file.getOriginalFilename()+file.getSize(),"1");
redisOperator.expire("upload_"+file.getOriginalFilename()+file.getSize(),10);
// 校验文件
if (Objects.isNull(file)) {
return Result.returnFail("文件不能为空");
}
// Verify file data,超过1Werror
Result resultCheck = FileReadLimitUtils.readFileAndUpload(file);
if (Objects.equals(resultCheck.getStatus(), ResultCode.CODE_1)){
logger.error("【checkStateNumberUpload】"+resultCheck.getMessage());
return resultCheck;
}
List<String> data = (List<String>) resultCheck.getData();
if (data == null || data.size() == 0) {
logger.error("【checkStateNumberUpload】"+"内容不能为空");
return Result.returnFail("内容不能为空");
}
//Collection of legal numbers
List<String> result = new ArrayList<>();
boolean b;
for (String s : data) {
b = p.matcher(s).matches();
//b为true则表示合法,为falseIt means the number is illegal
if (b) {
if (StringUtils.isNotEmpty(s)) {
result.add(s);
}
}
}
if (result.size() == 0) {
logger.error("【checkStateNumberUpload】"+"No valid number");
return Result.returnFail("No valid number");
}
// 根据权重 获取唯一 Number porting interface configurationAPI
List<CfgInterfaceConfig> cfgInterfaceConfigAll = findCfgInterfaceConfigAll();
if (CollectionUtils.isEmpty(cfgInterfaceConfigAll)) {
logger.error("【checkStateNumberUpload】"+"No porting was foundAPI配置");
return Result.returnFail("No porting was foundAPI配置");
}
// 获取相应的apikey和baseUrl
String apiKey = "";
String baseUrl = "";
for (CfgInterfaceConfig cfgInterfaceConfig : cfgInterfaceConfigAll) {
if (cfgInterfaceConfig.getCfgKey().equalsIgnoreCase("apikey")) {
apiKey = cfgInterfaceConfig.getCfgValue();
} else if (cfgInterfaceConfig.getCfgKey().equals("baseUrl")) {
baseUrl = cfgInterfaceConfig.getCfgValue();
}
}
if (StringUtils.isBlank(apiKey) || StringUtils.isBlank(baseUrl)) {
logger.error("【checkStateNumberUpload】"+"接口配置的apikey或baseUrl为空");
return Result.returnFail("接口配置的apikey或baseUrl为空");
}
// 去重
List<String> resultPhone = result.stream().distinct().collect(Collectors.toList());
// Depending on the quantity Request third-party interface usage
int taskTotal = (resultPhone.size() / 20) > 5 ? 5 : (resultPhone.size() / 20);
if (taskTotal == 0) {
//数据量小 Processed in the current thread
checkPhoneTransfer(resultPhone, apiKey, baseUrl);
return Result.returnSuccessWithMsg("导入成功,检测完毕");
} else {
//数据量大 在线程池中处理
List<List<String>> tmp = Lists.partition(resultPhone, resultPhone.size() / taskTotal);
CompletableFuture[] allFuture = new CompletableFuture[tmp.size()];
for (int i = 0; i < tmp.size(); i++) {
List<String> tempIds = tmp.get(i);
if (tempIds.isEmpty()) {
continue;
}
String finalApiKey = apiKey;
String finalBaseUrl = baseUrl;
allFuture[i] = CompletableFuture.runAsync(() -> {
try {
checkPhoneTransfer(tempIds, finalApiKey, finalBaseUrl);
} catch (Exception e) {
logger.error("【checkTransferNumberUpload】异常:" + e.getMessage());
}
}, concurrentExecuteSqlThreadPool);
}
// try {
// //等待结果
// CompletableFuture.allOf(allFuture).join();
// } catch (Exception e) {
// logger.error("【checkTransferNumberUpload】异常", e);
// }
return Result.returnSuccessWithMsg("导入成功,正在检测中");
}
}
核心部分(重点)
- After verifying the filtered data, Determine whether the amount of data is large
- If the amount of data is small, the current thread directly completes the business and returns the front-end data results
检测完毕
- Use if the amount of data is large
CompletableFuture
Perform business asynchronous execution,Return directly to the front-end data result正在检测中
边栏推荐
- Node version switching management using NVM
- leetcode: 485. Maximum number of consecutive 1s
- jOOQ 3.14 released - SQL/XML and SQL/JSON support
- Shell项目实战1.系统性能分析
- SetoolKit使用指南
- [Blue Bridge Cup Trial Question 46] Scratch Magnet Game Children's Programming Scratch Blue Bridge Cup Trial Question Explanation
- The operator,
- [QNX Hypervisor 2.2用户手册]9.14 safety
- Four ways to clear the float and its principle understanding
- ICML2022 | Fully Granular Self-Semantic Propagation for Self-Supervised Graph Representation Learning
猜你喜欢
最近很火的国产接口神器Apipost体验
Detailed explanation of network protocols and related technologies
Motion capture system for end-positioning control of flexible manipulators
DELL SC compellent 康贝存储系统怎么抓取配置信息
Miller_Rabin 米勒拉宾概率筛【模板】
八大排序汇总及其稳定性
For enterprises in the digital age, data governance is difficult, but it should be done
海康摄像机取流RTSP地址规则说明
Uniapp WeChat small application reference standard components
Analysis of the startup source code of hyperf (2) - how the request reaches the controller
随机推荐
为什么要分库分表?
Open Inventor 10.12 Major Improvements - Harmony Edition
jvm 一之 类加载器
LeetCode旋转数组
Install the latest pytorch gpu version
ADS communicate with c #
I summed up the bad MySQL interview questions
百度网盘安装在c盘显示系统权限限制的解决方法
C# control ListView usage
[QNX Hypervisor 2.2 User Manual] 9.13 rom
[QNX Hypervisor 2.2用户手册]9.14 safety
How IDEA runs web programs
以后面试官问你 为啥不建议使用Select *,请你大声回答他!
已解决(pymysqL连接数据库报错)pymysqL.err.ProgrammingError: (1146,“Table ‘test.students‘ doesn‘t exist“)
技能大赛训练题:交换机的远程管理
ECCV 2022 | Robotic Interaction Perception and Object Manipulation
uniapp微信小程序引用标准版交易组件
Resolved (pymysqL connect to the database error) pymysqL. Err. ProgrammingError: (1146, "Table" test. Students' doesn 't exist ")
技能大赛训练题:交换机虚拟化练习
多智能体协同控制研究中光学动作捕捉与UWB定位技术比较