当前位置:网站首页>Beauty of Refactoring: when multithreaded batch processing task lifts the beam - Universal scaffold
Beauty of Refactoring: when multithreaded batch processing task lifts the beam - Universal scaffold
2022-06-30 05:59:00 【Floating ~ sinking】
List of articles
origin
Recently, I was responsible for data archiving , You have to learn from multithreading ; As a concurrent counter artifact - The thread pool should be well entertained ,I can I up , The following is a general development scaffold designed by the author for a batch task, including : Flow chart Introduction 、 Code demonstration 、 Some black Technology
The scaffold has the following advantages :
- General template , Any batch task scenario can inherit
- Focus on business logic development of batch tasks , There is no need to consider batch granularity control , Thread control
- Support multiple granularity : Time segmentation , Integer shard 【 For example, export by table 】
- Provide convenience for extension , such as
execAfterBusiness monitoring or indicator collection can be carried out
Emerge
Flow chart Introduction

code
@Getter
public enum TaskTimeUnit {
/** * minute */
MINUTES("minutes"),
HOURS("hours"),
DAYS("days"),
MONTHS("months"),
YEARS("years");
private final String unit;
private final static Map<String, TaskTimeUnit> ALL = new HashMap<>();
static {
TaskTimeUnit[] values = TaskTimeUnit.values();
for (TaskTimeUnit val : values) {
ALL.put(val.getUnit(), val);
}
}
TaskTimeUnit(String unit) {
this.unit = unit;
}
public static TaskTimeUnit instanceOf(String unit) {
return ALL.get(unit);
}
}
public abstract class EsTaskCtlService<T> {
protected T param;
/** * apollo Business key */
@Resource
protected ApolloKey apolloKey;
/** * Parameter checking * * @param t */
protected void init(T t) {
Objects.requireNonNull(t, " invalid parameter ");
this.param = t;
}
/** * The class content actually executed */
protected abstract void exec();
public void run(T t) {
init(t);
exec();
}
/** * Handle a single integer task */
protected void dealWithTask(Integer num) {
}
/** * Handle tasks for a certain period of time * * @param start * @param end */
protected void dealWithTask(LocalDateTime start, LocalDateTime end) throws IOException {
}
/** * Execute tasks concurrently between integer ranges with actual business * Average resource consumption :50% Can be modified by <code>getTableConcurrentNum</code> Make custom adaptations * * @param start * @param end * @param threadPoolExecutor */
protected void assignAndExecuteTask(int start, int end, ExecutorService threadPoolExecutor) {
int tableConcurrentNum = getConcurrentNum();
log.info(" Number of tasks assigned to the current machine | Number of threads : {}", tableConcurrentNum);
try {
int tempEnd = 0;
while (tempEnd < end) {
tempEnd = start + tableConcurrentNum;
if (tempEnd > end) {
tempEnd = end;
}
List<Integer> of = OrderTableId.of(start, tempEnd);
CountDownLatch countDownLatch = new CountDownLatch(of.size());
for (Integer finalI : of) {
threadPoolExecutor.execute(() -> {
dealWithTask(finalI);
countDownLatch.countDown();
});
}
countDownLatch.await();
start = tempEnd + 1;
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/** * Cycle end condition * * @return */
protected boolean isRun() {
return false;
}
protected void spiltTaskWithUnit(LocalDateTime startTime, LocalDateTime endTime, TaskTimeUnit jumpUnit, Integer jumpNum) throws IOException {
LocalDateTime tempEnd = null;
while (isRun()) {
tempEnd = next(startTime, jumpUnit, jumpNum);
if (startTime.plusNanos(1).isAfter(endTime)) {
break;
}
dealWithTask(startTime, tempEnd);
startTime = next(startTime, jumpUnit, jumpNum);
}
execAfter(startTime, endTime);
}
/** * hook after exec task, such as deal with release the system memory or resources or log .etc * * @param startTime * @param endTime */
protected void execAfter(LocalDateTime startTime, LocalDateTime endTime) {
}
protected LocalDateTime next(LocalDateTime startTime, TaskTimeUnit jumpUnit, Integer jumpNum) {
switch (jumpUnit) {
case MINUTES:
return startTime.plusMinutes(jumpNum);
case HOURS:
return startTime.plusHours(jumpNum);
case DAYS:
return startTime.plusDays(jumpNum);
case MONTHS:
return startTime.plusMonths(jumpNum);
case YEARS:
return startTime.plusYears(jumpNum);
default:
return startTime;
}
}
/** * Get the concurrency of each task * * @return */
protected static int getConcurrentNum() {
int stand = 10;
int count = Runtime.getRuntime().availableProcessors();
int half = count >> 1;
// Not enough 1 And a half
if (count < stand) {
return half;
}
// More 1 Hemitropic 10 integer
return stand - (half % stand) + half;
}
How to use ? As a batch task , It is natural to control parameters , Task granularity segmentation ; Inherit show my code:
Fall scattered
Repeated wheel building process , There are always some whimsical reconstruction ideas , More or less , There is a design , No, wait for inspiration , In the mighty coding In the context of , Leave yourself a stage where you can stand alone .
The beauty of the refactoring , Always creating ...
边栏推荐
- Lantern Festival | maoqiu technology and everyone "guess riddles and have a good night"
- How to print pthread_ t - How to print pthread_ t
- Here comes the nearest chance to Ali
- Navigate back to fragmentpageradapter - & gt; Fragment is empty - navigating back to fragmentpageradapter - & gt; fragments are empty
- MySQL index
- token 过期后,如何自动续期?
- OSPF - authentication and load balancing summary (including configuration commands)
- STM32F103 series controlled OLED IIC 4-pin
- Create priority queue
- SparseArray
猜你喜欢

How to create a CSR (certificate signing request) file?

STM32F103 series controlled OLED IIC 4-pin

Voting vault: a new primitive for defi and Governance

OpenCL线程代数库ViennaCL的使用

云服务器部署 Web 项目

How does WPS cancel automatic numbering? Four options

MySQL storage system

Shenzhou ares tx6 boot logo modification tutorial
![[OSPF] comparison between rip and OSPF](/img/72/00e3a05bc5de0e5a66b4675d030911.jpg)
[OSPF] comparison between rip and OSPF

ECS deployment web project
随机推荐
Inno setup the simplest user-defined interface effect
动态规划--怪盗基德的滑翔翼
Idea of capturing mobile terminal variant combination
Mysql database learning notes - foreign keys, table connections, subqueries, and indexes for MySQL multi table queries
Solidy - fallback function - 2 trigger execution modes
[OSPF] comparison between rip and OSPF
Title: enter two positive integers m and N to find their maximum common divisor and minimum common multiple
Leetcode search insert location
拼多多店铺搜索相关问题,为什么新品上架搜索不到
MySQL transaction
Solidity - Security - reentrancy attack
Attempt to redefine 'timeout' at line 2 solution
Sound network, standing in the "soil" of the Internet of things
Balanced binary tree judgment of Li Kou 110 -- classic problems
InputStream to inputstreamsource
Intelligent deodorizer embedded development
[chestnut sugar GIS] global mapper - how to assign the elevation value of the grid to the point
Dao -- a beautiful new world?
How to write a thesis
STM32F103系列控制的OLED IIC 4针