From 0bb3703da79bf7e6546348156c7d69397c837f35 Mon Sep 17 00:00:00 2001
From: Robert LaThanh <lathanh@google.com>
Date: Wed, 13 Dec 2023 14:07:38 -0800
Subject: [PATCH] Add validation ensuring all EPW weather files needed by the
 batch are present.

If any EPW is missing, the script fails before starting the batch (rather than during the batch).

Also now only compresses and uploads EPWs actually used by the batch (rather than all of them).
---
 buildstockbatch/aws/aws.py                    |   2 +-
 buildstockbatch/cloud/docker_base.py          | 160 ++++++++++++------
 buildstockbatch/gcp/gcp.py                    |   2 +-
 buildstockbatch/test/test_docker_base.py      |   4 +-
 .../resources/buildstock_good.csv             |   4 +-
 .../resources/options_lookup.tsv              |   5 +-
 6 files changed, 120 insertions(+), 57 deletions(-)

diff --git a/buildstockbatch/aws/aws.py b/buildstockbatch/aws/aws.py
index 9bdc894a..d03ad947 100644
--- a/buildstockbatch/aws/aws.py
+++ b/buildstockbatch/aws/aws.py
@@ -1695,7 +1695,7 @@ def run_job(cls, job_id, bucket, prefix, job_name, region):
         weather_dir = sim_dir / "weather"
         os.makedirs(weather_dir, exist_ok=True)
 
-        epws_to_download = cls.get_epws_to_download(sim_dir, jobs_d)
+        epws_to_download = DockerBatchBase.determine_epws_needed_for_job(sim_dir, jobs_d)
 
         # Download the epws needed for these simulations
         for epw_filename in epws_to_download:
diff --git a/buildstockbatch/cloud/docker_base.py b/buildstockbatch/cloud/docker_base.py
index 48f81292..68cb6a99 100644
--- a/buildstockbatch/cloud/docker_base.py
+++ b/buildstockbatch/cloud/docker_base.py
@@ -13,6 +13,7 @@
 import docker
 from dataclasses import dataclass
 from fsspec.implementations.local import LocalFileSystem
+import glob
 import gzip
 import itertools
 from joblib import Parallel, delayed
@@ -29,7 +30,7 @@
 import time
 
 from buildstockbatch import postprocessing
-from buildstockbatch.base import BuildStockBatchBase
+from buildstockbatch.base import BuildStockBatchBase, ValidationError
 from buildstockbatch.utils import ContainerRuntime, calc_hash_for_file, compress_file, read_csv
 
 logger = logging.getLogger(__name__)
@@ -129,9 +130,9 @@ def _run_batch_prep(self, tmppath):
         files to the cloud that the batch jobs will use.
 
         This includes:
-            - Weather files (:func:`_prep_weather_files_for_batch`)
             - Sampling, and splitting the samples into (at most) ``self.batch_array_size`` batches,
               and bundling other assets needed for running simulations (:func:`_prep_jobs_for_batch`)
+            - Weather files (:func:`_prep_weather_files_for_batch`)
 
         Those functions place their files to be uploaded into ``tmppath``, and then this will upload
         them to the cloud using (:func:`upload_batch_files_to_cloud`).
@@ -149,33 +150,36 @@ def _run_batch_prep(self, tmppath):
         :returns: DockerBatchBase.BatchInfo
         """
 
-        # Weather files
-        logger.info("Prepping weather files...")
-        epws_to_copy = self._prep_weather_files_for_batch(tmppath)
-
         # Project configuration
-        logger.info("Writing project configuration for upload")
+        logger.info("Writing project configuration for upload...")
         with open(tmppath / "config.json", "wt", encoding="utf-8") as f:
             json.dump(self.cfg, f)
 
-        # Collect simulations to queue
-        batch_info = self._prep_jobs_for_batch(tmppath)
+        # Collect simulations to queue (along with the EPWs those sims need)
+        logger.info("Preparing simulation batch jobs...")
+        batch_info, epws_needed = self._prep_jobs_for_batch(tmppath)
 
-        return (epws_to_copy, batch_info)
+        # Weather files
+        logger.info("Prepping weather files...")
+        epws_to_copy = self._prep_weather_files_for_batch(tmppath, epws_needed)
 
-    def _prep_weather_files_for_batch(self, tmppath):
-        """Downloads, if necessary, and extracts weather files to ``self._weather_dir``.
+        return (epws_to_copy, batch_info)
 
