@

架构设计

总体架构

  • MasterServer:MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态. MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理. MasterServer基于netty提供监听服务.The service mainly includes:

    • Distributed Quartz分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作
    • MasterSchedulerThread是一个扫描线程,定时扫描数据库中的 command 表,根据不同的命令类型进行不同的业务操作
    • MasterExecThread主要是负责DAG任务切分、任务提交监控、各种不同命令类型的逻辑处理
    • MasterTaskExecThread主要负责任务的持久化
  • WorkerServer:WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务. WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳. Server基于netty提供监听服务.WorkerThe service mainly includes:
    • FetchTaskThread主要负责不断从Task Queue中领取任务,并根据不同任务类型调用TaskScheduleThread对应执行器.
  • ZooKeeper:ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错.另外系统还基于ZooKeeper进行事件监听和分布式锁. 我们也曾经基于Redis实现过队列,不过我们希望DolphinScheduler依赖到的组件尽量地少,所以最后还是去掉了Redis实现.
  • Task Queue:提供任务队列的操作,目前队列也是基于Zookeeper来实现.由于队列中存的信息较少,不必担心队列里数据过多的情况,实际上我们压测过百万级数据存队列,对系统稳定性和性能没影响.
  • Alert:提供告警相关接口,接口主要包括告警两种类型的告警数据的存储、查询和通知功能.其中通知功能又有邮件通知SNMP(暂未实现)两种.
  • API:API接口层,主要负责处理前端UI层的请求.该服务统一提供RESTful api向外部提供请求服务. 接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等.
  • UI:系统的前端页面,提供系统的各种可视化操作界面.

启动流程图

Architecture design is briefly

DolphinSchedulerThere are so many in the architecture and implementation of good design ideas,Detailed description can refer to the official,Below is the emphasis briefly:

  • 去中心化vs中心化

    • DolphinScheduler的去中心化是Master/Worker注册到Zookeeper中,实现Master集群和Worker集群无中心,并使用Zookeeper分布式锁来选举其中的一台Master或Worker为“管理者”来执行任务.
  • 分布式锁实践
    • DolphinScheduler使用ZooKeeper分布式锁来实现同一时刻只有一台Master执行Scheduler,或者只有一台Worker执行任务的提交.
  • 线程不足循环等待问题
    • 如果一个DAG中没有子流程,则如果Command中的数据条数大于线程池设置的阈值,则直接流程等待或失败.
    • 如果一个大的DAG中嵌套了很多子流程,则会产生“死等”状态.增加一种资源不足的Command类型,如果线程池不足,则将主流程挂起.这样线程池就有了新的线程,可以让资源不足挂起的流程重新唤醒执行.注意:Master Scheduler线程在获取Command的时候是FIFO的方式执行的.
  • 容错设计
    • 容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况.
  • 任务优先级设计
    • 按照不同流程实例优先级优先于同一个流程实例优先级优先于同一流程内任务优先级优先于同一流程内任务提交顺序依次从高到低进行任务处理.

      • The priority of the process definition is given some processes need to be done before other process to deal with,这个可以在流程启动或者定时启动时配置,共有5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST.
      • 任务的优先级也分为5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST.
  • Logback和netty实现日志访问
    • DolphinScheduler的轻量级性,所以选择了gRPC实现远程访问日志信息.
    • 使用自定义Logback的FileAppender和Filter功能,实现每个任务实例生成一个日志文件.

