当前位置:网站首页>dflow入门3——dpdispatcher插件
dflow入门3——dpdispatcher插件
2022-08-03 08:15:00 【frank_haha】
为了梳理学习dflow时遇到的知识点,我决定开这一个系列记录自己的学习过程。当然了,最好是去看 官方教程 和 文档
本来我是准备先把 reuse 和 recurse 、conditional 这种基础架构写完再写调用方式,但是这些涉及到的案例需要用的HPC,所以先写这一节了。
本文将先尝试阅读 test_dpdispatcher.py
,然后尝试修改之前的调用脚本。
原教程
我们首先来看原脚本中的OP
import time
from pathlib import Path
from typing import List
from dflow import Step, Workflow, download_artifact, upload_artifact
from dflow.plugins.dispatcher import DispatcherExecutor
from dflow.python import OP, OPIO, Artifact, OPIOSign, PythonOPTemplate
class Duplicate(OP):
def __init__(self):
pass
@classmethod
def get_input_sign(cls):
return OPIOSign({
'msg': str,
'num': int,
'foo': Artifact(Path),
'idir': Artifact(Path),
})
@classmethod
def get_output_sign(cls):
return OPIOSign({
'msg': List[str],
'bar': Artifact(Path),
'odir': Artifact(Path),
})
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
op_out = OPIO({
"msg": [op_in["msg"] * op_in["num"]],
"bar": Path("output.txt"),
"odir": Path("todir"),
})
content = open(op_in['foo'], "r").read()
open("output.txt", "w").write(content * op_in["num"])
Path(op_out['odir']).mkdir()
for ii in ['f1', 'f2']:
(op_out['odir']/ii).write_text(op_in['num']
* (op_in['idir']/ii).read_text())
return op_out
我再重复一下,调用dflow的OP可以看作下面函数的变种:
def aaa(in_msg: str, in_artifact: Artifact):
op_in = {
"in_msg": in_msg, "in_artifact": in_artifact}
op_out = execute(op_in)
out_msg, out_artifact = op_out["out_msg"], op_out["out_artifact"]
return out_msg, out_artifact
我们对照着两块儿代码。
该OP有4个输入,3个输出
前两句话是把foo文件内容读出,重复num次
后面是新建一个odir文件夹,该文件夹里新建两个文件,每个文件中写入输入文件夹对应文件的内容,并重复num次
最后至于msg,是重复num次,随后放到一个列表里输出。
总之就是对输入进行了一定的操作,得到了输出。
下面我们看外边的壳
def make_idir():
idir = Path("tidir")
idir.mkdir(exist_ok=True)
(idir / "f1").write_text("foo")
(idir / "f2").write_text("bar")
if __name__ == "__main__":
wf = Workflow(name="dispatcher")
with open("foo.txt", "w") as f:
f.write("Hi")
make_idir()
artifact0 = upload_artifact("foo.txt")
artifact1 = upload_artifact("tidir")
print(artifact0)
print(artifact1)
dispatcher_executor = DispatcherExecutor(
host="my-host", username="my-user", queue_name="V100")
step = Step(
name="step",
template=PythonOPTemplate(Duplicate, image="python:3.8"),
parameters={
"msg": "Hello", "num": 3},
artifacts={
"foo": artifact0, "idir": artifact1},
executor=dispatcher_executor
)
wf.add(step)
wf.submit()
while wf.query_status() in ["Pending", "Running"]:
time.sleep(1)
assert(wf.query_status() == "Succeeded")
step = wf.query_step(name="step")[0]
assert(step.phase == "Succeeded")
print(download_artifact(step.outputs.artifacts["bar"]))
print(download_artifact(step.outputs.artifacts["odir"]))
print(step.outputs.parameters["msg"].value,
type(step.outputs.parameters["msg"].value))
首先是新建了文件和文件夹,为OP输入做准备。与前文不同的是,壳里加了一个dispatcher_executor
其余的流程和前文一致,所以说,调用dpdispatcher就是配置好一个executor,然后夹到step里。
还有一个VASP计算的教程,后面会用到。就不再细写了。
下面我将尝试修改旧的dpdispatcher代码。
旧代码
import os
import shutil
import sys
import warnings
import networkx as nx
from pathlib import Path
from typing import Optional, Union
from typing import Union
import math
import dpdata
import numpy as np
import pandas as pd
from ase import Atoms, Atom
from ase.io import read, write
from ase.neighborlist import build_neighbor_list
from ase.visualize import view
from fullerenedatapraser.io.recursion import recursion_files
from fullerenedatapraser.io.xyz import simple_read_xyz_xtb
from fullerenedatapraser.molecular.fullerene import FullereneFamily
from tqdm import tqdm
from ase.optimize import BFGS, LBFGS
from ase.optimize.optimize import Optimizer
from ase.calculators.emt import EMT
from ase.io import Trajectory
from torchlightmolnet.caculator import torchCaculator
from torchlightmolnet.lightning.molnet import LightMolNet
from torchlightmolnet.dataset.atomref import refatoms_xTB, get_refatoms
from torchlightmolnet import Properties
import torch
import subprocess
from ase.units import Hartree, eV
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from dpdispatcher import Task, Submission, Machine, Resources
def lazy_rm(a_path):
if os.path.exists(a_path):
shutil.rmtree(a_path)
os.makedirs(exist_ok=True, name=a_path)
return os.path.abspath(a_path)
dump = lazy_rm(r'./dump')
dst = lazy_rm(r'./opt')
xyz_path = os.path.abspath(r'./xyz')
def run_a_batch(path_source: str, path_destination: str, cmd_list: list, out_list: list, sub_batch_size: int=None):
path_destination = os.path.abspath(path_destination)
mach = Machine(batch_type="Torque",
context_type="LocalContext",
remote_root=r'/home/mkliu/test_dpdispatcher/',
remote_profile=None,
local_root=path_destination)
resrc = Resources(number_node=1,
cpu_per_node=6,
gpu_per_node=0,
group_size=1,
queue_name='batch',
envs={
"OMP_STACKSIZE": "4G",
"OMP_NUM_THREADS": "3,1",
"OMP_MAX_ACTIVE_LEVELS": "1",
"MKL_NUM_THREADS": "3"
}
)
task_list = []
for item in os.listdir(path_source):
isomer_name = os.path.splitext(item)[0]
os.makedirs(os.path.join(path_destination, isomer_name), exist_ok=True)
path_item = os.path.join(path_source, item)
shutil.copy(path_item, os.path.join(path_destination, isomer_name, item))
cmd_list.append(item)
a_task = Task(command=' '.join(cmd_list),
task_work_path=f"{
isomer_name}/",
forward_files=[item],
backward_files=out_list
)
task_list.append(a_task)
del cmd_list[-1]
sub_batch_size = None
if sub_batch_size == None:
submission = Submission(work_base=path_destination,
machine=mach,
resources=resrc,
task_list=task_list,
forward_common_files=[],
backward_common_files=[]
)
try:
submission.run_submission()
except Exception as e:
with open('opt_error.log', 'a') as f:
f.write(str(e) + '\n' + str(task_list) + '\n')
else:
num_groups = math.ceil(len(task_list) / sub_batch_size)
for i in range(num_groups):
cursor = i * sub_batch_size
a_task_list = task_list[cursor:cursor + sub_batch_size]
submission = Submission(work_base=path_destination,
machine=mach,
resources=resrc,
task_list=a_task_list,
forward_common_files=[],
backward_common_files=[]
)
try:
submission.run_submission()
except Exception as e:
with open('opt_error.log', 'a') as f:
f.write(str(e) + '\n' + str(task_list) + '\n')
a=1
max_step = 100
# commandlist = [r'/home/mkliu/anaconda3/envs/env001/bin/xtb', '--opt tight']
commandlist = [r'/home/mkliu/anaconda3/envs/env001/bin/xtb', '--opt tight', '--uhf 1']
out_list = ['xtbopt.xyz', 'xtbopt.log']
run_a_batch(path_source=xyz_path,
path_destination=dst,
cmd_list=commandlist,
out_list=out_list)
os.chdir(dump)
for a_folder in os.listdir(dst):
if not os.path.isdir(os.path.join(dst,a_folder)):
continue
# print(a_folder)
shutil.copy(src=os.path.join(dst, a_folder, 'xtbopt.xyz'),
dst=f'{
a_folder}.xyz')
这是我最常用的dpdispatcher代码,他做了下面几件事:
- 我有一个
raw
的文件夹,里面存了好多xyz文件 - 我需要一个
opt
的文件夹,把xyz都优化一下,每个xyz的优化结果以文件夹的形式丢进opt
里 - 最后,我有一个
dump
的文件夹,每个优化过的xyz文件都会丢在dump
里
前面导入的依赖很多,并没有都用,我也不想删了。
下面我将逐步改造这段代码,完成同样的功能。
旧代码改造
完成一个xyz文件的上传和下载
我们在001目录里放了一个文件,下面将完成dpdispatcher的配置,将其传到HPC再下载。
我们首先制作一个template
import os
import time
from pathlib import Path
from typing import List
from dflow import Step, Workflow, download_artifact, upload_artifact
from dflow.plugins.dispatcher import DispatcherExecutor
from dflow.python import OP, OPIO, Artifact, OPIOSign, PythonOPTemplate
class UpDownLoad(OP):
def __init__(self):
pass
@classmethod
def get_input_sign(cls):
return OPIOSign({
'in_f': Artifact(Path),
})
@classmethod
def get_output_sign(cls):
return OPIOSign({
'out_f': Artifact(Path),
})
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
op_out = OPIO({
"out_f": Path(op_in['in_f']),
})
with open(op_in['in_f'], 'w') as f:
f.write('1111')
return op_out
该OP的作用是,接收一个文件,重写该文件并输出。
下面我们配置executor
if __name__ == "__main__":
wf = Workflow(name="wf001")
dispatcher_executor = DispatcherExecutor(
image="python_diy_3:3.8",
machine_dict={
'remote_root': r'/home/mkliu/test_dpdispatcher/',
'batch_type': 'Torque',
'remote_profile': {
'hostname': "x",
'username': "x",
'password': 'x',
'port': 22
}
},
resources_dict={
'number_node': 1,
'cpu_per_node': 1,
'gpu_per_node': 0,
'group_size': 1,
'queue_name': 'batch',
'envs': {
"OMP_STACKSIZE": "4G",
"OMP_NUM_THREADS": "3,1",
"OMP_MAX_ACTIVE_LEVELS": "1",
"MKL_NUM_THREADS": "3"
},
}
)
首先需要配置镜像,这个镜像环境里需要有dpdispatcher
这个库,需要手动打进去,或者拉官方的库dptechnology/dpdispatcher
executor
里的镜像和下面template
里的镜像可以不一致,但最好都弄成同一个,里面配置做全
最后,我们套上壳
artifact0 = upload_artifact("./temp/C84_11.xyz")
step = Step(
name="dummy",
template=PythonOPTemplate(UpDownLoad, image="python_diy_3:3.8"),
artifacts={
"in_f": artifact0},
executor=dispatcher_executor
)
wf.add(step)
wf.submit()
while wf.query_status() in ["Pending", "Running"]:
time.sleep(1)
assert (wf.query_status() == "Succeeded")
step_end = wf.query_step(name="dummy")[0]
assert (step_end.phase == "Succeeded")
download_artifact(step_end.outputs.artifacts['out_f'])
需要注意的是,当前版本的dflow,workflow和step的命名不能有重复
比如,dummy和dummy001,会导致bug(该bug让我花了两三个小时。。。)
一点感悟:
之前在使用dpdispatcher的时候,从PC端向集群交任务并自动返回结果文件。
是 local 和 cloud 的一对一信息传递
现在使用dflow,是local新建一个container,所有任务在container里完成,然后container和 cloud 进行信息交互,local再从container里下载文件,container类似信号中继站一样。
言归正传,下面我将继续改造旧代码。
完成一个xyz文件的xTB优化,并传回
在执行优化任务之前,我们需要思考一个问题
我们远程调用bin文件进行优化任务,比如说:
/xx/xx/xtb --opt abc.xyz
优化结果会存在哪里?
正常情况下,我们登录节点,会进入一个文件夹,这个文件夹是一个干净的,只有一个输入文件的文件夹。
我们执行
/xx/xx/xtb --opt abc.xyz
优化结果会存在这个文件夹里
所以我们远程调用的时候,可以模仿上述过程,即,我们需要一个干净的文件夹,然后进入这个文件夹,然后执行
/xx/xx/xtb --opt abc.xyz
这个时候,优化结果就存在了这个文件夹里,我们把它download即可
如果直接使用
/xx/xx/xtb --opt 绝对路径/abc.xyz
优化结果就不知道会存在哪里
dflow 一个特点就是,不知道某个文件传在了哪儿,只能用op_in的artifact代替路径
考虑到上述问题,我作了如下的脚本
import os
import time
from pathlib import Path
from typing import List
import subprocess
import shutil
from dflow import Step, Workflow, download_artifact, upload_artifact
from dflow.plugins.dispatcher import DispatcherExecutor
from dflow.python import OP, OPIO, Artifact, OPIOSign, PythonOPTemplate
class simpleOPT(OP):
def __init__(self):
pass
@classmethod
def get_input_sign(cls):
return OPIOSign({
'in_dir': Artifact(Path),
'cmd_list': list
})
@classmethod
def get_output_sign(cls):
return OPIOSign({
'out_f': Artifact(Path),
})
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
# new_path = os.path.join(os.path.split(op_in['in_dir'])[0], 'xtbopt.xyz')
cwd_ = os.getcwd()
os.chdir(op_in['in_dir'])
input_file = os.listdir('./')[0]
cmd_list = op_in['cmd_list']
cmd_list.append(input_file)
res = subprocess.Popen(' '.join(cmd_list), shell=True)
res.wait()
# shutil.copy(src='xtbopt.xyz', dst='../xtbopt.xyz')
os.chdir(cwd_)
op_out = OPIO({
"out_f": Path(op_in['in_dir']/'xtbopt.xyz'),
})
return op_out
if __name__ == "__main__":
wf = Workflow(name="wf002")
dispatcher_executor = DispatcherExecutor(
image="python_diy_4:3.8",
machine_dict={
'remote_root': r'/home/mkliu/test_dpdispatcher/',
'batch_type': 'Torque',
'remote_profile': {
"hostname": "",
"username": "",
"password": "",
'port': 22
}
},
resources_dict={
'number_node': 1,
'cpu_per_node': 1,
'gpu_per_node': 0,
'group_size': 1,
'queue_name': 'batch',
'envs': {
"OMP_STACKSIZE": "4G",
"OMP_NUM_THREADS": "3,1",
"OMP_MAX_ACTIVE_LEVELS": "1",
"MKL_NUM_THREADS": "3"
},
}
)
artifact0 = upload_artifact("./simple_raw")
simple_cmd_list = ['/home/mkliu/anaconda3/envs/env001/bin/xtb', '--opt', 'tight']
step = Step(
name="dummy",
template=PythonOPTemplate(simpleOPT, image="python_diy_4:3.8"),
artifacts={
"in_dir": artifact0},
parameters={
'cmd_list': simple_cmd_list},
executor=dispatcher_executor
)
wf.add(step)
wf.submit()
while wf.query_status() in ["Pending", "Running"]:
time.sleep(1)
assert (wf.query_status() == "Succeeded")
step_end = wf.query_step(name="dummy")[0]
assert (step_end.phase == "Succeeded")
download_artifact(step_end.outputs.artifacts['out_f'])
过程是:
上传了一个文件夹
这个文件夹在云端的路径就是:op_in['in_dir']
我们不知道具体在哪里,但是可以直接拿它当路径用
我们首先进入这个文件夹,然后感知一下输入文件,再execute
此时的输出文件在op_in['in_dir']
下面,我们给template的op_out一个绝对的路径:Path(op_in['in_dir']/'xtbopt.xyz')
因为退出这个template以后就不知道在哪里了,所以我们直接给绝对路径
这样的话就可以直接download了
其中执行命令的版块有几种可以选
res = subprocess.Popen(' '.join(cmd_list), shell=True)
res.wait()
或者
subprocess.call(' '.join(cmd_list), shell=True)
或者
os.system(' '.join(cmd_list), shell=True)
有趣的是,文件下载到本地时,直接进了上传的目录内
这点让我有些意外,但观察远程任务后发现,我们的输出文件在云端就是存在了一个文件夹里
下载是整个一起下载的。
有没有一种方法可以让文件下载到本地外呢?
一个思路就是在任务执行后,把文件copy到目录外,然后download这个文件即可
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
cwd_ = os.getcwd()
os.chdir(op_in['in_dir'])
input_file = os.listdir('./')[0]
cmd_list = op_in['cmd_list']
cmd_list.append(input_file)
res = subprocess.Popen(' '.join(cmd_list), shell=True)
res.wait()
shutil.copy(src='xtbopt.xyz', dst='../xtbopt.xyz')
os.chdir(cwd_)
new_path = os.path.join(os.path.split(op_in['in_dir'])[0], 'xtbopt.xyz')
op_out = OPIO({
"out_f": Path(new_path),
})
return op_out
我们在跳出目录前把文件拷贝到上一层,然后得到上一层文件的绝对路径
此时文件在下载的时候就会进入目录的上一层
完成一批xyz文件的xTB优化,并传回
我建了两个文件夹,每个里面放一个input
然后把这两个文件夹的母文件夹传上,按照文件夹名称切片
(和前一篇文章差不多,不过是加了一个 dpdispatcher executor )
我对template做了简单的修改:
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
# new_path = os.path.join(os.path.split(op_in['in_dir'])[0], 'xtbopt.xyz')
cwd_ = os.getcwd()
os.chdir(op_in['in_dir'])
os.chdir(op_in['in_para'])
input_file = os.listdir('./')[0]
cmd_list = op_in['cmd_list']
cmd_list.append(input_file)
os.system(' '.join(cmd_list))
os.chdir(cwd_)
op_out = OPIO({
"out_f": Path(os.path.join(op_in['in_dir'], op_in['in_para'], 'xtbopt.xyz')),
})
return op_out
增加了一层目录
套壳的时候做了简单的修改,增加了slice
step = Step(
name="dummy",
template=PythonOPTemplate(simpleOPT, image="python_diy_4:3.8", slices=Slices(
"{
{item}}",
input_parameter=['in_para'],
output_artifact=['out_f']
)),
artifacts={
"in_dir": artifact0},
parameters={
'cmd_list': simple_cmd_list, 'in_para': para_list},
executor=dispatcher_executor,
with_param=argo_range(2),
key='get-e-{
{item}}'
)
注意,我们在使用 dpdispatcher 插件的时候,需要增加两条环境变量(如果使用slice功能的话)
resources_dict={
'number_node': 1,
'cpu_per_node': 3,
'gpu_per_node': 0,
'group_size': 5,
'queue_name': 'batch',
'envs': {
"OMP_STACKSIZE": "4G",
"OMP_NUM_THREADS": "3,1",
"OMP_MAX_ACTIVE_LEVELS": "1",
"MKL_NUM_THREADS": "3",
"DFLOW_WORKFLOW": "{
{workflow.name}}",
"DFLOW_POD": "{
{pod.name}}"
},
由于我们output的结果是一个相对路径,所以传回的时候,文件刚好落在输入文件同一目录里
"out_f": Path(os.path.join(op_in['in_dir'], op_in['in_para'], 'xtbopt.xyz')),
最后是这样的结果
旧代码完全改造
我们的需求如下:
- 我有一个
raw
的文件夹,里面存了好多xyz文件 - 我需要一个
opt
的文件夹,把xyz都优化一下,每个xyz的优化结果以文件夹的形式丢进opt
里 - 最后,我有一个
dump
的文件夹,每个优化过的xyz文件都会丢在dump
里
初始文件夹如下:
raw 里有3个文件
下面梳理一下思路:
- 把raw文件夹和其他两个空文件夹上传(以下在argo里进行)
- 第一个OP,给每一个任务新建一个文件夹,每个里面放一个xyz
- 第二个OP,并行执行优化任务,并把结果传回到container里
- 第三个OP,把
xtbopt.xyz
rename 为old_file_name.xyz
,并dump到新文件夹里 - 把3,4步的output从argo传回到本地
下面是技术总结:
在写第一个OP时,我发现并行的效率太低了,代码可读性也很低,因此最后放弃了slice功能。在经过几天的debug之后,我发现这个slice功能是这样运作的:
并行计算 5 个任务,需要在每个任务里把这 5 个 input copy 到 input 文件夹里,即便是对文件进行slice
对文件进行slice的效果在于,output 文件夹里只有单个文件。
因此 slice 功能不适合超大规模运算。
最终,我决定第一、三步放弃并行计算,第二步自己写了并行计算的脚本。如下:
第一个OP:
class simpleBuild(OP):
def __init__(self):
pass
@classmethod
def get_input_sign(cls):
return OPIOSign({
'src_dir': Artifact(Path),
})
@classmethod
def get_output_sign(cls):
return OPIOSign({
'dst_dir': Artifact(Path),
})
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
name = 'cooking'
cwd_ = os.getcwd()
dst = os.path.abspath(name)
os.makedirs(dst, exist_ok=True)
raw = os.path.abspath(op_in['src_dir'])
for a_file in os.listdir(raw):
os.chdir(dst)
a_file_name = os.path.splitext(a_file)[0]
os.makedirs(a_file_name, exist_ok=True)
shutil.copy(src=os.path.join(raw, a_file), dst=os.path.join(a_file_name, a_file))
os.chdir(cwd_)
op_out = OPIO({
"dst_dir": Path(name),
})
return op_out
第二个OP
class batchExe(OP):
def __init__(self):
pass
@classmethod
def get_input_sign(cls):
return OPIOSign({
'in_dir': Artifact(Path),
'cmd_list': List[str],
'out_list': List[str],
'num_worker': int,
'cpu_per_worker': int,
'base_node': int,
'poll_interval': Optional[float]
})
@classmethod
def get_output_sign(cls):
return OPIOSign({
'cooking_outs': Artifact(type=List[Path]),
})
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
def _stack_a_job():
os.chdir(job_list[tracker])
input_f = os.listdir('./')[0]
cmd_list.append(input_f)
cmd_line = ' '.join(cmd_list)
start_node = op_in['base_node'] + op_in['cpu_per_worker'] * worker_id
end_node = op_in['base_node'] + op_in['cpu_per_worker'] * (worker_id + 1) - 1
# print('======================\n')
#
# print(tracker)
# print('\n')
# print(worker_id)
# print('\n')
# print(cmd_line)
# print('\n')
# print(os.getcwd())
# print('\n')
# print('======================\n')
#
# shutil.copy(src=input_f, dst='xtbopt.xyz')
# with open('xtbopt.log', 'w') as f:
# f.write('221122')
#
# res_list[worker_id] = subprocess.Popen('cd .', shell=True)
res_list[worker_id] = subprocess.Popen(f'taskset -c {
start_node}-{
end_node} {
cmd_line}', shell=True)
del cmd_list[-1]
os.chdir('./..')
cmd_list = op_in['cmd_list']
cwd_ = os.getcwd()
os.chdir(op_in['in_dir'])
tracker = 0
job_list = os.listdir('./')
job_num = len(job_list)
res_list = [None] * op_in['num_worker']
while tracker < job_num:
worker_id = tracker
if os.path.isdir(job_list[tracker]):
_stack_a_job()
tracker = tracker + 1
if worker_id == op_in['num_worker'] - 1:
break
while tracker < job_num:
for worker_id, a_proc in enumerate(res_list):
if tracker < job_num:
if a_proc.poll() == 0:
_stack_a_job()
tracker = tracker + 1
else:
break
if op_in['poll_interval']:
time.sleep(op_in['poll_interval'])
for a_proc in res_list:
a_proc.wait()
outs = []
for a_job in job_list:
for an_output in op_in['out_list']:
if os.path.exists(os.path.join(a_job, an_output)):
outs.append(Path(os.path.join(op_in['in_dir'], a_job, an_output)))
os.chdir(cwd_)
op_out = OPIO({
"cooking_outs": outs,
})
return op_out
第三个OP:
class xtbParser(OP):
def __init__(self):
pass
@classmethod
def get_input_sign(cls):
return OPIOSign({
'in_dir': Artifact(Path)
})
@classmethod
def get_output_sign(cls):
return OPIOSign({
'cooked_outs': Artifact(type=List[Path]),
'info': Artifact(Path)
})
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
cwd_ = os.getcwd()
name = 'cooked'
dst = os.path.abspath(name)
os.makedirs(dst, exist_ok=True)
name_list = []
e_list = []
outs = []
os.chdir(os.path.join(op_in['in_dir'], 'cooking'))
print(os.listdir('./'))
print(os.getcwd())
for a_job in os.listdir('./'):
old_path = os.path.join(a_job, 'xtbopt.xyz')
if os.path.exists(old_path):
name_list.append(a_job)
with open(old_path, 'r') as f:
f.readline()
e_line = f.readline()
e_list.append(float(e_line.split()[1]))
new_path = os.path.join(dst, a_job + '.xyz')
shutil.copy(src=old_path, dst=new_path)
outs.append(Path(new_path))
os.chdir(cwd_)
info = pd.DataFrame({
'name': name_list, 'e': e_list})
info = info.sort_values(by='e')
info.index = sorted(info.index)
info.to_pickle('info.pickle')
op_out = OPIO({
"cooked_outs": outs,
'info': Path('info.pickle')
})
return op_out
这篇文章写了好几天,深刻领略了云编程的艰难,总结一下经验:
- 每一个OP进行单独测试,减少debug时间。云编程最痛苦的莫过于debug周期太长了,而且有时候根本没有bug的相关信息。我的经验是,每一个OP进行单独测试,通过测试的OP就进行剥离,这样减少了debug的时间,第二点是,上云测试之前,使用 dummy command 先确保一下command之外的代码是正确的。(比如,
cd .
代替xtb --opt tight
)然后换上真的command,此时如果再出问题,只能是command本身的问题了,仔细检查command即可。 - 减少没有意义的并行。前文说过原因了,此外,slice的输入输出容易出问题,需要谨慎对接。这里的建议是,在进行 DFT 这种贵的任务的时候,可以上一下slice,代码看着很fancy
- dflow的文件夹体系很乱,尽量避免密集的文件夹操作,如果有需要,请在 cwd_的保护下进行,还可以把要进行的文件夹操作包装成一个函数。
cwd_ = os.getcwd()
# do something
os.chdir(cwd_)
边栏推荐
猜你喜欢
随机推荐
mysql的innodb存储引擎和myisam存储引擎的区别
36氪详情页AES
IDEA的database使用教程(使用mysql数据库)
BOM系列之localStorage
"Swordsman Offer" brush questions print from 1 to the largest n digits
工控机防勒索病毒浅析
Unity编辑器扩展批量修改图片名称
Mysql如何对两张表的相同字段,同时查询两张数据表
swiper分类菜单双层效果demo(整理)
C# 一周入门高级编程之《C#-继承》Day One
Guava-字符串工具
QT中线程调用GUI主线程控件的问题
前缀和(区间和,子矩阵的和)
内存模型之可见性
解决GANs训练中模式崩塌/训练崩溃的十五个方法
sqlite 日期字段加一天
FusionAccess软件架构、FusionAccess必须配置的四个组件、桌面发放流程、虚拟机组类型、桌面组类型
mysqlbinlog: unknown variable 'default-character-set=utf8'
IDEA2021.2安装与配置(持续更新)
ArcEngine (3) zoom in and zoom out through the MapControl control to achieve full-image roaming