diff --git a/src/aiida_sssp_workflow/cli/run.py b/src/aiida_sssp_workflow/cli/run.py index 2af74271..3bec47a9 100644 --- a/src/aiida_sssp_workflow/cli/run.py +++ b/src/aiida_sssp_workflow/cli/run.py @@ -4,16 +4,18 @@ Running verification workchain """ -import os +from typing import List, Tuple +from pathlib import Path import aiida import click from aiida import orm from aiida.cmdline.params import options, types from aiida.cmdline.utils import echo -from aiida.engine import run_get_node, submit +from aiida.engine import ProcessBuilder, run_get_node, submit from aiida.plugins import DataFactory, WorkflowFactory +from aiida_pseudo.data.pseudo.upf import UpfData from aiida_sssp_workflow.cli import cmd_root from aiida_sssp_workflow.workflows.verifications import ( DEFAULT_CONVERGENCE_PROPERTIES_LIST, @@ -21,13 +23,49 @@ DEFAULT_PROPERTIES_LIST, ) -UpfData = DataFactory("pseudo.upf") VerificationWorkChain = WorkflowFactory("sssp_workflow.verification") -# Trigger the launch by running: -# aiida-sssp-workflow launch --property measure.precision --pw-code pw-7.0@localhost --ph-code ph-7.0@localhost --protocol test --cutoff-control test --criteria efficiency --withmpi True -- examples/_static/Si_ONCV_PBE-1.2.upf +def guess_properties_list(property: list) -> Tuple[List[str], str]: + # if the property is not specified, use the default list with all properties calculated. + # otherwise, use the specified properties. + if not property: + properties_list = DEFAULT_PROPERTIES_LIST + extra_desc = "All properties" + elif len(property) == 1 and property[0] == "convergence": + properties_list = DEFAULT_CONVERGENCE_PROPERTIES_LIST + extra_desc = "Convergence" + elif len(property) == 1 and property[0] == "measure": + properties_list = DEFAULT_MEASURE_PROPERTIES_LIST + extra_desc = "Measure" + else: + properties_list = list(property) + extra_desc = f"{properties_list}" + + return properties_list, extra_desc +def guess_is_convergence(properties_list: list) -> bool: + """Check if it is a convergence test""" + return any([c for c in properties_list if c.startswith("convergence")]) + +def guess_is_full_convergence(properties_list: list) -> bool: + """Check if all properties are run for convergence test""" + + return len([c for c in properties_list if c.startswith("convergence")]) == len(DEFAULT_CONVERGENCE_PROPERTIES_LIST) + +def guess_is_measure(properties_list: list) -> bool: + """Check if it is a measure test""" + + return any([c for c in properties_list if c.startswith("measure")]) + +def guess_is_ph(properties_list: list) -> bool: + """Check if it has a measure test""" + + return any([c for c in properties_list if "phonon_frequencies" in c]) + + +# Trigger the launch by running: +# aiida-sssp-workflow launch --property measure.precision --pw-code pw-7.0@localhost --ph-code ph-7.0@localhost --protocol test --cutoff-control test --withmpi True -- examples/_static/Si_ONCV_PBE-1.2.upf @cmd_root.command("launch") @click.argument("pseudo", type=click.Path(exists=True)) @options.OverridableOption( @@ -51,11 +89,6 @@ type=click.FLOAT, help="Oxygen ecutrho to use for oxides precision measure workflow.", ) -@options.OverridableOption( - "--pw-code-large-memory", - "pw_code_large_memory", - type=types.CodeParamType(entry_point="quantumespresso.pw"), -)(required=False) @options.OverridableOption( "--ph-code", "ph_code", type=types.CodeParamType(entry_point="quantumespresso.ph") )(required=False) @@ -68,10 +101,9 @@ @click.option( "protocol", "--protocol", - default="acwf", - help="Protocol to use for the verification, (acwf, test).", + default="standard", + help="Protocol to use for the verification, (standard, quick, test).", ) -@click.option("withmpi", "--withmpi", default=True, help="Run with mpi.") @click.option("npool", "--npool", default=1, help="Number of pool.") @click.option("walltime", "--walltime", default=3600, help="Walltime.") @click.option( @@ -100,143 +132,92 @@ @click.option( "ecutwfc", "--ecutwfc", + type=float, help="Cutoff energy for wavefunctions in Rydberg.", ) @click.option( "ecutrho", "--ecutrho", + type=float, help="Cutoff energy for charge density in Rydberg.", ) -# cutoff_control, criteria, configuration is for convergence workflows only -@click.option( - "cutoff_control", - "--cutoff-control", - help="Cutoff control for convergence workflow, (standard, quick, opsp).", -) -@click.option( - "criteria", "--criteria", help="Criteria for convergence (efficiency, precision)." -) # configuration is hard coded for convergence workflow, but here is an interface for experiment purpose +# when this is passed with convergence test, only one can be passed. +# When it is passed with measure test, can be multiple configurations. @click.option( "configuration", "--configuration", multiple=True, - default=(), + default=[], help="Configuration of structure, can be: SC, FCC, BCC, Diamond, XO, XO2, XO3, X2O, X2O3, X2O5, GS, RE", ) def launch( - pw_code, - ph_code, - pw_code_large_memory, - property, - protocol, - ecutwfc, - ecutrho, - cutoff_control, - criteria, - configuration, - withmpi, - npool, - walltime, + pw_code: orm.Code, + ph_code: orm.Code, + property: list, + protocol: str, + configuration: list, + npool: int, + walltime: int, resources, - pseudo, - oxygen_pseudo, - oxygen_ecutwfc, - oxygen_ecutrho, - clean_workdir, - daemon, - comment, + pseudo: Path, + clean_workdir: bool, + daemon: bool, + comment: str, ): """Launch the verification workchain.""" - # if the property is not specified, use the default list with all properties calculated. - # otherwise, use the specified properties. - if not property: - properties_list = DEFAULT_PROPERTIES_LIST - extra_desc = "All properties" - elif len(property) == 1 and property[0] == "convergence": - properties_list = DEFAULT_CONVERGENCE_PROPERTIES_LIST - extra_desc = "Convergence" - elif len(property) == 1 and property[0] == "measure": - properties_list = DEFAULT_MEASURE_PROPERTIES_LIST - extra_desc = "Measure" - else: - properties_list = list(property) - extra_desc = f"{properties_list}" - - # validate the options are all provide for the property - is_convergence = False - is_measure = False - is_ph = False - for prop in properties_list: - if prop.startswith("convergence"): - is_convergence = True - if prop.startswith("measure"): - is_measure = True - if "phonon_frequencies" in prop: - is_ph = True - # raise error if the options are not provided - if is_convergence and not (cutoff_control and criteria): - echo.echo_critical( - "cutoff_control, criteria must be provided for convergence workflow." - ) + properties_list, extra_desc = guess_properties_list(property) - if is_measure and not (ecutwfc and ecutrho): - echo.echo_critical("ecutwfc and ecutrho must be provided for measure workflow.") + is_convergence = guess_is_convergence(properties_list) + is_full_convergence = guess_is_full_convergence(properties_list) + is_measure = guess_is_measure(properties_list) + is_ph = guess_is_ph(properties_list) if is_ph and not ph_code: - echo.echo_critical("ph_code must be provided for phonon frequencies.") - - # raise warning if the options are over provided, e.g. cutoff_control is provided for measure workflow - if is_measure and (cutoff_control or criteria): - echo.echo_warning("cutoff_control, criteria are not used for measure workflow.") - - # raise warning if pw_code_large_memory is provided for not include cohesive energy convergence workflow - if pw_code_large_memory and ( - not is_convergence or "convergence.cohesive_energy" not in properties_list - ): - echo.echo_warning("pw_code_large_memory is not used for this workflow.") + echo.echo_critical("ph_code must be provided since we run on it for phonon frequencies.") if is_convergence and len(configuration) > 1: echo.echo_critical( "Only one configuration is allowed for convergence workflow." ) - if is_convergence and (ecutwfc or ecutrho): - echo.echo_warning("ecutwfc and ecutrho are not used for convergence workflow.") + if is_measure and not is_full_convergence: + echo.echo_warning("Full convergence tests are not run, so we use maximum cutoffs for transferability verification.") + # Load the curent AiiDA profile and log to user _profile = aiida.load_profile() echo.echo_info(f"Current profile: {_profile.name}") - basename = os.path.basename(pseudo) - - computer = pw_code.computer.label - label, _ = os.path.splitext(basename) - # convert configuration to list - configuration = list(configuration) + configuration_list = list(configuration) - if len(configuration) == 0: + if len(configuration_list) == 0: conf_label = "default" - elif len(configuration) == 1: - conf_label = configuration[0] + elif len(configuration_list) == 1: + conf_label = configuration_list[0] else: - conf_label = "/".join(configuration) - - pre_label = ( - f"{protocol}" - if not is_convergence - else f"{protocol}-{criteria}-{cutoff_control}" - ) - label = orm.Str(f"({pre_label} at {computer} - {conf_label}) {label}") - - with open(pseudo, "rb") as stream: - pseudo = UpfData(stream) + conf_label = "/".join(configuration_list) resources = dict(resources) if "num_machines" not in resources: resources["num_machines"] = 1 + builder: ProcessBuilder = FullVerificationWorkChain.get_builder( + pseudo=pseudo, + protocol=protocol, + properties_list=properties_list, + configuration_list=configuration_list, + clean_workdir=clean_workdir, + ) + + builder.metadata.label = f"({protocol} at {pw_code.computer.label} - {conf_label}) {pseudo.stem}" + builder.metadata.description = f"""Calculation is run on protocol: {protocol}; on {pw_code.computer.label}; on configuration {conf_label}; on pseudo {pseudo.stem}.""" + + builder.pw_code = pw_code + if is_ph: + builder.ph_code = ph_code + inputs = { "measure": { "protocol": orm.Str(protocol), @@ -257,7 +238,7 @@ def launch( "resources": resources, "max_wallclock_seconds": walltime, "withmpi": withmpi, - } + }, ), "parallelization": orm.Dict(dict={"npool": npool}), "clean_workdir": orm.Bool(clean_workdir), diff --git a/src/aiida_sssp_workflow/protocol/convergence.yml b/src/aiida_sssp_workflow/protocol/convergence.yml index 98829079..7f0241c3 100644 --- a/src/aiida_sssp_workflow/protocol/convergence.yml +++ b/src/aiida_sssp_workflow/protocol/convergence.yml @@ -69,41 +69,6 @@ fine: scale_count: 7 scale_increment: 0.02 -acwf: - name: acwf - description: The parameters of EOS is exactly the same as it used in nat.phys.rev 2024 paper. - - base: # base parameters is inherit by other process - occupations: smearing - degauss: 0.0045 # balanced protocol of qe -> gabriel - smearing: fd - conv_thr_per_atom: 1.0e-10 - kpoints_distance: 0.06 # balanced protocol of qe -> gabriel - mixing_beta: 0.4 - - cohesive_energy: - atom_smearing: gaussian - vacuum_length: 12.0 - - phonon_frequencies: - qpoints_list: - - [0.5, 0.5, 0.5] - epsilon: false - tr2_ph: 1.0e-14 - diagonalization: cg - - pressure: - scale_count: 7 - scale_increment: 0.02 - - bands: - init_nbands_factor: 3.0 - fermi_shift: 10.0 - - eos: - scale_count: 7 - scale_increment: 0.02 - test: name: test-only description: Protocol to run test of workflow. diff --git a/src/aiida_sssp_workflow/protocol/criteria.yml b/src/aiida_sssp_workflow/protocol/criteria.yml index b24ab73a..c9d3b15a 100644 --- a/src/aiida_sssp_workflow/protocol/criteria.yml +++ b/src/aiida_sssp_workflow/protocol/criteria.yml @@ -1,6 +1,6 @@ --- -efficiency: - name: SSSP efficiency +standard: + name: initial naive defined criteria description: Protocol to pick the efficiency pseudopotestials. cohesive_energy: @@ -33,37 +33,3 @@ efficiency: eps: 1.0e-3 unit: meV/atom - -precision: - name: SSSP precision - description: Protocol to pick the precision pseudopotestials. - - cohesive_energy: - mode: 0 - bounds: [0.0, 2.0] # when relative error < 2.0 meV/atom - eps: 1.0e-3 - unit: meV/atom - - delta: - mode: 0 - bounds: [0.0, 0.1] # when absolute error < 0.1 meV/atom - eps: 1.0e-3 - unit: meV/atom - - phonon_frequencies: - mode: 0 - bounds: [0.0, 1.0] # when error < 1.0 % - eps: 1.0e-3 - unit: "%" - - pressure: - mode: 0 - bounds: [0.0, 0.5] # when relative error < 0.5% - eps: 1.0e-3 - unit: "%" - - bands: - mode: 0 - bounds: [0.0, 15] # when error eta_c < 15 meV - eps: 1.0e-3 - unit: meV/atom diff --git a/src/aiida_sssp_workflow/protocol/transferability.yml b/src/aiida_sssp_workflow/protocol/transferability.yml index b6cea302..f9536fa2 100644 --- a/src/aiida_sssp_workflow/protocol/transferability.yml +++ b/src/aiida_sssp_workflow/protocol/transferability.yml @@ -1,12 +1,12 @@ --- -acwf: +standard: name: acwf description: Delta measure parameters compatible with AiiDA common workflow occupations: smearing degauss: 0.0045 smearing: fd - conv_thr_per_atom: 1.0e-10 + conv_thr_per_atom: 1.0e-9 kpoints_distance: 0.06 mixing_beta: 0.4 scale_count: 7 diff --git a/src/aiida_sssp_workflow/utils/__init__.py b/src/aiida_sssp_workflow/utils/__init__.py index 35d2eb13..de5b1253 100644 --- a/src/aiida_sssp_workflow/utils/__init__.py +++ b/src/aiida_sssp_workflow/utils/__init__.py @@ -7,6 +7,7 @@ __all__ = [ "get_default_configuration", "extract_pseudo_info", + "get_proper_dual", "parse_std_filename", "LANTHANIDE_ELEMENTS", "ACTINIDE_ELEMENTS", diff --git a/src/aiida_sssp_workflow/utils/pseudo.py b/src/aiida_sssp_workflow/utils/pseudo.py index ed496d65..4f6c249d 100644 --- a/src/aiida_sssp_workflow/utils/pseudo.py +++ b/src/aiida_sssp_workflow/utils/pseudo.py @@ -5,6 +5,7 @@ from enum import Enum from aiida.plugins import DataFactory +from aiida_sssp_workflow.utils import HIGH_DUAL_ELEMENTS UpfData = DataFactory("pseudo.upf") @@ -130,6 +131,17 @@ def extract_pseudo_info(pseudo_text: str) -> PseudoInfo: z_valence=upf_info["z_valence"], ) +def get_proper_dual(pp_info: PseudoInfo) -> int: + if pp_info.type == 'nc': + dual = 4 + else: + dual = 8 + + if pp_info.element in HIGH_DUAL_ELEMENTS and pp_info.type != 'nc': + dual = 18 + + return dual + def parse_std_filename(filename: str, extension: str = "upf") -> PseudoInfo: """Parse the standard filename of pseudo and return the `PseudoInfo` object. diff --git a/src/aiida_sssp_workflow/workflows/verification.py b/src/aiida_sssp_workflow/workflows/verification.py new file mode 100644 index 00000000..25e3010d --- /dev/null +++ b/src/aiida_sssp_workflow/workflows/verification.py @@ -0,0 +1,412 @@ +# -*- coding: utf-8 -*- +""" +All in one verification workchain +""" +from typing import Tuple +from pathlib import Path + +from aiida import orm +from aiida.engine import if_, ProcessBuilder +from aiida.engine.processes.exit_code import ExitCode +from aiida.engine.processes.functions import calcfunction +from aiida_pseudo.data.pseudo import UpfData + +from aiida_sssp_workflow.utils.protocol import get_protocol +from aiida_sssp_workflow.workflows import SelfCleanWorkChain + +# TODO: simplipy me +from aiida_sssp_workflow.workflows.convergence.bands import ConvergenceBandsWorkChain +from aiida_sssp_workflow.workflows.convergence.cohesive_energy import ConvergenceCohesiveEnergyWorkChain +from aiida_sssp_workflow.workflows.convergence.eos import ConvergenceEOSWorkChain +from aiida_sssp_workflow.workflows.convergence.phonon_frequencies import ConvergencePhononFrequenciesWorkChain +from aiida_sssp_workflow.workflows.convergence.pressure import ConvergencePressureWorkChain +from aiida_sssp_workflow.workflows.convergence.report import ConvergenceReport + +from aiida_sssp_workflow.workflows.measure.bands import BandStructureWorkChain +from aiida_sssp_workflow.workflows.measure.transferability import EOSTransferabilityWorkChain + +from aiida_sssp_workflow.utils.pseudo import extract_pseudo_info, get_proper_dual, parse + + +@calcfunction +def parse_pseudo_info(pseudo): + """parse the pseudo info as a Dict""" + try: + info = parse(pseudo.get_content()) + except ValueError: + return ExitCode(100, "cannot parse the info of pseudopotential.") + + return orm.Dict(dict=info) + + +DEFAULT_CONVERGENCE_PROPERTIES_LIST = [ + "convergence.cohesive_energy", + "convergence.pressure", + "convergence.delta", + "convergence.bands", + "convergence.phonon_frequencies", +] + +DEFAULT_MEASURE_PROPERTIES_LIST = [ + "measure.eos", + "measure.bands", +] + +DEFAULT_PROPERTIES_LIST = ( + DEFAULT_MEASURE_PROPERTIES_LIST + DEFAULT_CONVERGENCE_PROPERTIES_LIST +) + +def compute_recommended_cutoffs(workchains: dict, pseudo: UpfData, criteria_name: str='standard'): + """Input is a dict with workchain name and values are the workchain node, + loop over the workchain and apply the criteria to get the recommended cutoffs. + """ + criteria = get_protocol(category='criteria', name=criteria_name) + success_workchains = {k: w for k, w in workchains.items() if w.is_finished_ok} + if len(success_workchains) == len(DEFAULT_CONVERGENCE_PROPERTIES_LIST): + # All convergence test are finished correct use the recommended cutoffs + ecutwfc = -1 + for k, w in success_workchains.items(): + k: str + + recommended_ecutwfc, _ = converge_check(w.outputs.report, criteria[k]) + + ecutwfc = max(ecutwfc, recommended_ecutwfc) + + elif len(workchains) > len(success_workchains): + ecutwfc = 300 + else: + ecutwfc = 200 + + return get_default_cutoff(pseudo, ecutwfc) + +def converge_check(report: ConvergenceReport, criteria: dict) -> Tuple[int, int]: + """From the report, go through evaluation node of reference and convergence test points, + compute the convergence behavior of the convergence run and based on the criteria, + give the recommended cutoff pair. + It gives pair since it will anchor the evaluation workchain where it converged and write out its cutoff pair. + So this is suitable for both ecutwfc and ecutrho check out. + """ + # TODO: + +def get_default_cutoff(pseudo: UpfData, ecutwfc: float) -> Tuple[float, float]: + """Based on the pseudo_type, give the cutoffs pairs""" + pp_info = extract_pseudo_info(pseudo.get_content()) + dual = get_proper_dual(pp_info) + + return ecutwfc, ecutwfc * dual + + +class FullVerificationWorkChain(SelfCleanWorkChain): + """Full verification work chain include to run convergence test, band structure and EOS verification""" + + # This two class attributes will control whether a WF flow is + # run and results write to outputs ports. + _VALID_CONGENCENCE_WF = [ + "convergence.cohesive_energy", + "convergence.phonon_frequencies", + "convergence.pressure", + "convergence.delta", + "convergence.bands", + ] + _VALID_MEASURE_WF = [ + "measure.eos", + "measure.bands", + ] + _CRITERIA = 'standard' + + @classmethod + def define(cls, spec): + super().define(spec) + # Expose all for convergence workchains + spec.expose_inputs(ConvergenceCohesiveEnergyWorkChain, namespace='convergence.cohesive_energy', + exclude=['code', 'pseudo', 'clean_workdir']) + spec.expose_inputs(ConvergencePhononFrequenciesWorkChain, namespace='convergence.phonon_frequencies', + exclude=['pw_code', 'ph_code', 'pseudo', 'clean_workdir']) + spec.expose_inputs(ConvergenceEOSWorkChain, namespace='convergence.eos', + exclude=['code', 'pseudo', 'clean_workdir']) + spec.expose_inputs(ConvergenceBandsWorkChain, namespace='convergence.bands', + exclude=['code', 'pseudo', 'clean_workdir']) + spec.expose_inputs(ConvergencePressureWorkChain, namespace='convergence.pressure', + exclude=['code', 'pseudo', 'clean_workdir']) + + # Expose all for transferability workchains: band structure and EOS + spec.expose_inputs(BandStructureWorkChain, namespace='transferability.bands', + exclude=['code', 'pseudo', 'clean_workdir']) + spec.expose_inputs(EOSTransferabilityWorkChain, namespace='transferability.eos', + exclude=['code', 'pseudo', 'clean_workdir']) + + spec.input('pw_code', valid_type=orm.AbstractCode, + help='The `pw.x` code use for the `PwCalculation`.') + spec.input('ph_code', valid_type=orm.AbstractCode, required=False, + help='The `ph.x` code use for the `PhCalculation`.') + spec.input('pseudo', valid_type=UpfData, required=True, + help='Pseudopotential to be verified') + + spec.outline( + cls._setup_code, + cls._parse_pseudo, + cls._init_setup, + if_(cls._do_run_convergence)( + cls._run_convergence, + cls._inspect_convergence, + ), + if_(cls._do_run_measure)( + cls._run_measure, + cls._inspect_measure, + ), + ) + spec.output('pseudo_info', valid_type=orm.Dict, required=True, + help='pseudopotential info') + for wfname in cls._VALID_MEASURE_WF: + spec.output_namespace(wfname, dynamic=True, + help=f'results of {wfname} calculation.') + for wfname in cls._VALID_CONGENCENCE_WF: + spec.output_namespace(wfname, dynamic=True, + help=f'results of {wfname} calculation.') + + spec.exit_code(401, 'ERROR_CACHING_ON_BUT_FAILED', + message='The caching is triggered but failed.') + spec.exit_code(811, 'WARNING_NOT_ALL_SUB_WORKFLOW_OK', + message='The sub-workflows {processes} is not finished ok.') + + @classmethod + def get_builder( + cls, + pw_code: orm.Code, + ph_code: orm.Code, + pseudo: Path, + protocol: str, + properties_list: list, + configuration_list: list, + clean_workdir: bool = True, + ) -> ProcessBuilder: + """Generate builder for the generic convergence workflow""" + builder = super().get_builder() + builder.protocol = orm.Str(protocol) + + builder.pseudo = UpfData.get_or_create(pseudo) + + builder.clean_workdir = orm.Bool(clean_workdir) + + # convergence + for property in [p for p in properties_list if p.startwith('convergence')]: + builder.convergence[property].configuration = configuration_list[0] + builder.convergence[property].cutoff_list = get_cutoff_list(protocol) + builder.convergence[property].protocol = get_convergence_protocol(protocol) + + if 'phonon_frequencies' in property: + builder.convergence[property].pw_code = pw_code + builder.convergence[property].ph_code = ph_code + else: + builder.convergence[property].code = pw_code + + # measure + builder.transferability.bands.configuration = configuration_list[0] + builder.transferability.bands.protocol = get_bands_protocol(protocol) + builder.transferability.bands.code = pw_code + + + return builder + + def prepare_subworkchain_builder(self): + """Use input prepare builder for each property subworkchain + It will return a dict further called `builders` has properties name as key. + """ + + def _setup_code(self): + """ + setup resource options and parallelization for `PwCalculation` from inputs + """ + if "options" in self.inputs: + self.ctx.options = self.inputs.options.get_dict() + else: + from aiida_sssp_workflow.utils import get_default_options + + self.ctx.options = get_default_options( + with_mpi=True, + ) + + if "parallelization" in self.inputs: + self.ctx.parallelization = self.inputs.parallelization.get_dict() + else: + self.ctx.parallelization = {} + + def init_setup(self): + """prepare inputs for all verification process""" + + if "label" in self.inputs: + label = self.inputs.label.value + else: + label = self._label_from_pseudo_info(self.ctx.pseudo_info) + + self.node.base.extras.set("label", label) + + # Properties list + valid_list = self._VALID_MEASURE_WF + self._VALID_CONGENCENCE_WF + self.ctx.properties_list = [ + p for p in self.inputs.properties_list.get_list() if p in valid_list + ] + + # Measure workflow: bands measure and precision measure workflows inputs setting + measure_inputs = self.exposed_inputs(_BaseMeasureWorkChain, namespace="measure") + measure_inputs["pseudo"] = self.inputs.pseudo + measure_inputs["code"] = self.inputs.pw_code + measure_inputs["options"] = self.inputs.options + measure_inputs["parallelization"] = self.inputs.parallelization + + measure_inputs["clean_workdir"] = self.inputs.clean_workdir + + self.ctx.measure_inputs = { + "precision": measure_inputs.copy(), + "bands": measure_inputs.copy(), + } + + # Convergence inputs setting, the properties of convergence test are: + # 1. cohesive energy + # 2. phonon frequencies + # 3. pressue + # 4. delta + # 5. bands distance + self.ctx.convergence_inputs = dict() + + convergence_inputs = self.exposed_inputs( + _BaseConvergenceWorkChain, namespace="convergence" + ) + convergence_inputs["code"] = self.inputs.pw_code + convergence_inputs["pseudo"] = self.inputs.pseudo + convergence_inputs["options"] = self.inputs.options + convergence_inputs["parallelization"] = self.inputs.parallelization + + convergence_inputs["clean_workdir"] = self.inputs.clean_workdir + + for prop in ["delta", "pressure"]: + self.ctx.convergence_inputs[prop] = convergence_inputs.copy() + + # The cohesive energy evaluation may hit the ran out of memory issue, + # so use the pw_code_large_memory if provided. + if "convergence.cohesive_energy" in self.ctx.properties_list: + inputs_cohesive_energy = convergence_inputs.copy() + if "pw_code_large_memory" in self.inputs: + inputs_cohesive_energy["pw_code_large_memory"] = ( + self.inputs.pw_code_large_memory + ) + + self.ctx.convergence_inputs["cohesive_energy"] = inputs_cohesive_energy + + # Here, the shallow copy can be used since the type of convergence_inputs + # is AttributesDict. + # The deepcopy can't be used, since it will create new data node. + if "convergence.phonon_frequencies" in self.ctx.properties_list: + inputs_phonon_frequencies = convergence_inputs.copy() + inputs_phonon_frequencies.pop("code", None) + inputs_phonon_frequencies["pw_code"] = self.inputs.pw_code + inputs_phonon_frequencies["ph_code"] = self.inputs.ph_code + inputs_phonon_frequencies["clean_workdir"] = orm.Bool( + False + ) # For phonon frequencies convergence workflow, the clean dir is taken care by the the finalize step of the verification workflow. + + self.ctx.convergence_inputs["phonon_frequencies"] = ( + inputs_phonon_frequencies + ) + + if "convergence.bands" in self.ctx.properties_list: + inputs_bands = convergence_inputs.copy() + inputs_bands["clean_workdir"] = orm.Bool( + False + ) # For bands convergence workflow, the clean dir is taken care by the the finalize step of the verification workflow. + + self.ctx.convergence_inputs["bands"] = inputs_bands + + # Caching inputs setting + # The running strategy of caching is: + # 1. run phonon_frequencies/bands convergence workflow + # 2. run cleandir for workchains (which will be the finalize step of phonon_frequencies/bands convergence workflow) + # 3. run cohesive_energy/pressure/delta convergence workflow which will use the cached data and clean on the fly + # 4. get the recommended cutoffs + # 5. run measure workflow using the recommended cutoffs + self.ctx.caching_inputs = convergence_inputs.copy() + self.ctx.caching_inputs["clean_workdir"] = orm.Bool( + False + ) # shouldn't clean until last, default of _caching but do it here explicitly + + # to collect workchains in a dict + self.ctx.workchains = dict() + + # For store the finished_ok workflow + self.ctx.finished_ok_wf = dict() + + def inspect_measure(self): + """Inspect delta measure results""" + return self._report_and_results(wname_list=self._VALID_MEASURE_WF) + + def _do_run_convergence_test(self): + """Whether to run convergence test workflows""" + + return len(self.ctx.convergence_properties_list) > 0 + + def _run_convergence_test(self): + for property in self.ctx.convergence_properties_list: + running = self.submit(builders.get(property)) + self.report( + f"Submit {property} convergence workchain pk={running.pk}" + ) + + self.to_context(_=running) + + self.ctx.convergence_workchains[f"{property}"] = running + + def _inspect_convergence_test(self): + self._report_and_results(workchains=self.ctx.convergence_workchains) + + def _do_run_transferability_verification(self): + return len(self.ctx.transferability_propertios_list) > 0 + + def _set_cutoffs(self): + """Set cutoffs for the transferability verification, if full convergence + test are run, then use the maximum cutoff for the transferability run. + """ + for property in self.ctx.transferability_propertios_list: + wavefunction_cutoff, charge_density_cutoff = compute_recommended_cutoffs(self.ctx.convergence_workchains, criteria=self._CRITERIA) + builder = builders.get(property) + builder.wavefunction_cutoff, builder.change_density_cutoff = wavefunction_cutoff, charge_density_cutoff + + def _run_transferability_verefication(self): + """Run delta measure sub-workflow""" + for property in self.ctx.transferability_properties_list + running = self.submit(builders.get(property)) + self.report(f"Submit {property} measure workchain pk={running.pk}") + + self.to_context(_=running) + self.ctx.transferability_workchains[f"{property}"] = running + + def _report_and_results(self, workchains): + """result to respective output namespace""" + + not_finished_ok_wf = {} + for wname, workchain in workchains.items(): + # dump all output as it is to verification workflow output + self.ctx.finished_ok_wf[wname] = workchain.pk + self.out(f"{wname}", workchain.outputs) + + # XXX:??? am I needed? + # for label in workchain.outputs: + # # output node and namespace -> verification workflow outputs + # self.out(f"{wname}.{label}", workchain.outputs[label]) + # map?? + + if not workchain.is_finished_ok: + self.logger.warning( + f"The sub-workflow {wname} pk={workchain.pk} not finished ok." + ) + not_finished_ok_wf[wname] = workchain.pk + + if not_finished_ok_wf: + return self.exit_codes.WARNING_NOT_ALL_SUB_WORKFLOW_OK.format( + processes=not_finished_ok_wf + ) + + def on_terminated(self): + super().on_terminated() + + if not self.inputs.clean_workdir.value: + self.report(f"{type(self)}: remote folders will not be cleaned") diff --git a/src/aiida_sssp_workflow/workflows/verifications.py b/src/aiida_sssp_workflow/workflows/verifications.py deleted file mode 100644 index 229c4b90..00000000 --- a/src/aiida_sssp_workflow/workflows/verifications.py +++ /dev/null @@ -1,413 +0,0 @@ -# -*- coding: utf-8 -*- -""" -All in one verification workchain -""" - -# pylint: disable=cyclic-import -from aiida import orm -from aiida.engine import ToContext, if_ -from aiida.engine.processes.exit_code import ExitCode -from aiida.engine.processes.functions import calcfunction -from aiida.plugins import DataFactory, WorkflowFactory - -from aiida_sssp_workflow.workflows import SelfCleanWorkChain -from aiida_sssp_workflow.workflows.convergence import _BaseConvergenceWorkChain -from aiida_sssp_workflow.workflows.convergence.caching import ( - _CachingConvergenceWorkChain, -) -from aiida_sssp_workflow.workflows.measure import _BaseMeasureWorkChain - -UpfData = DataFactory("pseudo.upf") - - -@calcfunction -def parse_pseudo_info(pseudo): - """parse the pseudo info as a Dict""" - from pseudo_parser.upf_parser import parse - - try: - info = parse(pseudo.get_content()) - except ValueError: - return ExitCode(100, "cannot parse the info of pseudopotential.") - - return orm.Dict(dict=info) - - -_REMOTE_FOLDER_DEPENDENT_CONVERENCE_PROPERTIES_LIST = [ - "convergence.bands", - "convergence.phonon_frequencies", -] - -_REMOTE_FOLDER_INDEPENDENT_CONVERGENCE_PROPERTIES_LIST = [ - "convergence.cohesive_energy", - "convergence.pressure", - "convergence.delta", -] - -DEFAULT_CONVERGENCE_PROPERTIES_LIST = ( - _REMOTE_FOLDER_DEPENDENT_CONVERENCE_PROPERTIES_LIST - + _REMOTE_FOLDER_INDEPENDENT_CONVERGENCE_PROPERTIES_LIST -) - -DEFAULT_MEASURE_PROPERTIES_LIST = [ - "measure.precision", - "measure.bands", -] - -DEFAULT_PROPERTIES_LIST = ( - DEFAULT_MEASURE_PROPERTIES_LIST + DEFAULT_CONVERGENCE_PROPERTIES_LIST -) - - -class VerificationWorkChain(SelfCleanWorkChain): - """The verification workflow to run all test for the given pseudopotential""" - - # This two class attributes will control whether a WF flow is - # run and results write to outputs ports. - _VALID_CONGENCENCE_WF = [ - "convergence.cohesive_energy", - "convergence.phonon_frequencies", - "convergence.pressure", - "convergence.delta", - "convergence.bands", - ] - _VALID_MEASURE_WF = [ - "measure.precision", - "measure.bands", - ] - - @classmethod - def define(cls, spec): - super().define(spec) - # yapf: disable - spec.expose_inputs(_BaseMeasureWorkChain, namespace='measure', - exclude=['code', 'pseudo', 'options', 'parallelization', 'clean_workdir']) - spec.expose_inputs(_BaseConvergenceWorkChain, namespace='convergence', - exclude=['code', 'pseudo', 'options', 'parallelization', 'clean_workdir']) - spec.input('pw_code', valid_type=orm.AbstractCode, - help='The `pw.x` code use for the `PwCalculation`.') - spec.input('ph_code', valid_type=orm.AbstractCode, required=False, - help='The `ph.x` code use for the `PhCalculation`.') - spec.input('pw_code_large_memory', valid_type=orm.AbstractCode, required=False, - help='The `pw.x` code use for the `PwCalculation` require large memory.') - spec.input('pseudo', valid_type=UpfData, required=True, - help='Pseudopotential to be verified') - spec.input('wavefunction_cutoff', valid_type=orm.Float, required=False, default=lambda: orm.Float(100.0), - help='The wavefunction cutoff for the Measure properties.') - spec.input('charge_density_cutoff', valid_type=orm.Float, required=False, default=lambda: orm.Float(800.0), - help='The charge density cutoff for the Measure properties.') - spec.input('label', valid_type=orm.Str, required=False, - help='label store for display as extra attributes.') - spec.input('properties_list', valid_type=orm.List, - default=lambda: orm.List(list=DEFAULT_PROPERTIES_LIST), - help='The preperties will be calculated, passed as a list.') - spec.input('options', valid_type=orm.Dict, required=False, - help='Optional `options`') - spec.input('parallelization', valid_type=orm.Dict, required=False, - help='Parallelization options') - - spec.outline( - cls.setup_code_resource_options, - cls.parse_pseudo, - cls.init_setup, - if_(cls.is_verify_convergence)( - if_(cls.is_caching)( - cls.run_caching, - cls.inspect_caching, - ), - cls.run_remote_folder_dependent_convergence, - cls.run_remote_folder_independent_convergence, - cls.inspect_convergence, - ), - if_(cls.is_verify_measure)( - cls.run_measure, - cls.inspect_measure, - ), - ) - spec.output('pseudo_info', valid_type=orm.Dict, required=True, - help='pseudopotential info') - for wfname in cls._VALID_MEASURE_WF: - spec.output_namespace(wfname, dynamic=True, - help=f'results of {wfname} calculation.') - for wfname in cls._VALID_CONGENCENCE_WF: - spec.output_namespace(wfname, dynamic=True, - help=f'results of {wfname} calculation.') - - spec.exit_code(401, 'ERROR_CACHING_ON_BUT_FAILED', - message='The caching is triggered but failed.') - spec.exit_code(811, 'WARNING_NOT_ALL_SUB_WORKFLOW_OK', - message='The sub-workflows {processes} is not finished ok.') - # yapf: enable - - def setup_code_resource_options(self): - """ - setup resource options and parallelization for `PwCalculation` from inputs - """ - if "options" in self.inputs: - self.ctx.options = self.inputs.options.get_dict() - else: - from aiida_sssp_workflow.utils import get_default_options - - self.ctx.options = get_default_options( - with_mpi=True, - ) - - if "parallelization" in self.inputs: - self.ctx.parallelization = self.inputs.parallelization.get_dict() - else: - self.ctx.parallelization = {} - - @staticmethod - def _label_from_pseudo_info(pseudo_info) -> str: - """derive a label string from pseudo_info dict""" - element = pseudo_info["element"] - pp_type = pseudo_info["pp_type"] - z_valence = pseudo_info["z_valence"] - - return f"{element}.{pp_type}.z_{z_valence}" - - def parse_pseudo(self): - """parse pseudo""" - pseudo_info = parse_pseudo_info(self.inputs.pseudo) - self.ctx.pseudo_info = pseudo_info.get_dict() - self.node.base.extras.set_many( - self.ctx.pseudo_info - ) # set the extra attributes for the node - - self.out("pseudo_info", pseudo_info) - - def init_setup(self): - """prepare inputs for all verification process""" - - if "label" in self.inputs: - label = self.inputs.label.value - else: - label = self._label_from_pseudo_info(self.ctx.pseudo_info) - - self.node.base.extras.set("label", label) - - # Properties list - valid_list = self._VALID_MEASURE_WF + self._VALID_CONGENCENCE_WF - self.ctx.properties_list = [ - p for p in self.inputs.properties_list.get_list() if p in valid_list - ] - - # Measure workflow: bands measure and precision measure workflows inputs setting - measure_inputs = self.exposed_inputs(_BaseMeasureWorkChain, namespace="measure") - measure_inputs["pseudo"] = self.inputs.pseudo - measure_inputs["code"] = self.inputs.pw_code - measure_inputs["options"] = self.inputs.options - measure_inputs["parallelization"] = self.inputs.parallelization - - measure_inputs["clean_workdir"] = self.inputs.clean_workdir - - self.ctx.measure_inputs = { - "precision": measure_inputs.copy(), - "bands": measure_inputs.copy(), - } - - # Convergence inputs setting, the properties of convergence test are: - # 1. cohesive energy - # 2. phonon frequencies - # 3. pressue - # 4. delta - # 5. bands distance - self.ctx.convergence_inputs = dict() - - convergence_inputs = self.exposed_inputs( - _BaseConvergenceWorkChain, namespace="convergence" - ) - convergence_inputs["code"] = self.inputs.pw_code - convergence_inputs["pseudo"] = self.inputs.pseudo - convergence_inputs["options"] = self.inputs.options - convergence_inputs["parallelization"] = self.inputs.parallelization - - convergence_inputs["clean_workdir"] = self.inputs.clean_workdir - - for prop in ["delta", "pressure"]: - self.ctx.convergence_inputs[prop] = convergence_inputs.copy() - - # The cohesive energy evaluation may hit the ran out of memory issue, - # so use the pw_code_large_memory if provided. - if "convergence.cohesive_energy" in self.ctx.properties_list: - inputs_cohesive_energy = convergence_inputs.copy() - if "pw_code_large_memory" in self.inputs: - inputs_cohesive_energy["pw_code_large_memory"] = ( - self.inputs.pw_code_large_memory - ) - - self.ctx.convergence_inputs["cohesive_energy"] = inputs_cohesive_energy - - # Here, the shallow copy can be used since the type of convergence_inputs - # is AttributesDict. - # The deepcopy can't be used, since it will create new data node. - if "convergence.phonon_frequencies" in self.ctx.properties_list: - inputs_phonon_frequencies = convergence_inputs.copy() - inputs_phonon_frequencies.pop("code", None) - inputs_phonon_frequencies["pw_code"] = self.inputs.pw_code - inputs_phonon_frequencies["ph_code"] = self.inputs.ph_code - inputs_phonon_frequencies["clean_workdir"] = orm.Bool( - False - ) # For phonon frequencies convergence workflow, the clean dir is taken care by the the finalize step of the verification workflow. - - self.ctx.convergence_inputs["phonon_frequencies"] = ( - inputs_phonon_frequencies - ) - - if "convergence.bands" in self.ctx.properties_list: - inputs_bands = convergence_inputs.copy() - inputs_bands["clean_workdir"] = orm.Bool( - False - ) # For bands convergence workflow, the clean dir is taken care by the the finalize step of the verification workflow. - - self.ctx.convergence_inputs["bands"] = inputs_bands - - # Caching inputs setting - # The running strategy of caching is: - # 1. run phonon_frequencies/bands convergence workflow - # 2. run cleandir for workchains (which will be the finalize step of phonon_frequencies/bands convergence workflow) - # 3. run cohesive_energy/pressure/delta convergence workflow which will use the cached data and clean on the fly - # 4. get the recommended cutoffs - # 5. run measure workflow using the recommended cutoffs - self.ctx.caching_inputs = convergence_inputs.copy() - self.ctx.caching_inputs["clean_workdir"] = orm.Bool( - False - ) # shouldn't clean until last, default of _caching but do it here explicitly - - # to collect workchains in a dict - self.ctx.workchains = dict() - - # For store the finished_ok workflow - self.ctx.finished_ok_wf = dict() - - def inspect_measure(self): - """Inspect delta measure results""" - return self._report_and_results(wname_list=self._VALID_MEASURE_WF) - - def is_verify_convergence(self): - """Whether to run convergence test workflows""" - if "_caching" in self.ctx.properties_list: - # for only run caching workflow - return True - - for p in self.ctx.properties_list: - if "convergence" in p: - return True - - return False - - def is_caching(self): - """run caching when more than one convergence test""" - # If the aiida config set pw caching off, then not caching any - from aiida.manage.caching import get_use_cache - - identifier = "aiida.calculations:quantumespresso.pw" - return get_use_cache(identifier=identifier) - - def run_caching(self): - """run pressure verification for caching""" - running = self.submit(_CachingConvergenceWorkChain, **self.ctx.caching_inputs) - self.report( - f"The caching is triggered, submit and run caching " - f"workchain pk={running.pk} for following convergence test." - "" - ) - - return ToContext(verify_caching=running) - - def inspect_caching(self): - """Simply check whether caching run finished okay.""" - workchain = self.ctx.verify_caching - - if not workchain.is_finished_ok: - return self.exit_codes.ERROR_CACHING_ON_BUT_FAILED - - def _run_convergence(self, plist): - for property in self.ctx.properties_list: - property_name = property.split(".")[1] - if property in plist: - ConvergenceWorkflow = WorkflowFactory(f"sssp_workflow.{property}") - - running = self.submit( - ConvergenceWorkflow, **self.ctx["convergence_inputs"][property_name] - ) - self.report( - f"Submit {property_name} convergence workchain pk={running.pk}" - ) - - self.to_context(_=running) - self.ctx.workchains[f"{property}"] = running - - def run_remote_folder_dependent_convergence(self): - """ - running convergence workflow that requires remote_folder not cleaned, e.g. phonon_frequencies, bands - """ - self._run_convergence(_REMOTE_FOLDER_DEPENDENT_CONVERENCE_PROPERTIES_LIST) - - def run_remote_folder_independent_convergence(self): - """ - running convergence workflow that requires remote_folder not cleaned, e.g. phonon_frequencies, bands - """ - self._run_convergence(_REMOTE_FOLDER_INDEPENDENT_CONVERGENCE_PROPERTIES_LIST) - - def inspect_convergence(self): - """ - inspect the convergence result - - the list set the avaliable convergence workchain that will be inspected - """ - return self._report_and_results(wname_list=self._VALID_CONGENCENCE_WF) - - def is_verify_measure(self): - """ - Whether to run measure (delta measure, bands distance} workflow. - """ - for p in self.ctx.properties_list: - if "measure" in p: - return True - - return False - - def run_measure(self): - """Run delta measure sub-workflow""" - for property in DEFAULT_MEASURE_PROPERTIES_LIST: - property_name = property.split(".")[1] - if property in self.ctx.properties_list: - MeasureWorkflow = WorkflowFactory(f"sssp_workflow.{property}") - - running = self.submit( - MeasureWorkflow, **self.ctx["measure_inputs"][property_name] - ) - self.report(f"Submit {property_name} measure workchain pk={running.pk}") - - self.to_context(_=running) - self.ctx.workchains[f"{property}"] = running - - def _report_and_results(self, wname_list): - """result to respective output namespace""" - - not_finished_ok_wf = {} - for wname, workchain in self.ctx.workchains.items(): - if wname in wname_list: - # dump all output as it is to verification workflow output - self.ctx.finished_ok_wf[wname] = workchain.pk - for label in workchain.outputs: - # output node and namespace -> verification workflow outputs - self.out(f"{wname}.{label}", workchain.outputs[label]) - - if not workchain.is_finished_ok: - self.logger.warning( - f"The sub-workflow {wname} pk={workchain.pk} not finished ok." - ) - not_finished_ok_wf[wname] = workchain.pk - - if not_finished_ok_wf: - return self.exit_codes.WARNING_NOT_ALL_SUB_WORKFLOW_OK.format( - processes=not_finished_ok_wf - ) - - def on_terminated(self): - super().on_terminated() - - if not self.inputs.clean_workdir.value: - self.report(f"{type(self)}: remote folders will not be cleaned")