负载均衡

  • DolphinScheduler-Master Assigned tasks to worker,The default configuration for the linear weighted load.Because the route is in the client do,即 master 服务,可以更改 master.properties 中的 master.host.selector To configure the algorithm.eg:master.host.selector=random(不区分大小写);提供了三种算法:

    • 加权随机(random)
    • Smooth poll(roundrobin)
    • 线性负载(lowerweight)
  • Worker 负载均衡配置:配置文件 worker.properties,权重

    • All the above load algorithm is based on the weight for the weighted distribution,Weight affect the shunt result.你可以在 修改 worker.weight The value of the different weights to different machine Settings.
    • 预热:考虑到 JIT 优化,我们会让 worker In the running for a period of time in low power since the start of,Make it gradually to achieve the best state,We call this process preheating.因此 worker 在启动后,His weight will gradually achieve maximum over time(默认十分钟,We did not provide configuration items,如果需要,You can modify and submit the related PR)
  • Load balancing algorithm described

    • 随机(加权):该算法比较简单,In accord with the worker Randomly selected from a(The proportion of weight will affect his).
    • Smooth poll(加权):Weighted polling algorithm an obvious flaws.Namely under the weight of some particular,Weighted polling dispatch meetings to generate non-uniform instance sequence,这种不平滑的负载可能会使某些实例出现瞬时高负载的现象,导致系统存在宕机的风险.In order to solve the scheduling defect,We provide a smooth weighted polling algorithm.每台 worker There are two weights,即 weight(After the completion of the preheating remains the same),current_weight(动态变化),Each routing.都会遍历所有的 worker,使其 current_weight+weight,At the same time accumulate all worker 的 weight,计为 total_weight,然后挑选 current_weight The biggest as the mission worker,与此同时,将这台 worker 的 current_weight-total_weight.
    • 线性加权(默认算法):Every once in a while the algorithm will report to the registry own load information.
      • We according to the two main information to determine

        • load 平均值(默认是 CPU 核数 *2)
        • 可用物理内存(默认是 0.3,单位是 G)
      • If any of the below configuration items,那么这台 worker It will not participate in the load.(Neither distribution flow),可以在 worker.properties Modify the following properties to customize the configuration
        • worker.max.cpuload.avg=-1 (worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为-1: cpu cores * 2)
        • worker.reserved.memory=0.3 (worker预留内存,只有低于系统可用内存时,worker服务才能被派发任务,单位为G)

    缓存

    由于在master-server调度过程中,Will produce a large number of database read operations,如tenant,user,processDefinition等,一方面对DBGreat read pressure,On the other hand, can make whole core scheduling process becomes slow;Consider this part of the business data is read more write less scene,It introduces the cache module,以减少DB读压力,To speed up the core scheduling process;缓存模块采用spring-cache机制,可直接在springWhether that is configured in the configuration file open cache(默认none关闭), 缓存类型;

    • 目前采用caffeine进行缓存管理,Are free to set the cache related configuration,如缓存大小、过期时间等;
    • 缓存读取:缓存采用spring-cache的注解,Configuration in the relevantmapper层,Refer to as:TenantMapper.
    • 缓存更新:Business data updates fromapi-server, The cache end inmaster-server, 故需要对api-serverThe data update to do listening(aspect切面拦截@CacheEvict),When you need to cache deportation will informmaster-server,master-server接收到cacheEvictCommandAfter the cache to expel;
    • 需要注意的是:Out of the cache update strategy from the users in thecaffeineThe expiration policy configuration,Please combine the configurations of business;

    实战使用

参数

参数优先级

DolphinScheduler Involved in the definition of parameter values may come from three types of:

  • 全局参数:Save the page in workflow definition when the definition of the variable
  • 上游任务传递的参数:The parameters of the upstream task passed
  • 本地参数:The node's own variables,用户在“自定义参数”定义的变量,And the user can in the workflow definition when the definition the part of the value of the variable

因为参数的值存在多个来源,当参数名相同时,You need will parameter priority problems.DolphinScheduler Parameters of priority from high to low as:本地参数 > 上游任务传递的参数 > 全局参数

In the upstream task parameters passed,由于上游可能存在多个任务向下游传递参数,当上游传递的参数名称相同时:

  • 下游节点会优先使用值为非空的参数
  • 如果存在多个值为非空的参数,则按照上游任务的完成时间排序,选择完成时间最早的上游任务对应的参数

内置参数

基础内置参数

变量名声明方式含义
system.biz.date${system.biz.date}日常调度实例定时的定时时间前一天,格式为 yyyyMMdd
system.biz.curdate${system.biz.curdate}日常调度实例定时的定时时间,格式为 yyyyMMdd
system.datetime${system.datetime}日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss

