当前位置:网站首页>Enterprise distributed batch processing scheme based on task scheduling

Enterprise distributed batch processing scheme based on task scheduling

2022-06-09 11:05:00 Alibaba cloud native

author : Yao Hui ( Qianxi )

background

Let's talk about distributed batch processing first , Literally, there are a large number of business data that require applications to perform batch calculation and processing , It takes a long time to execute in the stand-alone mode , Nor can it give full play to the processing capacity of each application node in the business cluster . Through some common distributed batch processing schemes , It can effectively enable all business application nodes in the business cluster to complete a task of mass data processing , So as to improve the overall processing efficiency and processing reliability .

 Insert picture description here

Batch model

In a simple stand-alone scenario, you can enable multithreading to process a large task at the same time , Under multiple machines, the same task can be processed simultaneously and in parallel by multiple machines . therefore , The distributed batch processing scheme needs to shield the above task segmentation at the code development level for developers before distribution 、 Parallel execution 、 The results converge 、 Failure tolerance 、 Distributed coordination logic among business application clusters such as dynamic capacity expansion , Users can only focus on the business logic fragmentation rules and business logic processing described in the above red box .

Big data batch processing comparison

In the big data processing scenario, we will also use MapReduce Model , The essence of its processing logic is consistent with the business batch processing logic we are going to discuss . Batch processing in the big data scenario is mainly data oriented processing , It is also necessary to deploy corresponding big data platform clusters to support data storage and data batch processing , Therefore, the main purpose of this scenario is to build a complete data platform . Compare with the big data batch processing scenario , This time, we will focus on the distributed business batch processing scenario , Build distributed batch processing logic based on the existing business application service cluster . The following requirements can be solved through the distributed batch processing scheme

  • Decouple time-consuming business logic , Ensure fast response of core link service processing
  • Fully schedule all application nodes of the business cluster to cooperate to complete business processing in batches
  • It is different from big data processing , During subtask processing, other online business services will be called to participate in batch processing

Open source batch processing scheme

ElasticJob

ElasticJob Is a distributed task scheduling framework , Its main feature is in Quartz On the basis of this, it can realize the regular scheduling and provide the ability to coordinate and process tasks in the business cluster . The whole architecture is based on Zookeeper To implement task slice execution 、 Application cluster dynamic elastic scheduling 、 Subtask execution is highly available . The fragment scheduling model can support the balanced distribution of large-scale business data processing to each node in the business cluster for processing , Effectively improve the efficiency of task processing .

 Insert picture description here

  • SimpleJob

Spring Boot The project can be completed by YAML Configure task definitions , Specify the following : Task implementation class 、 Scheduled scheduling cycle 、 Piece information .

elasticjob:
  regCenter:
    serverLists: 127.0.0.1:2181
    namespace: elasticjob-lite-springboot
  jobs:
    simpleJob:
      elasticJobClass: org.example.job.SpringBootSimpleJob
      cron: 0/5 * * * * ?
      overwrite: true
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou

Configured org.example.job.SpringBootSimpleJob Class needs to be implemented SimpleJob Interface execute Method , And through ShardingContext Parameter to obtain the corresponding business fragment data for business logic processing .

@Component
public class SpringBootSimpleJob implements SimpleJob {
    @Override
    public void execute(final ShardingContext shardingContext) {
        String value = shardingContext.getShardingParameter();
        System.out.println("simple.process->"+value);
    }
}

We deploy 3 Application services act as a scheduling cluster to handle the above tasks , When a task triggers a run ,ElasticJob Will correspond to 3 Each piece task is assigned to 3 Three application services are used for processing to complete the data processing of the whole task .

 Insert picture description here

  • DataflowJob

DataflowJob At present, it is essentially similar to SimpleJob There is no essential difference in the overall structure . Refer to the following interfaces , comparison SimpleJob It adds fetchData Method allows the business party to load the data to be processed by itself , In fact, I will SimpleJob Of execute Method is logically divided into two steps . The only difference is that DataflowJob Provides a resident data processing task ( It can be called :streaming process), Support task resident operation until fetchData It's empty .

public interface DataflowJob<T> extends ElasticJob {

    /**
     * Fetch to be processed data.
     *
     * @param shardingContext sharding context
     * @return to be processed data
     */
    List<T> fetchData(ShardingContext shardingContext);

