当前位置:网站首页>Thread pool batch processing data
Thread pool batch processing data
2022-07-02 05:16:00 【@Calm down】
Configuration parameters :
book:
core:
poolsize: 100
max:
poolsize: 200
queue:
capacity: 200
keepAlive:
seconds: 30
thread:
name:
prefix: abc
Configuration class :
@Configuration
@EnableAsync
public class AsyncConfig {
// Number of core threads receiving message
@Value("${book.core.poolsize}")
private int bookCorePoolSize;
// Maximum number of threads receiving message
@Value("${book.max.poolsize}")
private int bookMaxPoolSize;
// Receiving message queue capacity
@Value("${book.queue.capacity}")
private int bookQueueCapacity;
// Receive message thread active time ( second )
@Value("${book.keepAlive.seconds}")
private int bookKeepAliveSeconds;
// Receive message default thread name
@Value("${book.thread.name.prefix}")
private String bookThreadNamePrefix;
/** * bookTaskExecutor:( Thread pool for the interface ). <br/> * * @return TaskExecutor taskExecutor Interface * @since JDK 1.8 */
@Bean(name = "BookTask")
public ThreadPoolTaskExecutor bookTaskExecutor() {
//newFixedThreadPool
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// Set the number of core threads
executor.setCorePoolSize(bookCorePoolSize);
// Set the maximum number of threads
executor.setMaxPoolSize(bookMaxPoolSize);
// Set the queue capacity
executor.setQueueCapacity(bookQueueCapacity);
// Set thread active time ( second )
executor.setKeepAliveSeconds(bookKeepAliveSeconds);
// Set the default thread name
executor.setThreadNamePrefix(bookThreadNamePrefix);
// Set rejection policy
// rejection-policy: When pool Already achieved max size When , How to deal with new tasks
// CALLER_RUNS: Do not execute tasks in New Threads , Instead, it is executed by the thread of the caller
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// Wait for all tasks to finish before closing the thread pool
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
Asynchronous processing method ( Method 1 ):
@Component
public class SyncBookHandler {
@Autowired
private UserDao userDao;
private static final Logger LOG = LoggerFactory.getLogger(SyncBookHandler.class);
@Async(value = "BookTask")
public Future <String> syncMargePsr(List <UserDomain> bookList, int pageIndex) {
System.out.println("thread name " + Thread.currentThread().getName());
LOG.info(String.format(" The number of segments of this batch of data is :%s The number of data pieces in this segment is :%s", pageIndex, bookList.size()));
// Statement future object
Future <String> result = new AsyncResult <String>("");
// Loop through the passenger collection of this section
if (null != bookList && bookList.size() > 0) {
for (UserDomain book : bookList) {
try {
// Data warehousing operation
userDao.insert(book);
} catch (Exception e) {
// Record the time when the exception occurred , Threads name
result = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);
continue;
}
}
}
return result;
}
}
Method 2 :
/** * Push data thread class */
class PushDataTask implements Callable<Integer> {
List <UserDomain> list;
int threadNo; // The thread number
PushDataTask( List <UserDomain> list, int threadNo) {
this.list= list;
this.threadNo = threadNo;
}
@Override
public Integer call() throws Exception {
Future <String> result = new AsyncResult <String>("");
// Loop through the passenger collection of this section
if (null != bookList && bookList.size() > 0) {
for (UserDomain book : bookList) {
try {
// Data warehousing operation
userDao.insert(book);
} catch (Exception e) {
// Record the time when the exception occurred , Threads name
result = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);
continue;
}
}
}
return result;
}
}
}
Main method :
public void receiveBookJobRun() {
// Test data :
List <UserDomain> bookList = null;
for (int i = 0; i < 400000; i++) {
UserDomain book = new UserDomain();
book.setUserName("zcl" + i);
bookList .add(book);
}
// Warehousing start time
Long inserOrUpdateBegin = System.currentTimeMillis();
log.info(" Data update start time :" + inserOrUpdateBegin);
// Receive the of each segment of the set The return result of execution
List <Future <String>> futureList = new ArrayList <Future <String>>();
// Total number of sets
if (bookList != null) {
int listSize = bookList.size();
int listStart, listEnd;
// When the total number of pieces is less than threadSum Stripe time Total number of strips used As thread segmentation value
if (threadSum > listSize) {
threadSum = listSize;
}
// take list Cut into several pieces Multithreaded execution
for (int i = 0; i < threadSum; i++) {
// Calculate cutting Start and end
listStart = listSize / threadSum * i;
listEnd = listSize / threadSum * (i + 1);
// The last thread will Different from other threads
if (i == threadSum - 1) {
listEnd = listSize;
}
// Data cut-off
List <UserDomain> sunList = bookList.subList(listStart, listEnd);
// Each data set is stored in parallel ( Method 1 )
futureList.add(syncBookHandler.syncMargePsr(sunList, i));
// Each data set is stored in parallel ( Method 2 , Thread pool )
futureList.add()bookTaskExecutor.executor.submit(new PushDataTask(sunList,i)))
}
// Analyze the results of each thread segment
for (Future <String> future : futureList) {
String str;
if (null != future) {
try {
str = future.get().toString();
log.info("current thread id =" + Thread.currentThread().getName() + ",result=" + str);
} catch (InterruptedException | ExecutionException e) {
log.info(" Thread running exception !");
}
} else {
log.info(" Thread running exception !");
}
}
}
Long inserOrUpdateEnd = System.currentTimeMillis();
log.info(" Data update end time :" + inserOrUpdateEnd + ". The time taken to update the data is :" + (inserOrUpdateEnd - inserOrUpdateBegin));
}
Reference resources :https://blog.csdn.net/weixin_43889571/article/details/106604183?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522165597556616782389414029%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=165597556616782389414029&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-2-106604183-null-null.142^v21^control,157^v15^new_3&utm_term=%E5%A4%9A%E7%BA%BF%E7%A8%8B%E6%89%B9%E9%87%8F%E5%A4%84%E7%90%86%E6%95%B0%E6%8D%AE&spm=1018.2226.3001.4187
边栏推荐
- Briefly introduce chown command
- Global and Chinese market of impact roll 2022-2028: Research Report on technology, participants, trends, market size and share
- leetcode两数相加go实现
- Application of intelligent robot in agricultural ecology
- Fabric.js 右键菜单
- LeetCode 1175. 质数排列(质数判断+组合数学)
- Pyflink writes MySQL examples with JDBC
- 農業生態領域智能機器人的應用
- Gee series: Unit 2 explore datasets
- Splice characters in {{}}
猜你喜欢