-        Because there may be duplicate weather files, this also identifies duplicates to avoid
-        redundant compression work and bytes uploaded to the cloud.
+    def _prep_weather_files_for_batch(self, tmppath, epws_needed_set):
+        """Prepare the weather files (EPWs) needed by the batch.
 
-        It will put unique files in the ``tmppath`` (in the 'weather' subdir) which will get
-        uploaded to the cloud along with other batch files. It will also return a list of
-        duplicates. This will allow the duplicates to be quickly recreated on the cloud via copying
-        from-cloud-to-cloud.
+        1. Downloads, if necessary, and extracts weather files to ``self._weather_dir``.
+        2. Ensures that all EPWs needed by the batch are present.
+        3. Identifies weather files thare are duplicates to avoid redundant compression work and
+           bytes uploaded to the cloud.
+            * Puts unique files in the ``tmppath`` (in the 'weather' subdir) which will get uploaded
+              to the cloud along with other batch files.
+            * Returns a list duplicates, which allows them to be quickly recreated on the cloud via
+              copying from-cloud-to-cloud.
 
         :param tmppath: Unique weather files (compressed) will be copied into a 'weather' subdir
             of this path.
+        :param epws_needed_set: A set of weather filenames needed by the batch.
 
         :returns: an array of tuples where the first value is the filename of a file that will be
             uploaded to cloud storage (because it's in the ``tmppath``), and the second value is the
@@ -188,9 +192,23 @@ def _prep_weather_files_for_batch(self, tmppath):
             # Downloads, if necessary, and extracts weather files to ``self._weather_dir``
             self._get_weather_files()
 
+            # Ensure all needed EPWs are present
+            logger.info("Ensuring all needed weather files are present...")
+            epw_files = set(map(lambda x: x.split("/")[-1], glob.glob(f"{self.weather_dir}/*.epw")))
+            missing_epws = set()
+            for needed_epw in epws_needed_set:
+                if needed_epw not in epw_files:
+                    missing_epws.add(needed_epw)
+            if missing_epws:
+                raise ValidationError(
+                    "Not all weather files referenced by the buildstock are available. "
+                    f"{len(missing_epws):,} missing EPWs: {missing_epws}."
+                )
+            logger.debug("...all needed weather files are present.")
+
             # Determine the unique weather files
-            epw_filenames = list(filter(lambda x: x.endswith(".epw"), os.listdir(self.weather_dir)))
             logger.info("Calculating hashes for weather files")
+            epw_filenames = list(epws_needed_set)
             epw_hashes = Parallel(n_jobs=-1, verbose=9)(
                 delayed(calc_hash_for_file)(pathlib.Path(self.weather_dir) / epw_filename)
                 for epw_filename in epw_filenames
@@ -219,28 +237,30 @@ def _prep_weather_files_for_batch(self, tmppath):
             )
 
             # Calculate and print savings of duplicate files
-            total_count = 0
+            upload_bytes = 0
             dupe_count = 0
             dupe_bytes = 0
             for epws in unique_epws.values():
                 count = len(epws)
-                total_count += count
+                bytes = os.path.getsize(str(tmp_weather_out_path / epws[0]) + ".gz")
+                upload_bytes += bytes
                 if count > 1:
                     dupe_count += count - 1
-                    bytes = os.path.getsize(str(tmp_weather_out_path / epws[0]) + ".gz") * dupe_count
-                    dupe_bytes = bytes * (count - 1)
+                    dupe_bytes += bytes * (count - 1)
             logger.info(
-                f"Identified {dupe_count:,} duplicate weather files "
-                f"({len(unique_epws):,} unique, {total_count:,} total); "
-                f"saved from uploading {(dupe_bytes / 1024 / 1024):,.1f} MiB"
+                f"Weather files: {len(epws_needed_set):,}/{len(epw_files):,} referenced; "
+                f"{len(unique_epws):,} unique ({(upload_bytes / 1024 / 1024):,.1f} MiB to upload), "
+                f"{dupe_count:,} duplicates ({(dupe_bytes / 1024 / 1024):,.1f} MiB saved from uploading)"
             )
             return epws_to_copy
 
     def _prep_jobs_for_batch(self, tmppath):
         """Splits simulations into batches, and prepares asset files needed to run them."""
         # Run sampling - generates buildstock.csv
+        logger.debug("Running sampling....")
         buildstock_csv_filename = self.sampler.run_sampling()
 
+        logger.debug("Validating sampled buildstock...")
         df = read_csv(buildstock_csv_filename, index_col=0, dtype=str)
         self.validate_buildstock_csv(self.project_filename, df)
         building_ids = df.index.tolist()
@@ -266,6 +286,10 @@ def _prep_jobs_for_batch(self, tmppath):
 
         os.makedirs(tmppath / "jobs")
 
+        # Ensure all weather files are available
+        logger.debug("Determining which weather files are needed...")
+        epws_needed = self._determine_epws_needed_for_batch(df)
+
         # Write each batch of simulations to a file.
         logger.info("Queueing jobs")
         for i in itertools.count(0):
@@ -315,36 +339,53 @@ def _prep_jobs_for_batch(self, tmppath):
                 "lib/housing_characteristics",
             )
 
-        return DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count)
+        return DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count), epws_needed
 
-    @classmethod
-    def get_epws_to_download(cls, sim_dir, jobs_d):
+    def _determine_epws_needed_for_batch(self, buildstock_df):
         """
