当前位置:网站首页>重构之美:当多线程批处理任务挑起大梁 - 万能脚手架
重构之美:当多线程批处理任务挑起大梁 - 万能脚手架
2022-06-30 05:53:00 【浮~沉】
起源
最近负责数据归档工作,少不了要和多线程切磋;作为并发应对神器-线程池自然要好好招待下,I can I up ,下面是笔者整理一份设计了一份批处理任务的通用开发脚手架包括:流程图介绍、代码演示、 一些黑科技
改脚手架有如下优点:
- 通用模版,任何批量任务场景都可以继承
- 专注于批量任务的业务逻辑开发,无需在考虑批量粒度控制,线程控制
- 支持多个粒度:时间切分,整数切分【比如分表导出】
- 为扩展提供方便,比如
execAfter可以进行业务监控或指标收集
浮现
流程图介绍

code
@Getter
public enum TaskTimeUnit {
/** * 分钟 */
MINUTES("minutes"),
HOURS("hours"),
DAYS("days"),
MONTHS("months"),
YEARS("years");
private final String unit;
private final static Map<String, TaskTimeUnit> ALL = new HashMap<>();
static {
TaskTimeUnit[] values = TaskTimeUnit.values();
for (TaskTimeUnit val : values) {
ALL.put(val.getUnit(), val);
}
}
TaskTimeUnit(String unit) {
this.unit = unit;
}
public static TaskTimeUnit instanceOf(String unit) {
return ALL.get(unit);
}
}
public abstract class EsTaskCtlService<T> {
protected T param;
/** * apollo 业务key */
@Resource
protected ApolloKey apolloKey;
/** * 参数校验 * * @param t */
protected void init(T t) {
Objects.requireNonNull(t, "参数非法");
this.param = t;
}
/** * 实际执行的类容 */
protected abstract void exec();
public void run(T t) {
init(t);
exec();
}
/** * 处理单个整数任务 */
protected void dealWithTask(Integer num) {
}
/** * 处理某个时间段的任务 * * @param start * @param end */
protected void dealWithTask(LocalDateTime start, LocalDateTime end) throws IOException {
}
/** * 基于有实际业务整数范围之间并发执行任务 * 资源消耗平均:50% 可通过修改<code>getTableConcurrentNum</code>进行自定义适配 * * @param start * @param end * @param threadPoolExecutor */
protected void assignAndExecuteTask(int start, int end, ExecutorService threadPoolExecutor) {
int tableConcurrentNum = getConcurrentNum();
log.info("当前机器分配任务数|线程数: {}", tableConcurrentNum);
try {
int tempEnd = 0;
while (tempEnd < end) {
tempEnd = start + tableConcurrentNum;
if (tempEnd > end) {
tempEnd = end;
}
List<Integer> of = OrderTableId.of(start, tempEnd);
CountDownLatch countDownLatch = new CountDownLatch(of.size());
for (Integer finalI : of) {
threadPoolExecutor.execute(() -> {
dealWithTask(finalI);
countDownLatch.countDown();
});
}
countDownLatch.await();
start = tempEnd + 1;
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/** * 循环结束条件 * * @return */
protected boolean isRun() {
return false;
}
protected void spiltTaskWithUnit(LocalDateTime startTime, LocalDateTime endTime, TaskTimeUnit jumpUnit, Integer jumpNum) throws IOException {
LocalDateTime tempEnd = null;
while (isRun()) {
tempEnd = next(startTime, jumpUnit, jumpNum);
if (startTime.plusNanos(1).isAfter(endTime)) {
break;
}
dealWithTask(startTime, tempEnd);
startTime = next(startTime, jumpUnit, jumpNum);
}
execAfter(startTime, endTime);
}
/** * hook after exec task, such as deal with release the system memory or resources or log .etc * * @param startTime * @param endTime */
protected void execAfter(LocalDateTime startTime, LocalDateTime endTime) {
}
protected LocalDateTime next(LocalDateTime startTime, TaskTimeUnit jumpUnit, Integer jumpNum) {
switch (jumpUnit) {
case MINUTES:
return startTime.plusMinutes(jumpNum);
case HOURS:
return startTime.plusHours(jumpNum);
case DAYS:
return startTime.plusDays(jumpNum);
case MONTHS:
return startTime.plusMonths(jumpNum);
case YEARS:
return startTime.plusYears(jumpNum);
default:
return startTime;
}
}
/** * 获取每次任务并发数 * * @return */
protected static int getConcurrentNum() {
int stand = 10;
int count = Runtime.getRuntime().availableProcessors();
int half = count >> 1;
// 不够就1半
if (count < stand) {
return half;
}
// 多了1半向10取整
return stand - (half % stand) + half;
}
如何用?作为一个批量任务,自然少不了的进行参数控制,任务粒度切分;继承即可 show my code:
散落
重复的造轮子过程中,总有些突发奇想的重构思想,或多或少,有则设计,无则等待灵感的到来,于浩浩荡荡的coding上下文中,给自己留下一份可以独秀的舞台。
重构之美,一直创造中。。。
边栏推荐
- 炒股用指南针开户交易安全吗?
- Answer sheet for online assignment of "motor and drive" of Xijiao 21 autumn (IV) [standard answer]
- El table lazy load refresh
- Summary of redis learning notes (I)
- 1380. lucky numbers in matrices
- Database SQL language 05 SQL exercise
- InputStream转InputStreamSource
- Intelligent deodorizer embedded development
- Lantern Festival | maoqiu technology and everyone "guess riddles and have a good night"
- [GPU] basic operation of GPU (I)
猜你喜欢

Shenzhou ares tx6 boot logo modification tutorial

OpenCL线程代数库ViennaCL的使用
![[road of system analyst] collection of wrong topics in Project Management Chapter](/img/8b/2908cd282f5e505efe5223b4c5ddbf.jpg)
[road of system analyst] collection of wrong topics in Project Management Chapter
![[ansible series] fundamentals 02 module debug](/img/99/c53be8e2a42c7cb5b4a9a7ef4ad98c.jpg)
[ansible series] fundamentals 02 module debug

剑指 Offer 18. 删除链表的节点

Transfer the token on the matic-erc20 network to the matic polygon

MySQL日志管理、数据备份、恢复

MySQL 索引

UE4_ Editor development: highlight the UI making method according to the assets dragged by the mouse (1)
![[MD editing required] welcome to the CSDN markdown editor](/img/6c/df2ebb3e39d1e47b8dd74cfdddbb06.gif)
[MD editing required] welcome to the CSDN markdown editor
随机推荐
Luogup2756 pilot pairing scheme problem (maximum flow)
UE4_ Editor UMG close window cannot destroy UMG immediately
Simple use of qlistview of QT (including source code + comments)
English grammar_ Adjective / adverb Level 3 - superlative
Did you know that WPS can turn on eye protection mode?
Answer sheet for online assignment of "motor and drive" of Xijiao 21 autumn (IV) [standard answer]
Cisco vxlan configuration
ECS deployment web project
MySQL高级SQL语句
声网,站在物联网的“土壤”里
OSPF - authentication and load balancing summary (including configuration commands)
Online assignment of C language program design in the 22nd spring of Western Polytechnic University
The average salary of software testing in 2022 has been released. Have you been averaged?
Detailed explanation of issues related to SSL certificate renewal
Today, Ali came out with 35K. It's really sandpaper that wiped my ass. it showed me my hand
Switch to software testing and report to the training class for 3 months. It's a high paying job. Is it reliable?
[ansible series] fundamentals 02 module debug
You don't know how to deduce the location where HashSet stores elements?
Set of XXL job principles
Who is promoting the new inflection point of audio and video industry in 2022?