当前位置:网站首页>Distributed task scheduling

Distributed task scheduling

2022-06-12 09:09:00 Eden Garden

1、 What is task scheduling

First, we can think about the solutions for the following business scenarios :

  • The e-commerce system needs to work every morning 10 spot , Afternoon 3 spot , evening 8 A batch of coupons will be distributed by click .
  • The banking system needs to send a SMS reminder three days before the due date of the credit card .
  • 12306 After you buy the ticket successfully , Send you a text message two hours before departure .

The solution to these business scenarios is task scheduling

Task scheduling refers to the system in order to automatically complete specific tasks , The process of performing a task at a given time . With task scheduling, more manpower can be liberated , The system automatically performs tasks .

At present, our common distributed task scheduling is :

  1. TBSchedule: Taobao launched a very excellent high-performance distributed scheduling framework , Currently used in Alibaba 、 JD.COM 、 payment
    treasure 、 In the process scheduling system of Gome and many other Internet enterprises . But it has not been updated for many years , The document is missing seriously , Lack of maintenance .
  2. XXL-Job: Public comment distributed task scheduling platform , Is a lightweight distributed task scheduling platform , Its core design goal is
    Rapid development 、 Learn easy 、 Lightweight 、 Easy to expand . Now open source and access to a number of companies online product lines , Open the box .
  3. Elastic-job: Dangdang.com TBSchedule And based on quartz The second development of elastic distributed task scheduling system , Rich in functions
    Powerful , use zookeeper Achieve distributed coordination , High availability of tasks and slicing function .
  4. Saturn: Vipshop is an open source distributed task scheduling platform , be based on Elastic-job, It can be configured uniformly in the whole domain , Unified supervision
    control , High availability of tasks and slicing function .

In this chapter, we mainly introduce Elastic-Job Use .

2、Elastic-Job

Elastic-Job It is a distributed scheduling solution , Open source by Dangdang , It consists of two independent subprojects Elastic-Job-Lite
and Elastic- Job-Cloud form , Use Elastic-Job It can quickly realize distributed task scheduling .

Before we build the environment , Introduce Elastic-Job We need to introduce some important concepts before using :

1. Fragmentation :
Piecemeal execution of tasks , A task needs to be divided into several independent tasks, and then the distributed servers execute one or several shard items respectively .
If there is a task to traverse a table in the database , existing 2 Servers , We divide this task into ten parts , Then each server should perform five sharding tasks , If the servers are distributed evenly A Assigned to the shard item 0,1,2,3,4; The server B Assigned to the shard item 5,6,7,8,9.

2. leader The election :
zookeeper It will ensure that one... Is selected from multiple servers leader,leader If you go offline, it will trigger a re-election .

3. Fragmentation strategy :
AverageAllocationJobShardingStrategy
Partition strategy based on average allocation algorithm , It is also the default fragmentation strategy . If the partition cannot be divided completely , Then the non divisible redundant fragments will be added to the server with small serial number in turn .
OdevitySortByNameJobShardingStrategy
Based on the odd even hash value of the job name IP Slicing strategy of ascending descending algorithm . If the hash value of the job name is odd, then IP Ascending . If the hash value of the job name is even, then IP Descending . It is used to average the load of different jobs to different servers .
( Be careful : Once the number of slices is less than the number of job servers , Assignments will always be assigned to IP The server with the front address , Lead to IP The server at the back of the address is idle .)

4. cron expression :
cron An expression is a string , Used to set timing rules , It consists of seven parts , Each part is separated by a space ;

Part of the meaning Value range
The first part Seconds ( second )0-59
The second part Minutes( branch )0-59
The third part Hours( when )0-23
The fourth part Day-of-Month( God )1-31
The fifth part Month( month )0-11 or JAN-DEC
The sixth part Day-of-Week( week )1-7(1 Means Sunday ) or SUN-SAT
Part seven Year( year ) Optional 1970-2099

cron Expressions also have some special symbols

Symbol meaning
? An indefinite value . When one of the two sub expressions is assigned a value , In order to avoid conflict , You need to set the value of another to “?”. for example : I want to 20 Daily trigger dispatch , No matter 20 What day is the day of the week , It can only be written as follows :0 0 0 20 * ?, In the end, I thought I could only use “?”
* Represents all possible values
, Set multiple values , for example ”26,29,33” It means that 26 branch ,29 Points and 33 Run each task once
- Set value range , for example ”5-20”, From 5 be assigned to 20 Run the task every minute
/ Set the frequency or interval , Such as "1/15" From 1 Point start , every other 15 Run the task once a minute
L For every month , Or weekly , Indicates the last day of each month , Or the last day of the month , for example "6L" Express " The last Friday of the month "
W Represents the working day closest to a given date , for example "15W" On a monthly basis (day-of-month) In the said " From this month 15 The most recent working day "
# Indicates the week of the month X. for example ”6#3” Represents the 3 A Friday

Simple example , Let us have a more profound impression

cron expression meaning
*/5 * * * * ? every other 5 Run the task once per second
0 0 23 * * ? Every day 23 Click to run a task
0 0 1 1 * ? monthly 1 No. In the morning 1 Click to run a task
0 0 23 L * ? Last day of each month 23 Click to run a task
0 26,29,33 * * * ? stay 26 branch 、29 branch 、33 Run the task once
0 0/30 9-17 * * ? Run the task every half an hour during nine to five working hours
0 15 10 ? * 6#3 The third Friday morning of each month 10:15 Run a task

3、 Introductory cases