    /**
     * Process data.
     *
     * @param shardingContext sharding context
     * @param data to be processed data
     */
    void processData(ShardingContext shardingContext, List<T> data);
}

stay DataflowJob Mission yaml Add... To the configuration props: streaming.process=true, This task can be realized streaming process The effect of . When a task is triggered to execute , Each sharding task will follow the corresponding process :fetchData->processData->fetchData Loop until fetchData It's empty . This mode scenario analysis :

  • A single slice task needs a large amount of data ,fetchData Read the partial paging data of the partition for processing until all data processing is completed
  • Slice until data is continuously generated , Make the task pass fetchData Keep getting data , Realize long-term residence and continuous business data processing
elasticjob:
  regCenter:
    serverLists: 127.0.0.1:2181
    namespace: elasticjob-lite-springboot
  jobs:
    dataflowJob:
      elasticJobClass: org.example.job.SpringBootDataflowJob
      cron: 0/5 * * * * ?
      overwrite: true
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
      props:
        #  Turn on streaming process
        streaming.process: true
  • Characteristic analysis

ElasticJob Distributed partition scheduling model based on , It provides convenient support for common simple batch processing scenarios , It solves the whole coordination process of a large batch of business data processing distributed segmentation execution . In addition, there may be some deficiencies in the following aspects :

  • The core of the whole architecture depends on ZK stability

    • Additional O & M deployment and high availability are required
    • A large number of task storage trigger running processes depend on ZK, When the amount of tasks is large ZK Clusters can easily become scheduling performance bottlenecks
  • The number of slice configurations is fixed , Dynamic sharding is not supported

    • For example, when there is a large difference in the amount of data to be processed in each partition , It is easy to break the balance of cluster processing capacity
    • If the definition of segmentation is unreasonable , When the cluster size is much larger than the number of partitions, the cluster elasticity loses its effect
    • Fragment definitions are separated from business logic , It is troublesome to maintain the connection between the two
  • The control desk is weak

Spring Batch Batch framework

Spring Batch Batch framework , It offers lightweight and sophisticated batch processing capabilities .Spring Batch The task batch box mainly provides : Single process multithreading 、 There are two ways of distributed multi process processing . In single process multithreading mode , Users can choose one by themselves Job As a batch task unit ,Job By one or more Step Steps in series or in parallel , every last Step And by reader、process、writer Structure to complete the reading of each step of the task 、 Handle 、 Output . The follow-up mainly discusses one Job Contains only one Step Analysis of the scene .

 Insert picture description here

Spring Batch The framework thinks that the practical significance of multithreading under single process is not too great , It is mainly because it takes a little effort to implement this framework in the processing of small batch data tasks , You can open the thread pool to solve the problem . This discussion mainly focuses on the scenario of distributed collaborative completion of business data batch processing tasks under a certain scale of business clusters . stay Spring Batch Remote sharding is provided in / Partition processing capacity , stay Job Of Step Tasks can be divided into multiple subtasks and distributed to other tasks in the cluster according to specific rules worker To deal with it , To achieve distributed parallel batch processing capability . Its remote interaction capability is often achieved by using third-party message middleware to distribute subtasks and aggregate execution results .

  • Remote blocking (Remote Chunking)

Remote blocking is Spring Batch A distributed batch processing solution for processing large quantities of data tasks , It can be done in a Step Pass in step ItemReader Load data to build into multiple Chunk block , And by the ItemWriter Distribute these blocks to cluster nodes through message oriented middleware or other forms , The cluster application node is responsible for each Chunk Block for business processing .

 Insert picture description here

Remote Chunking Example

At the above master node ItemReader and ItemWriter It can be mapped to... In the batch model discussed in this article “ Task split -split” Stage , Master node pair ItemWriter May adopt Spring Batch Integration Provided ChunkMessageChannelItemWriter, This component is integrated Spring Integration Other channels provided ( Such as :AMQP、JMS) Complete batch task data loading and block distribution .


    @Bean
    public Job remoteChunkingJob() {
         return jobBuilderFactory.get("remoteChunkingJob")
             .start(stepBuilderFactory.get("step2")
                     .<Integer, Integer>chunk(2) //  Every time Chunk Block contains reader Number of records loaded 
                     .reader(itemReader())
                     //  use ChunkMessageChannelItemWriter distribution Chunk block 
                     .writer(itemWriter())
                     .build())
             .build();
     }

    @Bean
    public ItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        MessagingTemplate messagingTemplate = new MessagingTemplate();
        messagingTemplate.setDefaultChannel(requests());
        ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
        chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
        chunkMessageChannelItemWriter.setReplyChannel(replies());
        return chunkMessageChannelItemWriter;
    }
    //  Relevant message oriented middleware docking channel configuration is omitted 

