当前位置:网站首页>线程池批量处理数据
线程池批量处理数据
2022-07-02 05:11:00 【@淡 定】
配置参数:
book:
core:
poolsize: 100
max:
poolsize: 200
queue:
capacity: 200
keepAlive:
seconds: 30
thread:
name:
prefix: abc
配置类:
@Configuration
@EnableAsync
public class AsyncConfig {
//接收报文核心线程数
@Value("${book.core.poolsize}")
private int bookCorePoolSize;
//接收报文最大线程数
@Value("${book.max.poolsize}")
private int bookMaxPoolSize;
//接收报文队列容量
@Value("${book.queue.capacity}")
private int bookQueueCapacity;
//接收报文线程活跃时间(秒)
@Value("${book.keepAlive.seconds}")
private int bookKeepAliveSeconds;
//接收报文默认线程名称
@Value("${book.thread.name.prefix}")
private String bookThreadNamePrefix;
/** * bookTaskExecutor:(接口的线程池). <br/> * * @return TaskExecutor taskExecutor接口 * @since JDK 1.8 */
@Bean(name = "BookTask")
public ThreadPoolTaskExecutor bookTaskExecutor() {
//newFixedThreadPool
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(bookCorePoolSize);
// 设置最大线程数
executor.setMaxPoolSize(bookMaxPoolSize);
// 设置队列容量
executor.setQueueCapacity(bookQueueCapacity);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(bookKeepAliveSeconds);
// 设置默认线程名称
executor.setThreadNamePrefix(bookThreadNamePrefix);
// 设置拒绝策略
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
异步处理方法(方法一):
@Component
public class SyncBookHandler {
@Autowired
private UserDao userDao;
private static final Logger LOG = LoggerFactory.getLogger(SyncBookHandler.class);
@Async(value = "BookTask")
public Future <String> syncMargePsr(List <UserDomain> bookList, int pageIndex) {
System.out.println("thread name " + Thread.currentThread().getName());
LOG.info(String.format("此批数据的段数为:%s 此段数据的数据条数为:%s", pageIndex, bookList.size()));
//声明future对象
Future <String> result = new AsyncResult <String>("");
//循环遍历该段旅客集合
if (null != bookList && bookList.size() > 0) {
for (UserDomain book : bookList) {
try {
//数据入库操作
userDao.insert(book);
} catch (Exception e) {
//记录出现异常的时间,线程name
result = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);
continue;
}
}
}
return result;
}
}
方法二:
/** * 推送数据线程类 */
class PushDataTask implements Callable<Integer> {
List <UserDomain> list;
int threadNo; //线程编号
PushDataTask( List <UserDomain> list, int threadNo) {
this.list= list;
this.threadNo = threadNo;
}
@Override
public Integer call() throws Exception {
Future <String> result = new AsyncResult <String>("");
//循环遍历该段旅客集合
if (null != bookList && bookList.size() > 0) {
for (UserDomain book : bookList) {
try {
//数据入库操作
userDao.insert(book);
} catch (Exception e) {
//记录出现异常的时间,线程name
result = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);
continue;
}
}
}
return result;
}
}
}
主方法:
public void receiveBookJobRun() {
//测试数据:
List <UserDomain> bookList = null;
for (int i = 0; i < 400000; i++) {
UserDomain book = new UserDomain();
book.setUserName("zcl" + i);
bookList .add(book);
}
//入库开始时间
Long inserOrUpdateBegin = System.currentTimeMillis();
log.info("数据更新开始时间:" + inserOrUpdateBegin);
//接收集合各段的 执行的返回结果
List <Future <String>> futureList = new ArrayList <Future <String>>();
//集合总条数
if (bookList != null) {
int listSize = bookList.size();
int listStart, listEnd;
//当总条数不足threadSum条时 用总条数 当做线程切分值
if (threadSum > listSize) {
threadSum = listSize;
}
//将list 切分多份 多线程执行
for (int i = 0; i < threadSum; i++) {
//计算切割 开始和结束
listStart = listSize / threadSum * i;
listEnd = listSize / threadSum * (i + 1);
//最后一段线程会 出现与其他线程不等的情况
if (i == threadSum - 1) {
listEnd = listSize;
}
//数据切断
List <UserDomain> sunList = bookList.subList(listStart, listEnd);
//每段数据集合并行入库(方法一)
futureList.add(syncBookHandler.syncMargePsr(sunList, i));
//每段数据集合并行入库(方法二 ,线程池)
futureList.add()bookTaskExecutor.executor.submit(new PushDataTask(sunList,i)))
}
//对各个线程段结果进行解析
for (Future <String> future : futureList) {
String str;
if (null != future) {
try {
str = future.get().toString();
log.info("current thread id =" + Thread.currentThread().getName() + ",result=" + str);
} catch (InterruptedException | ExecutionException e) {
log.info("线程运行异常!");
}
} else {
log.info("线程运行异常!");
}
}
}
Long inserOrUpdateEnd = System.currentTimeMillis();
log.info("数据更新结束时间:" + inserOrUpdateEnd + "。此次更新数据花费时间为:" + (inserOrUpdateEnd - inserOrUpdateBegin));
}
参考:https://blog.csdn.net/weixin_43889571/article/details/106604183?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522165597556616782389414029%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=165597556616782389414029&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-2-106604183-null-null.142^v21^control,157^v15^new_3&utm_term=%E5%A4%9A%E7%BA%BF%E7%A8%8B%E6%89%B9%E9%87%8F%E5%A4%84%E7%90%86%E6%95%B0%E6%8D%AE&spm=1018.2226.3001.4187
边栏推荐
- Video cover image setting, put cover images into multiple videos in the simplest way
- [Yu Yue education] autumn 2021 reference materials of Tongji University
- [common error] the DDR type of FPGA device is selected incorrectly
- LeetCode 241. 为运算表达式设计优先级(分治/记忆化递归/动态规划)
- Typescript function details
- No logic is executed after the El form is validated successfully
- Global and Chinese market of insulin pens 2022-2028: Research Report on technology, participants, trends, market size and share
- Collectors. Groupingby sort
- Fasttext text text classification
- Simple and practical accounting software, so that accounts can be checked
猜你喜欢
LeetCode 1175. Prime number arrangement (prime number judgment + Combinatorial Mathematics)
奠定少儿编程成为基础学科的原理
Preparation for writing SAP ui5 applications using typescript
Pytest learning ----- pytest Interface Association framework encapsulation of interface automation testing
Rhcsa --- work on the fourth day
Mathematical knowledge (Euler function)
C# 基于MQTTNet的服务端与客户端通信案例
List of common bugs in software testing
Here comes the chicken soup! Keep this quick guide for data analysts
el form 表单validate成功后没有执行逻辑
随机推荐
Pyechats 1.19 generate a web version of Baidu map
The reason why sizeof (ARR) / sizeof (arr[0]) is used in the function to calculate the length of the array is incorrect
Mapping settings in elk (8) es
国产全中文-自动化测试软件Apifox
06 装饰(Decorator)模式
Line by line explanation of yolox source code of anchor free series network (7) -- obj in head_ loss、Cls_ Loss and reg_ Calculation and reverse transmission of loss I
Mouse events in JS
Express logistics quick query method, set the unsigned doc No. to refresh and query automatically
C # picture display occupancy problem
Fabric.js IText 上标和下标
Domestic all Chinese automatic test software apifox
Leetcode basic programming: array
How do I interview for a successful software testing position? If you want to get a high salary, you must see the offer
Feign realizes file uploading and downloading
No logic is executed after the El form is validated successfully
画波形图_数字IC
leetcode两数相加go实现
Dark horse notes -- Set Series Collection
Acelems Expressway microgrid energy efficiency management platform and intelligent lighting solution intelligent lighting tunnel
Simple and practical accounting software, so that accounts can be checked