Environmental requirements :

  • JDK requirement 1.7 And above
  • Maven requirement 3.0.4 And above
  • zookeeper Required 3.4.6 And above ( Open the box : hold conf In the catalog zoo_sample.cfg Change it to zoo.cfg, function bin In the catalog zkServer.cmd)

We need to prepare the database and corresponding tables , Here you can freely use your existing databases and tables to complete the operation , I use my own database here to demonstrate
1、 newly build maven engineering
 Insert picture description here
2、 Introduce dependencies

   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>

       <dependency>
        <groupId>com.dangdang</groupId>
           <artifactId>elastic-job-lite-spring</artifactId>
           <version>2.1.5</version>
       </dependency>
   
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <optional>true</optional>
    </dependency>
   
       <dependency>
           <groupId>com.baomidou</groupId>
           <artifactId>mybatis-plus-boot-starter</artifactId>
           <version>3.1.1</version>
       </dependency>
   
       <dependency>
           <groupId>mysql</groupId>
           <artifactId>mysql-connector-java</artifactId>
           <version>5.1.47</version>
       </dependency>
   
       <dependency>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-log4j12</artifactId>
       </dependency>
   </dependencies>

3、 To write springBoot Configuration file and startup class

server.port=${
    PORT:57081}
spring.application.name = elastic-job
logging.level.root = info

spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.200.128:3306/mp?useUnicode=true&characterEncoding=utf8&autoReconnect=true&allowMultiQueries=true&useSSL=false
spring.datasource.username=root
spring.datasource.password=root

#  Set up Mapper Interface XML file location 
mybatis-plus.mapper-locations = classpath*:dao/*.xml
#  Set alias package scanning path 
mybatis-plus.type-aliases-package = com.itxw.elasticjob.pojo

# zookeeper Service address 
zookeeper.connString = localhost:2181
#  The name space 
myjob.namespace = elastic-job-example
#  Total number of segments 
myjob.count = 3
# cron expression ( Timing strategy )
myjob.cron = 0/5 * * * * ?
@MapperScan("com.itxw.elasticjob.dao")
@SpringBootApplication
public class ElasticJobApp {
    
    public static void main(String[] args) {
    
     SpringApplication.run(ElasticJobApp.class,args);
    }
}

4、 Entity class

@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("tb_user")
public class User {
    
    @TableId("ID")
    private Long id;
    @TableField("USER_NAME")
    private String userName;
    @TableField("PASSWORD")
    private String password;
    @TableField("NAME")
    private String name;
    @TableField("AGE")
    private Integer age;
}

5、 Data access layer

public interface UserMapper extends BaseMapper<User> {
    
    // Yes id adopt mod Function remainder for fragmentation 
    @Select("SELECT * FROM `tb_user` WHERE MOD(id,#{shardingTotalCount})=#{shardingItem}")
    List<User> queryUserById(@Param("shardingTotalCount") int count, @Param("shardingItem") int item);
 }

6、Elastic-Job The task class

/** *  Data query task  **/
@Component
public class MyJob implements SimpleJob {
    

    @Autowired
    private UserMapper userMapper;

    // Perform scheduled tasks 
    @Override
    public void execute(ShardingContext shardingContext) {
    
        // Get the total number of slices 
        int count = shardingContext.getShardingTotalCount();
        // Get fragment item 
        int item = shardingContext.getShardingItem();
        // Query data 
        List<User> userList = userMapper.queryUserById(count,item);
        // Output results 
        userList.forEach(user -> {
    
            System.out.println(" Work piece by piece :" + item + "--->" + user);
        });
    }
}

7、zookeeper Registry Center

@Configuration
public class ZKRegistryCenterConfig {
    

    //zookeeper Server address 
    @Value("${zookeeper.connString}")
    private String ZOOKEEPER_CONNECTION_STRING;

    // The namespace of the scheduled task 
    @Value("${myjob.namespace}")
    private String JOB_NAMESPACE;

    //zk Configure and create a registry 
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter setUpRegistryCenter() {
    
        //zk To configure 
        ZookeeperConfiguration zookeeperConfiguration = new 
        ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);

        // Create a registry 
        ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);

        return zookeeperRegistryCenter;
    }
}

8、elastic-job Configuration class

@Configuration
public class ElasticJobConfig {
    

    @Autowired
    private MyJob myJob;

    @Autowired
    private ZookeeperRegistryCenter zkRegistryCenterConfig;

    @Value("${myjob.count}")
    private int shardingCount;

    @Value("${myjob.cron}")
    private String cron;

    /** *  Configure task details  * * @param jobClass  Task execution class  * @param cron  Execution strategy  * @param shardingTotalCount  Number of slices  * @return */
    private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                        final String cron,
                                                        final int shardingTotalCount) {
    
        // establish JobCoreConfigurationBuilder
        JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
        JobCoreConfiguration jobCoreConfiguration = jobCoreConfigurationBuilder.build();

        // establish SimpleJobConfiguration
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());

        // establish LiteJobConfiguration
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).jobShardingStrategyClass("com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy").overwrite(true).build();

        return liteJobConfiguration;
    }

    @Bean(initMethod = "init")
    public SpringJobScheduler initSimpleElasticJob() {
    
        SpringJobScheduler springJobScheduler = new SpringJobScheduler(myJob, zkRegistryCenterConfig, createJobConfiguration(myJob.getClass(), cron, shardingCount));
        return springJobScheduler;
    }
}

After the above code is completed , We can start the microservice to observe the corresponding data
I have enabled three services here , Assign different port numbers
 Insert picture description here
After waiting for a period of time, you can see the following effects :
 Insert picture description here
 Insert picture description here
 Insert picture description here

原网站

版权声明
本文为[Eden Garden]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/03/202203010531311560.html