-        Gets the list of filenames for the weather data required for a single batch of simulations.
+        Gets the list of EPW filenames required for a batch of simulations.
+
+        :param buildstock_df: DataFrame of the buildstock batch being simulated.
+
+        :returns: Set of EPW filenames needed for this batch of simulations.
+        """
+        # Fetch the mapping for building to weather file from options_lookup.tsv
+        epws_by_option, param_name = DockerBatchBase.epws_by_option(
+            pathlib.Path(self.buildstock_dir) / "resources" / "options_lookup.tsv"
+        )
+
+        # Iterate over all values in the `param_name` column and collect the referenced EPWs
+        epws_needed = set()
+        for lookup_value in buildstock_df[param_name]:
+            if not lookup_value:
+                raise ValidationError(
+                    f"Encountered a row in buildstock.csv with an empty value in column: {param_name}"
+                )
+
+            epw_path = epws_by_option[lookup_value]
+            if not epw_path:
+                raise ValidationError(f"Did not find an EPW for '{lookup_value}'")
+
+            # Add just the filename (without relative path)
+            epws_needed.add(epw_path.split("/")[-1])
+
+        logger.debug(f"Unique EPWs needed for this buildstock: {len(epws_needed):,}")
+        return epws_needed
+
+    @staticmethod
+    def determine_epws_needed_for_job(sim_dir, jobs_d):
+        """
+        Gets the list of filenames for the weather data required for a job of simulations.
 
         :param sim_dir: Path to the directory where job files are stored
         :param jobs_d: Contents of a single job JSON file; contains the list of buildings to simulate in this job.
 
-        :returns: Set of epw filenames needed for this batch of simulations.
+        :returns: Set of epw filenames needed for this job of simulations.
         """
-        # Make a lookup of which parameter points to the weather file from options_lookup.tsv
-        with open(sim_dir / "lib" / "resources" / "options_lookup.tsv", "r", encoding="utf-8") as f:
-            tsv_reader = csv.reader(f, delimiter="\t")
-            next(tsv_reader)  # skip headers
-            param_name = None
-            epws_by_option = {}
-            for row in tsv_reader:
-                row_has_epw = [x.endswith(".epw") for x in row[2:]]
-                if sum(row_has_epw):
-                    if row[0] != param_name and param_name is not None:
-                        raise RuntimeError(
-                            "The epw files are specified in options_lookup.tsv under more than one parameter type: "
-                            f"{param_name}, {row[0]}"
-                        )
-                    epw_filename = row[row_has_epw.index(True) + 2].split("=")[1]
-                    param_name = row[0]
-                    option_name = row[1]
-                    epws_by_option[option_name] = epw_filename
+        # Fetch the mapping for building to weather file from options_lookup.tsv
+        epws_by_option, param_name = DockerBatchBase.epws_by_option(
+            sim_dir / "lib" / "resources" / "options_lookup.tsv"
+        )
 
         # Look through the buildstock.csv to find the appropriate location and epw
         epws_to_download = set()
@@ -361,6 +402,27 @@ def get_epws_to_download(cls, sim_dir, jobs_d):
 
         return epws_to_download
 
+    @staticmethod
+    def epws_by_option(options_lookup_path):
+        epws_by_option = {}
+        with open(options_lookup_path, "r", encoding="utf-8") as f:
+            tsv_reader = csv.reader(f, delimiter="\t")
+            next(tsv_reader)  # skip headers
+            param_name = None
+            for row in tsv_reader:
+                row_has_epw = [x.endswith(".epw") for x in row[2:]]
+                if sum(row_has_epw):
+                    if row[0] != param_name and param_name is not None:
+                        raise RuntimeError(
+                            "The epw files are specified in options_lookup.tsv under more than one parameter "
+                            f"type: {param_name}, {row[0]}"
+                        )  # noqa: E501
+                    epw_filename = row[row_has_epw.index(True) + 2].split("=")[1]
+                    param_name = row[0]
+                    option_name = row[1]
+                    epws_by_option[option_name] = epw_filename
+        return (epws_by_option, param_name)
+
     @classmethod
     def run_simulations(cls, cfg, job_id, jobs_d, sim_dir, fs, output_path):
         """
