当前位置:网站首页>dflow入门4——recurse&reuse&conditional
dflow入门4——recurse&reuse&conditional
2022-08-03 08:15:00 【frank_haha】
为了梳理学习dflow时遇到的知识点,我决定开这一个系列记录自己的学习过程。当然了,最好是去看 官方教程 和 文档
本文,我们将阅读教程中recurse、reuse和conditional这三节
阅读原教程
recurse
recurse是dflow开发人员针对递归循环设计的模块,我们先来看recurse
首先定义了一个模板
plus1 = ShellOPTemplate(
name='plus1',
image="alpine:3.15",
script="echo 'This is iter {
{inputs.parameters.iter}}' && "
"echo $(({
{inputs.parameters.iter}}+1)) > /tmp/result.txt")
plus1.inputs.parameters = {
"iter": InputParameter()}
plus1.outputs.parameters = {
"iter": OutputParameter(
value_from_path="/tmp/result.txt")}
该模板是自增1的操作
hello = Step(name="hello", template=plus1, parameters={
"iter": steps.inputs.parameters["iter"]})
定义了一个step,该step使用了模板,自增1,输入是其他模板的输出
下面定义一个steps
steps = Steps(name="iter", inputs=Inputs(
parameters={
"iter": InputParameter(value=0),
"limit": InputParameter(value=3)}))
该steps的输入有两个,一个是iter,一个是limit
最后定义一个新的step
next = Step(name="next", template=steps,
parameters={
"iter": hello.outputs.parameters["iter"]},
when="%s < %s" % (
hello.outputs.parameters["iter"],
steps.inputs.parameters["limit"]))
翻译成普通的python
程序:
def add1(input: int):
print(input)
return input + 1
def loops(input_loop: int, limit_loop: int):
while input_loop < limit_loop:
input_loop = add1(input_loop)
loops(input_loop=input_loop, limit_loop=limit_loop)
loops(input_loop=0, limit_loop=3)
我们对比来看,自增1操作即为add1函数
loops函数对应next
我们从上往下看:
- wf 里仅有一个 steps
- 这个 steps add 了两步,add 1 和 loops
- 此外设定了进入函数的条件,也就是while的条件
从小往上看是build的过程:
- 首先定义自增1的模板
- 定义steps,设定好条件
- 定义next模板,该模板接收自增1的输出,再调用steps
- 最后steps添加这两个模板,wf 提交任务
调用方法:
- 把自增1替换为我们要循环的模板即可,next 接收自增1的关键输出,steps设定好阈值。这就是一个for循环
reuse
下面我们看reuse
这一节首先将上一节的shelltemplate替换成了python template
import time
from dflow import InputParameter, Inputs, Step, Steps, Workflow
from dflow.python import OP, OPIO, OPIOSign, PythonOPTemplate
class Plus1(OP):
def __init__(self):
pass
@classmethod
def get_input_sign(cls):
return OPIOSign({
'iter': int
})
@classmethod
def get_output_sign(cls):
return OPIOSign({
'iter': int
})
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
return OPIO({
'iter': op_in['iter'] + 1
})
if __name__ == "__main__":
steps = Steps(name="iter", inputs=Inputs(
parameters={
"iter": InputParameter(value=0),
"limit": InputParameter(value=5)}))
plus1 = Step(name="plus1",
template=PythonOPTemplate(Plus1,
image="python:3.8"),
parameters={
"iter": steps.inputs.parameters["iter"]},
key="iter-%s" % steps.inputs.parameters["iter"])
steps.add(plus1)
next = Step(name="next", template=steps,
parameters={
"iter": plus1.outputs.parameters["iter"]},
when="%s < %s" % (
plus1.outputs.parameters["iter"],
steps.inputs.parameters["limit"]))
steps.add(next)
wf = Workflow("recurse", steps=steps)
wf.submit()
区别在于limit替换成了5
这样的话,自增1步进行了5次
此外,每一个自增1步分配了一个key
重点在后面
while wf.query_status() in ["Pending", "Running"]:
time.sleep(1)
assert(wf.query_status() == "Succeeded")
step0 = wf.query_step(key="iter-0")[0]
step1 = wf.query_step(key="iter-1")[0]
step1.modify_output_parameter("iter", 3)
wf = Workflow("recurse-resubmit", steps=steps)
wf.submit(reuse_step=[step0, step1])
这里取出了前两步,修改了第二步的输出(从2修改到了3)
重新提交后,原来5次的循环,重新使用之前的两次,由于修改了第二次的输出,所以在这次相当于从原来的第4步开始,运行两步后结束。
这一feature比较有价值的点在于,我们的输入输出文件都是在minio上存着的,只要我们不删容器(e.g. 重启docker),只要有他们的key,这些东西都是可以查询的,可以重用的。
conditional
首先定义了一个条件template
import random
import time
from dflow import (OutputArtifact, OutputParameter, Outputs, Step, Steps,
Workflow, if_expression)
from dflow.python import (OP, OPIO, Artifact, OPIOSign, PythonOPTemplate,
upload_packages)
if "__file__" in locals():
upload_packages.append(__file__)
class Random(OP):
@classmethod
def get_input_sign(cls):
return OPIOSign()
@classmethod
def get_output_sign(cls):
return OPIOSign({
"is_head": bool,
"msg1": str,
"msg2": str,
"foo": Artifact(str),
"bar": Artifact(str)
})
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
open("foo.txt", "w").write("head")
open("bar.txt", "w").write("tail")
if random.random() < 0.5:
is_head = True
else:
is_head = False
return OPIO({
"is_head": is_head,
"msg1": "head",
"msg2": "tail",
"foo": "foo.txt",
"bar": "bar.txt"
})
然后套上壳
steps = Steps("conditional-steps", outputs=Outputs(
parameters={
"msg": OutputParameter()},
artifacts={
"res": OutputArtifact()}))
random_step = Step(
name="random",
template=PythonOPTemplate(Random, image="python:3.8")
)
steps.add(random_step)
steps.outputs.parameters["msg"].value_from_expression = if_expression(
_if=random_step.outputs.parameters["is_head"],
_then=random_step.outputs.parameters["msg1"],
_else=random_step.outputs.parameters["msg2"])
steps.outputs.artifacts["res"].from_expression = if_expression(
_if=random_step.outputs.parameters["is_head"],
_then=random_step.outputs.artifacts["foo"],
_else=random_step.outputs.artifacts["bar"])
wf = Workflow(name="conditional", steps=steps)
wf.submit()
while wf.query_status() in ["Pending", "Running"]:
time.sleep(1)
assert(wf.query_status() == "Succeeded")
结构简单
边栏推荐
猜你喜欢
随机推荐
推荐系统-排序层-模型:Wide&Deep
Unity编辑器扩展批量修改图片名称
并发之固定运行和交替运行方案
面渣逆袭:MySQL六十六问,两万字+五十图详解
Docker启动mysql
The use of the database table structure document generation tool screw
ArcEngine(八)用IWorkspaceFactory加载矢量数据
PowerShell:执行 Install-Module 时,不能从 URI 下载
mysql的innodb存储引擎和myisam存储引擎的区别
Pop Harmony Basics Big Notes
计算机网络之网络安全
热部署系统实现
Taro框架-微信小程序-调用微信支付
frp:开源内网穿透工具
mysql存生僻字奇怪问题,mysql为什么不能辨别mb4字符?
mysql备份时的快照原理
五、《图解HTTP》报文首部和HTTP缓存
【TPC-DS】DF的SQL(Data Maintenance部分)
LAN技术-2免费ARP
AI mid-stage sequence labeling task: three data set construction process records