- MasterServer:MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态. MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理. MasterServer基于netty提供监听服务.The service mainly includes:
- Distributed Quartz分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作
- MasterSchedulerThread是一个扫描线程,定时扫描数据库中的 command 表,根据不同的命令类型进行不同的业务操作
- MasterExecThread主要是负责DAG任务切分、任务提交监控、各种不同命令类型的逻辑处理
- MasterTaskExecThread主要负责任务的持久化
- WorkerServer:WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务. WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳. Server基于netty提供监听服务.WorkerThe service mainly includes:
- FetchTaskThread主要负责不断从Task Queue中领取任务,并根据不同任务类型调用TaskScheduleThread对应执行器.
- ZooKeeper:ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错.另外系统还基于ZooKeeper进行事件监听和分布式锁. 我们也曾经基于Redis实现过队列,不过我们希望DolphinScheduler依赖到的组件尽量地少,所以最后还是去掉了Redis实现.
- Task Queue:提供任务队列的操作,目前队列也是基于Zookeeper来实现.由于队列中存的信息较少,不必担心队列里数据过多的情况,实际上我们压测过百万级数据存队列,对系统稳定性和性能没影响.
- Alert:提供告警相关接口,接口主要包括告警两种类型的告警数据的存储、查询和通知功能.其中通知功能又有邮件通知和SNMP(暂未实现)两种.
- API:API接口层,主要负责处理前端UI层的请求.该服务统一提供RESTful api向外部提供请求服务. 接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等.
- UI:系统的前端页面,提供系统的各种可视化操作界面.
Architecture design is briefly
DolphinSchedulerThere are so many in the architecture and implementation of good design ideas,Detailed description can refer to the official,Below is the emphasis briefly:
- 去中心化vs中心化
- DolphinScheduler的去中心化是Master/Worker注册到Zookeeper中,实现Master集群和Worker集群无中心,并使用Zookeeper分布式锁来选举其中的一台Master或Worker为“管理者”来执行任务.
- 分布式锁实践
- DolphinScheduler使用ZooKeeper分布式锁来实现同一时刻只有一台Master执行Scheduler,或者只有一台Worker执行任务的提交.
- 线程不足循环等待问题
- 如果一个DAG中没有子流程,则如果Command中的数据条数大于线程池设置的阈值,则直接流程等待或失败.
- 如果一个大的DAG中嵌套了很多子流程,则会产生“死等”状态.增加一种资源不足的Command类型,如果线程池不足,则将主流程挂起.这样线程池就有了新的线程,可以让资源不足挂起的流程重新唤醒执行.注意:Master Scheduler线程在获取Command的时候是FIFO的方式执行的.
- 容错设计
- 容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况.
- 任务优先级设计
- 按照不同流程实例优先级优先于同一个流程实例优先级优先于同一流程内任务优先级优先于同一流程内任务提交顺序依次从高到低进行任务处理.
- The priority of the process definition is given some processes need to be done before other process to deal with,这个可以在流程启动或者定时启动时配置,共有5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST.
- 按照不同流程实例优先级优先于同一个流程实例优先级优先于同一流程内任务优先级优先于同一流程内任务提交顺序依次从高到低进行任务处理.
- Logback和netty实现日志访问
- DolphinScheduler的轻量级性,所以选择了gRPC实现远程访问日志信息.
- 使用自定义Logback的FileAppender和Filter功能,实现每个任务实例生成一个日志文件.
DolphinScheduler-Master Assigned tasks to worker,The default configuration for the linear weighted load.Because the route is in the client do,即 master 服务,可以更改 master.properties 中的 master.host.selector To configure the algorithm.eg:master.host.selector=random(不区分大小写);提供了三种算法:
- 加权随机(random)
- Smooth poll(roundrobin)
- 线性负载(lowerweight)
Worker 负载均衡配置:配置文件 worker.properties,权重
- All the above load algorithm is based on the weight for the weighted distribution,Weight affect the shunt result.你可以在 修改 worker.weight The value of the different weights to different machine Settings.
- 预热:考虑到 JIT 优化,我们会让 worker In the running for a period of time in low power since the start of,Make it gradually to achieve the best state,We call this process preheating.因此 worker 在启动后,His weight will gradually achieve maximum over time(默认十分钟,We did not provide configuration items,如果需要,You can modify and submit the related PR)
Load balancing algorithm described
- 随机(加权):该算法比较简单,In accord with the worker Randomly selected from a(The proportion of weight will affect his).
- Smooth poll(加权):Weighted polling algorithm an obvious flaws.Namely under the weight of some particular,Weighted polling dispatch meetings to generate non-uniform instance sequence,这种不平滑的负载可能会使某些实例出现瞬时高负载的现象,导致系统存在宕机的风险.In order to solve the scheduling defect,We provide a smooth weighted polling algorithm.每台 worker There are two weights,即 weight(After the completion of the preheating remains the same),current_weight(动态变化),Each routing.都会遍历所有的 worker,使其 current_weight+weight,At the same time accumulate all worker 的 weight,计为 total_weight,然后挑选 current_weight The biggest as the mission worker,与此同时,将这台 worker 的 current_weight-total_weight.
- 线性加权(默认算法):Every once in a while the algorithm will report to the registry own load information.
- We according to the two main information to determine
- load 平均值(默认是 CPU 核数 *2)
- 可用物理内存(默认是 0.3,单位是 G)
- If any of the below configuration items,那么这台 worker It will not participate in the load.(Neither distribution flow),可以在 worker.properties Modify the following properties to customize the configuration
- worker.max.cpuload.avg=-1 (worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为-1: cpu cores * 2)
- worker.reserved.memory=0.3 (worker预留内存,只有低于系统可用内存时,worker服务才能被派发任务,单位为G)
- We according to the two main information to determine
由于在master-server调度过程中,Will produce a large number of database read operations,如tenant,user,processDefinition等,一方面对DBGreat read pressure,On the other hand, can make whole core scheduling process becomes slow;Consider this part of the business data is read more write less scene,It introduces the cache module,以减少DB读压力,To speed up the core scheduling process;缓存模块采用spring-cache机制,可直接在springWhether that is configured in the configuration file open cache(默认none关闭), 缓存类型;
- 目前采用caffeine进行缓存管理,Are free to set the cache related configuration,如缓存大小、过期时间等;
- 缓存读取:缓存采用spring-cache的注解,Configuration in the relevantmapper层,Refer to as:TenantMapper.
- 缓存更新:Business data updates fromapi-server, The cache end inmaster-server, 故需要对api-serverThe data update to do listening(aspect切面拦截@CacheEvict),When you need to cache deportation will informmaster-server,master-server接收到cacheEvictCommandAfter the cache to expel;
- 需要注意的是:Out of the cache update strategy from the users in thecaffeineThe expiration policy configuration,Please combine the configurations of business;
DolphinScheduler Involved in the definition of parameter values may come from three types of:
- 全局参数:Save the page in workflow definition when the definition of the variable
- 上游任务传递的参数:The parameters of the upstream task passed
- 本地参数:The node's own variables,用户在“自定义参数”定义的变量,And the user can in the workflow definition when the definition the part of the value of the variable
因为参数的值存在多个来源,当参数名相同时,You need will parameter priority problems.DolphinScheduler Parameters of priority from high to low as:本地参数 > 上游任务传递的参数 > 全局参数
In the upstream task parameters passed,由于上游可能存在多个任务向下游传递参数,当上游传递的参数名称相同时:
- 下游节点会优先使用值为非空的参数
- 如果存在多个值为非空的参数,则按照上游任务的完成时间排序,选择完成时间最早的上游任务对应的参数
变量名 | 声明方式 | 含义 |
system.biz.date | ${system.biz.date} | 日常调度实例定时的定时时间前一天,格式为 yyyyMMdd |
system.biz.curdate | ${system.biz.curdate} | 日常调度实例定时的定时时间,格式为 yyyyMMdd |
system.datetime | ${system.datetime} | 日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss |
支持代码中自定义变量名,声明方式:${变量名}.可以是引用 "系统参数"
我们定义这种基准变量为 \([...] 格式的,\)[yyyyMMddHHmmss] 是可以任意分解组合的,比如:$[yyyyMMdd], $[HHmmss], $[yyyy-MM-dd] 等
Can also through the following two ways:
使用add_months()函数,该函数用于加减月份, 第一个入口参数为[yyyyMMdd],表示返回时间的格式 第二个入口参数为月份偏移量,表示加减多少个月
- 后 N 年:$[add_months(yyyyMMdd,12*N)]
- 前 N 年:$[add_months(yyyyMMdd,-12*N)]
- 后 N 月:$[add_months(yyyyMMdd,N)]
- 前 N 月:$[add_months(yyyyMMdd,-N)]
直接加减数字 在自定义格式后直接“+/-”数字
- 后 N 周:$[yyyyMMdd+7*N]
- 前 N 周:$[yyyyMMdd-7*N]
- 后 N 天:$[yyyyMMdd+N]
- 前 N 天:$[yyyyMMdd-N]
- 后 N 小时:$[HHmmss+N/24]
- 前 N 小时:$[HHmmss-N/24]
- 后 N 分钟:$[HHmmss+N/24/60]
- 前 N 分钟:$[HHmmss-N/24/60]
The scope of the local parameters:The task definition page configuration parameters,The default scope only the task,If the configuration parameter, this parameter can be effect to downstream tasks.
使用前面shellDemo workflow定义,在shell-nodeAAdd the local parameters:
- dt:参数名
- IN:IN According to local parameter can only be used in the current node,OUT According to local parameter can be passed to the downstream
- DATE:数据类型,日期
- $[yyyy-MM-dd]:The custom format derived built-in parameters
The global parameters scope:The global parameter is a pointer to整个工作流All the task node of effective parameters,In the workflow definition page configuration.Local tasks refer to global parameters is the premise of have defined the global parameters,Use similar way and the use of local parameter,But the value of the parameter need to be configured into the global parameters key.
在shell-nodeB和shell-nodeCThe script in the outputecho ${dt},And then click save workflow,添加全局变量
Click task instance from the right view the log,可以看到shell-nodeAThe output is the date2022-08-02,而shell-nodeB和shell-nodeCThe output is the day before the date2022-08-01.
DolphinScheduler Allow arguments between tasks,Current delivery direction support only one-way passed to the downstream.目前支持这个特性的任务类型有Shell、SQL、Procedure.
When defining the upstream node,If there is a need to pass the result of the node to have dependencies of the downstream node,需要在【当前节点设置】的【自定义参数】Set a direction is OUT 的变量.Now we're getting at SQL 和 SHELL Node to do the function can pass down.
- 注:If no dependencies between node,The local parameter can't be transmitted by upstream.
下面通过 SHELL Task to create local parameter and assign a value to the downstream,The user needs to pass parameters,在定义 SHELL 脚本时,Need the output format for ${setValue(key=value)} 的语句,key For the corresponding parameters of prop,value 为该参数的值.在shell-nodeAAdd a custom parameter Settings in the task node parameters as follows:
在shell-nodeCIn the script outputecho ${transfer},保存工作流定义-上线-运行,查看shell-nodeCTask sample log,Can get the output20220701To achieve workflow parameters.
- 点击"数据源中心->创建数据源",According to the demand to create different types of data sources
- 点击"测试连接",Whether can test the data source connection is successful(Only when the data source through the connectivity test before they can save data source).
以 MySQL 为例,如果想要使用 MySQL 数据源,需要先在 mysql maven 仓库 中下载对应版本的 JDBC 驱动,将其移入 api-server/libs
以及 worker-server/libs
文件夹中,最后重启 api-server
和 worker-server
服务,即可使用 MySQL 数据源.If you use a container to start DolphinScheduler,同样也是将 JDBC Drive mounted on the above two service after the corresponding path,Restart the driver can be.
由于前面部署DolphinSchedulerThe cluster hasMySQLDrive copied to all nodesmaster和worker、api-server、alert-server上了,So here you can begin to createMySQL数据源
- 数据源:选择 MYSQL
- 数据源名称:输入数据源的名称
- 描述:输入数据源的描述
- IP 主机名:输入连接 MySQL 的 IP
- 端口:输入连接 MySQL 的端口
- 用户名:设置连接 MySQL 的用户名
- 密码:设置连接 MySQL 的密码
- 数据库名:输入连接 MySQL 的数据库名称
- Jdbc 连接参数:用于 MySQL 连接的参数设置,以 JSON 形式填写
"useSSL": "false",
"useUnicode": "true",
"characterEncoding": "utf-8",
"allowMultiQueries": "true",
"zeroDateTimeBehavior": "convertToNull",
"allowPublicKeyRetrieval": "true"
Click the data source record data source parameters
Here we use the deployment goodClickHouse,CLICKHOUSEData source driver native has support
insert into table01
values('1001',99.9,'2022-08-02 22:00:00');
`id` String,
`price` Float64,
`create_time` DateTime
ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
insert into table02
values('1002',199.9,'2022-08-02 23:00:00');
`id` String,
`price` Float64,
`create_time` DateTime
ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
insert into table03 select * from table01
union all select * from table02;
`id` String,
`price` Float64,
`create_time` DateTime
ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
Edit good workflow dependence of,sql_node1和sql_node2都完成后再执行sql_node3,保存名称为sqlDemo workflow.
上线sqlTo demonstrate the workflow and click execute,Query workflow instance execution result
The query task instance execution result
使用ClickHouse的客户端登录ClickHouse查询数据,The result has shown that the correct
Most of the tasks will be regularly run demand,This requires timing workflow,创建步骤:点击项目管理->工作流->工作流定义,进入工作流定义页面,上线工作流,点击"定时"按钮,弹出定时参数设置弹框:
Add a every 5 Minutes to perform a timing.
失败策略、通知策略、流程优先级、Worker 分组、通知组、收件人、Cc people with workflow operation parameters.
The alarm module support scenario
The user needs to create alarm instance,In the create instances when the alarm,Need to select the alarm strategy,有三个选项,成功发、失败发,以及成功和失败都发.在执行完工作流或任务时,如果触发告警,Call the alarm instance sends method will be logical judgment,将告警实例与任务状态进行匹配,匹配则执行该告警实例发送逻辑,不匹配则过滤.After create the alarm instance,Needs associated with the alarm set,An alarm group can use multiple alarm instance. The alarm module support scenarios as follows:
Email warning example
Prepare for an E-mail,开启POP3/SMTP服务,Because it is to be used for sending mainlySMTP,得到授权码,保存修改.
使用管理员用户登录,进入到安全中心,Select the alarm instance management,Create a warning instance,Then select the corresponding alarm plug-insEMAIL,Fill in the following related parameters.
Then select the alarm group management,创建告警组,Select the corresponding alarm instance.
In the operation of the workflow definition configuration to inform strategies for success or failure is just,The alarm groups to create a test in front of the alarm
Have confirmed that they received the email check E-mail information
