当前位置:网站首页>Thread pool batch processing data
Thread pool batch processing data
2022-07-02 05:16:00 【@Calm down】
Configuration parameters :
book:
core:
poolsize: 100
max:
poolsize: 200
queue:
capacity: 200
keepAlive:
seconds: 30
thread:
name:
prefix: abc
Configuration class :
@Configuration
@EnableAsync
public class AsyncConfig {
// Number of core threads receiving message
@Value("${book.core.poolsize}")
private int bookCorePoolSize;
// Maximum number of threads receiving message
@Value("${book.max.poolsize}")
private int bookMaxPoolSize;
// Receiving message queue capacity
@Value("${book.queue.capacity}")
private int bookQueueCapacity;
// Receive message thread active time ( second )
@Value("${book.keepAlive.seconds}")
private int bookKeepAliveSeconds;
// Receive message default thread name
@Value("${book.thread.name.prefix}")
private String bookThreadNamePrefix;
/** * bookTaskExecutor:( Thread pool for the interface ). <br/> * * @return TaskExecutor taskExecutor Interface * @since JDK 1.8 */
@Bean(name = "BookTask")
public ThreadPoolTaskExecutor bookTaskExecutor() {
//newFixedThreadPool
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// Set the number of core threads
executor.setCorePoolSize(bookCorePoolSize);
// Set the maximum number of threads
executor.setMaxPoolSize(bookMaxPoolSize);
// Set the queue capacity
executor.setQueueCapacity(bookQueueCapacity);
// Set thread active time ( second )
executor.setKeepAliveSeconds(bookKeepAliveSeconds);
// Set the default thread name
executor.setThreadNamePrefix(bookThreadNamePrefix);
// Set rejection policy
// rejection-policy: When pool Already achieved max size When , How to deal with new tasks
// CALLER_RUNS: Do not execute tasks in New Threads , Instead, it is executed by the thread of the caller
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// Wait for all tasks to finish before closing the thread pool
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
Asynchronous processing method ( Method 1 ):
@Component
public class SyncBookHandler {
@Autowired
private UserDao userDao;
private static final Logger LOG = LoggerFactory.getLogger(SyncBookHandler.class);
@Async(value = "BookTask")
public Future <String> syncMargePsr(List <UserDomain> bookList, int pageIndex) {
System.out.println("thread name " + Thread.currentThread().getName());
LOG.info(String.format(" The number of segments of this batch of data is :%s The number of data pieces in this segment is :%s", pageIndex, bookList.size()));
// Statement future object
Future <String> result = new AsyncResult <String>("");
// Loop through the passenger collection of this section
if (null != bookList && bookList.size() > 0) {
for (UserDomain book : bookList) {
try {
// Data warehousing operation
userDao.insert(book);
} catch (Exception e) {
// Record the time when the exception occurred , Threads name
result = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);
continue;
}
}
}
return result;
}
}
Method 2 :
/** * Push data thread class */
class PushDataTask implements Callable<Integer> {
List <UserDomain> list;
int threadNo; // The thread number
PushDataTask( List <UserDomain> list, int threadNo) {
this.list= list;
this.threadNo = threadNo;
}
@Override
public Integer call() throws Exception {
Future <String> result = new AsyncResult <String>("");
// Loop through the passenger collection of this section
if (null != bookList && bookList.size() > 0) {
for (UserDomain book : bookList) {
try {
// Data warehousing operation
userDao.insert(book);
} catch (Exception e) {
// Record the time when the exception occurred , Threads name
result = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);
continue;
}
}
}
return result;
}
}
}
Main method :
public void receiveBookJobRun() {
// Test data :
List <UserDomain> bookList = null;
for (int i = 0; i < 400000; i++) {
UserDomain book = new UserDomain();
book.setUserName("zcl" + i);
bookList .add(book);
}
// Warehousing start time
Long inserOrUpdateBegin = System.currentTimeMillis();
log.info(" Data update start time :" + inserOrUpdateBegin);
// Receive the of each segment of the set The return result of execution
List <Future <String>> futureList = new ArrayList <Future <String>>();
// Total number of sets
if (bookList != null) {
int listSize = bookList.size();
int listStart, listEnd;
// When the total number of pieces is less than threadSum Stripe time Total number of strips used As thread segmentation value
if (threadSum > listSize) {
threadSum = listSize;
}
// take list Cut into several pieces Multithreaded execution
for (int i = 0; i < threadSum; i++) {
// Calculate cutting Start and end
listStart = listSize / threadSum * i;
listEnd = listSize / threadSum * (i + 1);
// The last thread will Different from other threads
if (i == threadSum - 1) {
listEnd = listSize;
}
// Data cut-off
List <UserDomain> sunList = bookList.subList(listStart, listEnd);
// Each data set is stored in parallel ( Method 1 )
futureList.add(syncBookHandler.syncMargePsr(sunList, i));
// Each data set is stored in parallel ( Method 2 , Thread pool )
futureList.add()bookTaskExecutor.executor.submit(new PushDataTask(sunList,i)))
}
// Analyze the results of each thread segment
for (Future <String> future : futureList) {
String str;
if (null != future) {
try {
str = future.get().toString();
log.info("current thread id =" + Thread.currentThread().getName() + ",result=" + str);
} catch (InterruptedException | ExecutionException e) {
log.info(" Thread running exception !");
}
} else {
log.info(" Thread running exception !");
}
}
}
Long inserOrUpdateEnd = System.currentTimeMillis();
log.info(" Data update end time :" + inserOrUpdateEnd + ". The time taken to update the data is :" + (inserOrUpdateEnd - inserOrUpdateBegin));
}
Reference resources :https://blog.csdn.net/weixin_43889571/article/details/106604183?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522165597556616782389414029%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=165597556616782389414029&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-2-106604183-null-null.142^v21^control,157^v15^new_3&utm_term=%E5%A4%9A%E7%BA%BF%E7%A8%8B%E6%89%B9%E9%87%8F%E5%A4%84%E7%90%86%E6%95%B0%E6%8D%AE&spm=1018.2226.3001.4187
边栏推荐
- ubuntu20.04安装mysql8
- 数据库批量插入数据
- Leetcode 18 problem [sum of four numbers] recursive solution
- 指针使用详解
- 7.1 Résumé du concours de simulation
- 2022阿里巴巴全球数学竞赛 第4题 虎虎生威(盲盒问题、集卡问题)解决思路
- 线程池批量处理数据
- Video cover image setting, put cover images into multiple videos in the simplest way
- Express logistics quick query method, set the unsigned doc No. to refresh and query automatically
- Lay the foundation for children's programming to become a basic discipline
猜你喜欢
![[bus interface] Axi interface](/img/ee/95ade7811ec2c37fb67a77f0b6ae2a.jpg)
[bus interface] Axi interface

