当前位置:网站首页>Enterprise distributed batch processing scheme based on task scheduling
Enterprise distributed batch processing scheme based on task scheduling
2022-06-09 11:05:00 【Alibaba cloud native】
author : Yao Hui ( Qianxi )
background
Let's talk about distributed batch processing first , Literally, there are a large number of business data that require applications to perform batch calculation and processing , It takes a long time to execute in the stand-alone mode , Nor can it give full play to the processing capacity of each application node in the business cluster . Through some common distributed batch processing schemes , It can effectively enable all business application nodes in the business cluster to complete a task of mass data processing , So as to improve the overall processing efficiency and processing reliability .

Batch model
In a simple stand-alone scenario, you can enable multithreading to process a large task at the same time , Under multiple machines, the same task can be processed simultaneously and in parallel by multiple machines . therefore , The distributed batch processing scheme needs to shield the above task segmentation at the code development level for developers before distribution 、 Parallel execution 、 The results converge 、 Failure tolerance 、 Distributed coordination logic among business application clusters such as dynamic capacity expansion , Users can only focus on the business logic fragmentation rules and business logic processing described in the above red box .
Big data batch processing comparison
In the big data processing scenario, we will also use MapReduce Model , The essence of its processing logic is consistent with the business batch processing logic we are going to discuss . Batch processing in the big data scenario is mainly data oriented processing , It is also necessary to deploy corresponding big data platform clusters to support data storage and data batch processing , Therefore, the main purpose of this scenario is to build a complete data platform . Compare with the big data batch processing scenario , This time, we will focus on the distributed business batch processing scenario , Build distributed batch processing logic based on the existing business application service cluster . The following requirements can be solved through the distributed batch processing scheme
- Decouple time-consuming business logic , Ensure fast response of core link service processing
- Fully schedule all application nodes of the business cluster to cooperate to complete business processing in batches
- It is different from big data processing , During subtask processing, other online business services will be called to participate in batch processing
Open source batch processing scheme
ElasticJob
ElasticJob Is a distributed task scheduling framework , Its main feature is in Quartz On the basis of this, it can realize the regular scheduling and provide the ability to coordinate and process tasks in the business cluster . The whole architecture is based on Zookeeper To implement task slice execution 、 Application cluster dynamic elastic scheduling 、 Subtask execution is highly available . The fragment scheduling model can support the balanced distribution of large-scale business data processing to each node in the business cluster for processing , Effectively improve the efficiency of task processing .

- SimpleJob
Spring Boot The project can be completed by YAML Configure task definitions , Specify the following : Task implementation class 、 Scheduled scheduling cycle 、 Piece information .
elasticjob:
regCenter:
serverLists: 127.0.0.1:2181
namespace: elasticjob-lite-springboot
jobs:
simpleJob:
elasticJobClass: org.example.job.SpringBootSimpleJob
cron: 0/5 * * * * ?
overwrite: true
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
Configured org.example.job.SpringBootSimpleJob Class needs to be implemented SimpleJob Interface execute Method , And through ShardingContext Parameter to obtain the corresponding business fragment data for business logic processing .
@Component
public class SpringBootSimpleJob implements SimpleJob {
@Override
public void execute(final ShardingContext shardingContext) {
String value = shardingContext.getShardingParameter();
System.out.println("simple.process->"+value);
}
}
We deploy 3 Application services act as a scheduling cluster to handle the above tasks , When a task triggers a run ,ElasticJob Will correspond to 3 Each piece task is assigned to 3 Three application services are used for processing to complete the data processing of the whole task .

