当前位置:网站首页>Thread pool batch processing data
Thread pool batch processing data
2022-07-02 05:16:00 【@Calm down】
Configuration parameters :
book:
core:
poolsize: 100
max:
poolsize: 200
queue:
capacity: 200
keepAlive:
seconds: 30
thread:
name:
prefix: abc
Configuration class :
@Configuration
@EnableAsync
public class AsyncConfig {
// Number of core threads receiving message
@Value("${book.core.poolsize}")
private int bookCorePoolSize;
// Maximum number of threads receiving message
@Value("${book.max.poolsize}")
private int bookMaxPoolSize;
// Receiving message queue capacity
@Value("${book.queue.capacity}")
private int bookQueueCapacity;
// Receive message thread active time ( second )
@Value("${book.keepAlive.seconds}")
private int bookKeepAliveSeconds;
// Receive message default thread name
@Value("${book.thread.name.prefix}")
private String bookThreadNamePrefix;
/** * bookTaskExecutor:( Thread pool for the interface ). <br/> * * @return TaskExecutor taskExecutor Interface * @since JDK 1.8 */
@Bean(name = "BookTask")
public ThreadPoolTaskExecutor bookTaskExecutor() {
//newFixedThreadPool
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// Set the number of core threads
executor.setCorePoolSize(bookCorePoolSize);
// Set the maximum number of threads
executor.setMaxPoolSize(bookMaxPoolSize);
// Set the queue capacity
executor.setQueueCapacity(bookQueueCapacity);
// Set thread active time ( second )
executor.setKeepAliveSeconds(bookKeepAliveSeconds);
// Set the default thread name
executor.setThreadNamePrefix(bookThreadNamePrefix);
// Set rejection policy
// rejection-policy: When pool Already achieved max size When , How to deal with new tasks
// CALLER_RUNS: Do not execute tasks in New Threads , Instead, it is executed by the thread of the caller
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// Wait for all tasks to finish before closing the thread pool
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
Asynchronous processing method ( Method 1 ):
@Component
public class SyncBookHandler {
@Autowired
private UserDao userDao;
private static final Logger LOG = LoggerFactory.getLogger(SyncBookHandler.class);
@Async(value = "BookTask")
public Future <String> syncMargePsr(List <UserDomain> bookList, int pageIndex) {
System.out.println("thread name " + Thread.currentThread().getName());
LOG.info(String.format(" The number of segments of this batch of data is :%s The number of data pieces in this segment is :%s", pageIndex, bookList.size()));
// Statement future object
Future <String> result = new AsyncResult <String>("");
// Loop through the passenger collection of this section
if (null != bookList && bookList.size() > 0) {
for (UserDomain book : bookList) {
try {
// Data warehousing operation
userDao.insert(book);
} catch (Exception e) {
// Record the time when the exception occurred , Threads name
result = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);
continue;
}
}
}
return result;
}
}
Method 2 :
/** * Push data thread class */
class PushDataTask implements Callable<Integer> {
List <UserDomain> list;
int threadNo; // The thread number
PushDataTask( List <UserDomain> list, int threadNo) {
this.list= list;
this.threadNo = threadNo;
}
@Override
public Integer call() throws Exception {
Future <String> result = new AsyncResult <String>("");
// Loop through the passenger collection of this section
if (null != bookList && bookList.size() > 0) {
for (UserDomain book : bookList) {
try {
// Data warehousing operation
userDao.insert(book);
} catch (Exception e) {
// Record the time when the exception occurred , Threads name
result = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);
continue;
}
}
}
return result;
}
}
}
Main method :
public void receiveBookJobRun() {
// Test data :
List <UserDomain> bookList = null;
for (int i = 0; i < 400000; i++) {
UserDomain book = new UserDomain();
book.setUserName("zcl" + i);
bookList .add(book);
}
// Warehousing start time
Long inserOrUpdateBegin = System.currentTimeMillis();
log.info(" Data update start time :" + inserOrUpdateBegin);
// Receive the of each segment of the set The return result of execution
List <Future <String>> futureList = new ArrayList <Future <String>>();
// Total number of sets
if (bookList != null) {
int listSize = bookList.size();
int listStart, listEnd;
// When the total number of pieces is less than threadSum Stripe time Total number of strips used As thread segmentation value
if (threadSum > listSize) {
threadSum = listSize;
}
// take list Cut into several pieces Multithreaded execution
for (int i = 0; i < threadSum; i++) {
// Calculate cutting Start and end
listStart = listSize / threadSum * i;
listEnd = listSize / threadSum * (i + 1);
// The last thread will Different from other threads
if (i == threadSum - 1) {
listEnd = listSize;
}
// Data cut-off
List <UserDomain> sunList = bookList.subList(listStart, listEnd);
// Each data set is stored in parallel ( Method 1 )
futureList.add(syncBookHandler.syncMargePsr(sunList, i));
// Each data set is stored in parallel ( Method 2 , Thread pool )
futureList.add()bookTaskExecutor.executor.submit(new PushDataTask(sunList,i)))
}
// Analyze the results of each thread segment
for (Future <String> future : futureList) {
String str;
if (null != future) {
try {
str = future.get().toString();
log.info("current thread id =" + Thread.currentThread().getName() + ",result=" + str);
} catch (InterruptedException | ExecutionException e) {
log.info(" Thread running exception !");
}
} else {
log.info(" Thread running exception !");
}
}
}
Long inserOrUpdateEnd = System.currentTimeMillis();
log.info(" Data update end time :" + inserOrUpdateEnd + ". The time taken to update the data is :" + (inserOrUpdateEnd - inserOrUpdateBegin));
}
Reference resources :https://blog.csdn.net/weixin_43889571/article/details/106604183?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522165597556616782389414029%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=165597556616782389414029&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-2-106604183-null-null.142^v21^control,157^v15^new_3&utm_term=%E5%A4%9A%E7%BA%BF%E7%A8%8B%E6%89%B9%E9%87%8F%E5%A4%84%E7%90%86%E6%95%B0%E6%8D%AE&spm=1018.2226.3001.4187
边栏推荐
- National all Chinese Automatic Test Software apifox
- Solution: the agent throws an exception error
- Set the default style of scroll bar Google browser
- Gee series: unit 6 building various remote sensing indexes in Google Earth engine
- Global and Chinese market of impact roll 2022-2028: Research Report on technology, participants, trends, market size and share
- Global and Chinese market of cell culture freezers 2022-2028: Research Report on technology, participants, trends, market size and share
- Using QA band and bit mask in Google Earth engine
- Go GC garbage collection notes (three color mark)
- Find the subscript with and as the target from the array
- Summary of database problems
猜你喜欢