衍生内置参数

  • 支持代码中自定义变量名,声明方式:${变量名}.可以是引用 "系统参数"

  • 我们定义这种基准变量为 \([...] 格式的,\)[yyyyMMddHHmmss] 是可以任意分解组合的,比如:$[yyyyMMdd], $[HHmmss], $[yyyy-MM-dd] 等

  • Can also through the following two ways:

    • 使用add_months()函数,该函数用于加减月份, 第一个入口参数为[yyyyMMdd],表示返回时间的格式 第二个入口参数为月份偏移量,表示加减多少个月

      • 后 N 年:$[add_months(yyyyMMdd,12*N)]
      • 前 N 年:$[add_months(yyyyMMdd,-12*N)]
      • 后 N 月:$[add_months(yyyyMMdd,N)]
      • 前 N 月:$[add_months(yyyyMMdd,-N)]
    • 直接加减数字 在自定义格式后直接“+/-”数字

      • 后 N 周:$[yyyyMMdd+7*N]
      • 前 N 周:$[yyyyMMdd-7*N]
      • 后 N 天:$[yyyyMMdd+N]
      • 前 N 天:$[yyyyMMdd-N]
      • 后 N 小时:$[HHmmss+N/24]
      • 前 N 小时:$[HHmmss-N/24]
      • 后 N 分钟:$[HHmmss+N/24/60]
      • 前 N 分钟:$[HHmmss-N/24/60]

本地参数和全局参数

The scope of the local parameters:The task definition page configuration parameters,The default scope only the task,If the configuration parameter, this parameter can be effect to downstream tasks.

使用前面shellDemo workflow定义,在shell-nodeAAdd the local parameters:

  • dt:参数名
  • IN:IN According to local parameter can only be used in the current node,OUT According to local parameter can be passed to the downstream
  • DATE:数据类型,日期
  • $[yyyy-MM-dd]:The custom format derived built-in parameters

The global parameters scope:The global parameter is a pointer to整个工作流All the task node of effective parameters,In the workflow definition page configuration.Local tasks refer to global parameters is the premise of have defined the global parameters,Use similar way and the use of local parameter,But the value of the parameter need to be configured into the global parameters key.

在shell-nodeB和shell-nodeCThe script in the outputecho ${dt},And then click save workflow,添加全局变量

Click task instance from the right view the log,可以看到shell-nodeAThe output is the date2022-08-02,而shell-nodeB和shell-nodeCThe output is the day before the date2022-08-01.

工作流传参

DolphinScheduler Allow arguments between tasks,Current delivery direction support only one-way passed to the downstream.目前支持这个特性的任务类型有Shell、SQL、Procedure.

When defining the upstream node,If there is a need to pass the result of the node to have dependencies of the downstream node,需要在【当前节点设置】的【自定义参数】Set a direction is OUT 的变量.Now we're getting at SQL 和 SHELL Node to do the function can pass down.

  • 注:If no dependencies between node,The local parameter can't be transmitted by upstream.

下面通过 SHELL Task to create local parameter and assign a value to the downstream,The user needs to pass parameters,在定义 SHELL 脚本时,Need the output format for ${setValue(key=value)} 的语句,key For the corresponding parameters of prop,value 为该参数的值.在shell-nodeAAdd a custom parameter Settings in the task node parameters as follows:

在shell-nodeCIn the script outputecho ${transfer},保存工作流定义-上线-运行,查看shell-nodeCTask sample log,Can get the output20220701To achieve workflow parameters.

数据源管理

支持数据源

数据源中心支持MySQL、POSTGRESQL、HIVE/IMPALA、SPARK、CLICKHOUSE、ORACLE、SQLSERVER等数据源.

  • 点击"数据源中心->创建数据源",According to the demand to create different types of data sources
  • 点击"测试连接",Whether can test the data source connection is successful(Only when the data source through the connectivity test before they can save data source).

以 MySQL 为例,如果想要使用 MySQL 数据源,需要先在 mysql maven 仓库 中下载对应版本的 JDBC 驱动,将其移入 api-server/libs 以及 worker-server/libs 文件夹中,最后重启 api-serverworker-server 服务,即可使用 MySQL 数据源.If you use a container to start DolphinScheduler,同样也是将 JDBC Drive mounted on the above two service after the corresponding path,Restart the driver can be.

创建MySQL数据源

由于前面部署DolphinSchedulerThe cluster hasMySQLDrive copied to all nodesmaster和worker、api-server、alert-server上了,So here you can begin to createMySQL数据源

  • 数据源:选择 MYSQL
  • 数据源名称:输入数据源的名称
  • 描述:输入数据源的描述
  • IP 主机名:输入连接 MySQL 的 IP
  • 端口:输入连接 MySQL 的端口
  • 用户名:设置连接 MySQL 的用户名
  • 密码:设置连接 MySQL 的密码
  • 数据库名:输入连接 MySQL 的数据库名称
  • Jdbc 连接参数:用于 MySQL 连接的参数设置,以 JSON 形式填写

