当前位置:网站首页>dflow入门1——HelloWorld!
dflow入门1——HelloWorld!
2022-08-03 08:15:00 【frank_haha】
为了梳理学习dflow时遇到的知识点,我决定开这一个系列记录自己的学习过程。当然了,最好是去看官方教程 和 文档
写在前面
dflow是干啥的
是做工作流的,其实就是argo的一个python包装,核心卖点就是可以用argo的UI,同时帮人解决一些工程问题,使代码标准化。
dflow怎么用
所谓工作流就是,预先定好任务执行流程的一整套任务的集合。说人话就是,填空。
使用dflow就是填一个个空。
在python是语境下就是,dflow预先定义好了很多的类,这些类已经可以实现很复杂的,工程向的功能。比如,任务调度、切片、循环、异常处理等等。
我们使用dflow最核心的方式就是调用它的类,增加这些类的科学向的功能。比如分子建模,制作量化软件的输入文件等等,所以学习dflow就是学习调用类的方式。
dflow架构浅析
如果把一整条工作流想象成一座生产标准工业品的工厂,那么step就是一个个顺序拼接起来的生产车间。
车间
每一个生产车间可以完成零部件的加工,在外界看来,step就是一个没有感情的,重复某项工作的一个黑盒。他所需要的只是生产所需的两种元素,一种是message,一种是artifact。前者类似指令,后者类似生产原料。
小结,step的上级是整个工厂(workflow),服从工厂调度,内部黑箱。
工人
那么生产车间内部是什么呢?是一个叫做template的工人。template收到step的原料和指令,开始按照既定的功能开展工作,吭哧吭哧干完后把成果交给step,作为输出。
小结,工人的上级是车间(step),服从车间调度,内部黑箱。
工序
工人干活,比如说,调用gaussian完成一项优化任务。gaussian正版软件要几十万,所以这些东西哪里来?作为一名工人,他需要收集各种工具,处理一些杂事(也就是调取image)。正儿八经要干的活儿,被抽象成了标准的操作手册:OP(operator)。在执行OP的时候,所需的一切条件都已经就位了(container里面环境啥的都配好了)。干完活儿后,把产品交给工人。
小结,工序的上级是工人(template),服从工人安排,内部黑箱。
工作流
我们从头捋一下。
工厂接到了订单,按照指令和原料生产某件产品,并把产品输出。
这是整个工作流的使命,对外黑箱,内部的流程是这样的:
生产流程:
工厂把任务(原料artifact和指令message)分给各个车间(step),车间把任务(原料artifact和指令message)分给工人(template),工人收集工具(image),按照操作手册(OP)生产产品,OP其实也是接收了任务(原料artifact和指令message)。
所以生产的时候,原料artifact和指令message是一直往下传的。
交货流程:
工人完成产品后,该产品对外就是整个车间(step)的output,车间和车间会在工厂的调度下相互耦合,交换产品等。工作流跑完后,工厂可以把最后一个车间的产品交付(download),当然了,也可以交中间车间的产品,这个可以任选,但都是以车间为单位的。
交货的时候,产品(artifact和message)也是一层层向上传的。
案例一:HelloWorld
下面我将从最底层的类开始,逐级向上,并完成helloworld。
制作OP
我们先制作最底层的工序(OP)。
下面是一个OP和他的依赖。
from pathlib import Path
from dflow.python import (OP, OPIO, Artifact, OPIOSign)
from typing import List
class WriteFile(OP):
def __init__(self):
pass
@classmethod
def get_input_sign(cls):
return OPIOSign({
'msg': str,
'number': int,
})
@classmethod
def get_output_sign(cls):
return OPIOSign({
"out_art": Artifact(List[Path])
})
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
f = open("msg.txt","w")
f.write(op_in["msg"])
f.close()
g = open("results.txt","w")
g.write(str(op_in["number"]))
g.close()
op_out = OPIO({
"out_art": [Path("msg.txt"), Path("results.txt")],
})
return op_out
我们在制作OP的时候要套用这个模板,他一定要有三个类方法,这些方法的名字不能动。
前两个是定义一下,工序正常运作所需要的原料和指令(get_input_sign),以及正常运作的输出(get_output_sign)
就好比python写一个函数
def aaa(in_msg: str, in_artifact: Artifact):
op_in = {
"in_msg": in_msg, "in_artifact": in_artifact}
op_out = execute(op_in)
out_msg, out_artifact = op_out["out_msg"], op_out["out_artifact"]
return out_msg, out_artifact
我们写OP,其实就是写一个普通的函数,只不过格式上有区别,首先定义输入输出,然后把输入输出打包成dict给op_in,execute使用其中的东西完成业务逻辑,输出op_out,最后的op_out就是整个工序(OP)的输出,也是整个工人(template)。车间(step)的输出,他可以理解为一个字典。
具体的内部细节不用深究,我们只需要学会套用这个模板即可。
制作template
制作template需要考虑选择什么样的工序,以及该工序所需要的工具(image)
from dflow.python import PythonOPTemplate
a_tmp = PythonOPTemplate(WriteFile, image="python:3.8")
制作step
from dflow import Step
step0 = Step(
name="step0",
template=a_tmp,
parameters={
"msg": "HelloWorld!", "number": 1},
)
制作step就是把输入准备好即可,内部黑箱,取个名字方便调用。
制作工作流
from dflow import Workflow
wf = Workflow(name="python")
wf.add(step0)
wf.submit()
while wf.query_status() in ["Pending", "Running"]:
time.sleep(1)
assert (wf.query_status() == "Succeeded")
step = wf.query_step(name="step")[0]
assert (step.phase == "Succeeded")
from dflow import download_artifact
download_artifact(step.outputs.artifacts["out_art"])
工作流就是一个个step组合而成的,我们通过句柄wf来监测任务,是更高级的接口。
任务提交后可以在argo上查看任务状态。
最后放一张PPT,帮助理解:
此文仅为个人理解,不足之处请指正
边栏推荐
猜你喜欢
随机推荐
用diskpart的offline命令弹出顽固硬盘
Logic Pro X built-in sound library list
数据监控平台
mysql存生僻字奇怪问题,mysql为什么不能辨别mb4字符?
【无标题】
Gauva的ListenableFuture
实时目标检测新高地之#YOLOv7#更快更强的目标检测器
Taro框架-微信小程序-内嵌h5页面
【愚公系列】2022年07月 Go教学课程 026-结构体
NFT到底有哪些实际用途?
解决GANs训练中模式崩塌/训练崩溃的十五个方法
《剑指Offer》刷题之打印从1到最大的n位数
服务器资源监控工具-nmon、nmon_analyser
[Kaggle combat] Prediction of the number of survivors of the Titanic (from zero to submission to Kaggle to model saving and restoration)
The use of the database table structure document generation tool screw
集群
【TPC-DS】25张表的详细介绍,SQL的查询特征
mysqlbinlog: unknown variable 'default-character-set=utf8'
【收获合辑】k-NN与检索任务的异同+jupyter转pdf
sqlite 日期字段加一天