当前位置:网站首页>Asynchronous processing business using CompletableFuture
Asynchronous processing business using CompletableFuture
2022-07-31 14:08:00 【ZeKi_hao】
准备
pom.xml
- 为了使用
Lists.partition(list集合,Divide each onelist的长度) - 将一个listThe collection is split into multipleslist集合
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0-jre</version>
</dependency>
自定义线程池(仅参考)
- CompletableFutureWhen using the business, you need to specify which thread pool to use
- Give the thread pool definition in the configuration classbean管理
@Configuration
public class ConcurrentExecuteSqlThreadPoolConfig {
private final static Logger logger = LoggerFactory.getLogger(ConcurrentExecuteSqlThreadPoolConfig.class);
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_CORE_SIZE:"+ PoolDefaultConfig.poolCoreSize_default +"}")
private int poolCoreSize;
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_MAX_SIZE:"+ PoolDefaultConfig.poolMaxSize_default +"}")
private int poolMaxSize;
@Value("${ConcurrentExecuteSqlThreadPoolConfig.POOL_QUEUE_MAX_SIZE:"+ PoolDefaultConfig.queueMaxSize_default +"}")
private int taskQueueMaxSize;
/**
* 并发执行的sql的线程池
*/
@Bean(name = "concurrentExecuteSqlThreadPool")
public ThreadPoolExecutor concurrentExecuteSqlThreadPool(){
logger.info("Load domesticUpdateSqlExecute thread pool configuration poolCoreSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_CORE_SIZE:{}", PoolDefaultConfig.poolCoreSize_default, poolCoreSize);
logger.info("Load domesticUpdateSqlExecute thread pool configuration poolMaxSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_MAX_SIZE:{}", PoolDefaultConfig.poolMaxSize_default, poolMaxSize);
logger.info("Load domesticUpdateSqlExecute thread pool configuration queueMaxSize_default:{} --> ConcurrentExecuteSqlThreadPoolConfig.POOL_QUEUE_MAX_SIZE:{}", PoolDefaultConfig.queueMaxSize_default, taskQueueMaxSize);
final ConcurrentExecuteSqlThreadFactory threadFactory = new ConcurrentExecuteSqlThreadFactory();
final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(taskQueueMaxSize);
ThreadPoolExecutor pushTaskPool = new ThreadPoolExecutor(poolCoreSize, poolMaxSize,60, TimeUnit.SECONDS,
queue,threadFactory,new ThreadPoolExecutor.CallerRunsPolicy()
);
return pushTaskPool;
}
class ConcurrentExecuteSqlThreadFactory implements java.util.concurrent.ThreadFactory{
private final ThreadGroup group;
//编号
private final AtomicInteger threadNumber = new AtomicInteger(1);
//池子名称
private final String namePrefix = "ConcurrentExecuteSqlThreadPool-thread-";
public ConcurrentExecuteSqlThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
/**
* 默认配置
* ps:如果不配置 yml
*/
class PoolDefaultConfig {
private static final String poolCoreSize_default = "5";
private static final String poolMaxSize_default = "15";
private static final String queueMaxSize_default = "200";
}
}
业务代码演示
- 业务描述: There are many numbers in the imported file,Use these numbers to request the third-party interface test results to be restocked one by one
@Autowired
@Qualifier("concurrentExecuteSqlThreadPool")
private ThreadPoolExecutor concurrentExecuteSqlThreadPool;
@Override
public Result checkTransferUpload(MultipartFile file) throws Exception {
// Verify that the file is3Repeat submission within days
Boolean existsKey = redisOperator.existsKey("upload_" + file.getOriginalFilename()+file.getSize());
if (existsKey){
return Result.returnFail("Do not submit this document repeatedly");
}
redisOperator.set("upload_"+file.getOriginalFilename()+file.getSize(),"1");
redisOperator.expire("upload_"+file.getOriginalFilename()+file.getSize(),10);
// 校验文件
if (Objects.isNull(file)) {
return Result.returnFail("文件不能为空");
}
// Verify file data,超过1Werror
Result resultCheck = FileReadLimitUtils.readFileAndUpload(file);
if (Objects.equals(resultCheck.getStatus(), ResultCode.CODE_1)){
logger.error("【checkStateNumberUpload】"+resultCheck.getMessage());
return resultCheck;
}
List<String> data = (List<String>) resultCheck.getData();
if (data == null || data.size() == 0) {
logger.error("【checkStateNumberUpload】"+"内容不能为空");
return Result.returnFail("内容不能为空");
}
//Collection of legal numbers
List<String> result = new ArrayList<>();
boolean b;
for (String s : data) {
b = p.matcher(s).matches();
//b为true则表示合法,为falseIt means the number is illegal
if (b) {
if (StringUtils.isNotEmpty(s)) {
result.add(s);
}
}
}
if (result.size() == 0) {
logger.error("【checkStateNumberUpload】"+"No valid number");
return Result.returnFail("No valid number");
}
// 根据权重 获取唯一 Number porting interface configurationAPI
List<CfgInterfaceConfig> cfgInterfaceConfigAll = findCfgInterfaceConfigAll();
if (CollectionUtils.isEmpty(cfgInterfaceConfigAll)) {
logger.error("【checkStateNumberUpload】"+"No porting was foundAPI配置");
return Result.returnFail("No porting was foundAPI配置");
}
// 获取相应的apikey和baseUrl
String apiKey = "";
String baseUrl = "";
for (CfgInterfaceConfig cfgInterfaceConfig : cfgInterfaceConfigAll) {
if (cfgInterfaceConfig.getCfgKey().equalsIgnoreCase("apikey")) {
apiKey = cfgInterfaceConfig.getCfgValue();
} else if (cfgInterfaceConfig.getCfgKey().equals("baseUrl")) {
baseUrl = cfgInterfaceConfig.getCfgValue();
}
}
if (StringUtils.isBlank(apiKey) || StringUtils.isBlank(baseUrl)) {
logger.error("【checkStateNumberUpload】"+"接口配置的apikey或baseUrl为空");
return Result.returnFail("接口配置的apikey或baseUrl为空");
}
// 去重
List<String> resultPhone = result.stream().distinct().collect(Collectors.toList());
// Depending on the quantity Request third-party interface usage
int taskTotal = (resultPhone.size() / 20) > 5 ? 5 : (resultPhone.size() / 20);
if (taskTotal == 0) {
//数据量小 Processed in the current thread
checkPhoneTransfer(resultPhone, apiKey, baseUrl);
return Result.returnSuccessWithMsg("导入成功,检测完毕");
} else {
//数据量大 在线程池中处理
List<List<String>> tmp = Lists.partition(resultPhone, resultPhone.size() / taskTotal);
CompletableFuture[] allFuture = new CompletableFuture[tmp.size()];
for (int i = 0; i < tmp.size(); i++) {
List<String> tempIds = tmp.get(i);
if (tempIds.isEmpty()) {
continue;
}
String finalApiKey = apiKey;
String finalBaseUrl = baseUrl;
allFuture[i] = CompletableFuture.runAsync(() -> {
try {
checkPhoneTransfer(tempIds, finalApiKey, finalBaseUrl);
} catch (Exception e) {
logger.error("【checkTransferNumberUpload】异常:" + e.getMessage());
}
}, concurrentExecuteSqlThreadPool);
}
// try {
// //等待结果
// CompletableFuture.allOf(allFuture).join();
// } catch (Exception e) {
// logger.error("【checkTransferNumberUpload】异常", e);
// }
return Result.returnSuccessWithMsg("导入成功,正在检测中");
}
}
核心部分(重点)
- After verifying the filtered data, Determine whether the amount of data is large
- If the amount of data is small, the current thread directly completes the business and returns the front-end data results
检测完毕 - Use if the amount of data is large
CompletableFuturePerform business asynchronous execution,Return directly to the front-end data result正在检测中
边栏推荐
- The pre-sale of the new Hyundai Paristi is open, and safety and comfort are not lost
- CodeIgniter 打开错误日志
- IDEA连接MySQL数据库并使用数据
- C# control ListView usage
- 使用CompletableFuture进行异步处理业务
- Linux bash: redis-server: command not found
- 技能大赛训练题:交换机的远程管理
- 五个维度着手MySQL的优化
- How IDEA runs web programs
- 1-hour live broadcast recruitment order: industry leaders share dry goods, and enterprise registration is open丨qubit · point of view
猜你喜欢

