当前位置:网站首页>Thread pool batch processing data
Thread pool batch processing data
2022-07-02 05:16:00 【@Calm down】
Configuration parameters :
book:
core:
poolsize: 100
max:
poolsize: 200
queue:
capacity: 200
keepAlive:
seconds: 30
thread:
name:
prefix: abc
Configuration class :
@Configuration
@EnableAsync
public class AsyncConfig {
// Number of core threads receiving message
@Value("${book.core.poolsize}")
private int bookCorePoolSize;
// Maximum number of threads receiving message
@Value("${book.max.poolsize}")
private int bookMaxPoolSize;
// Receiving message queue capacity
@Value("${book.queue.capacity}")
private int bookQueueCapacity;
// Receive message thread active time ( second )
@Value("${book.keepAlive.seconds}")
private int bookKeepAliveSeconds;
// Receive message default thread name
@Value("${book.thread.name.prefix}")
private String bookThreadNamePrefix;
/** * bookTaskExecutor:( Thread pool for the interface ). <br/> * * @return TaskExecutor taskExecutor Interface * @since JDK 1.8 */
@Bean(name = "BookTask")
public ThreadPoolTaskExecutor bookTaskExecutor() {
//newFixedThreadPool
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// Set the number of core threads
executor.setCorePoolSize(bookCorePoolSize);
// Set the maximum number of threads
executor.setMaxPoolSize(bookMaxPoolSize);
// Set the queue capacity
executor.setQueueCapacity(bookQueueCapacity);
// Set thread active time ( second )
executor.setKeepAliveSeconds(bookKeepAliveSeconds);
// Set the default thread name
executor.setThreadNamePrefix(bookThreadNamePrefix);
// Set rejection policy
// rejection-policy: When pool Already achieved max size When , How to deal with new tasks
// CALLER_RUNS: Do not execute tasks in New Threads , Instead, it is executed by the thread of the caller
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// Wait for all tasks to finish before closing the thread pool
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
Asynchronous processing method ( Method 1 ):
@Component
public class SyncBookHandler {
@Autowired
private UserDao userDao;
private static final Logger LOG = LoggerFactory.getLogger(SyncBookHandler.class);
@Async(value = "BookTask")
public Future <String> syncMargePsr(List <UserDomain> bookList, int pageIndex) {
System.out.println("thread name " + Thread.currentThread().getName());
LOG.info(String.format(" The number of segments of this batch of data is :%s The number of data pieces in this segment is :%s", pageIndex, bookList.size()));
// Statement future object
Future <String> result = new AsyncResult <String>("");
// Loop through the passenger collection of this section
if (null != bookList && bookList.size() > 0) {
for (UserDomain book : bookList) {
try {
// Data warehousing operation
userDao.insert(book);
} catch (Exception e) {
// Record the time when the exception occurred , Threads name
result = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);
continue;
}
}
}
return result;
}
}
Method 2 :
/** * Push data thread class */
class PushDataTask implements Callable<Integer> {
List <UserDomain> list;
int threadNo; // The thread number
PushDataTask( List <UserDomain> list, int threadNo) {
this.list= list;
this.threadNo = threadNo;
}
@Override
public Integer call() throws Exception {
Future <String> result = new AsyncResult <String>("");
// Loop through the passenger collection of this section
if (null != bookList && bookList.size() > 0) {
for (UserDomain book : bookList) {
try {
// Data warehousing operation
userDao.insert(book);
} catch (Exception e) {
// Record the time when the exception occurred , Threads name
result = new AsyncResult <String>("fail,time=" + System.currentTimeMillis() + ",thread id=" + Thread.currentThread().getName() + ",pageIndex=" + pageIndex);
continue;
}
}
}
return result;
}
}
}
Main method :
public void receiveBookJobRun() {
// Test data :
List <UserDomain> bookList = null;
for (int i = 0; i < 400000; i++) {
UserDomain book = new UserDomain();
book.setUserName("zcl" + i);
bookList .add(book);
}
// Warehousing start time
Long inserOrUpdateBegin = System.currentTimeMillis();
log.info(" Data update start time :" + inserOrUpdateBegin);
// Receive the of each segment of the set The return result of execution
List <Future <String>> futureList = new ArrayList <Future <String>>();
// Total number of sets
if (bookList != null) {
int listSize = bookList.size();
int listStart, listEnd;
// When the total number of pieces is less than threadSum Stripe time Total number of strips used As thread segmentation value
if (threadSum > listSize) {
threadSum = listSize;
}
// take list Cut into several pieces Multithreaded execution
for (int i = 0; i < threadSum; i++) {
// Calculate cutting Start and end
listStart = listSize / threadSum * i;
listEnd = listSize / threadSum * (i + 1);
// The last thread will Different from other threads
if (i == threadSum - 1) {
listEnd = listSize;
}
// Data cut-off
List <UserDomain> sunList = bookList.subList(listStart, listEnd);
// Each data set is stored in parallel ( Method 1 )
futureList.add(syncBookHandler.syncMargePsr(sunList, i));
// Each data set is stored in parallel ( Method 2 , Thread pool )
futureList.add()bookTaskExecutor.executor.submit(new PushDataTask(sunList,i)))
}
// Analyze the results of each thread segment
for (Future <String> future : futureList) {
String str;
if (null != future) {
try {
str = future.get().toString();
log.info("current thread id =" + Thread.currentThread().getName() + ",result=" + str);
} catch (InterruptedException | ExecutionException e) {
log.info(" Thread running exception !");
}
} else {
log.info(" Thread running exception !");
}
}
}
Long inserOrUpdateEnd = System.currentTimeMillis();
log.info(" Data update end time :" + inserOrUpdateEnd + ". The time taken to update the data is :" + (inserOrUpdateEnd - inserOrUpdateBegin));
}
Reference resources :https://blog.csdn.net/weixin_43889571/article/details/106604183?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522165597556616782389414029%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=165597556616782389414029&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-2-106604183-null-null.142^v21^control,157^v15^new_3&utm_term=%E5%A4%9A%E7%BA%BF%E7%A8%8B%E6%89%B9%E9%87%8F%E5%A4%84%E7%90%86%E6%95%B0%E6%8D%AE&spm=1018.2226.3001.4187
边栏推荐
- [bus interface] Axi interface
- Global and Chinese market of impact roll 2022-2028: Research Report on technology, participants, trends, market size and share
- 操作符详解
- 6.网络-基础
- LeetCode 1175. Prime number arrangement (prime number judgment + Combinatorial Mathematics)
- Ansible installation and use
- ubuntu20.04安装mysql8
- el-cascader回显只选中不显示的问题
- [common error] the DDR type of FPGA device is selected incorrectly
- paddle: ValueError:quality setting only supported for ‘jpeg‘ compression
猜你喜欢
Gee: analyze the change of spatial centroid of remote sensing image [centroid acquisition analysis]
LS1046nfs挂载文件系统
Fabric.js IText设置指定文字的颜色和背景色
No logic is executed after the El form is validated successfully
2022阿里巴巴全球数学竞赛 第4题 虎虎生威(盲盒问题、集卡问题)解决思路
Pycharm breakpoint management: temporarily cancel some breakpoints + run directly to a line
Analyzing the hands-on building tutorial in children's programming
Gee series: Unit 4 data import and export in Google Earth engine
Cultivate primary and secondary school students' love for educational robots
Save the CDA from the disc to the computer
随机推荐
Gee: analyze the change of spatial centroid of remote sensing image [centroid acquisition analysis]
Fabric.js 右键菜单
Set the default style of scroll bar Google browser
C# 图片显示占用问题
Pyechart1.19 national air quality exhibition
Mysql重点难题(2)汇总
2022 Alibaba global mathematics competition, question 4, huhushengwei (blind box problem, truck problem) solution ideas
el form 表单validate成功后没有执行逻辑
ubuntu20.04安装mysql8
C# 基于MQTTNet的服务端与客户端通信案例
CubeMx DMA笔记
Pycharm breakpoint management: temporarily cancel some breakpoints + run directly to a line
Fabric.js 自由绘制矩形
Creation and destruction of function stack frames
在{{}}中拼接字符
LS1046nfs挂载文件系统
Gee series: Unit 3 raster remote sensing image band characteristics and rendering visualization
延时队列两种实现方式
运维工作的“本手、妙手、俗手”
js中的Map(含leetcode例题)