当前位置:网站首页>Asynchronous processing business using CompletableFuture
Asynchronous processing business using CompletableFuture
2022-07-31 14:08:00 【ZeKi_hao】
准备
pom.xml
- 为了使用
Lists.partition(list集合,Divide each onelist的长度) - 将一个listThe collection is split into multipleslist集合
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0-jre</version>
</dependency>
自定义线程池(仅参考)
- CompletableFutureWhen using the business, you need to specify which thread pool to use
- Give the thread pool definition in the configuration classbean管理
@Configuration
public class ConcurrentExecuteSqlThreadPoolConfig {
private final static Logger logger = LoggerFactory.getLogger(ConcurrentExecuteSqlThreadPoolConfig.class);
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_CORE_SIZE:"+ PoolDefaultConfig.poolCoreSize_default +"}")
private int poolCoreSize;
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_MAX_SIZE:"+ PoolDefaultConfig.poolMaxSize_default +"}")
private int poolMaxSize;
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_QUEUE_MAX_SIZE:"+ PoolDefaultConfig.queueMaxSize_default +"}")
private int taskQueueMaxSize;
/**
* 并发执行的sql的线程池
*/
@Bean(name = "concurrentExecuteSqlThreadPool")
public ThreadPoolExecutor concurrentExecuteSqlThreadPool(){
logger.info("Load domesticUpdateSqlExecute thread pool configuration poolCoreSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_CORE_SIZE:{}", PoolDefaultConfig.poolCoreSize_default, poolCoreSize);
logger.info("Load domesticUpdateSqlExecute thread pool configuration poolMaxSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_MAX_SIZE:{}", PoolDefaultConfig.poolMaxSize_default, poolMaxSize);
logger.info("Load domesticUpdateSqlExecute thread pool configuration queueMaxSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_QUEUE_MAX_SIZE:{}", PoolDefaultConfig.queueMaxSize_default, taskQueueMaxSize);
final ConcurrentExecuteSqlThreadFactory threadFactory = new ConcurrentExecuteSqlThreadFactory();
final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(taskQueueMaxSize);
ThreadPoolExecutor pushTaskPool = new ThreadPoolExecutor(poolCoreSize, poolMaxSize,60, TimeUnit.SECONDS,
queue,threadFactory,new ThreadPoolExecutor.CallerRunsPolicy()
);
return pushTaskPool;
}
class ConcurrentExecuteSqlThreadFactory implements java.util.concurrent.ThreadFactory{
private final ThreadGroup group;
//编号
private final AtomicInteger threadNumber = new AtomicInteger(1);
//池子名称
private final String namePrefix = "ConcurrentExecuteSqlThreadPool-thread-";
public ConcurrentExecuteSqlThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
/**
* 默认配置
* ps:如果不配置 yml
*/
class PoolDefaultConfig {
private static final String poolCoreSize_default = "5";
private static final String poolMaxSize_default = "15";
private static final String queueMaxSize_default = "200";
}
}
业务代码演示
- 业务描述: There are many numbers in the imported file,Use these numbers to request the third-party interface test results to be restocked one by one
@Autowired
@Qualifier("concurrentExecuteSqlThreadPool")
private ThreadPoolExecutor concurrentExecuteSqlThreadPool;
@Override
public Result checkTransferUpload(MultipartFile file) throws Exception {
// Verify that the file is3Repeat submission within days
Boolean existsKey = redisOperator.existsKey("upload_" + file.getOriginalFilename()+file.getSize());
if (existsKey){
return Result.returnFail("Do not submit this document repeatedly");
}
redisOperator.set("upload_"+file.getOriginalFilename()+file.getSize(),"1");
redisOperator.expire("upload_"+file.getOriginalFilename()+file.getSize(),10);
// 校验文件
if (Objects.isNull(file)) {
return Result.returnFail("文件不能为空");
}
// Verify file data,超过1Werror
Result resultCheck = FileReadLimitUtils.readFileAndUpload(file);
if (Objects.equals(resultCheck.getStatus(), ResultCode.CODE_1)){
logger.error("【checkStateNumberUpload】"+resultCheck.getMessage());
return resultCheck;
}
List<String> data = (List<String>) resultCheck.getData();
if (data == null || data.size() == 0) {
logger.error("【checkStateNumberUpload】"+"内容不能为空");
return Result.returnFail("内容不能为空");
}
//Collection of legal numbers
List<String> result = new ArrayList<>();
boolean b;
for (String s : data) {
b = p.matcher(s).matches();
//b为true则表示合法,为falseIt means the number is illegal
if (b) {
if (StringUtils.isNotEmpty(s)) {
result.add(s);
}
}
}
if (result.size() == 0) {
logger.error("【checkStateNumberUpload】"+"No valid number");
return Result.returnFail("No valid number");
}
// 根据权重 获取唯一 Number porting interface configurationAPI
List<CfgInterfaceConfig> cfgInterfaceConfigAll = findCfgInterfaceConfigAll();
if (CollectionUtils.isEmpty(cfgInterfaceConfigAll)) {
logger.error("【checkStateNumberUpload】"+"No porting was foundAPI配置");
return Result.returnFail("No porting was foundAPI配置");
}
// 获取相应的apikey和baseUrl
String apiKey = "";
String baseUrl = "";
for (CfgInterfaceConfig cfgInterfaceConfig : cfgInterfaceConfigAll) {
if (cfgInterfaceConfig.getCfgKey().equalsIgnoreCase("apikey")) {
apiKey = cfgInterfaceConfig.getCfgValue();
} else if (cfgInterfaceConfig.getCfgKey().equals("baseUrl")) {
baseUrl = cfgInterfaceConfig.getCfgValue();
}
}
if (StringUtils.isBlank(apiKey) || StringUtils.isBlank(baseUrl)) {
logger.error("【checkStateNumberUpload】"+"接口配置的apikey或baseUrl为空");
return Result.returnFail("接口配置的apikey或baseUrl为空");
}
// 去重
List<String> resultPhone = result.stream().distinct().collect(Collectors.toList());
// Depending on the quantity Request third-party interface usage
int taskTotal = (resultPhone.size() / 20) > 5 ? 5 : (resultPhone.size() / 20);
if (taskTotal == 0) {
//数据量小 Processed in the current thread
checkPhoneTransfer(resultPhone, apiKey, baseUrl);
return Result.returnSuccessWithMsg("导入成功,检测完毕");
} else {
//数据量大 在线程池中处理
List<List<String>> tmp = Lists.partition(resultPhone, resultPhone.size() / taskTotal);
CompletableFuture[] allFuture = new CompletableFuture[tmp.size()];
for (int i = 0; i < tmp.size(); i++) {
List<String> tempIds = tmp.get(i);
if (tempIds.isEmpty()) {
continue;
}
String finalApiKey = apiKey;
String finalBaseUrl = baseUrl;
allFuture[i] = CompletableFuture.runAsync(() -> {
try {
checkPhoneTransfer(tempIds, finalApiKey, finalBaseUrl);
} catch (Exception e) {
logger.error("【checkTransferNumberUpload】异常:" + e.getMessage());
}
}, concurrentExecuteSqlThreadPool);
}
// try {
// //等待结果
// CompletableFuture.allOf(allFuture).join();
// } catch (Exception e) {
// logger.error("【checkTransferNumberUpload】异常", e);
// }
return Result.returnSuccessWithMsg("导入成功,正在检测中");
}
}
核心部分(重点)
- After verifying the filtered data, Determine whether the amount of data is large
- If the amount of data is small, the current thread directly completes the business and returns the front-end data results
检测完毕 - Use if the amount of data is large
CompletableFuturePerform business asynchronous execution,Return directly to the front-end data result正在检测中
边栏推荐
- 4.爬虫之Scrapy框架2数据解析&配置参数&数据持久化&提高Scrapy效率
- 图像大面积缺失,也能逼真修复,新模型CM-GAN兼顾全局结构和纹理细节
- All-round visual monitoring of the Istio microservice governance grid (microservice architecture display, resource monitoring, traffic monitoring, link monitoring)
- 新款现代帕里斯帝预售开启,安全、舒适一个不落
- How IDEA runs web programs
- hyperf的启动源码分析(二)——请求如何到达控制器
- LeetCode rotate array
- Solution for browser hijacking by hao360
- Resolved (pymysqL connect to the database error) pymysqL. Err. ProgrammingError: (1146, "Table" test. Students' doesn 't exist ")
- Nuget打包并上传教程
猜你喜欢
随机推荐
pytorch gpu版本安装最新
技能大赛训练题: 子网掩码划分案例
MySQL玩到这种程度,难怪大厂抢着要!
技能大赛训练题:MS15_034漏洞验证与安全加固
IDEA can't find the Database solution
The latest complete code: Incremental training using the word2vec pre-training model (two loading methods corresponding to two saving methods) applicable to various versions of gensim
Six Stones Programming: No matter which function you think is useless, people who can use it will not be able to leave, so at least 99%
Motion capture system for end-positioning control of flexible manipulators
清除浮动的四种方式及其原理理解
ADS communicate with c #
Batch大小不一定是2的n次幂!ML资深学者最新结论
VU 非父子组件通信
深度剖析 Apache EventMesh 云原生分布式事件驱动架构
Unity study notes Description of AVPro video jump function (Seeking)
Introduction to the PartImageNet Semantic Part Segmentation dataset
新款现代帕里斯帝预售开启,安全、舒适一个不落
纸质说明书秒变3D动画,斯坦福大学吴佳俊最新研究,入选ECCV 2022
Node version switching management using NVM
The recently popular domestic interface artifact Apipost experience
Analysis of the startup source code of hyperf (2) - how the request reaches the controller