Slave Nodes are mainly distributed to Chunk Piece of data ( Can be understood as a subtask ) Perform corresponding business logic processing and data result output . therefore , At the subtask processing end, you need to configure Spring Batch Integration Provided ChunkProcessorChunkHandler To complete the subtask receiving 、 Actual business processing 、 Feedback processing results and other related actions .

    //  Relevant message oriented middleware docking channel configuration is omitted 

    //  Receive block task upgrade and feedback execution results 
    @Bean
    @ServiceActivator(inputChannel = "slaveRequests", outputChannel = "slaveReplies")
    public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
        ChunkProcessor<Integer> chunkProcessor = new SimpleChunkProcessor(slaveItemProcessor(), slaveItemWriter());
        ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>();
        chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
        return chunkProcessorChunkHandler;
    }

    //  The actual business needs to develop task processing logic processor
    @Bean
    public SlaveItemProcessor slaveItemProcessor(){ return new SlaveItemProcessor();}

    //  The actual business needs to develop task processing logic writer
    @Bean
    public SlaveItemWriter slaveItemWriter(){ return new SlaveItemWriter();}
  • Remote partition (Remote Partitioning)

The main difference between remote partition and remote block is master The node is not responsible for data loading , It can be understood that the current Step adopt Partitioner Split out multiple children Step( It can also be understood as a subtask ), And then through PartitionHandler Distribute the corresponding subtasks to each Slave Node processing , So ,Spring Batch Integration Provides MessageChannelPartitionHandler To realize the corresponding sub task distribution , Its bottom layer also needs to rely on Message Oriented Middleware for adaptation and docking . At every Slave Nodes need to read subtasks Step The context of , Based on this information, complete ItemReader、ItemProcess、ItemWrite Handle .

 Insert picture description here

  • Characteristic analysis

Spring Batch frame , Comprehensive characteristic analysis :

  • With complete batch processing capability : Support single machine multithreading 、 Distributed multi process collaborative batch processing , Support customized sharding models .
  • Lack of scheduled scheduling support : The native non scheduled scheduling capability needs to integrate the three-party timing framework ( Such as :Spring Task You need to solve the problem of cluster repeated triggering ).
  • Weak visual control ability :Spring Batch Common tasks are configured with programs or files , The control desk needs to be built additionally and its control ability is weak .
  • Integration is difficult : Its distributed batch processing capability requires additional third-party middleware integration , Or expand and develop based on its interface ; It requires relatively complex planning and integration to complete enterprise level use based on officially provided methods .

Enterprise batch processing scheme -SchedulerX visualization MapReduce Mission

