当前位置:网站首页>dflow入门4——recurse&reuse&conditional
dflow入门4——recurse&reuse&conditional
2022-08-03 08:15:00 【frank_haha】
为了梳理学习dflow时遇到的知识点,我决定开这一个系列记录自己的学习过程。当然了,最好是去看 官方教程 和 文档
本文,我们将阅读教程中recurse、reuse和conditional这三节
阅读原教程
recurse
recurse是dflow开发人员针对递归循环设计的模块,我们先来看recurse
首先定义了一个模板
plus1 = ShellOPTemplate(
name='plus1',
image="alpine:3.15",
script="echo 'This is iter {
{inputs.parameters.iter}}' && "
"echo $(({
{inputs.parameters.iter}}+1)) > /tmp/result.txt")
plus1.inputs.parameters = {
"iter": InputParameter()}
plus1.outputs.parameters = {
"iter": OutputParameter(
value_from_path="/tmp/result.txt")}
该模板是自增1的操作
hello = Step(name="hello", template=plus1, parameters={
"iter": steps.inputs.parameters["iter"]})
定义了一个step,该step使用了模板,自增1,输入是其他模板的输出
下面定义一个steps
steps = Steps(name="iter", inputs=Inputs(
parameters={
"iter": InputParameter(value=0),
"limit": InputParameter(value=3)}))
该steps的输入有两个,一个是iter,一个是limit
最后定义一个新的step
next = Step(name="next", template=steps,
parameters={
"iter": hello.outputs.parameters["iter"]},
when="%s < %s" % (
hello.outputs.parameters["iter"],
steps.inputs.parameters["limit"]))
翻译成普通的python程序:
def add1(input: int):
print(input)
return input + 1
def loops(input_loop: int, limit_loop: int):
while input_loop < limit_loop:
input_loop = add1(input_loop)
loops(input_loop=input_loop, limit_loop=limit_loop)
loops(input_loop=0, limit_loop=3)
我们对比来看,自增1操作即为add1函数
loops函数对应next
我们从上往下看:
- wf 里仅有一个 steps
- 这个 steps add 了两步,add 1 和 loops
- 此外设定了进入函数的条件,也就是while的条件
从小往上看是build的过程:
- 首先定义自增1的模板
- 定义steps,设定好条件
- 定义next模板,该模板接收自增1的输出,再调用steps
- 最后steps添加这两个模板,wf 提交任务
调用方法:
- 把自增1替换为我们要循环的模板即可,next 接收自增1的关键输出,steps设定好阈值。这就是一个for循环
reuse
下面我们看reuse
这一节首先将上一节的shelltemplate替换成了python template
import time
from dflow import InputParameter, Inputs, Step, Steps, Workflow
from dflow.python import OP, OPIO, OPIOSign, PythonOPTemplate
class Plus1(OP):
def __init__(self):
pass
@classmethod
def get_input_sign(cls):
return OPIOSign({
'iter': int
})
@classmethod
def get_output_sign(cls):
return OPIOSign({
'iter': int
})
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
return OPIO({
'iter': op_in['iter'] + 1
})
if __name__ == "__main__":
steps = Steps(name="iter", inputs=Inputs(
parameters={
"iter": InputParameter(value=0),
"limit": InputParameter(value=5)}))
plus1 = Step(name="plus1",
template=PythonOPTemplate(Plus1,
image="python:3.8"),
parameters={
"iter": steps.inputs.parameters["iter"]},
key="iter-%s" % steps.inputs.parameters["iter"])
steps.add(plus1)
next = Step(name="next", template=steps,
parameters={
"iter": plus1.outputs.parameters["iter"]},
when="%s < %s" % (
plus1.outputs.parameters["iter"],
steps.inputs.parameters["limit"]))
steps.add(next)
wf = Workflow("recurse", steps=steps)
wf.submit()
区别在于limit替换成了5
这样的话,自增1步进行了5次
此外,每一个自增1步分配了一个key
重点在后面
while wf.query_status() in ["Pending", "Running"]:
time.sleep(1)
assert(wf.query_status() == "Succeeded")
step0 = wf.query_step(key="iter-0")[0]
step1 = wf.query_step(key="iter-1")[0]
step1.modify_output_parameter("iter", 3)
wf = Workflow("recurse-resubmit", steps=steps)
wf.submit(reuse_step=[step0, step1])
这里取出了前两步,修改了第二步的输出(从2修改到了3)
重新提交后,原来5次的循环,重新使用之前的两次,由于修改了第二次的输出,所以在这次相当于从原来的第4步开始,运行两步后结束。
这一feature比较有价值的点在于,我们的输入输出文件都是在minio上存着的,只要我们不删容器(e.g. 重启docker),只要有他们的key,这些东西都是可以查询的,可以重用的。
conditional
首先定义了一个条件template
import random
import time
from dflow import (OutputArtifact, OutputParameter, Outputs, Step, Steps,
Workflow, if_expression)
from dflow.python import (OP, OPIO, Artifact, OPIOSign, PythonOPTemplate,
upload_packages)
if "__file__" in locals():
upload_packages.append(__file__)
class Random(OP):
@classmethod
def get_input_sign(cls):
return OPIOSign()
@classmethod
def get_output_sign(cls):
return OPIOSign({
"is_head": bool,
"msg1": str,
"msg2": str,
"foo": Artifact(str),
"bar": Artifact(str)
})
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
open("foo.txt", "w").write("head")
open("bar.txt", "w").write("tail")
if random.random() < 0.5:
is_head = True
else:
is_head = False
return OPIO({
"is_head": is_head,
"msg1": "head",
"msg2": "tail",
"foo": "foo.txt",
"bar": "bar.txt"
})
然后套上壳
steps = Steps("conditional-steps", outputs=Outputs(
parameters={
"msg": OutputParameter()},
artifacts={
"res": OutputArtifact()}))
random_step = Step(
name="random",
template=PythonOPTemplate(Random, image="python:3.8")
)
steps.add(random_step)
steps.outputs.parameters["msg"].value_from_expression = if_expression(
_if=random_step.outputs.parameters["is_head"],
_then=random_step.outputs.parameters["msg1"],
_else=random_step.outputs.parameters["msg2"])
steps.outputs.artifacts["res"].from_expression = if_expression(
_if=random_step.outputs.parameters["is_head"],
_then=random_step.outputs.artifacts["foo"],
_else=random_step.outputs.artifacts["bar"])
wf = Workflow(name="conditional", steps=steps)
wf.submit()
while wf.query_status() in ["Pending", "Running"]:
time.sleep(1)
assert(wf.query_status() == "Succeeded")
结构简单
边栏推荐
猜你喜欢
随机推荐
Evaluate: A detailed introduction to the introduction of huggingface evaluation indicator module
HCIA实验(07)
ArcEngine(三)通过MapControl控件实现放大缩小全图漫游
Gauva的ListenableFuture
vim 折叠函数
《剑指Offer》刷题之打印从1到最大的n位数
How does Mysql query two data tables for the same fields in two tables at the same time
Fortify白盒神器20.1.1下载及安装(非百度网盘)
redis键值出现 xacxedx00x05tx00&的解决方法
Karatsuba大数乘法的Verilog实现
并发之固定运行和交替运行方案
mysql存生僻字奇怪问题,mysql为什么不能辨别mb4字符?
mysql系统变量与状态变量
Dapr 与 NestJs ,实战编写一个 Pub & Sub 装饰器
RViz报错: Error subscribing: Unable to load plugin for transport ‘compressed‘解决方法
HCIP练习(OSPF)
数仓4.0(一)
Shell运维开发基础(一)
@Async注解的坑,小心
sqlite date field plus one day