diff --git a/buildstockbatch/gcp/gcp.py b/buildstockbatch/gcp/gcp.py
index 8bf73996..be8a3de8 100644
--- a/buildstockbatch/gcp/gcp.py
+++ b/buildstockbatch/gcp/gcp.py
@@ -720,7 +720,7 @@ def run_task(cls, task_index, job_name, gcs_bucket, gcs_prefix):
         weather_dir = sim_dir / "weather"
         os.makedirs(weather_dir, exist_ok=True)
 
-        epws_to_download = cls.get_epws_to_download(sim_dir, jobs_d)
+        epws_to_download = DockerBatchBase.determine_epws_needed_for_job(sim_dir, jobs_d)
 
         # Download and unzip the epws needed for these simulations
         for epw_filename in epws_to_download:
diff --git a/buildstockbatch/test/test_docker_base.py b/buildstockbatch/test/test_docker_base.py
index cf4d4b25..4ee97c9a 100644
--- a/buildstockbatch/test/test_docker_base.py
+++ b/buildstockbatch/test/test_docker_base.py
@@ -97,8 +97,8 @@ def test_get_epws_to_download():
             ],
         }
 
-        epws = DockerBatchBase.get_epws_to_download(sim_dir, jobs_d)
-        assert epws == set(["weather/G0100970.epw", "weather/G0100830.epw"])
+        epws = DockerBatchBase.determine_epws_needed_for_job(sim_dir, jobs_d)
+        assert epws == {"weather/G2500210.epw", "weather/G2601390.epw"}
 
 
 def test_run_simulations(basic_residential_project_file):
diff --git a/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/buildstock_good.csv b/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/buildstock_good.csv
index d69de02c..0a98f57f 100644
--- a/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/buildstock_good.csv
+++ b/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/buildstock_good.csv
@@ -1,6 +1,6 @@
 Building,Bedroom,Location,Vintage,State,Insulation Wall,Insulation Slab,Zipcode,County
 1,1,AL_Mobile-Rgnl.AP.722230,<1950,CO,Good Option,None,36608,County1
 2,3,AL_Mobile-Rgnl.AP.722230,1940s,CO,Good Option,None,36601,County1
-3,2,AL_Mobile-Rgnl.AP.722230,2010s,VA,Good Option,None,36602,County1
+3,2,AL_Mobile-Rgnl.AP.722230,2010s,VA,Good Option,None,36602,County2
 4,1,AL_Mobile-Rgnl.AP.722230,2000s,VA,Good Option,None,36603,County2
-5,2,AL_Mobile-Rgnl.AP.722230,1970s,VA,Good Option,None,36604,County2
+5,2,AL_Mobile-Rgnl.AP.722230,1970s,VA,Good Option,None,36604,County3
diff --git a/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/options_lookup.tsv b/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/options_lookup.tsv
index 73032a93..39905259 100644
--- a/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/options_lookup.tsv
+++ b/buildstockbatch/test/test_inputs/test_openstudio_buildstock/resources/options_lookup.tsv
@@ -14,8 +14,9 @@ Vintage	2000s
 Vintage	2010s
 State	VA
 State	CO
-County	County1	weather_station_epw_filepath=weather/G0100970.epw
-County	County2	weather_station_epw_filepath=weather/G0100830.epw
+County	County1	weather_station_epw_filepath=weather/G2500210.epw
+County	County2	weather_station_epw_filepath=weather/G2601210.epw
+County	County3	weather_station_epw_filepath=weather/G2601390.epw
 Bedroom	1
 Bedroom	2
 Bedroom	3