From 19ee8d55c74a029071cbc235c48a6872ecbfb09c Mon Sep 17 00:00:00 2001 From: zjgemi Date: Fri, 25 Oct 2024 14:52:33 +0800 Subject: [PATCH] sort keys in showkey by virtue of dflow; fold resubmit steps by virtue of dflow Signed-off-by: zjgemi --- dpgen2/entrypoint/submit.py | 165 ++++++-------------------- pyproject.toml | 2 +- tests/entrypoint/test_submit.py | 199 +++++++++++++++----------------- 3 files changed, 129 insertions(+), 237 deletions(-) diff --git a/dpgen2/entrypoint/submit.py b/dpgen2/entrypoint/submit.py index a46b16dd..ad29f883 100644 --- a/dpgen2/entrypoint/submit.py +++ b/dpgen2/entrypoint/submit.py @@ -808,136 +808,39 @@ def print_list_steps( return "\n".join(ret) -def successful_step_keys(wf): - all_step_keys = [] - steps = wf.query_step() - # For reused steps whose startedAt are identical, sort them by key - steps.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key)) - for step in steps: - if step.key is not None and step.phase == "Succeeded": - all_step_keys.append(step.key) - return all_step_keys - - -def get_superop(key): - if "prep-train" in key: - return key.replace("prep-train", "prep-run-train") - elif "run-train-" in key: - return re.sub("run-train-[0-9]*", "prep-run-train", key) - elif "prep-lmp" in key: - return key.replace("prep-lmp", "prep-run-explore") - elif "run-lmp-" in key: - return re.sub("run-lmp-[0-9]*", "prep-run-explore", key) - elif "prep-fp" in key: - return key.replace("prep-fp", "prep-run-fp") - elif "run-fp-" in key: - return re.sub("run-fp-[0-9]*", "prep-run-fp", key) - elif "prep-caly-input" in key: - return key.replace("prep-caly-input", "prep-run-explore") - elif "collect-run-calypso-" in key: - return re.sub("collect-run-calypso-[0-9]*-[0-9]*", "prep-run-explore", key) - elif "prep-dp-optim-" in key: - return re.sub("prep-dp-optim-[0-9]*-[0-9]*", "prep-run-explore", key) - elif "run-dp-optim-" in key: - return re.sub("run-dp-optim-[0-9]*-[0-9]*-[0-9]*", "prep-run-explore", key) - elif "prep-caly-model-devi" in key: - return key.replace("prep-caly-model-devi", "prep-run-explore") - elif "run-caly-model-devi" in key: - return re.sub("run-caly-model-devi-[0-9]*", "prep-run-explore", key) - elif "caly-evo-step" in key: - return re.sub("caly-evo-step-[0-9]*", "prep-run-explore", key) - elif "diffcsp-gen-" in key: - return re.sub("diffcsp-gen-[0-9]*", "prep-run-explore", key) - elif "prep-relax" in key: - return re.sub("prep-relax", "prep-run-explore", key) - elif "run-relax-" in key: - return re.sub("run-relax-[0-9]*", "prep-run-explore", key) - return None - - -def fold_keys(all_step_keys): - folded_keys = {} - for key in all_step_keys: - is_superop = False - for superop in ["prep-run-train", "prep-run-explore", "prep-run-fp"]: - if superop in key: - if key not in folded_keys: - folded_keys[key] = [] - is_superop = True - break - if is_superop: - continue - superop = get_superop(key) - # if its super OP is succeeded, fold it into its super OP - if superop is not None and superop in all_step_keys: - if superop not in folded_keys: - folded_keys[superop] = [] - folded_keys[superop].append(key) - else: - folded_keys[key] = [key] - for k, v in folded_keys.items(): - if v == []: - folded_keys[k] = [k] - return folded_keys - - def get_resubmit_keys( wf, ): - all_step_keys = successful_step_keys(wf) - step_keys = [ + wf_info = wf.query() + all_steps = [step for step in wf_info.get_step(sort_by_generation=True) if step.key is not None] + super_keys = [ "prep-run-train", - "prep-train", - "run-train", - "prep-caly-input", - "prep-caly-model-devi", - "run-caly-model-devi", "prep-run-explore", - "prep-lmp", - "run-lmp", - "diffcsp-gen", - "prep-relax", - "run-relax", + "prep-run-fp" + ] + other_keys = [ "select-confs", - "prep-run-fp", - "prep-fp", - "run-fp", "collect-data", "scheduler", "id", ] - if ( - len( - matched_step_key( - all_step_keys, - [ - "collect-run-calypso", - "prep-dp-optim", - "run-dp-optim", - ], - ) - ) - > 0 - ): - # calypso default mode - step_keys += [ - "collect-run-calypso", - "prep-dp-optim", - "run-dp-optim", - ] - else: - # calypso merge mode - step_keys.append("caly-evo-step") - all_step_keys = matched_step_key( - all_step_keys, - step_keys, - ) - all_step_keys = sort_slice_ops( - all_step_keys, - ["run-train", "run-lmp", "run-fp", "diffcsp-gen", "run-relax"], - ) - folded_keys = fold_keys(all_step_keys) + folded_keys = {} + for step in all_steps: + if len(matched_step_key([step.key], super_keys)) > 0: + sub_steps = wf_info.get_step(parent_id=step.id, sort_by_generation=True) + sub_keys = [step.key for step in sub_steps if step.key is not None and step.phase == "Succeeded"] + sub_keys = sort_slice_ops( + sub_keys, + ["run-train", "run-lmp", "run-fp", "diffcsp-gen", "run-relax"], + ) + if step.phase == "Succeeded": + folded_keys[step.key] = sub_keys + else: + for key in sub_keys: + folded_keys[key] = [key] + elif len(matched_step_key([step.key], other_keys)) > 0: + folded_keys[step.key] = [step.key] return folded_keys @@ -955,7 +858,12 @@ def resubmit_concurrent_learning( old_wf = Workflow(id=wfid) folded_keys = get_resubmit_keys(old_wf) - all_step_keys = sum(folded_keys.values(), []) + all_step_keys = [] + super_keys = {} + for super_key, keys in folded_keys.items(): + all_step_keys += keys + for key in keys: + super_keys[key] = super_key if list_steps: prt_str = print_keys_in_nice_format( @@ -971,21 +879,16 @@ def resubmit_concurrent_learning( if fold: reused_folded_keys = {} for key in reused_keys: - superop = get_superop(key) - if superop is not None: - if superop not in reused_folded_keys: - reused_folded_keys[superop] = [] - reused_folded_keys[superop].append(key) - else: - reused_folded_keys[key] = [key] + super_key = super_keys[key] + if super_key not in reused_folded_keys: + reused_folded_keys[super_key] = [] + reused_folded_keys[super_key].append(key) for k, v in reused_folded_keys.items(): # reuse the super OP iif all steps within it are reused - if v != [k] and k in folded_keys and set(v) == set(folded_keys[k]): + if set(v) == set(folded_keys[k]): reused_folded_keys[k] = [k] reused_keys = sum(reused_folded_keys.values(), []) - reuse_step = old_wf.query_step(key=reused_keys) - # For reused steps whose startedAt are identical, sort them by key - reuse_step.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key)) + reuse_step = old_wf.query_step(key=reused_keys, sort_by_generation=True) wf = submit_concurrent_learning( wf_config, diff --git a/pyproject.toml b/pyproject.toml index 297ff8a7..4a3708ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ classifiers = [ dependencies = [ 'numpy', 'dpdata>=0.2.20', - 'pydflow>=1.8.95', + 'pydflow>=1.8.97', 'dargs>=0.3.1', 'scipy', 'lbg', diff --git a/tests/entrypoint/test_submit.py b/tests/entrypoint/test_submit.py index 6352df90..9f0c547a 100644 --- a/tests/entrypoint/test_submit.py +++ b/tests/entrypoint/test_submit.py @@ -19,7 +19,7 @@ from dpgen2.entrypoint.submit import ( copy_scheduler_plans, expand_idx, - fold_keys, + get_resubmit_keys, print_list_steps, submit_concurrent_learning, update_reuse_step_scheduler, @@ -959,109 +959,98 @@ def test(self): ) -def test_fold_keys_lmp(): - all_step_keys = [ - "init--scheduler", - "init--id", - "iter-000000--prep-run-train", - "iter-000000--prep-train", - "iter-000000--run-train-0000", - "iter-000000--run-train-0001", - "iter-000000--run-train-0002", - "iter-000000--run-train-0003", - "iter-000000--prep-run-explore", - "iter-000000--prep-lmp", - "iter-000000--run-lmp-000000", - "iter-000000--run-lmp-000001", - "iter-000000--select-confs", - "iter-000000--prep-run-fp", - "iter-000000--prep-fp", - "iter-000000--run-fp-000000", - "iter-000000--run-fp-000001", - "iter-000000--run-fp-000002", - ] - folded_keys = fold_keys(all_step_keys) - assert folded_keys == { - "init--scheduler": ["init--scheduler"], - "init--id": ["init--id"], - "iter-000000--prep-run-train": [ - "iter-000000--prep-train", - "iter-000000--run-train-0000", - "iter-000000--run-train-0001", - "iter-000000--run-train-0002", - "iter-000000--run-train-0003", - ], - "iter-000000--prep-run-explore": [ - "iter-000000--prep-lmp", - "iter-000000--run-lmp-000000", - "iter-000000--run-lmp-000001", - ], - "iter-000000--select-confs": ["iter-000000--select-confs"], - "iter-000000--prep-run-fp": [ - "iter-000000--prep-fp", - "iter-000000--run-fp-000000", - "iter-000000--run-fp-000001", - "iter-000000--run-fp-000002", - ], - } +class MockedArgoStep: + def __init__(self, key, phase): + self.key = key + self.id = key + self.phase = phase + + +steps = [ + MockedArgoStep(key='init--scheduler', phase="Succeeded"), + MockedArgoStep(key='init--id', phase="Succeeded"), + MockedArgoStep(key='iter-000000--loop', phase="Succeeded"), + MockedArgoStep(key='iter-000000--block', phase="Succeeded"), + MockedArgoStep(key='iter-000000--prep-run-train', phase="Succeeded"), + MockedArgoStep(key='iter-000000--prep-run-explore', phase="Succeeded"), + MockedArgoStep(key='iter-000000--select-confs', phase="Succeeded"), + MockedArgoStep(key='iter-000000--prep-run-fp', phase="Succeeded"), + MockedArgoStep(key='iter-000000--collect-data', phase="Succeeded"), + MockedArgoStep(key='iter-000000--scheduler', phase="Succeeded"), + MockedArgoStep(key='iter-000000--id', phase="Succeeded"), + MockedArgoStep(key='iter-000001--block', phase="Failed"), + MockedArgoStep(key='iter-000001--prep-run-train', phase="Succeeded"), + MockedArgoStep(key='iter-000001--prep-train', phase="Succeeded"), + MockedArgoStep(key='iter-000001--run-train-0000', phase="Succeeded"), + MockedArgoStep(key='iter-000001--run-train-0001', phase="Succeeded"), + MockedArgoStep(key='iter-000001--run-train-0002', phase="Succeeded"), + MockedArgoStep(key='iter-000001--run-train-0003', phase="Succeeded"), + MockedArgoStep(key='iter-000001--prep-run-explore', phase="Succeeded"), + MockedArgoStep(key='iter-000001--prep-lmp', phase="Succeeded"), + MockedArgoStep(key='iter-000001--run-lmp-000000', phase="Succeeded"), + MockedArgoStep(key='iter-000001--run-lmp-000001', phase="Succeeded"), + MockedArgoStep(key='iter-000001--run-lmp-000002', phase="Succeeded"), + MockedArgoStep(key='iter-000001--select-confs', phase="Succeeded"), + MockedArgoStep(key='iter-000001--prep-run-fp', phase="Failed"), + MockedArgoStep(key='iter-000001--prep-fp', phase="Succeeded"), + MockedArgoStep(key='iter-000001--run-fp-000000', phase="Succeeded"), + MockedArgoStep(key='iter-000001--run-fp-000001', phase="Failed"), +] + + +class MockedWorkflowInfo: + def get_step(self, parent_id=None, sort_by_generation=False): + if parent_id is None: + return steps + if parent_id == "iter-000000--prep-run-train": + return [steps[4]] + if parent_id == "iter-000000--prep-run-explore": + return [steps[5]] + if parent_id == "iter-000000--prep-run-fp": + return [steps[7]] + if parent_id == "iter-000001--prep-run-train": + return steps[13:18] + if parent_id == "iter-000001--prep-run-explore": + return steps[19:23] + if parent_id == "iter-000001--prep-run-fp": + return steps[25:28] + + +class MockedWorkflow: + def query(self): + return MockedWorkflowInfo() + + +expected_folded_keys = { + 'init--scheduler': ['init--scheduler'], + 'init--id': ['init--id'], + 'iter-000000--prep-run-train': ['iter-000000--prep-run-train'], + 'iter-000000--prep-run-explore': ['iter-000000--prep-run-explore'], + 'iter-000000--select-confs': ['iter-000000--select-confs'], + 'iter-000000--prep-run-fp': ['iter-000000--prep-run-fp'], + 'iter-000000--collect-data': ['iter-000000--collect-data'], + 'iter-000000--scheduler': ['iter-000000--scheduler'], + 'iter-000000--id': ['iter-000000--id'], + 'iter-000001--prep-run-train': [ + 'iter-000001--prep-train', + 'iter-000001--run-train-0000', + 'iter-000001--run-train-0001', + 'iter-000001--run-train-0002', + 'iter-000001--run-train-0003' + ], + 'iter-000001--prep-run-explore': [ + 'iter-000001--prep-lmp', + 'iter-000001--run-lmp-000000', + 'iter-000001--run-lmp-000001', + 'iter-000001--run-lmp-000002' + ], + 'iter-000001--select-confs': ['iter-000001--select-confs'], + 'iter-000001--prep-fp': ['iter-000001--prep-fp'], + 'iter-000001--run-fp-000000': ['iter-000001--run-fp-000000'], +} -def test_fold_keys_caly(): - all_step_keys = [ - "init--scheduler", - "init--id", - "iter-000000--prep-run-train", - "iter-000000--prep-train", - "iter-000000--run-train-0000", - "iter-000000--run-train-0001", - "iter-000000--run-train-0002", - "iter-000000--run-train-0003", - "iter-000000--prep-run-explore", - "iter-000000--prep-caly-input", - "iter-000000--prep-run-dp-optim-000000-0", - "iter-000000--prep-run-dp-optim-000000-1", - "iter-000000--prep-run-dp-optim-000001-0", - "iter-000000--prep-run-dp-optim-000001-1", - "iter-000000--collect-run-calypso-000000-0", - "iter-000000--collect-run-calypso-000000-1", - "iter-000000--collect-run-calypso-000001-0", - "iter-000000--collect-run-calypso-000001-1", - "iter-000000--run-caly-model-devi", - "iter-000000--select-confs", - "iter-000000--prep-run-fp", - "iter-000000--prep-fp", - "iter-000000--run-fp-000000", - "iter-000000--run-fp-000001", - "iter-000000--run-fp-000002", - ] - folded_keys = fold_keys(all_step_keys) - assert folded_keys == { - "init--scheduler": ["init--scheduler"], - "init--id": ["init--id"], - "iter-000000--prep-run-train": [ - "iter-000000--prep-train", - "iter-000000--run-train-0000", - "iter-000000--run-train-0001", - "iter-000000--run-train-0002", - "iter-000000--run-train-0003", - ], - "iter-000000--prep-run-explore": [ - "iter-000000--prep-caly-input", - "iter-000000--prep-run-dp-optim-000000-0", - "iter-000000--prep-run-dp-optim-000000-1", - "iter-000000--prep-run-dp-optim-000001-0", - "iter-000000--prep-run-dp-optim-000001-1", - "iter-000000--collect-run-calypso-000000-0", - "iter-000000--collect-run-calypso-000000-1", - "iter-000000--collect-run-calypso-000001-0", - "iter-000000--collect-run-calypso-000001-1", - "iter-000000--run-caly-model-devi", - ], - "iter-000000--select-confs": ["iter-000000--select-confs"], - "iter-000000--prep-run-fp": [ - "iter-000000--prep-fp", - "iter-000000--run-fp-000000", - "iter-000000--run-fp-000001", - "iter-000000--run-fp-000002", - ], - } +def test_get_resubmit_keys(): + wf = MockedWorkflow() + folded_keys = get_resubmit_keys(wf) + assert folded_keys == expected_folded_keys