当前位置:网站首页>Thread pool batch processing data
Thread pool batch processing data
2022-07-02 05:16:00 【@Calm down】
Configuration parameters :
book:
core:
poolsize: 100
max:
poolsize: 200
queue:
capacity: 200
keepAlive:
seconds: 30
thread:
name:
prefix: abc
Configuration class :
@Configuration
@EnableAsync
public class AsyncConfig {
// Number of core threads receiving message
@Value("${book.core.poolsize}")
private int bookCorePoolSize;
// Maximum number of threads receiving message
@Value("${book.max.poolsize}")
private int bookMaxPoolSize;
// Receiving message queue capacity
@Value("${book.queue.capacity}")
private int bookQueueCapacity;
// Receive message thread active time ( second )
@Value("${book.keepAlive.seconds}")
private int bookKeepAliveSeconds;
// Receive message default thread name
@Value("${book.thread.name.prefix}")
private String bookThreadNamePrefix;
/** * bookTaskExecutor:( Thread pool for the interface ). <br/> * * @return TaskExecutor taskExecutor Interface * @since JDK 1.8 */
@Bean(name = "BookTask")
public ThreadPoolTaskExecutor bookTaskExecutor() {
//newFixedThreadPool
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// Set the number of core threads
executor.setCorePoolSize(bookCorePoolSize);
// Set the maximum number of threads
executor.setMaxPoolSize(bookMaxPoolSize);
// Set the queue capacity
executor.setQueueCapacity(bookQueueCapacity);
// Set thread active time ( second )
executor.setKeepAliveSeconds(bookKeepAliveSeconds);
// Set the default thread name
executor.setThreadNamePrefix(bookThreadNamePrefix);
// Set rejection policy
// rejection-policy: When pool Already achieved max size When , How to deal with new tasks
// CALLER_RUNS: Do not execute tasks in New Threads , Instead, it is executed by the thread of the caller
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// Wait for all tasks to finish before closing the thread pool
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
Asynchronous processing method ( Method 1 ):
@Component
public class SyncBookHandler {
@Autowired
private UserDao userDao;
private static final Logger LOG = LoggerFactory.getLogger(SyncBookHandler.class);
@Async(value = "BookTask")
public Future <String> syncMargePsr(List <UserDomain> bookList, int pageIndex) {
System.out.println("thread name " + Thread.currentThread().getName());
LOG.info(String.format(" The number of segments of this batch of data is :%s The number of data pieces in this segment is :%s", pageIndex, bookList.size()));
// Statement future object
Future <String> result = new AsyncResult <String>("");
// Loop through the passenger collection of this section
if (null != bookList && bookList.size() > 0) {
for (UserDomain book : bookList) {
try {
// Data warehousing operation
userDao.insert(book);
} catch (Exception e) {
// Record the time when the exception occurred , Threads name
result = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);
continue;
}
}
}
return result;
}
}
Method 2 :
/** * Push data thread class */
class PushDataTask implements Callable<Integer> {
List <UserDomain> list;
int threadNo; // The thread number
PushDataTask( List <UserDomain> list, int threadNo) {
this.list= list;
this.threadNo = threadNo;
}
@Override
public Integer call() throws Exception {
Future <String> result = new AsyncResult <String>("");
// Loop through the passenger collection of this section
if (null != bookList && bookList.size() > 0) {
for (UserDomain book : bookList) {
try {
// Data warehousing operation
userDao.insert(book);
} catch (Exception e) {
// Record the time when the exception occurred , Threads name
result = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);
continue;
}
}
}
return result;
}
}
}
Main method :
public void receiveBookJobRun() {
// Test data :
List <UserDomain> bookList = null;
for (int i = 0; i < 400000; i++) {
UserDomain book = new UserDomain();
book.setUserName("zcl" + i);
bookList .add(book);
}
// Warehousing start time
Long inserOrUpdateBegin = System.currentTimeMillis();
log.info(" Data update start time :" + inserOrUpdateBegin);
// Receive the of each segment of the set The return result of execution
List <Future <String>> futureList = new ArrayList <Future <String>>();
// Total number of sets
if (bookList != null) {
int listSize = bookList.size();
int listStart, listEnd;
// When the total number of pieces is less than threadSum Stripe time Total number of strips used As thread segmentation value
if (threadSum > listSize) {
threadSum = listSize;
}
// take list Cut into several pieces Multithreaded execution
for (int i = 0; i < threadSum; i++) {
// Calculate cutting Start and end
listStart = listSize / threadSum * i;
listEnd = listSize / threadSum * (i + 1);
// The last thread will Different from other threads
if (i == threadSum - 1) {
listEnd = listSize;
}
// Data cut-off
List <UserDomain> sunList = bookList.subList(listStart, listEnd);
// Each data set is stored in parallel ( Method 1 )
futureList.add(syncBookHandler.syncMargePsr(sunList, i));
// Each data set is stored in parallel ( Method 2 , Thread pool )
futureList.add()bookTaskExecutor.executor.submit(new PushDataTask(sunList,i)))
}
// Analyze the results of each thread segment
for (Future <String> future : futureList) {
String str;
if (null != future) {
try {
str = future.get().toString();
log.info("current thread id =" + Thread.currentThread().getName() + ",result=" + str);
} catch (InterruptedException | ExecutionException e) {
log.info(" Thread running exception !");
}
} else {
log.info(" Thread running exception !");
}
}
}
Long inserOrUpdateEnd = System.currentTimeMillis();
log.info(" Data update end time :" + inserOrUpdateEnd + ". The time taken to update the data is :" + (inserOrUpdateEnd - inserOrUpdateBegin));
}
Reference resources :https://blog.csdn.net/weixin_43889571/article/details/106604183?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522165597556616782389414029%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=165597556616782389414029&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-2-106604183-null-null.142^v21^control,157^v15^new_3&utm_term=%E5%A4%9A%E7%BA%BF%E7%A8%8B%E6%89%B9%E9%87%8F%E5%A4%84%E7%90%86%E6%95%B0%E6%8D%AE&spm=1018.2226.3001.4187
边栏推荐
- Knowledge arrangement about steam Education
- Gee: explore the characteristics of precipitation change in the Yellow River Basin in the past 10 years [pixel by pixel analysis]
- LeetCode 241. Design priorities for operational expressions (divide and conquer / mnemonic recursion / dynamic programming)
- Application of intelligent robot in agricultural ecology
- Mysql基础---查询(1天学会mysql基础)
- No logic is executed after the El form is validated successfully
- 线程池批量处理数据
- C# 图片显示占用问题
- 案例分享|智慧化的西部机场
- Implementation of go language for deleting duplicate items in sorting array
猜你喜欢
Solution: the agent throws an exception error
Collectors. Groupingby sort
黑马笔记---Set系列集合
Dark horse notes -- map set system
数据库问题汇总
6.网络-基础
Fabric.js 精简JSON
Johnson–Lindenstrauss Lemma(2)
[common error] the DDR type of FPGA device is selected incorrectly
Video multiple effects production, fade in effect and border background are added at the same time
随机推荐
Preparation for writing SAP ui5 applications using typescript
Collectors. Groupingby sort
Fabric.js 圆形笔刷
Using Kube bench and Kube hunter to evaluate the risk of kubernetes cluster
7.TCP的十一种状态集
Global and Chinese markets for marine selective catalytic reduction systems 2022-2028: Research Report on technology, participants, trends, market size and share
Fabric.js 居中元素
2022阿里巴巴全球数学竞赛 第4题 虎虎生威(盲盒问题、集卡问题)解决思路
paddle: ValueError:quality setting only supported for ‘jpeg‘ compression
js面试收藏试题1
Cubemx DMA notes
Mysql重点难题(2)汇总
js中的Map(含leetcode例题)
创新永不止步——nVisual网络可视化平台针对Excel导入的创新历程
Using QA band and bit mask in Google Earth engine
Analyzing the hands-on building tutorial in children's programming
视差特效的原理和实现方法
数学知识(欧拉函数)
Pyechart1.19 national air quality exhibition
Gee dataset: chirps pentad high resolution global grid rainfall dataset