SchedulerX The task scheduling platform provides a complete overall solution for enterprise batch processing requirements , Users can directly use the services of the public cloud platform to easily realize the distributed batch processing capability of the business application cluster ( Users' non Alibaba cloud business application deployment can also support docking ), There is no need to deploy additional middleware integration maintenance .

Principle analysis

In the whole solution , The task scheduling platform provides all-round visual control for the tasks registered by users 、 Highly reliable timing scheduling and visual query capability . in addition , On the user's business application side, through integration SchedulerX SDK, It can realize the fast access of distributed batch processing capability . At this point, the user only needs to care about the sub task business segmentation rules of the batch processing model 、 Processing logic of each subtask is sufficient . This distributed batch process has the following features :

  • Subtask high availability : When the cluster execution node goes down , Support Automation failover Redistribute the subtasks on the offline machine to other nodes
  • Automatic elastic expansion : When new application nodes are deployed in the cluster , It can automatically participate in the execution of subsequent tasks
  • Visualization : Provide various monitoring, operation and maintenance and business log query capabilities for the execution of tasks and sub tasks

 Insert picture description here

The following describes the general principle process :

  • Create on the platform MapReduce After the task , The scheduled scheduling service will enable highly reliable scheduled trigger execution for it
  • When MapReduce When a task triggers execution , The scheduling service will be connected to the service Worker Select a node from the nodes as the primary node for this task
  • The master node runs the sub task segmentation and loading logic developed by the user , And pass map Method calls to other in the cluster worker Nodes distribute sub tasks to process requests in a balanced way
  • The master node monitors the processing of the entire distributed batch task , And each Worker Node health monitoring , Ensure high availability of overall operation
  • Others worker The node receives the subtask processing request , Start callback to execute user-defined business logic , Finally, the processing requirements for each subtask are completed ; And you can configure the number of parallel threads that a single application node processes subtasks at the same time .
  • When all the subtasks are finished , The main node will aggregate all subtask execution result callbacks reduce Method , And feed back to the dispatching platform to record the execution results

Developers only need to implement one in the business application MapReduceJobProcessor abstract class , stay isRootTask Load the list of business subtask data objects to be processed this time ; In Africa root In the request jobContext.getTask() Get single subtask object information , Execute the business processing logic according to this information . After the business application deployment is published to the cluster node , When the task triggers to run, all nodes of the cluster will participate in coordinating the execution of the entire distributed batch task until it is completed .

public class MapReduceHelloProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult reduce(JobContext jobContext) throws Exception {
        //  The aggregation logic processing callback for the completion of all subtasks , Optional implementation 
        jobContext.getTaskResults();
        return new ProcessResult(true, " Process result quantity set :" + jobContext.getTaskResults().size());
    }

    @Override
    public ProcessResult process(JobContext jobContext) throws Exception {
        if (isRootTask(jobContext)) {
            List<String> list = //  Load the list of business subtasks to be processed 
            //  Callback sdk Provided map Method , Automate subtask distribution 
            ProcessResult result = map(list, "SecondDataProcess");
            return result;
        } else {
            //  Get the data information of a single subtask , Perform single sub task business processing 
            String data = (String) jobContext.getTask();
            // ...  Business logic processing supplement  ...
            return new ProcessResult(true, " Data processing successful !");
        }
    }
}

Functional advantages

  • Subtask Visualization

User market : It provides visual record information for triggering and running of all tasks .

 Insert picture description here

Visualize subtask details : Query task execution record details , The execution status and node of each subtask can be obtained .

 Insert picture description here

  • Subtask business log

Click... In the subtask list “ journal ”, You can get the logging information during the current subtask processing .

 Insert picture description here

  • Perform stack view

Perform stack view function , It can be used in the scenario where the sub task is stuck and the operation is not finished , It is convenient to check the corresponding execution thread stack information .

 Insert picture description here

 Insert picture description here

  • Custom business label

Subtask business tag capability , It provides users with the ability to view and query sub task business information quickly and visually . In the following illustration “ title of account ” It is the business tag information segmented from this subtask , Based on this information, the user can quickly understand the processing status of the corresponding business subtask , It also supports querying the sub task processing status of the specified business tag information .

 Insert picture description here

How to configure a custom label for a subtask , Just for this map The distributed subtask object implements BizSubTask Interface , And realize it labelMap Method can add its own business feature tag for each subtask for visual query .

public class AccountTransferProcessor extends MapReduceJobProcessor {

    private static final Logger logger = LoggerFactory.getLogger("schedulerxLog");

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true);
    }

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        if(isRootTask(context)){
            logger.info("split task list size:20");
            List<AccountInfo> list = new LinkedList();
            for(int i=0; i < 20; i++){
                list.add(new AccountInfo(i, "CUS"+StringUtils.leftPad(i+"", 4, "0"),
                        "AC"+StringUtils.leftPad(i+"", 12, "0")));
            }
            return map(list, "transfer");
        }else {
            logger.info("start biz process...");
            logger.info("task info:"+context.getTask().toString());
            TimeUnit.SECONDS.sleep(30L);
            logger.info("start biz process end.");
            return new ProcessResult(true);
        }
    }
}

public class AccountInfo implements BizSubTask {

        private long id;

        private String name;

        private String accountId;

        public AccountInfo(long id, String name, String accountId) {
            this.id = id;
            this.name = name;
            this.accountId = accountId;
        }

