From 2e17cf97bfb3c6b1666304db91ca99342d78be23 Mon Sep 17 00:00:00 2001 From: zhenyu <76582286+wangzyphysics@users.noreply.github.com> Date: Mon, 1 Apr 2024 22:38:59 +0800 Subject: [PATCH] Fix dp optim parallel (#208) Signed-off-by: zjgemi Co-authored-by: zjgemi Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- dpgen2/entrypoint/submit.py | 15 +- dpgen2/op/__init__.py | 7 +- dpgen2/op/collect_run_caly.py | 25 +- dpgen2/op/prep_caly_dp_optim.py | 163 ++++++++++ ...p_run_dp_optim.py => run_caly_dp_optim.py} | 68 ++-- dpgen2/superop/caly_evo_step.py | 98 +++--- dpgen2/superop/prep_run_calypso.py | 8 +- .../{train.json => dp_dpa1_train.json} | 0 examples/calypso/dpa2_train.json | 96 ++++++ examples/calypso/input.test.json | 303 ++++++++++++++++-- tests/mocked_ops.py | 123 +++++-- tests/op/test_collect_run_caly.py | 8 +- tests/op/test_prep_caly_dp_optim.py | 153 +++++++++ tests/op/test_prep_run_dp_optim.py | 181 ----------- tests/op/test_run_caly_dp_optim.py | 169 ++++++++++ tests/test_caly_evo_step.py | 88 ++--- tests/test_prep_run_caly.py | 37 ++- 17 files changed, 1114 insertions(+), 428 deletions(-) create mode 100644 dpgen2/op/prep_caly_dp_optim.py rename dpgen2/op/{prep_run_dp_optim.py => run_caly_dp_optim.py} (62%) rename examples/calypso/{train.json => dp_dpa1_train.json} (100%) create mode 100644 examples/calypso/dpa2_train.json create mode 100644 tests/op/test_prep_caly_dp_optim.py delete mode 100644 tests/op/test_prep_run_dp_optim.py create mode 100644 tests/op/test_run_caly_dp_optim.py diff --git a/dpgen2/entrypoint/submit.py b/dpgen2/entrypoint/submit.py index d8fd7e73..5d81b041 100644 --- a/dpgen2/entrypoint/submit.py +++ b/dpgen2/entrypoint/submit.py @@ -92,10 +92,11 @@ from dpgen2.op import ( CollectData, CollRunCaly, + PrepCalyDPOptim, PrepCalyInput, PrepDPTrain, PrepLmp, - PrepRunDPOptim, + RunCalyDPOptim, RunCalyModelDevi, RunDPTrain, RunLmp, @@ -172,7 +173,8 @@ def make_concurrent_learning_op( caly_evo_step_op = CalyEvoStep( "caly-evo-step", collect_run_caly=CollRunCaly, - prep_run_dp_optim=PrepRunDPOptim, + prep_dp_optim=PrepCalyDPOptim, + run_dp_optim=RunCalyDPOptim, prep_config=prep_explore_config, run_config=run_explore_config, upload_python_packages=upload_python_packages, @@ -797,8 +799,10 @@ def get_superop(key): return key.replace("prep-caly-input", "prep-run-explore") elif "collect-run-calypso-" in key: return re.sub("collect-run-calypso-[0-9]*-[0-9]*", "prep-run-explore", key) - elif "prep-run-dp-optim-" in key: - return re.sub("prep-run-dp-optim-[0-9]*-[0-9]*", "prep-run-explore", key) + elif "prep-dp-optim-" in key: + return re.sub("prep-dp-optim-[0-9]*-[0-9]*", "prep-run-explore", key) + elif "run-dp-optim-" in key: + return re.sub("run-dp-optim-[0-9]*-[0-9]*-[0-9]*", "prep-run-explore", key) elif "run-caly-model-devi" in key: return key.replace("run-caly-model-devi", "prep-run-explore") return None @@ -843,7 +847,8 @@ def get_resubmit_keys( "modify-train-script", "prep-caly-input", "collect-run-calypso", - "prep-run-dp-optim", + "prep-dp-optim", + "run-dp-optim", "run-caly-model-devi", "prep-run-explore", "prep-lmp", diff --git a/dpgen2/op/__init__.py b/dpgen2/op/__init__.py index a871f9d8..573b8a07 100644 --- a/dpgen2/op/__init__.py +++ b/dpgen2/op/__init__.py @@ -4,6 +4,9 @@ from .collect_run_caly import ( CollRunCaly, ) +from .prep_caly_dp_optim import ( + PrepCalyDPOptim, +) from .prep_caly_input import ( PrepCalyInput, ) @@ -13,8 +16,8 @@ from .prep_lmp import ( PrepLmp, ) -from .prep_run_dp_optim import ( - PrepRunDPOptim, +from .run_caly_dp_optim import ( + RunCalyDPOptim, ) from .run_caly_model_devi import ( RunCalyModelDevi, diff --git a/dpgen2/op/collect_run_caly.py b/dpgen2/op/collect_run_caly.py index b0b82e14..80332c5c 100644 --- a/dpgen2/op/collect_run_caly.py +++ b/dpgen2/op/collect_run_caly.py @@ -67,7 +67,7 @@ def get_input_sign(cls): type=Path, optional=True ), # dir named results for evo "opt_results_dir": Artifact( - type=Path, optional=True + type=List[Path], optional=True ), # dir contains POSCAR* CONTCAR* OUTCAR* "qhull_input": Artifact(type=Path, optional=True), # for vsc } @@ -141,11 +141,16 @@ def execute( results = ( ip["results"].resolve() if ip["results"] is not None else ip["results"] ) - opt_results_dir = ( - ip["opt_results_dir"].resolve() - if ip["opt_results_dir"] is not None - else ip["opt_results_dir"] - ) + # opt_results_dir = ( + # ip["opt_results_dir"].resolve() + # if ip["opt_results_dir"] is not None + # else ip["opt_results_dir"] + # ) + opt_results_dir = [] + if ip["opt_results_dir"] is not None: + for temp in ip["opt_results_dir"]: + opt_results_dir.append(Path(temp).resolve()) + qhull_input = ( ip["qhull_input"].resolve() if ip["qhull_input"] is not None @@ -227,11 +232,13 @@ def normalize_config(data={}): def prep_last_calypso_file(step, results, opt_results_dir, qhull_input, vsc): - if step is not None and results is not None or opt_results_dir is not None: + if step is not None and results is not None and opt_results_dir is not None: Path(step.name).symlink_to(step) Path(results.name).symlink_to(results) - for file_name in opt_results_dir.iterdir(): - Path(file_name.name).symlink_to(file_name) + assert isinstance(opt_results_dir, list), "opt_results_dir should be a list." + for opt_results_name in opt_results_dir: + for file_name in opt_results_name.iterdir(): + Path(file_name.name).symlink_to(file_name) if vsc and qhull_input is not None: Path(qhull_input.name).symlink_to(qhull_input) diff --git a/dpgen2/op/prep_caly_dp_optim.py b/dpgen2/op/prep_caly_dp_optim.py new file mode 100644 index 00000000..c982456e --- /dev/null +++ b/dpgen2/op/prep_caly_dp_optim.py @@ -0,0 +1,163 @@ +import json +import logging +import pickle +import shutil +from pathlib import ( + Path, +) +from typing import ( + List, + Tuple, +) + +from dflow.python import ( + OP, + OPIO, + Artifact, + BigParameter, + OPIOSign, + Parameter, + TransientError, +) + +from dpgen2.constants import ( + calypso_check_opt_file, + calypso_opt_dir_name, + calypso_run_opt_file, + model_name_pattern, +) +from dpgen2.exploration.task import ( + ExplorationTaskGroup, +) +from dpgen2.utils import ( + BinaryFileInput, + set_directory, +) +from dpgen2.utils.run_command import ( + run_command, +) + + +class PrepCalyDPOptim(OP): + r"""Prepare the working directories and input file according to slices information + for structure optimization with DP. + + `POSCAR_*`, `frozen_model.pb` or `model.ckpt.pt`, `calypso_run_opt.py` + and `calypso_check_opt.py` will be copied or symlink to each optimization directory + from `ip["work_path"]`, according to the group size of ip["template_slice_config"]. + The POSCAR_* will be splited into group_size parts and the name of each path will be returned + in a `task_names` list and `task_dirs` list. + """ + + @classmethod + def get_input_sign(cls): + return OPIOSign( + { + "task_name": Parameter(str), # calypso_task.idx + "finished": Parameter(str), + "template_slice_config": Parameter(dict), + "poscar_dir": Artifact( + Path + ), # from run_calypso first, then from collect_run_caly + "models_dir": Artifact(Path), # + "caly_run_opt_file": Artifact(Path), # from prep_caly_input + "caly_check_opt_file": Artifact(Path), # from prep_caly_input + } + ) + + @classmethod + def get_output_sign(cls): + return OPIOSign( + { + "task_names": Parameter(List[str]), + "task_dirs": Artifact(List[Path]), + "caly_run_opt_file": Artifact(Path), # from prep_caly_input + "caly_check_opt_file": Artifact(Path), # from prep_caly_input + } + ) + + @OP.exec_sign_check + def execute( + self, + ip: OPIO, + ) -> OPIO: + r"""Execute the OP. + + Parameters + ---------- + ip : dict + Input dict with components: + - `task_name` : (`str`) + - `finished` : (`str`) + - `template_slice_config` : (`dict`) + - `poscar_dir` : (`Path`) + - `models_dir` : (`Path`) + - `caly_run_opt_file` : (`Path`) + - `caly_check_opt_file` : (`Path`) + + Returns + ------- + op : dict + Output dict with components: + + - `task_names`: (`List[str]`) + - `task_dirs`: (`Artifact(List[Path])`) + - `caly_run_opt_file` : (`Path`) + - `caly_check_opt_file` : (`Path`) + + """ + group_size = ip["template_slice_config"]["group_size"] + + finished = ip["finished"] + + work_dir = Path(ip["task_name"]) + poscar_dir = ip["poscar_dir"] + models_dir = ip["models_dir"] + _caly_run_opt_file = ip["caly_run_opt_file"] + _caly_check_opt_file = ip["caly_check_opt_file"] + caly_run_opt_file = _caly_run_opt_file.resolve() + caly_check_opt_file = _caly_check_opt_file.resolve() + poscar_list = [poscar.resolve() for poscar in poscar_dir.rglob("POSCAR_*")] + poscar_list = sorted(poscar_list, key=lambda x: int(x.name.strip("POSCAR_"))) + model_name = "frozen_model.pb" + model_list = [model.resolve() for model in models_dir.rglob(model_name)] + if len(model_list) == 0: + model_name = "model.ckpt.pt" + model_list = [model.resolve() for model in models_dir.rglob(model_name)] + model_list = sorted(model_list, key=lambda x: str(x).split(".")[1]) + model_file = model_list[0] + + with set_directory(work_dir): + Path(caly_run_opt_file.name).symlink_to(caly_run_opt_file) + Path(caly_check_opt_file.name).symlink_to(caly_check_opt_file) + if finished == "false": + grouped_poscar_list = [ + poscar_list[i : i + group_size] + for i in range(0, len(poscar_list), group_size) + ] + + task_dirs = [] + for idx, _poscar_list in enumerate(grouped_poscar_list): + opt_path = Path(f"opt_path_{idx}") + task_dirs.append(work_dir / opt_path) + with set_directory(opt_path): + for poscar in _poscar_list: + Path(poscar.name).symlink_to(poscar) + Path(model_name).symlink_to(model_file) + Path(caly_run_opt_file.name).symlink_to(caly_run_opt_file) + Path(caly_check_opt_file.name).symlink_to(caly_check_opt_file) + task_names = [str(task_dir) for task_dir in task_dirs] + else: + temp_dir = work_dir / "opt_path" + temp_dir.mkdir(parents=True, exist_ok=True) + task_dirs = [temp_dir] + task_names = [str(task_dir) for task_dir in task_dirs] + + return OPIO( + { + "task_names": task_names, + "task_dirs": task_dirs, + "caly_run_opt_file": work_dir / caly_run_opt_file.name, + "caly_check_opt_file": work_dir / caly_check_opt_file.name, + } + ) diff --git a/dpgen2/op/prep_run_dp_optim.py b/dpgen2/op/run_caly_dp_optim.py similarity index 62% rename from dpgen2/op/prep_run_dp_optim.py rename to dpgen2/op/run_caly_dp_optim.py index 512cccc6..7b5c5086 100644 --- a/dpgen2/op/prep_run_dp_optim.py +++ b/dpgen2/op/run_caly_dp_optim.py @@ -21,8 +21,8 @@ ) from dpgen2.constants import ( - calypso_opt_dir_name, - model_name_pattern, + calypso_check_opt_file, + calypso_run_opt_file, ) from dpgen2.exploration.task import ( ExplorationTaskGroup, @@ -36,14 +36,11 @@ ) -class PrepRunDPOptim(OP): - r"""Prepare the working directories and input file for structure optimization with DP. - - `POSCAR_*`, `model.000.pb`, `calypso_run_opt.py` and `calypso_check_opt.py` will be copied - or symlink to each optimization directory from `ip["work_path"]`, according to the - popsize `ip["caly_input"]["PopSize"]`. - The paths of these optimization directory will be returned as `op["optim_paths"]`. +class RunCalyDPOptim(OP): + r"""Perform structure optimization with DP in `ip["work_path"]`. + The `optim_results_dir` and `traj_results` will be returned as `op["optim_results_dir"]` + and `op["traj_results"]`. """ @classmethod @@ -54,12 +51,7 @@ def get_input_sign(cls): "task_name": Parameter(str), # calypso_task.idx "finished": Parameter(str), "cnt_num": Parameter(int), - "poscar_dir": Artifact( - Path - ), # from run_calypso first, then from collect_run_caly - "models_dir": Artifact(Path), # - "caly_run_opt_file": Artifact(Path), # from prep_caly_input - "caly_check_opt_file": Artifact(Path), # from prep_caly_input + "task_dir": Artifact(Path), # ready to run structure optimization } ) @@ -70,8 +62,6 @@ def get_output_sign(cls): "task_name": Parameter(str), "optim_results_dir": Artifact(Path), "traj_results": Artifact(Path), - "caly_run_opt_file": Artifact(Path), - "caly_check_opt_file": Artifact(Path), } ) @@ -90,10 +80,7 @@ def execute( - `task_name` : (`str`) - `finished` : (`str`) - `cnt_num` : (`int`) - - `poscar_dir` : (`Path`) - - `models_dir` : (`Path`) - - `caly_run_opt_file` : (`Path`) - - `caly_check_opt_file` : (`Path`) + - `task_dir` : (`Path`) Returns ------- @@ -103,41 +90,28 @@ def execute( - `task_name`: (`str`) - `optim_results_dir`: (`List[str]`) - `traj_results`: (`Artifact(List[Path])`) - - `caly_run_opt_file` : (`Path`) - - `caly_check_opt_file` : (`Path`) """ finished = ip["finished"] cnt_num = ip["cnt_num"] - work_dir = Path(ip["task_name"]) - poscar_dir = ip["poscar_dir"] - models_dir = ip["models_dir"] - _caly_run_opt_file = ip["caly_run_opt_file"] - _caly_check_opt_file = ip["caly_check_opt_file"] - caly_run_opt_file = _caly_run_opt_file.resolve() - caly_check_opt_file = _caly_check_opt_file.resolve() - poscar_list = [poscar.resolve() for poscar in poscar_dir.rglob("POSCAR_*")] - - model_name = "frozen_model.pb" - model_list = [model.resolve() for model in models_dir.rglob(model_name)] - if len(model_list) == 0: - model_name = "model.ckpt.pt" - model_list = [model.resolve() for model in models_dir.rglob(model_name)] - - model_list = sorted(model_list, key=lambda x: str(x).split(".")[1]) - model_file = model_list[0] + task_path = ip["task_dir"] + if task_path is not None: + input_files = [ii.resolve() for ii in Path(task_path).iterdir()] + else: + input_files = [] config = ip["config"] if ip["config"] is not None else {} command = config.get( - "run_opt_command", f"python -u calypso_run_opt.py {model_name}" + f"run_opt_command", "python -u calypso_run_opt.py {model_name}" ) + work_dir = Path(ip["task_name"]) + with set_directory(work_dir): - for idx, poscar in enumerate(poscar_list): - Path(poscar.name).symlink_to(poscar) - Path(model_name).symlink_to(model_file) - Path(caly_run_opt_file.name).symlink_to(caly_run_opt_file) - Path(caly_check_opt_file.name).symlink_to(caly_check_opt_file) + # link input files + for ii in input_files: + iname = ii.name + Path(iname).symlink_to(ii) if finished == "false": ret, out, err = run_command(command, shell=True) @@ -188,7 +162,5 @@ def execute( "task_name": str(work_dir), "optim_results_dir": work_dir / optim_results_dir, "traj_results": work_dir / traj_results_dir, - "caly_run_opt_file": work_dir / caly_run_opt_file.name, - "caly_check_opt_file": work_dir / caly_check_opt_file.name, } ) diff --git a/dpgen2/superop/caly_evo_step.py b/dpgen2/superop/caly_evo_step.py index c323dc08..547cd1ea 100644 --- a/dpgen2/superop/caly_evo_step.py +++ b/dpgen2/superop/caly_evo_step.py @@ -52,7 +52,8 @@ def __init__( self, name: str, collect_run_caly: Type[OP], - prep_run_dp_optim: Type[OP], + prep_dp_optim: Type[OP], + run_dp_optim: Type[OP], prep_config: dict = normalize_step_dict({}), run_config: dict = normalize_step_dict({}), upload_python_packages: Optional[List[os.PathLike]] = None, @@ -75,9 +76,7 @@ def __init__( "opt_results_dir": InputArtifact(optional=True), "qhull_input": InputArtifact(optional=True), } - self._output_parameters = { - # "task_name": OutputParameter(), - } + self._output_parameters = {} self._output_artifacts = { "traj_results": OutputArtifact(), } @@ -94,28 +93,14 @@ def __init__( ), ) - self.collect_run_calypso_keys = [ - "%s--collect-run-calypso-%s-%s" % (self.inputs.parameters["block_id"], i, j) - for i in range(5) - for j in range(10) - ] - self.prep_run_dp_optim_keys = [ - "%s--prep-run-dp-optim-%s-%s" % (self.inputs.parameters["block_id"], i, j) - for i in range(5) - for j in range(10) - ] - self._keys = self.collect_run_calypso_keys + self.prep_run_dp_optim_keys self.step_keys = {} - for ii in self._keys: - self.step_keys[ii] = "--".join( - ["%s" % self.inputs.parameters["block_id"], ii] - ) - + self._keys = [] self = _caly_evo_step( self, self.step_keys, collect_run_caly, - prep_run_dp_optim, + prep_dp_optim, + run_dp_optim, prep_config=prep_config, run_config=run_config, upload_python_packages=upload_python_packages, @@ -146,7 +131,8 @@ def _caly_evo_step( caly_evo_step_steps, step_keys, collect_run_calypso_op: Type[OP], - prep_run_dp_optim_op: Type[OP], + prep_dp_optim_op: Type[OP], + run_dp_optim_op: Type[OP], prep_config: dict = normalize_step_dict({}), run_config: dict = normalize_step_dict({}), upload_python_packages: Optional[List[os.PathLike]] = None, @@ -155,7 +141,6 @@ def _caly_evo_step( run_config = deepcopy(run_config) prep_template_config = prep_config.pop("template_config") run_template_config = run_config.pop("template_config") - # caly_config = run_template_config.pop("caly_config") prep_executor = init_executor(prep_config.pop("executor")) run_executor = init_executor(run_config.pop("executor")) template_slice_config = run_config.pop("template_slice_config", {}) @@ -191,19 +176,18 @@ def _caly_evo_step( ) caly_evo_step_steps.add(collect_run_calypso) - # prep_run_dp_optim - prep_run_dp_optim = Step( - "prep-run-dp-optim", + # prep_dp_optim + prep_dp_optim = Step( + "prep-dp-optim", template=PythonOPTemplate( - prep_run_dp_optim_op, + prep_dp_optim_op, python_packages=upload_python_packages, **run_template_config, ), parameters={ - "config": caly_evo_step_steps.inputs.parameters["expl_config"], "task_name": caly_evo_step_steps.inputs.parameters["task_name"], "finished": collect_run_calypso.outputs.parameters["finished"], - "cnt_num": caly_evo_step_steps.inputs.parameters["cnt_num"], + "template_slice_config": template_slice_config, }, artifacts={ "poscar_dir": collect_run_calypso.outputs.artifacts["poscar_dir"], @@ -215,17 +199,49 @@ def _caly_evo_step( "caly_check_opt_file" ], }, - key="%s--prep-run-dp-optim-%s-%s" + key="%s--prep-dp-optim-%s-%s" % ( caly_evo_step_steps.inputs.parameters["block_id"], caly_evo_step_steps.inputs.parameters["iter_num"], caly_evo_step_steps.inputs.parameters["cnt_num"], ), executor=prep_executor, # cpu is enough to run calypso.x, default step config is c2m4 - # when="%s == false" % (collect_run_calypso.outputs.parameters["finished"]), **run_config, ) - caly_evo_step_steps.add(prep_run_dp_optim) + caly_evo_step_steps.add(prep_dp_optim) + + # run_dp_optim + run_dp_optim = Step( + "run-dp-optim", + template=PythonOPTemplate( + run_dp_optim_op, + slices=Slices( + input_parameter=["task_name"], + input_artifact=["task_dir"], + output_artifact=["traj_results", "optim_results_dir"], + ), + python_packages=upload_python_packages, + **run_template_config, + ), + parameters={ + "config": caly_evo_step_steps.inputs.parameters["expl_config"], + "task_name": prep_dp_optim.outputs.parameters["task_names"], + "finished": collect_run_calypso.outputs.parameters["finished"], + "cnt_num": caly_evo_step_steps.inputs.parameters["cnt_num"], + }, + artifacts={ + "task_dir": prep_dp_optim.outputs.artifacts["task_dirs"], + }, + key="%s--run-dp-optim-%s-%s-{{item}}" + % ( + caly_evo_step_steps.inputs.parameters["block_id"], + caly_evo_step_steps.inputs.parameters["iter_num"], + caly_evo_step_steps.inputs.parameters["cnt_num"], + ), + executor=run_executor, + **run_config, + ) + caly_evo_step_steps.add(run_dp_optim) name = "calypso-block" next_step = Step( @@ -236,8 +252,6 @@ def _caly_evo_step( "cnt_num": caly_evo_step_steps.inputs.parameters["cnt_num"] + 1, "block_id": caly_evo_step_steps.inputs.parameters["block_id"], "expl_config": caly_evo_step_steps.inputs.parameters["expl_config"], - # "task_name": caly_evo_step_steps.inputs.parameters["task_name"] + "", - # "task_name": prep_run_dp_optim.outputs.parameters["task_name"], "task_name": collect_run_calypso.outputs.parameters["task_name"], }, artifacts={ @@ -248,11 +262,9 @@ def _caly_evo_step( "results": collect_run_calypso.outputs.artifacts["results"], "step": collect_run_calypso.outputs.artifacts["step"], "qhull_input": collect_run_calypso.outputs.artifacts["qhull_input"], - "opt_results_dir": prep_run_dp_optim.outputs.artifacts["optim_results_dir"], - "caly_run_opt_file": prep_run_dp_optim.outputs.artifacts[ - "caly_run_opt_file" - ], - "caly_check_opt_file": prep_run_dp_optim.outputs.artifacts[ + "opt_results_dir": run_dp_optim.outputs.artifacts["optim_results_dir"], + "caly_run_opt_file": prep_dp_optim.outputs.artifacts["caly_run_opt_file"], + "caly_check_opt_file": prep_dp_optim.outputs.artifacts[ "caly_check_opt_file" ], }, @@ -260,18 +272,12 @@ def _caly_evo_step( ) caly_evo_step_steps.add(next_step) - # caly_evo_step_steps.outputs.parameters[ - # "task_name" - # ].value_from_parameter = collect_run_calypso.outputs.parameters["task_name"] - caly_evo_step_steps.outputs.artifacts[ "traj_results" ].from_expression = if_expression( _if=(collect_run_calypso.outputs.parameters["finished"] == "false"), _then=(next_step.outputs.artifacts["traj_results"]), - # _then=(prep_run_dp_optim.outputs.artifacts["traj_results_dir"]), - _else=(prep_run_dp_optim.outputs.artifacts["traj_results"]), - # _else=(collect_run_calypso.outputs.artifacts["fake_traj_results_dir"]), + _else=(run_dp_optim.outputs.artifacts["traj_results"]), ) return caly_evo_step_steps diff --git a/dpgen2/superop/prep_run_calypso.py b/dpgen2/superop/prep_run_calypso.py index 938efad8..ae3bb2bb 100644 --- a/dpgen2/superop/prep_run_calypso.py +++ b/dpgen2/superop/prep_run_calypso.py @@ -180,7 +180,6 @@ def _prep_run_caly( name="caly-evo-step", template=caly_evo_step_op, slices=Slices( - "int('{{item}}')", input_parameter=[ "task_name", ], @@ -193,7 +192,6 @@ def _prep_run_caly( "caly_check_opt_file", ], output_artifact=["traj_results"], - **template_slice_config, ), parameters={ "block_id": prep_run_caly_steps.inputs.parameters["block_id"], @@ -216,10 +214,6 @@ def _prep_run_caly( "qhull_input": temp_value, }, key=step_keys["caly-evo-step-{{item}}"], - with_sequence=argo_sequence( - argo_len(prep_caly_input.outputs.parameters["task_names"]), - format=calypso_index_pattern, - ), executor=prep_executor, **prep_config, ) @@ -242,7 +236,7 @@ def _prep_run_caly( "models": prep_run_caly_steps.inputs.artifacts["models"], }, key=step_keys["run-caly-model-devi"], - executor=prep_executor, + executor=run_executor, **prep_config, ) prep_run_caly_steps.add(run_caly_model_devi) diff --git a/examples/calypso/train.json b/examples/calypso/dp_dpa1_train.json similarity index 100% rename from examples/calypso/train.json rename to examples/calypso/dp_dpa1_train.json diff --git a/examples/calypso/dpa2_train.json b/examples/calypso/dpa2_train.json new file mode 100644 index 00000000..e6019597 --- /dev/null +++ b/examples/calypso/dpa2_train.json @@ -0,0 +1,96 @@ +{ + "_comment": "that's all", + "model": { + "type_map": [ + "Mg", + "Al" + ], + "descriptor": { + "type": "dpa2", + "tebd_dim": 8, + "repinit_rcut": 9.0, + "repinit_rcut_smth": 8.0, + "repinit_nsel": 120, + "repformer_rcut": 4.0, + "repformer_rcut_smth": 3.5, + "repformer_nsel": 40, + "repinit_neuron": [ + 25, + 50, + 100 + ], + "repinit_axis_neuron": 12, + "repinit_activation": "tanh", + "repformer_nlayers": 12, + "repformer_g1_dim": 128, + "repformer_g2_dim": 32, + "repformer_attn2_hidden": 32, + "repformer_attn2_nhead": 4, + "repformer_attn1_hidden": 128, + "repformer_attn1_nhead": 4, + "repformer_axis_dim": 4, + "repformer_update_h2": false, + "repformer_update_g1_has_conv": true, + "repformer_update_g1_has_grrg": true, + "repformer_update_g1_has_drrd": true, + "repformer_update_g1_has_attn": true, + "repformer_update_g2_has_g1g1": true, + "repformer_update_g2_has_attn": true, + "repformer_attn2_has_gate": true, + "repformer_add_type_ebd_to_seq": false + }, + "fitting_net": { + "neuron": [ + 240, + 240, + 240 + ], + "resnet_dt": true, + "seed": 1, + "_comment": " that's all" + }, + "_comment": " that's all" + }, + "learning_rate": { + "type": "exp", + "decay_steps": 5000, + "start_lr": 0.0002, + "stop_lr": 3.51e-08, + "_comment": "that's all" + }, + "loss": { + "type": "ener", + "start_pref_e": 0.02, + "limit_pref_e": 1, + "start_pref_f": 1000, + "limit_pref_f": 1, + "start_pref_v": 0, + "limit_pref_v": 0, + "_comment": " that's all" + }, + "training": { + "stat_file": "./dpa2", + "training_data": { + "systems": [ + "/personal/workplace/DP/dpgen2/Mg12Al8/deepmd" + ], + "batch_size": 1, + "_comment": "that's all" + }, + "_validation_data": { + "systems": [ + "/personal/workplace/DP/dpgen2/Mg12Al8/deepmd" + ], + "batch_size": 1, + "_comment": "that's all" + }, + "numb_steps": 20, + "warmup_steps": 0, + "gradient_max_norm": 5.0, + "seed": 10, + "disp_file": "lcurve.out", + "disp_freq": 100, + "save_freq": 200, + "_comment": "that's all" + } +} diff --git a/examples/calypso/input.test.json b/examples/calypso/input.test.json index 12bdfabe..6c745f02 100644 --- a/examples/calypso/input.test.json +++ b/examples/calypso/input.test.json @@ -1,8 +1,8 @@ { "bohrium_config": { - "username": "your@email", - "password": "your_passwd", - "project_id": 00000, + "username": "x@x.cn", + "password": "xxx", + "project_id": 111111, "_host": "https://workflow.dp.tech/", "_k8s_api_server": "https://workflows.deepmodeling.com", "_repo_key": "oss-bohrium", @@ -10,7 +10,7 @@ }, "default_step_config": { "template_config": { - "image": "registry.dp.tech/dptech/prod-11265/dpgen2-calypso:v3", + "image": "registry.dp.tech/dptech/prod-11265/deepmdv3-dpgen2-calypso:v0.2", "_comment": "all" }, "executor": { @@ -33,7 +33,7 @@ "step_configs": { "run_train_config": { "template_config": { - "image": "registry.dp.tech/dptech/prod-11265/dpgen2-calypso:v3", + "image": "registry.dp.tech/dptech/prod-11265/deepmdv3-dpgen2-calypso:v0.2", "_comment": "all" }, "executor": { @@ -47,8 +47,7 @@ "input_data": { "job_type": "container", "platform": "ali", - "_scass_type": "1 * NVIDIA V100_16g", - "scass_type": "c16_m64_cpu" + "scass_type": "1 * NVIDIA V100_32g" } } } @@ -61,9 +60,10 @@ }, "run_explore_config": { "template_config": { - "image": "registry.dp.tech/dptech/prod-11265/dpgen2-calypso:v3", + "image": "registry.dp.tech/dptech/prod-11265/deepmdv3-dpgen2-calypso:v0.2", "_comment": "all" }, + "_continue_on_success_ratio": 0.8, "executor": { "type": "dispatcher", "retry_on_submission_error": 10, @@ -75,13 +75,13 @@ "input_data": { "job_type": "container", "platform": "ali", - "scass_type": "c16_m64_cpu" + "scass_type": "1 * NVIDIA T4_16g" } } } }, - "_template_slice_config": { - "group_size": 50, + "template_slice_config": { + "group_size": 2, "pool_size": 1 }, "_comment": "all" @@ -116,31 +116,276 @@ }, "_comment": "all" }, - "upload_python_packages": [ - "/root/dpgen2/dpgen2" + "_upload_python_packages": [ + "/root/dpgen2/dpgen2", + "/opt/re_e_bias_new/deepmd-kit/deepmd", + "/root/dflow/src/dflow" ], "inputs": { + "mixed_type": true, + "do_finetune": true, "type_map": [ + "H", + "He", + "Li", + "Be", + "B", + "C", + "N", + "O", + "F", + "Ne", + "Na", "Mg", - "Al" + "Al", + "Si", + "P", + "S", + "Cl", + "Ar", + "K", + "Ca", + "Sc", + "Ti", + "V", + "Cr", + "Mn", + "Fe", + "Co", + "Ni", + "Cu", + "Zn", + "Ga", + "Ge", + "As", + "Se", + "Br", + "Kr", + "Rb", + "Sr", + "Y", + "Zr", + "Nb", + "Mo", + "Tc", + "Ru", + "Rh", + "Pd", + "Ag", + "Cd", + "In", + "Sn", + "Sb", + "Te", + "I", + "Xe", + "Cs", + "Ba", + "La", + "Ce", + "Pr", + "Nd", + "Pm", + "Sm", + "Eu", + "Gd", + "Tb", + "Dy", + "Ho", + "Er", + "Tm", + "Yb", + "Lu", + "Hf", + "Ta", + "W", + "Re", + "Os", + "Ir", + "Pt", + "Au", + "Hg", + "Tl", + "Pb", + "Bi", + "Po", + "At", + "Rn", + "Fr", + "Ra", + "Ac", + "Th", + "Pa", + "U", + "Np", + "Pu", + "Am", + "Cm", + "Bk", + "Cf", + "Es", + "Fm", + "Md", + "No", + "Lr", + "Rf", + "Db", + "Sg", + "Bh", + "Hs", + "Mt", + "Ds", + "Rg", + "Cn", + "Nh", + "Fl", + "Mc", + "Lv", + "Ts", + "Og" ], "mass_map": [ - 24, - 27 + 4.0, + 4.0026, + 6.94, + 9.0122, + 10.81, + 12.011, + 14.007, + 15.999, + 18.998, + 20.18, + 22.99, + 24.305, + 26.982, + 28.0855, + 30.974, + 32.06, + 35.45, + 39.95, + 39.098, + 40.078, + 44.956, + 47.867, + 50.942, + 51.996, + 54.938, + 55.845, + 58.933, + 58.693, + 63.546, + 65.38, + 69.723, + 72.63, + 74.922, + 78.971, + 79.904, + 83.798, + 85.468, + 87.62, + 88.906, + 91.224, + 92.906, + 95.95, + 97, + 101.07, + 102.91, + 106.42, + 107.87, + 112.41, + 114.82, + 118.71, + 121.76, + 127.6, + 126.9, + 131.29, + 132.91, + 137.33, + 138.91, + 140.12, + 140.91, + 144.24, + 145, + 150.36, + 151.96, + 157.25, + 158.93, + 162.5, + 164.93, + 167.26, + 168.93, + 173.05, + 174.97, + 178.49, + 180.95, + 183.84, + 186.21, + 190.23, + 192.22, + 195.08, + 196.97, + 200.59, + 204.38, + 207.2, + 208.98, + 209, + 210, + 222, + 223, + 226, + 227, + 232.04, + 231.04, + 238.03, + 237, + 244, + 243, + 247, + 247, + 251, + 252, + 257, + 258, + 259, + 262, + 267, + 268, + 269, + 270, + 269, + 277, + 281, + 282, + 285, + 286, + 290, + 290, + 293, + 294, + 294 ], "init_data_prefix": null, "init_data_sys": [ - "/personal/workplace/DP/dpgen2/Mg12Al8/deepmd" + "/personal/workplace/DP/dpgen2/Mg10Al54/deepmd" ], "_comment": "all" }, "train": { "type": "dp", + "numb_models": 4, + "init_models_paths": [ + "model-ckpt-new.pt", + "model-ckpt-new.pt", + "model-ckpt-new.pt", + "model-ckpt-new.pt" + ], "config": { + "impl":"pytorch", + "finetune_args":"--model-branch Domains_Alloy", "init_model_policy": "yes", - "init_model_old_ratio": 0.5, - "init_model_numb_steps": 20000, - "init_model_start_lr": 1e-4, + "init_model_old_ratio": 0.9, + "init_model_numb_steps": 20, + "init_model_start_lr": 2e-5, "init_model_start_pref_e": 0.25, "init_model_start_pref_f": 100, "_comment": "all" @@ -171,7 +416,7 @@ "config": { "_command": "lmp -var restart 0", "run_calypso_command": "calypso.x", - "run_opt_command": "python -u calypso_run_opt.py" + "run_opt_command": "python -u calypso_run_opt.py model.ckpt.pt" }, "convergence": { "type": "fixed-levels", @@ -207,8 +452,8 @@ "atomic_number": [ 3 ], - "pop_size": 5, - "max_step": 2, + "pop_size": 50, + "max_step": 3, "distance_of_ions": [ [ 1.0 @@ -229,8 +474,8 @@ 3, 37 ], - "pop_size": 5, - "max_step": 2, + "pop_size": 50, + "max_step": 3, "distance_of_ions": [ [ 1.0, @@ -255,8 +500,8 @@ "atomic_number": [ 3 ], - "pop_size": 5, - "max_step": 2, + "pop_size": 50, + "max_step": 3, "distance_of_ions": [ [ 1.0 @@ -277,8 +522,8 @@ 3, 37 ], - "pop_size": 5, - "max_step": 2, + "pop_size": 50, + "max_step": 3, "distance_of_ions": [ [ 1.0, diff --git a/tests/mocked_ops.py b/tests/mocked_ops.py index afab7420..89f570bf 100644 --- a/tests/mocked_ops.py +++ b/tests/mocked_ops.py @@ -30,6 +30,8 @@ # case of upload everything to argo, no context needed pass from dpgen2.constants import ( + calypso_check_opt_file, + calypso_run_opt_file, fp_task_pattern, lmp_conf_name, lmp_input_name, @@ -71,14 +73,17 @@ from dpgen2.op.collect_run_caly import ( CollRunCaly, ) +from dpgen2.op.prep_caly_dp_optim import ( + PrepCalyDPOptim, +) from dpgen2.op.prep_dp_train import ( PrepDPTrain, ) from dpgen2.op.prep_lmp import ( PrepExplorationTaskGroup, ) -from dpgen2.op.prep_run_dp_optim import ( - PrepRunDPOptim, +from dpgen2.op.run_caly_dp_optim import ( + RunCalyDPOptim, ) from dpgen2.op.run_caly_model_devi import ( RunCalyModelDevi, @@ -978,22 +983,30 @@ def execute( results = ( ip["results"].resolve() if ip["results"] is not None else ip["results"] ) - opt_results_dir = ( - ip["opt_results_dir"].resolve() - if ip["opt_results_dir"] is not None - else ip["opt_results_dir"] - ) + # opt_results_dir = ( + # ip["opt_results_dir"].resolve() + # if ip["opt_results_dir"] is not None + # else ip["opt_results_dir"] + # ) + opt_results_dir = [] + if ip["opt_results_dir"] is not None: + for temp in ip["opt_results_dir"]: + opt_results_dir.append(Path(temp).resolve()) os.chdir(work_dir) Path(input_file.name).symlink_to(input_file) if step is not None and results is not None and opt_results_dir is not None: step = ip["step"].resolve() results = ip["results"].resolve() - opt_results_dir = ip["opt_results_dir"].resolve() + + for opt_results_name in opt_results_dir: + for file_name in opt_results_name.iterdir(): + Path(file_name.name).symlink_to(file_name) + # opt_results_dir = ip["opt_results_dir"].resolve() Path(step.name).symlink_to(step) Path(results.name).symlink_to(results) - Path(opt_results_dir.name).symlink_to(opt_results_dir) + # Path(opt_results_dir.name).symlink_to(opt_results_dir) for i in range(5): Path(f"POSCAR_{str(i)}").write_text(f"POSCAR_{str(i)}") @@ -1027,9 +1040,6 @@ def execute( target = poscar_dir.joinpath(poscar.name) shutil.copyfile(poscar, target) finished = "true" if int(cnt_num) == int(max_step) else "false" - # print(f"-------------cnt_num: {cnt_num}, -------max_step---:{max_step}") - # print(f"-------------step_num: {step_num}") - # print(f"-------------finished: {finished}") os.chdir(cwd) ret_dict = { @@ -1044,7 +1054,7 @@ def execute( return OPIO(ret_dict) -class MockedPrepRunDPOptim(PrepRunDPOptim): +class MockedPrepCalyDPOptim(PrepCalyDPOptim): @OP.exec_sign_check def execute( self, @@ -1054,8 +1064,6 @@ def execute( finished = ip["finished"] work_dir = Path(ip["task_name"]) - cnt_num = ip["cnt_num"] - # print(f"--------=---------task_name: {work_dir}") work_dir.mkdir(parents=True, exist_ok=True) poscar_dir = ip["poscar_dir"] @@ -1067,23 +1075,84 @@ def execute( model_list = sorted(model_list, key=lambda x: str(x).split(".")[1]) model_file = model_list[0] + os.chdir(work_dir) + + group_size = 2 + + if finished == "false": + grouped_poscar_list = [ + poscar_list[i : i + group_size] + for i in range(0, len(poscar_list), group_size) + ] + task_dirs = [] + for idx, poscar_list in enumerate(grouped_poscar_list): + opt_path = Path(f"opt_path_{idx}") + task_dirs.append(work_dir / opt_path) + + cwd = os.getcwd() + os.chdir(opt_path) + for poscar in poscar_list: + Path(poscar.name).symlink_to(poscar) + Path(model_file.name).symlink_to(model_file) + Path(caly_run_opt_file.name).symlink_to(caly_run_opt_file) + Path(caly_check_opt_file.name).symlink_to(caly_check_opt_file) + os.chdir(cwd) + + task_names = [str(task_dir) for task_dir in task_dirs] + else: + temp_dir = work_dir / "opt_path" + temp_dir.mkdir(parents=True, exist_ok=True) + task_dirs = [temp_dir] + task_names = [str(task_dir) for task_dir in task_dirs] + + os.chdir(cwd) + return OPIO( + { + "task_name": task_names, + "task_dirs": task_dirs, + } + ) + + +class MockedRunCalyDPOptim(RunCalyDPOptim): + @OP.exec_sign_check + def execute( + self, + ip: OPIO, + ) -> OPIO: + cwd = os.getcwd() + + finished = ip["finished"] + work_dir = Path(ip["task_name"]) + cnt_num = ip["cnt_num"] + work_dir.mkdir(parents=True, exist_ok=True) + config = ip["config"] if ip["config"] is not None else {} command = config.get("run_opt_command", "python -u calypso_run_opt.py") - os.chdir(work_dir) - - for idx, poscar in enumerate(poscar_list): - Path(poscar.name).symlink_to(poscar) - Path("frozen_model.pb").symlink_to(model_file) - Path(caly_run_opt_file.name).symlink_to(caly_run_opt_file) - Path(caly_check_opt_file.name).symlink_to(caly_check_opt_file) + task_path = ip["task_dir"] + if task_path is not None: + input_files = [ii.resolve() for ii in Path(task_path).iterdir()] + else: + pass - for i in range(1, 6): - Path().joinpath(f"CONTCAR_{str(i)}").write_text(f"CONTCAR_{str(i)}") - Path().joinpath(f"OUTCAR_{str(i)}").write_text(f"OUTCAR_{str(i)}") - Path().joinpath(f"{str(i)}.traj").write_text(f"{str(i)}.traj") + os.chdir(work_dir) if finished == "false": + for ii in input_files: + iname = ii.name + Path(iname).symlink_to(ii) + + poscar_list = sorted(Path().rglob("POSCAR_*")) + cnt = 0 + for poscar in poscar_list: + cnt += 1 + poscar_name = poscar.name + num = poscar_name.strip("POSCAR_") + Path(poscar_name.replace("POSCAR", "CONTCAR")).write_text("") + Path(poscar_name.replace("POSCAR", "OUTCAR")).write_text("") + Path(f"{num}.traj").write_text("") + optim_results_dir = Path("optim_results_dir") optim_results_dir.mkdir(parents=True, exist_ok=True) for poscar in Path().glob("POSCAR_*"): @@ -1114,8 +1183,6 @@ def execute( "task_name": str(work_dir), "optim_results_dir": work_dir / optim_results_dir, "traj_results": work_dir / traj_results_dir, - "caly_run_opt_file": work_dir / caly_run_opt_file.name, - "caly_check_opt_file": work_dir / caly_check_opt_file.name, } ) diff --git a/tests/op/test_collect_run_caly.py b/tests/op/test_collect_run_caly.py index aec481ec..22ec37db 100644 --- a/tests/op/test_collect_run_caly.py +++ b/tests/op/test_collect_run_caly.py @@ -102,7 +102,7 @@ def side_effect(*args, **kwargs): "input_file": self.input_file, "step": self.step_file, "results": self.results_dir, - "opt_results_dir": self.opt_results_dir, + "opt_results_dir": [self.opt_results_dir], } ) ) @@ -138,7 +138,7 @@ def side_effect(*args, **kwargs): "input_file": self.input_file, "step": self.step_file, "results": self.results_dir, - "opt_results_dir": self.opt_results_dir, + "opt_results_dir": [self.opt_results_dir], } ) ) @@ -169,7 +169,7 @@ def side_effect(*args, **kwargs): "input_file": self.input_file, "step": self.step_file, "results": self.results_dir, - "opt_results_dir": self.opt_results_dir, + "opt_results_dir": [self.opt_results_dir], } ) ) @@ -203,7 +203,7 @@ def side_effect(*args, **kwargs): "input_file": self.input_file, "step": self.step_file, "results": self.results_dir, - "opt_results_dir": self.opt_results_dir, + "opt_results_dir": [self.opt_results_dir], } ), ) diff --git a/tests/op/test_prep_caly_dp_optim.py b/tests/op/test_prep_caly_dp_optim.py new file mode 100644 index 00000000..8905e229 --- /dev/null +++ b/tests/op/test_prep_caly_dp_optim.py @@ -0,0 +1,153 @@ +import os +import shutil +import unittest +from pathlib import ( + Path, +) + +import numpy as np +from dflow.python import ( + OP, + OPIO, + Artifact, + OPIOSign, + TransientError, +) +from mock import ( + call, + mock, + patch, +) + +# isort: off +from .context import ( + dpgen2, +) +from dpgen2.constants import ( + calypso_task_pattern, + model_name_pattern, + calypso_run_opt_file, + calypso_check_opt_file, +) +from dpgen2.op import PrepCalyDPOptim +from dpgen2.utils import ( + BinaryFileInput, +) + +# isort: on + + +class TestPrepDPOptim(unittest.TestCase): + def setUp(self): + self.config_1 = {"run_calypso_command": "echo 1"} + self.config_2 = {"run_calypso_command": None} + + self.poscar_dir = Path("poscar_dir") + self.poscar_dir.mkdir(parents=True, exist_ok=True) + nposcar = 10 + for i in range(1, nposcar + 1): + self.poscar_dir.joinpath(f"POSCAR_{str(i)}").write_text(f"POSCAR_{str(i)}") + + self.models_dir = Path("models_dir") + self.models_dir.mkdir(parents=True, exist_ok=True) + for i in range(4): + model_path = self.models_dir.joinpath(f"task_name.{i}") + model_path.mkdir(parents=True, exist_ok=True) + model_path.joinpath("frozen_model.pb").write_text("pb") + + self.task_name = calypso_task_pattern % 0 + + self.other_files = Path("other_files") + self.other_files.mkdir(parents=True, exist_ok=True) + self.caly_run_opt_file = self.other_files.joinpath(calypso_run_opt_file) + self.caly_run_opt_file.write_text("run_opt_file") + self.caly_check_opt_file = self.other_files.joinpath(calypso_check_opt_file) + self.caly_check_opt_file.write_text("check_opt_file") + + self.template_slice_config = {"group_size": 3} + self.group_size = self.template_slice_config["group_size"] + + grouped_poscar_list = [i for i in range(0, nposcar, self.group_size)] + self.ngrouped = len(grouped_poscar_list) + self.ref_task_dirs = [] + for i in range(0, self.ngrouped): + self.ref_task_dirs += [self.task_name / Path(f"opt_path_{i}")] + self.ref_task_names = [str(task_dir) for task_dir in self.ref_task_dirs] + + def tearDown(self): + shutil.rmtree(self.other_files) + shutil.rmtree(self.task_name) + shutil.rmtree(self.poscar_dir) + + def test_success_00(self): + if Path(self.task_name).is_dir(): + shutil.rmtree(Path(self.task_name)) + + op = PrepCalyDPOptim() + out = op.execute( + OPIO( + { + "task_name": calypso_task_pattern % 0, + "finished": "false", + "template_slice_config": self.template_slice_config, + "poscar_dir": self.poscar_dir, + "models_dir": self.models_dir, + "caly_run_opt_file": self.caly_run_opt_file, + "caly_check_opt_file": self.caly_check_opt_file, + } + ) + ) + # check output + self.assertEqual(len(out["task_names"]), self.ngrouped) + self.assertEqual(len(out["task_dirs"]), self.ngrouped) + self.assertEqual(out["task_names"], self.ref_task_names) + + first_opt_path, last_opt_path = out["task_dirs"][0], out["task_dirs"][-1] + + ref_common_opt_path_file_check = [ + "frozen_model.pb", + "calypso_run_opt.py", + "calypso_check_opt.py", + ] + ref_first_opt_path_file_check = [ + f"POSCAR_{i}" for i in range(1, self.ngrouped + 1) + ] + first_opt_path_files = sorted(Path(first_opt_path).iterdir()) + for file_name in first_opt_path_files: + self.assertTrue( + file_name.name + in ref_first_opt_path_file_check + ref_common_opt_path_file_check + ) + + ref_last_opt_path_file_check = ["POSCAR_10"] + last_opt_path_files = sorted(last_opt_path.iterdir()) + for file_name in last_opt_path_files: + self.assertTrue( + file_name.name + in ref_last_opt_path_file_check + ref_common_opt_path_file_check + ) + + def test_01_success(self): + if Path(self.task_name).is_dir(): + shutil.rmtree(Path(self.task_name)) + + op = PrepCalyDPOptim() + out = op.execute( + OPIO( + { + "task_name": calypso_task_pattern % 0, + "finished": "true", + "template_slice_config": self.template_slice_config, + "poscar_dir": self.poscar_dir, + "models_dir": self.models_dir, + "caly_run_opt_file": self.caly_run_opt_file, + "caly_check_opt_file": self.caly_check_opt_file, + } + ) + ) + # check output + self.assertEqual(len(out["task_names"]), 1) + self.assertEqual(len(out["task_dirs"]), 1) + self.assertEqual( + out["task_names"], [str(Path(self.task_name) / Path("opt_path"))] + ) diff --git a/tests/op/test_prep_run_dp_optim.py b/tests/op/test_prep_run_dp_optim.py deleted file mode 100644 index 60f5667b..00000000 --- a/tests/op/test_prep_run_dp_optim.py +++ /dev/null @@ -1,181 +0,0 @@ -import os -import shutil -import unittest -from pathlib import ( - Path, -) - -import numpy as np -from dflow.python import ( - OP, - OPIO, - Artifact, - OPIOSign, - TransientError, -) -from mock import ( - call, - mock, - patch, -) - -# isort: off -from .context import ( - dpgen2, -) -from dpgen2.constants import ( - calypso_task_pattern, - model_name_pattern, - calypso_run_opt_file, - calypso_check_opt_file, -) -from dpgen2.op.prep_run_dp_optim import PrepRunDPOptim -from dpgen2.utils import ( - BinaryFileInput, -) - -# isort: on - - -class TestPrepRunDPOptim(unittest.TestCase): - def setUp(self): - self.config_1 = {"run_calypso_command": "echo 1"} - self.config_2 = {"run_calypso_command": None} - - self.poscar_dir = Path("poscar_dir") - self.poscar_dir.mkdir(parents=True, exist_ok=True) - for i in range(1, 6): - self.poscar_dir.joinpath(f"POSCAR_{str(i)}").write_text(f"POSCAR_{str(i)}") - - self.models_dir = Path("models_dir") - self.models_dir.mkdir(parents=True, exist_ok=True) - for i in range(4): - path_name = self.models_dir.joinpath(f"task.{i}") - path_name.mkdir(exist_ok=True, parents=True) - path_name.joinpath("frozen_model.pb").write_text("frozen_model.pb") - - self.task_name = calypso_task_pattern % 0 - - self.other_files = Path("other_files") - self.other_files.mkdir(parents=True, exist_ok=True) - self.caly_run_opt_file = self.other_files.joinpath(calypso_run_opt_file) - self.caly_run_opt_file.write_text("run_opt_file") - self.caly_check_opt_file = self.other_files.joinpath(calypso_check_opt_file) - self.caly_check_opt_file.write_text("check_opt_file") - - self.ref_optim_results_dir = Path("optim_results_dir") - self.ref_optim_results_dir.mkdir(parents=True, exist_ok=True) - for temp in self.poscar_dir.iterdir(): - self.ref_optim_results_dir.joinpath(temp.name).symlink_to(temp.resolve()) - self.ref_optim_results_dir.joinpath( - str(temp.name).replace("POSCAR", "CONTCAR") - ).write_text("foo") - self.ref_optim_results_dir.joinpath( - str(temp.name).replace("POSCAR", "OUTCAR") - ).write_text("foo") - - self.ref_traj_results_dir = Path("traj_results_dir") - self.ref_traj_results_dir.mkdir(parents=True, exist_ok=True) - for i in range(1, len(list(self.poscar_dir.iterdir())) + 1): - self.ref_optim_results_dir.joinpath(f"{str(i+1)}.traj").write_text("foo") - - def tearDown(self): - shutil.rmtree(self.other_files) - shutil.rmtree(self.ref_optim_results_dir) - shutil.rmtree(self.ref_traj_results_dir) - shutil.rmtree(Path(self.task_name)) - shutil.rmtree(self.poscar_dir) - shutil.rmtree(self.models_dir) - - @patch("dpgen2.op.prep_run_dp_optim.run_command") - def test_success_00(self, mocked_run): - if Path(self.task_name).is_dir(): - shutil.rmtree(Path(self.task_name)) - - def side_effect(*args, **kwargs): - for i in range(1, 6): - Path().joinpath(f"CONTCAR_{str(i)}").write_text(f"CONTCAR_{str(i)}") - Path().joinpath(f"OUTCAR_{str(i)}").write_text(f"OUTCAR_{str(i)}") - Path().joinpath(f"{str(i)}.traj").write_text(f"{str(i)}.traj") - return (0, "foo\n", "") - - mocked_run.side_effect = side_effect - op = PrepRunDPOptim() - out = op.execute( - OPIO( - { - "config": {"run_calypso_command": "echo 1"}, - "task_name": calypso_task_pattern % 0, - "finished": "false", - "cnt_num": 0, - "poscar_dir": self.poscar_dir, - "models_dir": self.models_dir, - "caly_run_opt_file": self.caly_run_opt_file, - "caly_check_opt_file": self.caly_check_opt_file, - } - ) - ) - # check output - self.assertEqual(out["task_name"], self.task_name) - - optim_results_dir = out["optim_results_dir"] - list_optim_results_dir = list(optim_results_dir.iterdir()) - counts_optim_results_dir = len(list_optim_results_dir) - counts_outcar_in_optim_results_dir = len( - list(optim_results_dir.rglob("OUTCAR_*")) - ) - - self.assertTrue(optim_results_dir, Path(self.task_name) / "optim_results_dir") - self.assertEqual(counts_optim_results_dir, 15) - self.assertEqual(counts_outcar_in_optim_results_dir, 5) - self.assertTrue( - Path(self.task_name) / "optim_results_dir" / "CONTCAR_4" - in list_optim_results_dir - ) - - traj_results_dir = out["traj_results"] - list_traj_results_dir = list(traj_results_dir.glob("*.traj")) - counts_traj = len(list_traj_results_dir) - self.assertEqual(traj_results_dir, Path(self.task_name) / "traj_results") - self.assertEqual(counts_traj, 5) - self.assertTrue( - Path(self.task_name) / "traj_results" / "0.3.traj" in list_traj_results_dir - ) - - self.assertEqual( - Path(self.task_name) / calypso_run_opt_file, out["caly_run_opt_file"] - ) - self.assertEqual( - Path(self.task_name) / calypso_check_opt_file, out["caly_check_opt_file"] - ) - - @patch("dpgen2.op.prep_run_dp_optim.run_command") - def test_error_01(self, mocked_run): - if Path(self.task_name).is_dir(): - shutil.rmtree(Path(self.task_name)) - - def side_effect(*args, **kwargs): - for i in range(1, 6): - Path().joinpath(f"CONTCAR_{str(i)}").write_text(f"CONTCAR_{str(i)}") - Path().joinpath(f"OUTCAR_{str(i)}").write_text(f"OUTCAR_{str(i)}") - Path().joinpath(f"{str(i)}.traj").write_text(f"{str(i)}.traj") - return (1, "foo\n", "") - - mocked_run.side_effect = side_effect - op = PrepRunDPOptim() - self.assertRaises( - TransientError, - op.execute, - OPIO( - { - "config": {"run_calypso_command": "echo 1"}, - "task_name": calypso_task_pattern % 0, - "finished": "false", - "cnt_num": 0, - "poscar_dir": self.poscar_dir, - "models_dir": self.models_dir, - "caly_run_opt_file": self.caly_run_opt_file, - "caly_check_opt_file": self.caly_check_opt_file, - } - ), - ) diff --git a/tests/op/test_run_caly_dp_optim.py b/tests/op/test_run_caly_dp_optim.py new file mode 100644 index 00000000..ee6afd43 --- /dev/null +++ b/tests/op/test_run_caly_dp_optim.py @@ -0,0 +1,169 @@ +import os +import shutil +import unittest +from pathlib import ( + Path, +) + +import numpy as np +from dflow.python import ( + OP, + OPIO, + Artifact, + OPIOSign, + TransientError, +) +from mock import ( + call, + mock, + patch, +) + +# isort: off +from .context import ( + dpgen2, +) +from dpgen2.constants import ( + calypso_task_pattern, + model_name_pattern, + calypso_run_opt_file, + calypso_check_opt_file, +) +from dpgen2.op import RunCalyDPOptim +from dpgen2.utils import ( + BinaryFileInput, +) + +# isort: on + + +class TestRunDPOptim(unittest.TestCase): + def setUp(self): + self.task_name = "task_dir" + self.task_dir = Path("pre_run_optim") + + self.task_dir.mkdir(parents=True, exist_ok=True) + self.task_dir.joinpath("frozen_model.pb").write_text("1") + self.task_dir.joinpath("calypso_run_opt.py").write_text("1") + self.task_dir.joinpath("calypso_check_opt.py").write_text("1") + for i in range(1, 11): + self.task_dir.joinpath(f"POSCAR_{i}").write_text("1") + + def tearDown(self): + shutil.rmtree(self.task_name) + shutil.rmtree(self.task_dir) + + @patch("dpgen2.op.run_caly_dp_optim.run_command") + def test_00_success(self, mocked_run): + def side_effect(*args, **kwargs): + for i in range(1, 11): + Path().joinpath(f"CONTCAR_{str(i)}").write_text(f"CONTCAR_{str(i)}") + Path().joinpath(f"OUTCAR_{str(i)}").write_text(f"OUTCAR_{str(i)}") + Path().joinpath(f"{str(i)}.traj").write_text(f"{str(i)}.traj") + return (0, "foo\n", "") + + mocked_run.side_effect = side_effect + op = RunCalyDPOptim() + out = op.execute( + OPIO( + { + "config": {"run_calypso_command": "echo 1"}, + "task_name": self.task_name, + "finished": "false", + "cnt_num": 0, + "task_dir": self.task_dir, + } + ) + ) + # check output + self.assertEqual(out["task_name"], self.task_name) + + optim_results_dir = out["optim_results_dir"] + list_optim_results_dir = list(optim_results_dir.iterdir()) + counts_optim_results_dir = len(list_optim_results_dir) + counts_outcar_in_optim_results_dir = len( + list(optim_results_dir.rglob("OUTCAR_*")) + ) + + self.assertTrue(optim_results_dir, Path(self.task_name) / "optim_results_dir") + self.assertEqual(counts_optim_results_dir, 30) + self.assertEqual(counts_outcar_in_optim_results_dir, 10) + self.assertTrue( + Path(self.task_name) / "optim_results_dir" / "CONTCAR_4" + in list_optim_results_dir + ) + + traj_results_dir = out["traj_results"] + list_traj_results_dir = list(traj_results_dir.glob("*.traj")) + counts_traj = len(list_traj_results_dir) + self.assertEqual(traj_results_dir, Path(self.task_name) / "traj_results") + self.assertEqual(counts_traj, 10) + self.assertTrue( + Path(self.task_name) / "traj_results" / "0.3.traj" in list_traj_results_dir + ) + + @patch("dpgen2.op.run_caly_dp_optim.run_command") + def test_01_error(self, mocked_run): + def side_effect(*args, **kwargs): + for i in range(1, 6): + Path().joinpath(f"CONTCAR_{str(i)}").write_text(f"CONTCAR_{str(i)}") + Path().joinpath(f"OUTCAR_{str(i)}").write_text(f"OUTCAR_{str(i)}") + Path().joinpath(f"{str(i)}.traj").write_text(f"{str(i)}.traj") + return (1, "foo\n", "") + + mocked_run.side_effect = side_effect + op = RunCalyDPOptim() + self.assertRaises( + TransientError, + op.execute, + OPIO( + { + "config": {"run_calypso_command": "echo 1"}, + "task_name": self.task_name, + "finished": "false", + "cnt_num": 0, + "task_dir": self.task_dir, + } + ), + ) + + def test_02_success(self): + op = RunCalyDPOptim() + out = op.execute( + OPIO( + { + "config": {"run_calypso_command": "echo 1"}, + "task_name": self.task_name, + "finished": "true", + "cnt_num": 0, + "task_dir": self.task_dir, + } + ) + ) + # check output + self.assertEqual(out["task_name"], self.task_name) + + optim_results_dir = out["optim_results_dir"] + list_optim_results_dir = list(optim_results_dir.iterdir()) + counts_optim_results_dir = len(list_optim_results_dir) + counts_outcar_in_optim_results_dir = len( + list(optim_results_dir.rglob("OUTCAR_*")) + ) + + self.assertTrue(optim_results_dir, Path(self.task_name) / "optim_results_dir") + self.assertEqual(counts_optim_results_dir, 0) + self.assertEqual(counts_outcar_in_optim_results_dir, 0) + self.assertTrue( + Path(self.task_name) / "optim_results_dir" / "CONTCAR_4" + not in list_optim_results_dir + ) + + traj_results_dir = out["traj_results"] + list_traj_results_dir = list(traj_results_dir.glob("*.traj")) + counts_traj = len(list_traj_results_dir) + self.assertEqual(traj_results_dir, Path(self.task_name) / "traj_results") + self.assertEqual(counts_traj, 0) + self.assertTrue( + Path(self.task_name) / "traj_results" / "0.3.traj" + not in list_traj_results_dir + ) diff --git a/tests/test_caly_evo_step.py b/tests/test_caly_evo_step.py index 06937ffc..25759973 100644 --- a/tests/test_caly_evo_step.py +++ b/tests/test_caly_evo_step.py @@ -60,7 +60,8 @@ ) from mocked_ops import ( MockedCollRunCaly, - MockedPrepRunDPOptim, + MockedPrepCalyDPOptim, + MockedRunCalyDPOptim, mocked_numb_models, ) @@ -80,15 +81,16 @@ ExplorationTask, ExplorationTaskGroup, ) +from dpgen2.op import ( + PrepCalyDPOptim, + RunCalyDPOptim, +) from dpgen2.op.collect_run_caly import ( CollRunCaly, ) from dpgen2.op.prep_caly_input import ( PrepCalyInput, ) -from dpgen2.op.prep_run_dp_optim import ( - PrepRunDPOptim, -) from dpgen2.superop.caly_evo_step import ( CalyEvoStep, ) @@ -98,7 +100,11 @@ { "template_config": { "image": default_image, - } + }, + "template_slice_config": { + "group_size": 2, + "pool_size": 1, + }, } ) @@ -115,12 +121,6 @@ def setUp(self) -> None: self.results_dir = None self.opt_results_dir = None self.cnt_num = 0 - # self.step_file = self.file_storage.joinpath("step_none") - # self.step_file.write_text("step_none") - # self.results_dir = self.file_storage.joinpath("results_none") - # self.results_dir.mkdir(parents=True, exist_ok=True) - # self.opt_results_dir = self.file_storage.joinpath("opt_results_dir_none") - # self.opt_results_dir.mkdir(parents=True, exist_ok=True) self.finished = str(False) def tearDown(self) -> None: @@ -159,20 +159,15 @@ def test_mocked_coll_run_caly_00(self): self.assertTrue(out["results"] == Path(self.task_name).joinpath("results")) -class TestMockedPrepRunDPOptim(unittest.TestCase): +class TestMockedRunDPOptim(unittest.TestCase): def setUp(self) -> None: self.config = {} self.task_name = "task_name" self.file_storage = Path("storge_files") self.file_storage.mkdir(parents=True, exist_ok=True) - self.poscar_dir = self.file_storage.joinpath("poscar_dir") - self.poscar_dir.mkdir(parents=True, exist_ok=True) for i in range(5): - self.poscar_dir.joinpath(f"POSCAR_{i}").write_text(f"POSCAR_{i}") - self.models_dir = self.file_storage.joinpath("models_dir") - self.models_dir.mkdir(parents=True, exist_ok=True) - for i in range(2): - self.models_dir.joinpath(f"model.{i}.pb").write_text(f"model.{i}.pb") + self.file_storage.joinpath(f"POSCAR_{i}").write_text(f"POSCAR_{i}") + self.file_storage.joinpath(f"frozen_model.pb").write_text(f"model.{i}.pb") self.caly_run_opt_file = self.file_storage.joinpath(calypso_run_opt_file) self.caly_run_opt_file.write_text("caly_run_opt_script") self.caly_check_opt_file = self.file_storage.joinpath(calypso_check_opt_file) @@ -182,8 +177,8 @@ def tearDown(self) -> None: shutil.rmtree(self.file_storage, ignore_errors=True) shutil.rmtree(Path(self.task_name), ignore_errors=True) - def test_mocked_prep_run_dp_optim(self): - op = MockedPrepRunDPOptim() + def test_mocked_run_dp_optim(self): + op = MockedRunCalyDPOptim() out = op.execute( OPIO( { @@ -191,10 +186,7 @@ def test_mocked_prep_run_dp_optim(self): "finished": "false", "cnt_num": 0, "task_name": self.task_name, - "poscar_dir": self.poscar_dir, - "models_dir": self.models_dir, - "caly_run_opt_file": self.caly_run_opt_file, - "caly_check_opt_file": self.caly_check_opt_file, + "task_dir": self.file_storage, } ) ) @@ -226,13 +218,6 @@ def test_mocked_prep_run_dp_optim(self): Path(self.task_name) / "traj_results" / "3.traj", list_traj_results_dir ) - self.assertEqual( - Path(self.task_name) / calypso_run_opt_file, out["caly_run_opt_file"] - ) - self.assertEqual( - Path(self.task_name) / calypso_check_opt_file, out["caly_check_opt_file"] - ) - # @unittest.skip("temporary pass") @unittest.skipIf(skip_ut_with_dflow, skip_ut_with_dflow_reason) @@ -246,7 +231,9 @@ def setUp(self): self.nmodels = mocked_numb_models self.model_list = [] for ii in range(self.nmodels): - model = self.work_dir.joinpath(f"model.{ii}.pb") + model_path = self.work_dir.joinpath(f"task.{ii}") + model_path.mkdir(exist_ok=True, parents=True) + model = model_path.joinpath(f"model.ckpt.pt") model.write_text(f"model {ii}") self.model_list.append(model) self.models = upload_artifact(self.model_list) @@ -262,19 +249,8 @@ def setUp(self): self.input_file_list = upload_artifact([input_file, input_file]) self.step = None - # step_file = self.work_dir.joinpath("step-none") - # step_file.write_text("None") - # self.step = upload_artifact(step_file) - self.results = None - # results_dir = self.work_dir.joinpath("results-none") - # results_dir.mkdir(parents=True, exist_ok=True) - # self.results = upload_artifact(results_dir) - self.opt_results_dir = None - # opt_results_dir = self.work_dir.joinpath("opt-results-none") - # opt_results_dir.mkdir(parents=True, exist_ok=True) - # self.opt_results_dir = upload_artifact(opt_results_dir) caly_run_opt_file = self.work_dir.joinpath("caly_run_opt.py") caly_run_opt_file.write_text("caly_run_opt") @@ -303,7 +279,8 @@ def test_00(self): steps = CalyEvoStep( "caly-evo-run", MockedCollRunCaly, - MockedPrepRunDPOptim, + PrepCalyDPOptim, + MockedRunCalyDPOptim, prep_config=default_config, run_config=default_config, upload_python_packages=upload_python_packages, @@ -333,26 +310,17 @@ def test_00(self): wf.add(caly_evo_step) wf.submit() - # while wf.query_status() in ["Pending", "Running"]: - # time.sleep(4) - self.assertEqual(wf.query_status(), "Succeeded") step = wf.query_step(name="caly-evo-step")[0] self.assertEqual(step.phase, "Succeeded") - # download_artifact(step.outputs.artifacts["model_devis"]) - # download_artifact(step.outputs.artifacts["trajs"]) - # download_artifact(step.outputs.artifacts["logs"]) - - # for ii in step.outputs.parameters["task_names"].value: - # self.check_run_lmp_output(ii, self.model_list) - # @unittest.skip("temp skit") - def test_01(self): + def test_caly_evo_step(self): steps = CalyEvoStep( "caly-evo-run", MockedCollRunCaly, - MockedPrepRunDPOptim, + PrepCalyDPOptim, + MockedRunCalyDPOptim, prep_config=default_config, run_config=default_config, upload_python_packages=upload_python_packages, @@ -361,7 +329,6 @@ def test_01(self): "caly-evo-step", template=steps, slices=Slices( - "int('{{item}}')", input_parameter=[ "task_name", ], @@ -405,10 +372,13 @@ def test_01(self): download_artifact(step.outputs.artifacts["traj_results"]) + cwd = Path().cwd() for idx, name in enumerate(self.task_name_list): cwd = Path().cwd() os.chdir(Path(name)) traj_list = list(Path().rglob("*.traj")) self.assertEqual(len(traj_list), 5 * self.max_step) - self.assertTrue(Path("traj_results").joinpath(f"{idx}.2.traj") in traj_list) + self.assertTrue( + Path("opt_path_0/traj_results").joinpath(f"{idx}.0.traj") in traj_list + ) os.chdir(cwd) diff --git a/tests/test_prep_run_caly.py b/tests/test_prep_run_caly.py index 542cb0e3..ee7d06a7 100644 --- a/tests/test_prep_run_caly.py +++ b/tests/test_prep_run_caly.py @@ -60,7 +60,7 @@ ) from mocked_ops import ( MockedCollRunCaly, - MockedPrepRunDPOptim, + MockedRunCalyDPOptim, MockedRunCalyModelDevi, mocked_numb_models, ) @@ -69,6 +69,9 @@ BaseExplorationTaskGroup, ExplorationTask, ) +from dpgen2.op.prep_caly_dp_optim import ( + PrepCalyDPOptim, +) from dpgen2.op.prep_caly_input import ( PrepCalyInput, ) @@ -83,11 +86,22 @@ ) from dpgen2.utils.step_config import normalize as normalize_step_dict -default_config = normalize_step_dict( +prep_default_config = normalize_step_dict( + { + "template_config": { + "image": default_image, + }, + } +) +run_default_config = normalize_step_dict( { "template_config": { "image": default_image, - } + }, + "template_slice_config": { + "group_size": 2, + "pool_size": 1, + }, } ) @@ -105,7 +119,7 @@ def make_task_group_list(njobs): # @unittest.skip("temporary pass") @unittest.skipIf(skip_ut_with_dflow, skip_ut_with_dflow_reason) -class TestCalyEvoStep(unittest.TestCase): +class TestPrepRunCaly(unittest.TestCase): def setUp(self): self.expl_config = {} self.work_dir = Path("storge_files") @@ -114,7 +128,9 @@ def setUp(self): self.nmodels = mocked_numb_models self.model_list = [] for ii in range(self.nmodels): - model = self.work_dir.joinpath(f"model.{ii}.pb") + model_path = self.work_dir.joinpath(f"task.{ii}") + model_path.mkdir(parents=True, exist_ok=True) + model = model_path.joinpath(f"frozen_model.pb") model.write_text(f"model {ii}") self.model_list.append(model) self.models = upload_artifact(self.model_list) @@ -133,9 +149,10 @@ def test(self): caly_evo_step_op = CalyEvoStep( "caly-evo-run", MockedCollRunCaly, - MockedPrepRunDPOptim, - prep_config=default_config, - run_config=default_config, + PrepCalyDPOptim, + MockedRunCalyDPOptim, + prep_config=prep_default_config, + run_config=run_default_config, upload_python_packages=upload_python_packages, ) prep_run_caly_op = PrepRunCaly( @@ -143,8 +160,8 @@ def test(self): PrepCalyInput, caly_evo_step_op, MockedRunCalyModelDevi, - prep_config=default_config, - run_config=default_config, + prep_config=prep_default_config, + run_config=run_default_config, upload_python_packages=upload_python_packages, ) prep_run_caly_step = Step(