当前位置:网站首页>重构之美:当多线程批处理任务挑起大梁 - 万能脚手架

重构之美:当多线程批处理任务挑起大梁 - 万能脚手架

2022-06-30 05:53:00 浮~沉

起源

最近负责数据归档工作,少不了要和多线程切磋;作为并发应对神器-线程池自然要好好招待下,I can I up ,下面是笔者整理一份设计了一份批处理任务的通用开发脚手架包括:
流程图介绍代码演示一些黑科技

改脚手架有如下优点:

  1. 通用模版,任何批量任务场景都可以继承
  2. 专注于批量任务的业务逻辑开发,无需在考虑批量粒度控制,线程控制
  3. 支持多个粒度:时间切分,整数切分【比如分表导出】
  4. 为扩展提供方便,比如execAfter可以进行业务监控或指标收集

浮现

流程图介绍

在这里插入图片描述

code

@Getter
public enum TaskTimeUnit {
    

    /** * 分钟 */
    MINUTES("minutes"),
    HOURS("hours"),
    DAYS("days"),
    MONTHS("months"),
    YEARS("years");

    private final String unit;

    private final static Map<String, TaskTimeUnit> ALL = new HashMap<>();

    static {
    
        TaskTimeUnit[] values = TaskTimeUnit.values();
        for (TaskTimeUnit val : values) {
    
            ALL.put(val.getUnit(), val);
        }
    }

    TaskTimeUnit(String unit) {
    
        this.unit = unit;
    }


    public static TaskTimeUnit instanceOf(String unit) {
    
        return ALL.get(unit);
    }
}


public abstract class EsTaskCtlService<T> {
    
    protected T param;

    /** * apollo 业务key */
    @Resource
    protected ApolloKey apolloKey;

    /** * 参数校验 * * @param t */
    protected void init(T t) {
    
        Objects.requireNonNull(t, "参数非法");
        this.param = t;
    }

    /** * 实际执行的类容 */
    protected abstract void exec();


    public void run(T t) {
    
        init(t);
        exec();
    }

    /** * 处理单个整数任务 */
    protected void dealWithTask(Integer num) {
     }

    /** * 处理某个时间段的任务 * * @param start * @param end */
    protected void dealWithTask(LocalDateTime start, LocalDateTime end) throws IOException {
     }


    /** * 基于有实际业务整数范围之间并发执行任务 * 资源消耗平均:50% 可通过修改<code>getTableConcurrentNum</code>进行自定义适配 * * @param start * @param end * @param threadPoolExecutor */
    protected void assignAndExecuteTask(int start, int end, ExecutorService threadPoolExecutor) {
    
        int tableConcurrentNum = getConcurrentNum();
        log.info("当前机器分配任务数|线程数: {}", tableConcurrentNum);
        try {
    
            int tempEnd = 0;
            while (tempEnd < end) {
    
                tempEnd = start + tableConcurrentNum;
                if (tempEnd > end) {
    
                    tempEnd = end;
                }
                List<Integer> of = OrderTableId.of(start, tempEnd);
                CountDownLatch countDownLatch = new CountDownLatch(of.size());
                for (Integer finalI : of) {
    
                    threadPoolExecutor.execute(() -> {
    
                        dealWithTask(finalI);
                        countDownLatch.countDown();
                    });
                }
                countDownLatch.await();
                start = tempEnd + 1;
            }
        } catch (Exception e) {
    
            log.error(e.getMessage(), e);
        }
    }

    /** * 循环结束条件 * * @return */
    protected boolean isRun() {
    
        return false;
    }


    protected void spiltTaskWithUnit(LocalDateTime startTime, LocalDateTime endTime, TaskTimeUnit jumpUnit, Integer jumpNum) throws IOException {
    
        LocalDateTime tempEnd = null;
        while (isRun()) {
    
            tempEnd = next(startTime, jumpUnit, jumpNum);
            if (startTime.plusNanos(1).isAfter(endTime)) {
    
                break;
            }
            dealWithTask(startTime, tempEnd);

            startTime = next(startTime, jumpUnit, jumpNum);
        }
        execAfter(startTime, endTime);
    }

    /** * hook after exec task, such as deal with release the system memory or resources or log .etc * * @param startTime * @param endTime */
    protected void execAfter(LocalDateTime startTime, LocalDateTime endTime) {
    
    }

    protected LocalDateTime next(LocalDateTime startTime, TaskTimeUnit jumpUnit, Integer jumpNum) {
    
        switch (jumpUnit) {
    
            case MINUTES:
                return startTime.plusMinutes(jumpNum);
            case HOURS:
                return startTime.plusHours(jumpNum);
            case DAYS:
                return startTime.plusDays(jumpNum);
            case MONTHS:
                return startTime.plusMonths(jumpNum);
            case YEARS:
                return startTime.plusYears(jumpNum);
            default:
                return startTime;
        }
    }

    /** * 获取每次任务并发数 * * @return */
    protected static int getConcurrentNum() {
    
        int stand = 10;
        int count = Runtime.getRuntime().availableProcessors();
        int half = count >> 1;
        // 不够就1半
        if (count < stand) {
    
            return half;
        }
        // 多了1半向10取整
        return stand - (half % stand) + half;

    }

如何用?作为一个批量任务,自然少不了的进行参数控制,任务粒度切分;继承即可 show my code:


散落

重复的造轮子过程中,总有些突发奇想的重构思想,或多或少,有则设计,无则等待灵感的到来,于浩浩荡荡的coding上下文中,给自己留下一份可以独秀的舞台。

重构之美,一直创造中。。。

原网站

版权声明
本文为[浮~沉]所创,转载请带上原文链接,感谢
https://blog.csdn.net/lkg5211314/article/details/125380104