jdbc连接参数如下:

{
"useSSL": "false",
"useUnicode": "true",
"characterEncoding": "utf-8",
"allowMultiQueries": "true",
"zeroDateTimeBehavior": "convertToNull",
"allowPublicKeyRetrieval": "true"
}

Click the data source record data source parameters

创建ClickHouse数据源

Here we use the deployment goodClickHouse,CLICKHOUSEData source driver native has support

工作流实践

SQL工作流

拖拉SQL引擎图标,创建名称为sql_node1任务定义

insert into table01
values('1001',99.9,'2022-08-02 22:00:00'); CREATE TABLE table01
(
`id` String,
`price` Float64,
`create_time` DateTime
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY id;

创建名称为sql_node2任务定义

insert into table02
values('1002',199.9,'2022-08-02 23:00:00'); CREATE TABLE table02
(
`id` String,
`price` Float64,
`create_time` DateTime
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY id;



创建名称为sql_node3任务定义

insert into table03 select * from table01
union all select * from table02; CREATE TABLE table03
(
`id` String,
`price` Float64,
`create_time` DateTime
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY id;

Edit good workflow dependence of,sql_node1和sql_node2都完成后再执行sql_node3,保存名称为sqlDemo workflow.

上线sqlTo demonstrate the workflow and click execute,Query workflow instance execution result

The query task instance execution result

使用ClickHouse的客户端登录ClickHouse查询数据,The result has shown that the correct

工作流定时

Most of the tasks will be regularly run demand,This requires timing workflow,创建步骤:点击项目管理->工作流->工作流定义,进入工作流定义页面,上线工作流,点击"定时"按钮,弹出定时参数设置弹框:

  • 选择起止时间.在起止时间范围内,定时运行工作流;不在起止时间范围内,不再产生定时工作流实例.

  • Add a every 5 Minutes to perform a timing.

  • 失败策略、通知策略、流程优先级、Worker 分组、通知组、收件人、Cc people with workflow operation parameters.

    点击"创建"按钮,创建定时成功,此时定时状态为"下线",定时需上线才生效.

    定时上线:点击"定时管理"按钮,进入定时管理页面,点击"上线"按钮,定时状态变为"上线",如下图所示,工作流定时生效.

告警

The alarm module support scenario

The user needs to create alarm instance,In the create instances when the alarm,Need to select the alarm strategy,有三个选项,成功发、失败发,以及成功和失败都发.在执行完工作流或任务时,如果触发告警,Call the alarm instance sends method will be logical judgment,将告警实例与任务状态进行匹配,匹配则执行该告警实例发送逻辑,不匹配则过滤.After create the alarm instance,Needs associated with the alarm set,An alarm group can use multiple alarm instance. The alarm module support scenarios as follows:

Email warning example

Prepare for an E-mail,开启POP3/SMTP服务,Because it is to be used for sending mainlySMTP,得到授权码,保存修改.

使用管理员用户登录,进入到安全中心,Select the alarm instance management,Create a warning instance,Then select the corresponding alarm plug-insEMAIL,Fill in the following related parameters.

Then select the alarm group management,创建告警组,Select the corresponding alarm instance.

In the operation of the workflow definition configuration to inform strategies for success or failure is just,The alarm groups to create a test in front of the alarm

Have confirmed that they received the email check E-mail information

**本人博客网站 **IT小神 www.itxiaoshen.com

Apache DolphinSchedulerA new generation of distributed workflow task scheduling platform of actual combat-中的更多相关文章

  1. 开源分布式工作流任务调度系统Easy Scheduler Release 1.0.2发布

    Easy Scheduler Release 1.0.2===Easy Scheduler 1.0.2是1.x系列中的第三个版本.此版本增加了调度开放接口.worker分组(指定任务运行的机器组).任 ...

  2. 分布式工作流任务调度系统Easy Scheduler正式开源

    分布式工作流任务调度系统Easy Scheduler正式开源 1.背景 在多位技术小伙伴的努力下,经过近2年的研发迭代.内部业务剥离及重构,也经历一批种子用户试用一段时间后,EasyScheduler ...

  3. 宜信开源|分布式任务调度平台SIA-TASK的架构设计与运行流程

    一.分布式任务调度的背景 无论是互联网应用或者企业级应用,都充斥着大量的批处理任务.我们常常需要一些任务调度系统来帮助解决问题.随着微服务化架构的逐步演进,单体架构逐渐演变为分布式.微服务架构.在此背 ...

  4. .net Task scheduling of distributed architecture platform

    开源地址:http://git.oschina.net/chejiangyi/Dyd.BaseService.TaskManager .net 任务调度平台 用于.net dll,exeThe task of the mount, ...

  5. 宜信开源微服务任务调度平台(SIA-TASK)

    背景 无论是互联网应用或者企业级应用,都充斥着大量的批处理任务.常常需要一些任务调度系统帮助开发者解决问题.随着微服务化架构的逐步演进,单体架构逐渐演变为分布式.微服务架构.在此的背景下,很多原先的任 ...

  6. 宜信开源|微服务任务调度平台SIA-TASK入手实践

    引言 Appropriate letter open source recently micro service task scheduling platformSIA-TASK,SIA-TASKBelong to the distributed task scheduling platform,使用起来简单方便,非常容易入手,Deployment build goodSIA-TASKAfter the task scheduling platform,编写TASK后配置JOB进行 ...

  7. .net任务调度平台 Dyd.BaseService.TaskManager

    Slow network abroad,The latest version of the migration tohttp://git.oschina.net/chejiangyi/Dyd.BaseService.TaskManager .net Simple task scheduling platform 用于.net dll, ...

  8. 分布式任务调度平台 → XXL-JOB 实战

    开心一刻 老师:谁知道鞭炮用英语怎么说? 甲:老师!老师!我知道,鞭炮的英文是pilipala. 老师:那闪电呢? 乙:kucha kucha 老师:那舞狮呢? 丙:dong dong qiang 老 ...

  9. 分布式任务调度平台XXL-JOB搭建教程

    关于分布式任务调度平台XXL-JOB,其实作者 许雪里 在其发布的中文教程中已经介绍的很清楚了,这里我就不做过多的介绍了,关于其搭建教程,本人依照其文档搭建起来基本上也没遇到啥问题,这里通过博客的形式 ...

  10. XXL-JOB分布式任务调度平台安装与部署

    配XXL-JOB分布式任务调度平台安装与部署

随机推荐

  1. R 语言编码风格指南

    R 语言是一门主要用于统计计算和绘图的高级编程语言.这份 R 语言编码风格指南旨在让我们的 R代码更容易阅读.分享和检查.以下规则系与 Google 的 R User community collaborative design and. 概要: R编码风格 ...

  2. 推荐一些常用感觉不错的jQuery插件

    转:http://www.cnblogs.com/v10258/p/3263939.html JQuery插件繁多,下面是个人在工作和学习中用到感觉不错的,特此记录. UI: jquery UI(官方 ...

  3. Java中的Annotation(2)----Annotation工作原理

    Java中的Annotation(2)----Annotation工作原理 分类: 编程语言2013-03-18 01:06 3280人阅读 评论(6) 收藏 举报 In an article has introduced how to useJDK ...

  4. git之reset图解

    https://blog.csdn.net/longintchar/article/details/81843048 1.三棵树. 此时如果我们运行 git status,会发现没有任何改动,因为现在 ...

  5. Vue Routing and routing default jump

    Routing is to make the root component dynamic have to mount the other components: 步骤: //路由配置: //.安装 npm install vue-router --save / cnpm install vue-router --save ...

  6. Tree总结

    The tree structure for easy to write solution,So often appear in the interview questions 1. 树的种类 1) Tree 2) Binary Trees 3) Binary Search Trees(BST) : used to sort ...

  7. delphi android 自动升级

    用IdHTTP1Download the file to my phone. Call not installed automatically. First chance exception at $DFC22519. Exception class EJNIException with ...

  8. 安装ubuntu系统 ——分区

    安装ubuntu 系统主要分四个区 目录 建议大小 格式 描述 / 10G-20G ext4 根目录 swap <2048M swap 交换空间 /boot 400M左右 ext4 Linux的 ...

  9. Oracle EBS Get full value set information

    SELECT fvs1.flex_value_set_name, fvs1.description, decode(upper(fvs1.longlist_flag), 'N', '值列表', 'Y' ...

  10. [翻译] LTInfiniteScrollView

    LTInfiniteScrollView 效果: Usage - 使用 Create the scroll view by: Through the following ways to createscroll view self.scrollView ...