Gee: create a new feature and set corresponding attributes

Gee data set: export the distribution and installed capacity of hydropower stations in the country to CSV table

Gee: remote sensing image composite and mosaic

The underlying principle of go map (storage and capacity expansion)

数学问题(数论)试除法做质数的判断、分解质因数,筛质数

Fabric.js IText设置指定文字的颜色和背景色

Gee dataset: chirps pentad high resolution global grid rainfall dataset

Fabric.js 居中元素

Paddlepaddle project source code

Cubemx DMA notes
随机推荐
在{{}}中拼接字符
Gee: use of common mask functions in remote sensing image processing [updatemask]
线程池批量处理数据
Super detailed pycharm tutorial
6.30 year end summary, end of student age
Pycharm breakpoint management: temporarily cancel some breakpoints + run directly to a line
el form 表单validate成功后没有执行逻辑
Preparation for writing SAP ui5 applications using typescript
ubuntu20.04安装mysql8
Gee: analyze the change of spatial centroid of remote sensing image [centroid acquisition analysis]
Gee series: Unit 2 explore datasets
No logic is executed after the El form is validated successfully
Global and Chinese market of impact roll 2022-2028: Research Report on technology, participants, trends, market size and share
LeetCode 1175. 质数排列(质数判断+组合数学)
Knowledge arrangement about steam Education
The El cascader echo only selects the questions that are not displayed
Map in JS (including leetcode examples)
fastText文本分类
Basic differences between Oracle and MySQL (entry level)
Pyechart1.19 national air quality exhibition