- DataflowJob
DataflowJob At present, it is essentially similar to SimpleJob There is no essential difference in the overall structure . Refer to the following interfaces , comparison SimpleJob It adds fetchData Method allows the business party to load the data to be processed by itself , In fact, I will SimpleJob Of execute Method is logically divided into two steps . The only difference is that DataflowJob Provides a resident data processing task ( It can be called :streaming process), Support task resident operation until fetchData It's empty .
public interface DataflowJob<T> extends ElasticJob {
/**
* Fetch to be processed data.
*
* @param shardingContext sharding context
* @return to be processed data
*/
List<T> fetchData(ShardingContext shardingContext);
/**
* Process data.
*
* @param shardingContext sharding context
* @param data to be processed data
*/
void processData(ShardingContext shardingContext, List<T> data);
}
stay DataflowJob Mission yaml Add... To the configuration props: streaming.process=true, This task can be realized streaming process The effect of . When a task is triggered to execute , Each sharding task will follow the corresponding process :fetchData->processData->fetchData Loop until fetchData It's empty . This mode scenario analysis :
- A single slice task needs a large amount of data ,fetchData Read the partial paging data of the partition for processing until all data processing is completed
- Slice until data is continuously generated , Make the task pass fetchData Keep getting data , Realize long-term residence and continuous business data processing
elasticjob:
regCenter:
serverLists: 127.0.0.1:2181
namespace: elasticjob-lite-springboot
jobs:
dataflowJob:
elasticJobClass: org.example.job.SpringBootDataflowJob
cron: 0/5 * * * * ?
overwrite: true
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
props:
# Turn on streaming process
streaming.process: true
- Characteristic analysis
ElasticJob Distributed partition scheduling model based on , It provides convenient support for common simple batch processing scenarios , It solves the whole coordination process of a large batch of business data processing distributed segmentation execution . In addition, there may be some deficiencies in the following aspects :
The core of the whole architecture depends on ZK stability
- Additional O & M deployment and high availability are required
- A large number of task storage trigger running processes depend on ZK, When the amount of tasks is large ZK Clusters can easily become scheduling performance bottlenecks
The number of slice configurations is fixed , Dynamic sharding is not supported
- For example, when there is a large difference in the amount of data to be processed in each partition , It is easy to break the balance of cluster processing capacity
- If the definition of segmentation is unreasonable , When the cluster size is much larger than the number of partitions, the cluster elasticity loses its effect
- Fragment definitions are separated from business logic , It is troublesome to maintain the connection between the two
The control desk is weak
Spring Batch Batch framework
Spring Batch Batch framework , It offers lightweight and sophisticated batch processing capabilities .Spring Batch The task batch box mainly provides : Single process multithreading 、 There are two ways of distributed multi process processing . In single process multithreading mode , Users can choose one by themselves Job As a batch task unit ,Job By one or more Step Steps in series or in parallel , every last Step And by reader、process、writer Structure to complete the reading of each step of the task 、 Handle 、 Output . The follow-up mainly discusses one Job Contains only one Step Analysis of the scene .

Spring Batch The framework thinks that the practical significance of multithreading under single process is not too great , It is mainly because it takes a little effort to implement this framework in the processing of small batch data tasks , You can open the thread pool to solve the problem . This discussion mainly focuses on the scenario of distributed collaborative completion of business data batch processing tasks under a certain scale of business clusters . stay Spring Batch Remote sharding is provided in / Partition processing capacity , stay Job Of Step Tasks can be divided into multiple subtasks and distributed to other tasks in the cluster according to specific rules worker To deal with it , To achieve distributed parallel batch processing capability . Its remote interaction capability is often achieved by using third-party message middleware to distribute subtasks and aggregate execution results .
- Remote blocking (Remote Chunking)
Remote blocking is Spring Batch A distributed batch processing solution for processing large quantities of data tasks , It can be done in a Step Pass in step ItemReader Load data to build into multiple Chunk block , And by the ItemWriter Distribute these blocks to cluster nodes through message oriented middleware or other forms , The cluster application node is responsible for each Chunk Block for business processing .