LeetCode 241. 为运算表达式设计优先级(分治/记忆化递归/动态规划)

LeetCode 1175. Prime number arrangement (prime number judgment + Combinatorial Mathematics)

函数栈帧的创建和销毁

创新永不止步——nVisual网络可视化平台针对Excel导入的创新历程
![Gee: explore the change of water area in the North Canal basin over the past 30 years [year by year]](/img/7b/b9ef76cee8b32204331a9c3c21b5c2.jpg)
Gee: explore the change of water area in the North Canal basin over the past 30 years [year by year]

Fabric.js 右键菜单

6.网络-基础

No logic is executed after the El form is validated successfully

Line by line explanation of yolox source code of anchor free series network (7) -- obj in head_ loss、Cls_ Loss and reg_ Calculation and reverse transmission of loss I
随机推荐
黑马笔记---Set系列集合
Global and Chinese market of pressure gauges 2022-2028: Research Report on technology, participants, trends, market size and share
Feign realizes file uploading and downloading
Cultivate primary and secondary school students' love for educational robots
数据库问题汇总
运维工作的“本手、妙手、俗手”
Go implements leetcode rotation array
Video multiple effects production, fade in effect and border background are added at the same time
Record my pytorch installation process and errors
ubuntu20.04安装mysql8
leetcode存在重复元素go实现
LeetCode 241. 为运算表达式设计优先级(分治/记忆化递归/动态规划)
Gee: use of common mask functions in remote sensing image processing [updatemask]
2022 Alibaba global mathematics competition, question 4, huhushengwei (blind box problem, truck problem) solution ideas
将光盘中的cda保存到电脑中
LeetCode 241. Design priorities for operational expressions (divide and conquer / mnemonic recursion / dynamic programming)
Fabric.js 3个api设置画布宽高
Dark horse notes -- Set Series Collection
Express logistics quick query method, set the unsigned doc No. to refresh and query automatically
Application d'un robot intelligent dans le domaine de l'agroécologie