Solution: the agent throws an exception error
![Gee: use of common mask functions in remote sensing image processing [updatemask]](/img/55/bf4ef5fc923242e72caab71f1a4e4b.jpg)
Gee: use of common mask functions in remote sensing image processing [updatemask]

Fabric.js 右键菜单

CubeMx DMA笔记

C case of communication between server and client based on mqttnet

Gee series: Unit 1 Introduction to Google Earth engine

Cubemx DMA notes

Fabric.js IText 上标和下标

Johnson–Lindenstrauss Lemma(2)

Fabric.js 将本地图像上传到画布背景
随机推荐
Cultivate primary and secondary school students' love for educational robots
7.1 simulation summary
leetcode两数相加go实现
Collectors.groupingBy 排序
[quick view opencv] familiar with CV matrix operation with image splicing examples (3)
C # picture display occupancy problem
Differential identities (help find mean, variance, and other moments)
C# 基于MQTTNet的服务端与客户端通信案例
Here comes the chicken soup! Keep this quick guide for data analysts
Feign realizes file uploading and downloading
Fabric.js IText设置指定文字的颜色和背景色
Black Horse Notes - - set Series Collection
Paddlepaddle project source code
Fabric.js 3个api设置画布宽高
Global and Chinese market of cell culture freezers 2022-2028: Research Report on technology, participants, trends, market size and share
Application d'un robot intelligent dans le domaine de l'agroécologie
Analyzing the hands-on building tutorial in children's programming
【pyinstaller】_get_sysconfigdata_name() missing 1 required positional argument: ‘check_exists‘
Gee series: Unit 2 explore datasets
Pyflink writes MySQL examples with JDBC