Remote Chunking Example
At the above master node ItemReader and ItemWriter It can be mapped to... In the batch model discussed in this article “ Task split -split” Stage , Master node pair ItemWriter May adopt Spring Batch Integration Provided ChunkMessageChannelItemWriter, This component is integrated Spring Integration Other channels provided ( Such as :AMQP、JMS) Complete batch task data loading and block distribution .
@Bean
public Job remoteChunkingJob() {
return jobBuilderFactory.get("remoteChunkingJob")
.start(stepBuilderFactory.get("step2")
.<Integer, Integer>chunk(2) // Every time Chunk Block contains reader Number of records loaded
.reader(itemReader())
// use ChunkMessageChannelItemWriter distribution Chunk block
.writer(itemWriter())
.build())
.build();
}
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
}
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
// Relevant message oriented middleware docking channel configuration is omitted
Slave Nodes are mainly distributed to Chunk Piece of data ( Can be understood as a subtask ) Perform corresponding business logic processing and data result output . therefore , At the subtask processing end, you need to configure Spring Batch Integration Provided ChunkProcessorChunkHandler To complete the subtask receiving 、 Actual business processing 、 Feedback processing results and other related actions .
// Relevant message oriented middleware docking channel configuration is omitted
// Receive block task upgrade and feedback execution results
@Bean
@ServiceActivator(inputChannel = "slaveRequests", outputChannel = "slaveReplies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor = new SimpleChunkProcessor(slaveItemProcessor(), slaveItemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}
// The actual business needs to develop task processing logic processor
@Bean
public SlaveItemProcessor slaveItemProcessor(){ return new SlaveItemProcessor();}
// The actual business needs to develop task processing logic writer
@Bean
public SlaveItemWriter slaveItemWriter(){ return new SlaveItemWriter();}
- Remote partition (Remote Partitioning)
The main difference between remote partition and remote block is master The node is not responsible for data loading , It can be understood that the current Step adopt Partitioner Split out multiple children Step( It can also be understood as a subtask ), And then through PartitionHandler Distribute the corresponding subtasks to each Slave Node processing , So ,Spring Batch Integration Provides MessageChannelPartitionHandler To realize the corresponding sub task distribution , Its bottom layer also needs to rely on Message Oriented Middleware for adaptation and docking . At every Slave Nodes need to read subtasks Step The context of , Based on this information, complete ItemReader、ItemProcess、ItemWrite Handle .

- Characteristic analysis
Spring Batch frame , Comprehensive characteristic analysis :
- With complete batch processing capability : Support single machine multithreading 、 Distributed multi process collaborative batch processing , Support customized sharding models .
- Lack of scheduled scheduling support : The native non scheduled scheduling capability needs to integrate the three-party timing framework ( Such as :Spring Task You need to solve the problem of cluster repeated triggering ).
- Weak visual control ability :Spring Batch Common tasks are configured with programs or files , The control desk needs to be built additionally and its control ability is weak .
- Integration is difficult : Its distributed batch processing capability requires additional third-party middleware integration , Or expand and develop based on its interface ; It requires relatively complex planning and integration to complete enterprise level use based on officially provided methods .
Enterprise batch processing scheme -SchedulerX visualization MapReduce Mission
SchedulerX The task scheduling platform provides a complete overall solution for enterprise batch processing requirements , Users can directly use the services of the public cloud platform to easily realize the distributed batch processing capability of the business application cluster ( Users' non Alibaba cloud business application deployment can also support docking ), There is no need to deploy additional middleware integration maintenance .
Principle analysis
In the whole solution , The task scheduling platform provides all-round visual control for the tasks registered by users 、 Highly reliable timing scheduling and visual query capability . in addition , On the user's business application side, through integration SchedulerX SDK, It can realize the fast access of distributed batch processing capability . At this point, the user only needs to care about the sub task business segmentation rules of the batch processing model 、 Processing logic of each subtask is sufficient . This distributed batch process has the following features :
- Subtask high availability : When the cluster execution node goes down , Support Automation failover Redistribute the subtasks on the offline machine to other nodes
- Automatic elastic expansion : When new application nodes are deployed in the cluster , It can automatically participate in the execution of subsequent tasks
- Visualization : Provide various monitoring, operation and maintenance and business log query capabilities for the execution of tasks and sub tasks

The following describes the general principle process :
- Create on the platform MapReduce After the task , The scheduled scheduling service will enable highly reliable scheduled trigger execution for it
- When MapReduce When a task triggers execution , The scheduling service will be connected to the service Worker Select a node from the nodes as the primary node for this task
- The master node runs the sub task segmentation and loading logic developed by the user , And pass map Method calls to other in the cluster worker Nodes distribute sub tasks to process requests in a balanced way
- The master node monitors the processing of the entire distributed batch task , And each Worker Node health monitoring , Ensure high availability of overall operation
- Others worker The node receives the subtask processing request , Start callback to execute user-defined business logic , Finally, the processing requirements for each subtask are completed ; And you can configure the number of parallel threads that a single application node processes subtasks at the same time .
- When all the subtasks are finished , The main node will aggregate all subtask execution result callbacks reduce Method , And feed back to the dispatching platform to record the execution results
Developers only need to implement one in the business application MapReduceJobProcessor abstract class , stay isRootTask Load the list of business subtask data objects to be processed this time ; In Africa root In the request jobContext.getTask() Get single subtask object information , Execute the business processing logic according to this information . After the business application deployment is published to the cluster node , When the task triggers to run, all nodes of the cluster will participate in coordinating the execution of the entire distributed batch task until it is completed .
public class MapReduceHelloProcessor extends MapReduceJobProcessor {
@Override
public ProcessResult reduce(JobContext jobContext) throws Exception {
// The aggregation logic processing callback for the completion of all subtasks , Optional implementation
jobContext.getTaskResults();
return new ProcessResult(true, " Process result quantity set :" + jobContext.getTaskResults().size());
}
@Override
public ProcessResult process(JobContext jobContext) throws Exception {
if (isRootTask(jobContext)) {
List<String> list = // Load the list of business subtasks to be processed
// Callback sdk Provided map Method , Automate subtask distribution
ProcessResult result = map(list, "SecondDataProcess");
return result;
} else {
// Get the data information of a single subtask , Perform single sub task business processing
String data = (String) jobContext.getTask();
// ... Business logic processing supplement ...
return new ProcessResult(true, " Data processing successful !");
}
}
}
Functional advantages
- Subtask Visualization
User market : It provides visual record information for triggering and running of all tasks .

Visualize subtask details : Query task execution record details , The execution status and node of each subtask can be obtained .

- Subtask business log
Click... In the subtask list “ journal ”, You can get the logging information during the current subtask processing .

- Perform stack view
Perform stack view function , It can be used in the scenario where the sub task is stuck and the operation is not finished , It is convenient to check the corresponding execution thread stack information .


- Custom business label
Subtask business tag capability , It provides users with the ability to view and query sub task business information quickly and visually . In the following illustration “ title of account ” It is the business tag information segmented from this subtask , Based on this information, the user can quickly understand the processing status of the corresponding business subtask , It also supports querying the sub task processing status of the specified business tag information .

How to configure a custom label for a subtask , Just for this map The distributed subtask object implements BizSubTask Interface , And realize it labelMap Method can add its own business feature tag for each subtask for visual query .
public class AccountTransferProcessor extends MapReduceJobProcessor {
private static final Logger logger = LoggerFactory.getLogger("schedulerxLog");
@Override
public ProcessResult reduce(JobContext context) throws Exception {
return new ProcessResult(true);
}
@Override
public ProcessResult process(JobContext context) throws Exception {
if(isRootTask(context)){
logger.info("split task list size:20");
List<AccountInfo> list = new LinkedList();
for(int i=0; i < 20; i++){
list.add(new AccountInfo(i, "CUS"+StringUtils.leftPad(i+"", 4, "0"),
"AC"+StringUtils.leftPad(i+"", 12, "0")));
}
return map(list, "transfer");
}else {
logger.info("start biz process...");
logger.info("task info:"+context.getTask().toString());
TimeUnit.SECONDS.sleep(30L);
logger.info("start biz process end.");
return new ProcessResult(true);
}
}
}
public class AccountInfo implements BizSubTask {
private long id;
private String name;
private String accountId;
public AccountInfo(long id, String name, String accountId) {
this.id = id;
this.name = name;
this.accountId = accountId;
}
// Subtask label information settings
@Override
public Map<String, String> labelMap() {
Map<String, String> labelMap = new HashMap();
labelMap.put(" title of account ", name);
return labelMap;
}
}
- Compatible with open source
SchedulerX Supports executors written based on common open source frameworks , Include :XXL-Job、ElasticJob, The subsequent scheduling platform will also plan to support scheduling Spring Batch Mission .
Case scenario
Distributed batch processing model ( visualization MapReduce Model ), In actual enterprise applications, there are a large number of demand scenarios . Some common usage scenarios such as :
For batch parallel processing of database and table data , Distribute sub database or sub table information among cluster nodes as sub task objects to realize parallel processing
Process logistics order data by urban area , Cities and regions are distributed among cluster nodes as sub task objects to realize parallel processing
In view of Visualization MapReduce Subtask Visualization , Key customers can be / The order information is processed as a sub task , To process corresponding data reports or push information , To realize the visual tracking processing of important subtasks
Fund sales business case
The following provides a fund sales business case for reference if a distributed batch processing model is used , So that users can play freely in their own business scenarios . Case description : In fund companies and fund sales companies ( Such as : Ant wealth ) There will be investors' accounts every day / Transaction application data is processed synchronously , It often uses file data interaction , A fund company is right next N Multiple vendors ( vice versa ), The data files provided by each vendor are completely independent ; Every vendor's data file needs to be verified 、 Interface file parsing 、 data verification 、 There are several fixed steps for data import . In dealing with the above fixed steps, fund companies are very suitable to adopt the distributed batch processing method to speed up the processing of data files , Each vendor is distributed to the cluster as a subtask object , All application nodes participate in parsing the data files of different vendors assigned to them .
@Component
public class FileImportJob extends MapReduceJobProcessor {
private static final Logger logger = LoggerFactory.getLogger("schedulerx");
@Override
public ProcessResult reduce(JobContext context) throws Exception {
return new ProcessResult(true);
}
@Override
public ProcessResult process(JobContext context) throws Exception {
if(isRootTask(context)){
// ---------------------------------------------------------
// Step1. Read the docked vendor list Code
// ---------------------------------------------------------
logger.info(" Build a subtask list with the vendor as the dimension ...");
// The pseudo code reads the vendor list from the database ,Agency Class needs to be implemented BizSubTask Interface and can be used to
// Vendor name / Code as subtask label , So that the console can visually track
List<Agency> agencylist = getAgencyListFromDb();
return map(agencylist, "fileImport");
}else {
// ---------------------------------------------------------
// Step2. Process the corresponding file data for a single vendor
// ---------------------------------------------------------
Agency agency = (Agency)context.getTask();
File file = loadFile(agency);
logger.info(" File loading complete .");
validFile(file);
logger.info(" File verification passed .");
List<Request> request = resolveRequest(file);
logger.info(" File data parsing is completed .");
List<Request> request = checkRequest(request);
logger.info(" Application data check passed .");
importRequest(request);
logger.info(" Application data import completed .");
return new ProcessResult(true);
}
}
}

The case is mainly a business link in the fund transaction clearing , Parallel batch processing is adopted for processing , Each subsequent processing step can also be processed in a similar way . in addition , Every Visualization MapReduce The task node passes DAG Depending on orchestration, a complete automatic business clearing process can be built .
summary
Distributed task scheduling platform SchedulerX It provides a perfect solution for enterprise level distributed batch processing , It provides users with a fast and easy-to-use access mode , And support regular scheduling 、 Visual run trace 、 Simple operation and maintenance can be controlled 、 Highly available scheduling services , At the same time, supporting enterprise level monitoring market 、 The log service 、 Monitoring and alarming capabilities .
reference :
Spring Batch Integration:
ElasticJob:
https://shardingsphere.apache.org/elasticjob/current/cn/overview/
Distributed task scheduling SchedulerX User manual :
https://help.aliyun.com/document_detail/161998.html
SchedulerX How to help users solve distributed task scheduling :
边栏推荐
- 【Pyhton 实战】---- 批量【端午节】海报下载
- 单体模式
- "When you are no longer a programmer, many things will get out of control" -- a dialogue with SUSE CTO, the world's largest independent open source company
- NFT市场进入聚合时代,OKALEIDO成BNB Chain上的首个聚合平台
- 线程池的实现
- Publication of the prize for contribution - Essay solicitation activity for lightweight application server (April)
- MySQL 学习笔记-第五篇-数据备份与恢复、MySQL 日志
- Harbor正确密码登录不上去
- 中银证券靠谱吗?开证券账户安全吗?
- 【tgcalls】跟踪调试calls的manager们 2
猜你喜欢

对象的实例化和访问

叁拾叁- NodeJS简单代理池(估计完了吧)SuperAgent 使用 Proxy 的惨痛经历 及做事要细心

MKS H3615NS 直流电机驱动 使用说明书

文档书写规范

Web SSH client shwifty

error NU1202: Package Volo.Abp.Cli 5.2.1 is not compatible with netcoreapp3.1

MySQL 学习笔记-第五篇-数据备份与恢复、MySQL 日志

Thirty eight JS tried fractal graphics on canvas (II) tried mountain building, painted mountains and the basis of angular geometry

Mof-53nps loaded antibacterial molecule vancomycin (MOF metal organic framework loaded protein polypeptide drugs)

Two Sum
随机推荐
[tgcalls] managers who track and debug calls 2
叁拾叁- NodeJS简单代理池(估计完了吧)SuperAgent 使用 Proxy 的惨痛经历 及做事要细心
web开发重点,简单开发web
论文阅读 (53):Universal Adversarial Perturbations
[go] introduction to exp
太神奇的 SQL 查询经历,group by 慢查询优化!
Learning fuzzy from SQL injection to bypass the latest safe dog WAF
最新Camtasia 2022免费版电脑录屏工具
你知道多少,深度解析,值得收藏
费用最低的证券公司 开户安全吗
Is it safe for the securities company with the lowest fees to open an account
Use the five number generalization method to determine the outliers in the data set
Dotnet core can also coordinate distributed transactions!
【tgcalls】跟踪調試calls的manager們 2
Web SSH client shwifty
DM 平台管理 - netcore
叁拾壹- NodeJS简单代理池(合) 之 MongoDB 链接数爆炸了
塔米狗知识|2022年新的国有产权非公开协议转让新规解读来了!
用80%的工时拿100%的薪水,英国正式开启“四天工作制”试验!
计网 | OSI模型中各层单位