Batch大小不一定是2的n次幂!ML资深学者最新结论

五个维度着手MySQL的优化

Open Inventor 10.12 Major Improvements - Harmony Edition

In the future, the interviewer asks you why it is not recommended to use Select *, please answer him out loud!

1-hour live broadcast recruitment order: industry leaders share dry goods, and enterprise registration is open丨qubit · point of view

C# using ComboBox control

C# Get network card information NetworkInterface IPInterfaceProperties

The recently popular domestic interface artifact Apipost experience

An article makes it clear!What is the difference and connection between database and data warehouse?

MySQL【子查询】
随机推荐
五个维度着手MySQL的优化
Miller_Rabin 米勒拉宾概率筛【模板】
IDEA connects to MySQL database and uses data
1-hour live broadcast recruitment order: industry leaders share dry goods, and enterprise registration is open丨qubit · point of view
推荐系统-召回阶段-2013:DSSM(双塔模型)【Embedding(语义向量)召回】【微软】
The JVM a class loader
leetcode:2032. Values that appear in at least two arrays
为什么要分库分表?
MySQL玩到这种程度,难怪大厂抢着要!
ADS communicate with c #
已解决(pymysqL连接数据库报错)pymysqL.err.ProgrammingError: (1146,“Table ‘test.students‘ doesn‘t exist“)
Error IDEA Terminated with exit code 1
Shell script classic case: detecting whether a batch of hosts is alive
1小时直播招募令:行业大咖干货分享,企业报名开启丨量子位·视点
Four ways to clear the float and its principle understanding
使用CompletableFuture进行异步处理业务
Resolved (pymysqL connect to the database error) pymysqL. Err. ProgrammingError: (1146, "Table" test. Students' doesn 't exist ")
纸质说明书秒变3D动画,斯坦福大学吴佳俊最新研究,入选ECCV 2022
Selenium自动化测试之Selenium IDE
动作捕捉系统用于柔性机械臂的末端定位控制