diff --git a/buildstockbatch/aws/aws.py b/buildstockbatch/aws/aws.py index 0b417c8d..f55fd6b0 100644 --- a/buildstockbatch/aws/aws.py +++ b/buildstockbatch/aws/aws.py @@ -34,9 +34,9 @@ import io import yaml -from buildstockbatch.base import ValidationError from buildstockbatch.aws.awsbase import AwsJobBase, boto_client_config -from buildstockbatch.cloud.docker_base import DockerBatchBase +from buildstockbatch.base import ValidationError +from buildstockbatch.cloud import docker_base from buildstockbatch.utils import ( log_error_details, get_project_configuration, @@ -947,7 +947,7 @@ def clean(self): raise error -class AwsBatch(DockerBatchBase): +class AwsBatch(docker_base.DockerBatchBase): def __init__(self, project_filename): super().__init__(project_filename) @@ -1003,10 +1003,6 @@ def validate_project(project_file): def docker_image(self): return "nrel/buildstockbatch" - @property - def weather_dir(self): - return self._weather_dir - @property def results_dir(self): return f"{self.s3_bucket}/{self.s3_bucket_prefix}/results" @@ -1275,36 +1271,7 @@ def run_job(cls, job_id, bucket, prefix, job_name, region): weather_dir = sim_dir / "weather" os.makedirs(weather_dir, exist_ok=True) - # Make a lookup of which parameter points to the weather file from options_lookup.tsv - with open(sim_dir / "lib" / "resources" / "options_lookup.tsv", "r", encoding="utf-8") as f: - tsv_reader = csv.reader(f, delimiter="\t") - next(tsv_reader) # skip headers - param_name = None - epws_by_option = {} - for row in tsv_reader: - row_has_epw = [x.endswith(".epw") for x in row[2:]] - if sum(row_has_epw): - if row[0] != param_name and param_name is not None: - raise RuntimeError( - f"The epw files are specified in options_lookup.tsv under more than one parameter type: {param_name}, {row[0]}" - ) # noqa: E501 - epw_filename = row[row_has_epw.index(True) + 2].split("=")[1].split("/")[-1] - param_name = row[0] - option_name = row[1] - epws_by_option[option_name] = epw_filename - - # Look through the buildstock.csv to find the appropriate location and epw - epws_to_download = set() - building_ids = [x[0] for x in jobs_d["batch"]] - with open( - sim_dir / "lib" / "housing_characteristics" / "buildstock.csv", - "r", - encoding="utf-8", - ) as f: - csv_reader = csv.DictReader(f) - for row in csv_reader: - if int(row["Building"]) in building_ids: - epws_to_download.add(epws_by_option[row[param_name]]) + epws_to_download = docker_base.determine_weather_files_needed_for_job(sim_dir, jobs_d) # Download the epws needed for these simulations for epw_filename in epws_to_download: diff --git a/buildstockbatch/cloud/docker_base.py b/buildstockbatch/cloud/docker_base.py index 50e17aae..0ca348ec 100644 --- a/buildstockbatch/cloud/docker_base.py +++ b/buildstockbatch/cloud/docker_base.py @@ -30,12 +30,65 @@ import time from buildstockbatch import postprocessing -from buildstockbatch.base import BuildStockBatchBase +from buildstockbatch.base import BuildStockBatchBase, ValidationError from buildstockbatch.utils import ContainerRuntime, calc_hash_for_file, compress_file, read_csv, get_bool_env_var logger = logging.getLogger(__name__) +def determine_weather_files_needed_for_job(sim_dir, jobs_d): + """ + Gets the list of filenames for the weather data required for a job of simulations. + + :param sim_dir: Path to the directory where job files are stored + :param jobs_d: Contents of a single job JSON file; contains the list of buildings to simulate in this job. + + :returns: Set of weather filenames needed for this job of simulations. + """ + # Fetch the mapping for building to weather file from options_lookup.tsv + epws_by_option, param_name = _epws_by_option(sim_dir / "lib" / "resources" / "options_lookup.tsv") + + # ComStock requires these empty files to exist. + files_to_download = set(["empty.epw", "empty.stat", "empty.ddy"]) + + # Look through the buildstock.csv to find the appropriate location and epw + building_ids = [x[0] for x in jobs_d["batch"]] + with open( + sim_dir / "lib" / "housing_characteristics" / "buildstock.csv", + "r", + encoding="utf-8", + ) as f: + csv_reader = csv.DictReader(f) + for row in csv_reader: + if int(row["Building"]) in building_ids: + epw_file = epws_by_option[row[param_name]] + root, _ = os.path.splitext(epw_file) + files_to_download.update((f"{root}.epw", f"{root}.stat", f"{root}.ddy")) + + return files_to_download + + +def _epws_by_option(options_lookup_path): + epws_by_option = {} + with open(options_lookup_path, "r", encoding="utf-8") as f: + tsv_reader = csv.reader(f, delimiter="\t") + next(tsv_reader) # skip headers + param_name = None + for row in tsv_reader: + row_has_epw = [x.endswith(".epw") for x in row[2:]] + if sum(row_has_epw): + if row[0] != param_name and param_name is not None: + raise RuntimeError( + "The epw files are specified in options_lookup.tsv under more than one parameter " + f"type: {param_name}, {row[0]}" + ) # noqa: E501 + epw_filename = row[row_has_epw.index(True) + 2].split("=")[1] + param_name = row[0] + option_name = row[1] + epws_by_option[option_name] = epw_filename + return (epws_by_option, param_name) + + class DockerBatchBase(BuildStockBatchBase): """Base class for implementations that run in Docker containers.""" @@ -95,7 +148,7 @@ def copy_files_at_cloud(self, files_to_copy): :param files_to_copy: a dict where the key is a file on the cloud to copy, and the value is the filename to copy the source file to. Both are relative to the ``tmppath`` used in - ``prep_batches()`` (so the implementation should prepend the bucket name and prefix + ``_run_batch_prep()`` (so the implementation should prepend the bucket name and prefix where they were uploaded to by ``upload_batch_files_to_cloud``). """ raise NotImplementedError @@ -120,14 +173,14 @@ def run_batch(self): """ with tempfile.TemporaryDirectory(prefix="bsb_") as tmpdir: tmppath = pathlib.Path(tmpdir) - epws_to_copy, batch_info = self._run_batch_prep(tmppath) + weather_files_to_copy, batch_info = self._run_batch_prep(tmppath) # Copy all the files to cloud storage logger.info("Uploading files for batch...") self.upload_batch_files_to_cloud(tmppath) logger.info("Copying duplicate weather files...") - self.copy_files_at_cloud(epws_to_copy) + self.copy_files_at_cloud(weather_files_to_copy) self.start_batch_job(batch_info) @@ -136,9 +189,9 @@ def _run_batch_prep(self, tmppath): files to the cloud that the batch jobs will use. This includes: - - Weather files (:func:`_prep_weather_files_for_batch`) - Sampling, and splitting the samples into (at most) ``self.batch_array_size`` batches, and bundling other assets needed for running simulations (:func:`_prep_jobs_for_batch`) + - Weather files (:func:`_prep_weather_files_for_batch`) Those functions place their files to be uploaded into ``tmppath``, and then this will upload them to the cloud using (:func:`upload_batch_files_to_cloud`). @@ -156,33 +209,36 @@ def _run_batch_prep(self, tmppath): :returns: DockerBatchBase.BatchInfo """ - # Weather files - logger.info("Prepping weather files...") - epws_to_copy = self._prep_weather_files_for_batch(tmppath) - # Project configuration - logger.info("Writing project configuration for upload") + logger.info("Writing project configuration for upload...") with open(tmppath / "config.json", "wt", encoding="utf-8") as f: json.dump(self.cfg, f) - # Collect simulations to queue - batch_info = self._prep_jobs_for_batch(tmppath) + # Collect simulations to queue (along with the EPWs those sims need) + logger.info("Preparing simulation batch jobs...") + batch_info, files_needed = self._prep_jobs_for_batch(tmppath) - return (epws_to_copy, batch_info) + # Weather files + logger.info("Prepping weather files...") + epws_to_copy = self._prep_weather_files_for_batch(tmppath, files_needed) - def _prep_weather_files_for_batch(self, tmppath): - """Downloads, if necessary, and extracts weather files to ``self._weather_dir``. + return (epws_to_copy, batch_info) - Because there may be duplicate weather files, this also identifies duplicates to avoid - redundant compression work and bytes uploaded to the cloud. + def _prep_weather_files_for_batch(self, tmppath, weather_files_needed_set): + """Prepare the weather files needed by the batch. - It will put unique files in the ``tmppath`` (in the 'weather' subdir) which will get - uploaded to the cloud along with other batch files. It will also return a list of - duplicates. This will allow the duplicates to be quickly recreated on the cloud via copying - from-cloud-to-cloud. + 1. Downloads, if necessary, and extracts weather files to ``self._weather_dir``. + 2. Ensures that all EPWs needed by the batch are present. + 3. Identifies weather files thare are duplicates to avoid redundant compression work and + bytes uploaded to the cloud. + * Puts unique files in the ``tmppath`` (in the 'weather' subdir) which will get uploaded + to the cloud along with other batch files. + * Returns a list duplicates, which allows them to be quickly recreated on the cloud via + copying from-cloud-to-cloud. :param tmppath: Unique weather files (compressed) will be copied into a 'weather' subdir of this path. + :param weather_files_needed_set: A set of weather filenames needed by the batch. :returns: an array of tuples where the first value is the filename of a file that will be uploaded to cloud storage (because it's in the ``tmppath``), and the second value is the @@ -195,9 +251,23 @@ def _prep_weather_files_for_batch(self, tmppath): # Downloads, if necessary, and extracts weather files to ``self._weather_dir`` self._get_weather_files() + # Ensure all needed weather files are present + logger.info("Ensuring all needed weather files are present...") + weather_files = os.listdir(self.weather_dir) + missing_epws = set() + for needed_epw in weather_files_needed_set: + if needed_epw not in weather_files: + missing_epws.add(needed_epw) + if missing_epws: + raise ValidationError( + "Not all weather files referenced by the sampled buildstock are available. " + f"{len(missing_epws):,} missing EPWs: {missing_epws}." + ) + logger.debug("...all needed weather files are present.") + # Determine the unique weather files - epw_filenames = list(filter(lambda x: x.endswith(".epw"), os.listdir(self.weather_dir))) logger.info("Calculating hashes for weather files") + epw_filenames = list(weather_files_needed_set) epw_hashes = Parallel(n_jobs=-1, verbose=9)( delayed(calc_hash_for_file)(pathlib.Path(self.weather_dir) / epw_filename) for epw_filename in epw_filenames @@ -226,28 +296,30 @@ def _prep_weather_files_for_batch(self, tmppath): ) # Calculate and print savings of duplicate files - total_count = 0 + upload_bytes = 0 dupe_count = 0 dupe_bytes = 0 for epws in unique_epws.values(): count = len(epws) - total_count += count + bytes = os.path.getsize(str(tmp_weather_out_path / epws[0]) + ".gz") + upload_bytes += bytes if count > 1: dupe_count += count - 1 - bytes = os.path.getsize(str(tmp_weather_out_path / epws[0]) + ".gz") * dupe_count - dupe_bytes = bytes * (count - 1) + dupe_bytes += bytes * (count - 1) logger.info( - f"Identified {dupe_count:,} duplicate weather files " - f"({len(unique_epws):,} unique, {total_count:,} total); " - f"saved from uploading {(dupe_bytes / 1024 / 1024):,.1f} MiB" + f"Weather files: {len(weather_files_needed_set):,}/{len(weather_files):,} referenced; " + f"{len(unique_epws):,} unique ({(upload_bytes / 1024 / 1024):,.1f} MiB to upload), " + f"{dupe_count:,} duplicates ({(dupe_bytes / 1024 / 1024):,.1f} MiB saved from uploading)" ) return epws_to_copy def _prep_jobs_for_batch(self, tmppath): """Splits simulations into batches, and prepares asset files needed to run them.""" # Run sampling - generates buildstock.csv + logger.debug("Running sampling....") buildstock_csv_filename = self.sampler.run_sampling() + logger.debug("Validating sampled buildstock...") df = read_csv(buildstock_csv_filename, index_col=0, dtype=str) self.validate_buildstock_csv(self.project_filename, df) building_ids = df.index.tolist() @@ -268,6 +340,10 @@ def _prep_jobs_for_batch(self, tmppath): os.makedirs(tmppath / "jobs") + # Ensure all weather files are available + logger.debug("Determining which weather files are needed...") + files_needed = self._determine_weather_files_needed_for_batch(df) + # Write each batch of simulations to a file. logger.info("Queueing jobs") for i in itertools.count(0): @@ -317,51 +393,40 @@ def _prep_jobs_for_batch(self, tmppath): "lib/housing_characteristics", ) - return DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count) + return ( + DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count), + files_needed, + ) - @classmethod - def get_epws_to_download(cls, sim_dir, jobs_d): + def _determine_weather_files_needed_for_batch(self, buildstock_df): + """ + Gets the list of weather filenames required for a batch of simulations. + :param buildstock_df: DataFrame of the buildstock batch being simulated. + :returns: Set of weather filenames needed for this batch of simulations. """ - Gets the list of filenames for the weather data required for a single batch of simulations. + # Fetch the mapping for building to weather file from options_lookup.tsv + epws_by_option, param_name = _epws_by_option( + pathlib.Path(self.buildstock_dir) / "resources" / "options_lookup.tsv" + ) - :param sim_dir: Path to the directory where job files are stored - :param jobs_d: Contents of a single job JSON file; contains the list of buildings to simulate in this job. + # Iterate over all values in the `param_name` column and collect the referenced EPWs + files_needed = set(["empty.epw", "empty.stat", "empty.ddy"]) + for lookup_value in buildstock_df[param_name]: + if not lookup_value: + raise ValidationError( + f"Encountered a row in buildstock.csv with an empty value in column: {param_name}" + ) - :returns: Set of epw filenames needed for this batch of simulations. - """ - # Make a lookup of which parameter points to the weather file from options_lookup.tsv - with open(sim_dir / "lib" / "resources" / "options_lookup.tsv", "r", encoding="utf-8") as f: - tsv_reader = csv.reader(f, delimiter="\t") - next(tsv_reader) # skip headers - param_name = None - epws_by_option = {} - for row in tsv_reader: - row_has_epw = [x.endswith(".epw") for x in row[2:]] - if sum(row_has_epw): - if row[0] != param_name and param_name is not None: - raise RuntimeError( - "The epw files are specified in options_lookup.tsv under more than one parameter type: " - f"{param_name}, {row[0]}" - ) - epw_filename = row[row_has_epw.index(True) + 2].split("=")[1] - param_name = row[0] - option_name = row[1] - epws_by_option[option_name] = epw_filename - - # Look through the buildstock.csv to find the appropriate location and epw - epws_to_download = set() - building_ids = [x[0] for x in jobs_d["batch"]] - with open( - sim_dir / "lib" / "housing_characteristics" / "buildstock.csv", - "r", - encoding="utf-8", - ) as f: - csv_reader = csv.DictReader(f) - for row in csv_reader: - if int(row["Building"]) in building_ids: - epws_to_download.add(epws_by_option[row[param_name]]) - - return epws_to_download + epw_path = epws_by_option[lookup_value] + if not epw_path: + raise ValidationError(f"Did not find an EPW for '{lookup_value}'") + + # Add just the filenames (without relative path) + root, _ = os.path.splitext(os.path.basename(epw_path)) + files_needed.update((f"{root}.epw", f"{root}.stat", f"{root}.ddy")) + + logger.debug(f"Unique weather files needed for this buildstock: {len(files_needed):,}") + return files_needed @classmethod def run_simulations(cls, cfg, job_id, jobs_d, sim_dir, fs, output_path): @@ -457,7 +522,7 @@ def run_simulations(cls, cfg, job_id, jobs_d, sim_dir, fs, output_path): elif os.path.isfile(item): os.remove(item) - # Upload simulation outputs tarfile to s3 + # Upload simulation outputs tarfile to bucket fs.put( str(simulation_output_tar_filename), f"{output_path}/results/simulation_output/simulations_job{job_id}.tar.gz", diff --git a/buildstockbatch/test/test_docker_base.py b/buildstockbatch/test/test_docker_base.py index 1f1e089b..bc4c9967 100644 --- a/buildstockbatch/test/test_docker_base.py +++ b/buildstockbatch/test/test_docker_base.py @@ -11,6 +11,7 @@ import tempfile from unittest.mock import MagicMock, PropertyMock +from buildstockbatch.cloud import docker_base from buildstockbatch.cloud.docker_base import DockerBatchBase from buildstockbatch.test.shared_testing_stuff import docker_available from buildstockbatch.utils import get_project_configuration @@ -37,21 +38,22 @@ def test_run_batch_prep(basic_residential_project_file, mocker): with tempfile.TemporaryDirectory(prefix="bsb_") as tmpdir: tmppath = pathlib.Path(tmpdir) - epws_to_copy, batch_info = dbb._run_batch_prep(tmppath) + files_to_copy, batch_info = dbb._run_batch_prep(tmppath) sampler_mock.run_sampling.assert_called_once() - # There are three weather files... + # There are three sets of weather files... # * "G2500210.epw" is unique; check for it (gz'd) in tmppath # * "G2601210.epw" and "G2601390.epw" are dupes. One should be in - # tmppath; one should be copied to the other according to ``epws_to_copy`` + # tmppath; one should be copied to the other according to ``files_to_copy`` + # Same for the .ddy and .stat files. assert os.path.isfile(tmppath / "weather" / "G2500210.epw.gz") assert os.path.isfile(tmppath / "weather" / "G2601210.epw.gz") or os.path.isfile( tmppath / "weather" / "G2601390.epw.gz" ) - src, dest = epws_to_copy[0] - assert src in ("G2601210.epw.gz", "G2601390.epw.gz") - assert dest in ("G2601210.epw.gz", "G2601390.epw.gz") - assert src != dest + assert ("G2601210.epw.gz", "G2601390.epw.gz") in files_to_copy or ( + "G2601390.epw.gz", + "G2601210.epw.gz", + ) in files_to_copy # Three job files should be created, with 10 total simulations, split # into batches of 4, 4, and 2 simulations. @@ -79,7 +81,7 @@ def test_run_batch_prep(basic_residential_project_file, mocker): assert [building, 0] in simulations -def test_get_epws_to_download(): +def test_get_weather_files_to_download(): resources_dir_path = pathlib.Path(resources_dir) options_file = resources_dir_path / "options_lookup.tsv" buildstock_file = resources_dir_path / "buildstock_good.csv" @@ -100,8 +102,18 @@ def test_get_epws_to_download(): ], } - epws = DockerBatchBase.get_epws_to_download(sim_dir, jobs_d) - assert epws == set(["weather/G0100970.epw", "weather/G0100830.epw"]) + files = docker_base.determine_weather_files_needed_for_job(sim_dir, jobs_d) + assert files == { + "empty.epw", + "empty.stat", + "empty.ddy", + "weather/G2500210.epw", + "weather/G2601390.epw", + "weather/G2500210.ddy", + "weather/G2601390.ddy", + "weather/G2500210.stat", + "weather/G2601390.stat", + } def test_run_simulations(basic_residential_project_file): diff --git a/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/buildstock_good.csv b/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/buildstock_good.csv index d69de02c..0a98f57f 100644 --- a/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/buildstock_good.csv +++ b/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/buildstock_good.csv @@ -1,6 +1,6 @@ Building,Bedroom,Location,Vintage,State,Insulation Wall,Insulation Slab,Zipcode,County 1,1,AL_Mobile-Rgnl.AP.722230,<1950,CO,Good Option,None,36608,County1 2,3,AL_Mobile-Rgnl.AP.722230,1940s,CO,Good Option,None,36601,County1 -3,2,AL_Mobile-Rgnl.AP.722230,2010s,VA,Good Option,None,36602,County1 +3,2,AL_Mobile-Rgnl.AP.722230,2010s,VA,Good Option,None,36602,County2 4,1,AL_Mobile-Rgnl.AP.722230,2000s,VA,Good Option,None,36603,County2 -5,2,AL_Mobile-Rgnl.AP.722230,1970s,VA,Good Option,None,36604,County2 +5,2,AL_Mobile-Rgnl.AP.722230,1970s,VA,Good Option,None,36604,County3 diff --git a/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/options_lookup.tsv b/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/options_lookup.tsv index 73032a93..39905259 100644 --- a/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/options_lookup.tsv +++ b/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/options_lookup.tsv @@ -14,8 +14,9 @@ Vintage 2000s Vintage 2010s State VA State CO -County County1 weather_station_epw_filepath=weather/G0100970.epw -County County2 weather_station_epw_filepath=weather/G0100830.epw +County County1 weather_station_epw_filepath=weather/G2500210.epw +County County2 weather_station_epw_filepath=weather/G2601210.epw +County County3 weather_station_epw_filepath=weather/G2601390.epw Bedroom 1 Bedroom 2 Bedroom 3 diff --git a/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/weather.zip b/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/weather.zip index 25b5f82f..137e985a 100644 Binary files a/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/weather.zip and b/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/weather.zip differ diff --git a/docs/changelog/changelog_dev.rst b/docs/changelog/changelog_dev.rst index e6161959..5c92c23c 100644 --- a/docs/changelog/changelog_dev.rst +++ b/docs/changelog/changelog_dev.rst @@ -53,3 +53,10 @@ Development Changelog :pullreq: 437 Add a ``step_failures`` section to json results files with error messages from OpenStudio simulations. + + .. change:: + :tags: general + :pullreq: 436 + + Clean up handling of weather files in GCP/AWS implementations: only upload files that are required, + and fail with clearer messaging if any files are missing.