        //  Subtask label information settings 
        @Override
        public Map<String, String> labelMap() {
            Map<String, String> labelMap = new HashMap();
            labelMap.put(" title of account ", name);
            return labelMap;
        }
    }
  • Compatible with open source

SchedulerX Supports executors written based on common open source frameworks , Include :XXL-Job、ElasticJob, The subsequent scheduling platform will also plan to support scheduling Spring Batch Mission .

Case scenario

Distributed batch processing model ( visualization MapReduce Model ), In actual enterprise applications, there are a large number of demand scenarios . Some common usage scenarios such as :

  • For batch parallel processing of database and table data , Distribute sub database or sub table information among cluster nodes as sub task objects to realize parallel processing

  • Process logistics order data by urban area , Cities and regions are distributed among cluster nodes as sub task objects to realize parallel processing

  • In view of Visualization MapReduce Subtask Visualization , Key customers can be / The order information is processed as a sub task , To process corresponding data reports or push information , To realize the visual tracking processing of important subtasks

  • Fund sales business case

The following provides a fund sales business case for reference if a distributed batch processing model is used , So that users can play freely in their own business scenarios . Case description : In fund companies and fund sales companies ( Such as : Ant wealth ) There will be investors' accounts every day / Transaction application data is processed synchronously , It often uses file data interaction , A fund company is right next N Multiple vendors ( vice versa ), The data files provided by each vendor are completely independent ; Every vendor's data file needs to be verified 、 Interface file parsing 、 data verification 、 There are several fixed steps for data import . In dealing with the above fixed steps, fund companies are very suitable to adopt the distributed batch processing method to speed up the processing of data files , Each vendor is distributed to the cluster as a subtask object , All application nodes participate in parsing the data files of different vendors assigned to them .

@Component
public class FileImportJob extends MapReduceJobProcessor {

    private static final Logger logger = LoggerFactory.getLogger("schedulerx");

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true);
    }

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        if(isRootTask(context)){
            // ---------------------------------------------------------
            // Step1.  Read the docked vendor list Code
            // ---------------------------------------------------------
            logger.info(" Build a subtask list with the vendor as the dimension ...");

            //  The pseudo code reads the vendor list from the database ,Agency Class needs to be implemented BizSubTask Interface and can be used to 
            //  Vendor name / Code as subtask label , So that the console can visually track 
            List<Agency> agencylist = getAgencyListFromDb();
            return map(agencylist, "fileImport");
        }else {
            // ---------------------------------------------------------
            // Step2.  Process the corresponding file data for a single vendor 
            // ---------------------------------------------------------
            Agency agency = (Agency)context.getTask();
            File file = loadFile(agency);
            logger.info(" File loading complete .");
            validFile(file);
            logger.info(" File verification passed .");
            List<Request> request = resolveRequest(file);
            logger.info(" File data parsing is completed .");
            List<Request> request = checkRequest(request);
            logger.info(" Application data check passed .");
            importRequest(request);
            logger.info(" Application data import completed .");
            return new ProcessResult(true);
        }
    }
}

 Insert picture description here

The case is mainly a business link in the fund transaction clearing , Parallel batch processing is adopted for processing , Each subsequent processing step can also be processed in a similar way . in addition , Every Visualization MapReduce The task node passes DAG Depending on orchestration, a complete automatic business clearing process can be built .

summary

Distributed task scheduling platform SchedulerX It provides a perfect solution for enterprise level distributed batch processing , It provides users with a fast and easy-to-use access mode , And support regular scheduling 、 Visual run trace 、 Simple operation and maintenance can be controlled 、 Highly available scheduling services , At the same time, supporting enterprise level monitoring market 、 The log service 、 Monitoring and alarming capabilities .

reference :

Spring Batch Integration:

https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html#springBatchIntegration

ElasticJob:

https://shardingsphere.apache.org/elasticjob/current/cn/overview/

Distributed task scheduling SchedulerX User manual :

https://help.aliyun.com/document_detail/161998.html

SchedulerX How to help users solve distributed task scheduling :

https://mp.weixin.qq.com/s/EgyfS1Vuv4itnuxbiT7KwA

原网站

版权声明
本文为[Alibaba cloud native]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/160/202206091016144516.html