Skip to content

Commit

Permalink
Add validation ensuring all EPW weather files needed by the batch are…
Browse files Browse the repository at this point in the history
… present.

If any EPW is missing, the script fails before starting the batch (rather than during the batch).

Also now only compresses and uploads EPWs actually used by the batch (rather than all of them).
  • Loading branch information
lathanh committed Dec 13, 2023
1 parent 7f3cae7 commit 0bb3703
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 57 deletions.
2 changes: 1 addition & 1 deletion buildstockbatch/aws/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -1695,7 +1695,7 @@ def run_job(cls, job_id, bucket, prefix, job_name, region):
weather_dir = sim_dir / "weather"
os.makedirs(weather_dir, exist_ok=True)

epws_to_download = cls.get_epws_to_download(sim_dir, jobs_d)
epws_to_download = DockerBatchBase.determine_epws_needed_for_job(sim_dir, jobs_d)

# Download the epws needed for these simulations
for epw_filename in epws_to_download:
Expand Down
160 changes: 111 additions & 49 deletions buildstockbatch/cloud/docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import docker
from dataclasses import dataclass
from fsspec.implementations.local import LocalFileSystem
import glob
import gzip
import itertools
from joblib import Parallel, delayed
Expand All @@ -29,7 +30,7 @@
import time

from buildstockbatch import postprocessing
from buildstockbatch.base import BuildStockBatchBase
from buildstockbatch.base import BuildStockBatchBase, ValidationError
from buildstockbatch.utils import ContainerRuntime, calc_hash_for_file, compress_file, read_csv

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -129,9 +130,9 @@ def _run_batch_prep(self, tmppath):
files to the cloud that the batch jobs will use.
This includes:
- Weather files (:func:`_prep_weather_files_for_batch`)
- Sampling, and splitting the samples into (at most) ``self.batch_array_size`` batches,
and bundling other assets needed for running simulations (:func:`_prep_jobs_for_batch`)
- Weather files (:func:`_prep_weather_files_for_batch`)
Those functions place their files to be uploaded into ``tmppath``, and then this will upload
them to the cloud using (:func:`upload_batch_files_to_cloud`).
Expand All @@ -149,33 +150,36 @@ def _run_batch_prep(self, tmppath):
:returns: DockerBatchBase.BatchInfo
"""

# Weather files
logger.info("Prepping weather files...")
epws_to_copy = self._prep_weather_files_for_batch(tmppath)

# Project configuration
logger.info("Writing project configuration for upload")
logger.info("Writing project configuration for upload...")
with open(tmppath / "config.json", "wt", encoding="utf-8") as f:
json.dump(self.cfg, f)

# Collect simulations to queue
batch_info = self._prep_jobs_for_batch(tmppath)
# Collect simulations to queue (along with the EPWs those sims need)
logger.info("Preparing simulation batch jobs...")
batch_info, epws_needed = self._prep_jobs_for_batch(tmppath)

return (epws_to_copy, batch_info)
# Weather files
logger.info("Prepping weather files...")
epws_to_copy = self._prep_weather_files_for_batch(tmppath, epws_needed)

def _prep_weather_files_for_batch(self, tmppath):
"""Downloads, if necessary, and extracts weather files to ``self._weather_dir``.
return (epws_to_copy, batch_info)

Because there may be duplicate weather files, this also identifies duplicates to avoid
redundant compression work and bytes uploaded to the cloud.
def _prep_weather_files_for_batch(self, tmppath, epws_needed_set):
"""Prepare the weather files (EPWs) needed by the batch.
It will put unique files in the ``tmppath`` (in the 'weather' subdir) which will get
uploaded to the cloud along with other batch files. It will also return a list of
duplicates. This will allow the duplicates to be quickly recreated on the cloud via copying
from-cloud-to-cloud.
1. Downloads, if necessary, and extracts weather files to ``self._weather_dir``.
2. Ensures that all EPWs needed by the batch are present.
3. Identifies weather files thare are duplicates to avoid redundant compression work and
bytes uploaded to the cloud.
* Puts unique files in the ``tmppath`` (in the 'weather' subdir) which will get uploaded
to the cloud along with other batch files.
* Returns a list duplicates, which allows them to be quickly recreated on the cloud via
copying from-cloud-to-cloud.
:param tmppath: Unique weather files (compressed) will be copied into a 'weather' subdir
of this path.
:param epws_needed_set: A set of weather filenames needed by the batch.
:returns: an array of tuples where the first value is the filename of a file that will be
uploaded to cloud storage (because it's in the ``tmppath``), and the second value is the
Expand All @@ -188,9 +192,23 @@ def _prep_weather_files_for_batch(self, tmppath):
# Downloads, if necessary, and extracts weather files to ``self._weather_dir``
self._get_weather_files()

# Ensure all needed EPWs are present
logger.info("Ensuring all needed weather files are present...")
epw_files = set(map(lambda x: x.split("/")[-1], glob.glob(f"{self.weather_dir}/*.epw")))
missing_epws = set()
for needed_epw in epws_needed_set:
if needed_epw not in epw_files:
missing_epws.add(needed_epw)
if missing_epws:
raise ValidationError(
"Not all weather files referenced by the buildstock are available. "
f"{len(missing_epws):,} missing EPWs: {missing_epws}."
)
logger.debug("...all needed weather files are present.")

# Determine the unique weather files
epw_filenames = list(filter(lambda x: x.endswith(".epw"), os.listdir(self.weather_dir)))
logger.info("Calculating hashes for weather files")
epw_filenames = list(epws_needed_set)
epw_hashes = Parallel(n_jobs=-1, verbose=9)(
delayed(calc_hash_for_file)(pathlib.Path(self.weather_dir) / epw_filename)
for epw_filename in epw_filenames
Expand Down Expand Up @@ -219,28 +237,30 @@ def _prep_weather_files_for_batch(self, tmppath):
)

# Calculate and print savings of duplicate files
total_count = 0
upload_bytes = 0
dupe_count = 0
dupe_bytes = 0
for epws in unique_epws.values():
count = len(epws)
total_count += count
bytes = os.path.getsize(str(tmp_weather_out_path / epws[0]) + ".gz")
upload_bytes += bytes
if count > 1:
dupe_count += count - 1
bytes = os.path.getsize(str(tmp_weather_out_path / epws[0]) + ".gz") * dupe_count
dupe_bytes = bytes * (count - 1)
dupe_bytes += bytes * (count - 1)
logger.info(
f"Identified {dupe_count:,} duplicate weather files "
f"({len(unique_epws):,} unique, {total_count:,} total); "
f"saved from uploading {(dupe_bytes / 1024 / 1024):,.1f} MiB"
f"Weather files: {len(epws_needed_set):,}/{len(epw_files):,} referenced; "
f"{len(unique_epws):,} unique ({(upload_bytes / 1024 / 1024):,.1f} MiB to upload), "
f"{dupe_count:,} duplicates ({(dupe_bytes / 1024 / 1024):,.1f} MiB saved from uploading)"
)
return epws_to_copy

def _prep_jobs_for_batch(self, tmppath):
"""Splits simulations into batches, and prepares asset files needed to run them."""
# Run sampling - generates buildstock.csv
logger.debug("Running sampling....")
buildstock_csv_filename = self.sampler.run_sampling()

logger.debug("Validating sampled buildstock...")
df = read_csv(buildstock_csv_filename, index_col=0, dtype=str)
self.validate_buildstock_csv(self.project_filename, df)
building_ids = df.index.tolist()
Expand All @@ -266,6 +286,10 @@ def _prep_jobs_for_batch(self, tmppath):

os.makedirs(tmppath / "jobs")

# Ensure all weather files are available
logger.debug("Determining which weather files are needed...")
epws_needed = self._determine_epws_needed_for_batch(df)

# Write each batch of simulations to a file.
logger.info("Queueing jobs")
for i in itertools.count(0):
Expand Down Expand Up @@ -315,36 +339,53 @@ def _prep_jobs_for_batch(self, tmppath):
"lib/housing_characteristics",
)

return DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count)
return DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count), epws_needed

@classmethod
def get_epws_to_download(cls, sim_dir, jobs_d):
def _determine_epws_needed_for_batch(self, buildstock_df):
"""
Gets the list of filenames for the weather data required for a single batch of simulations.
Gets the list of EPW filenames required for a batch of simulations.
:param buildstock_df: DataFrame of the buildstock batch being simulated.
:returns: Set of EPW filenames needed for this batch of simulations.
"""
# Fetch the mapping for building to weather file from options_lookup.tsv
epws_by_option, param_name = DockerBatchBase.epws_by_option(
pathlib.Path(self.buildstock_dir) / "resources" / "options_lookup.tsv"
)

# Iterate over all values in the `param_name` column and collect the referenced EPWs
epws_needed = set()
for lookup_value in buildstock_df[param_name]:
if not lookup_value:
raise ValidationError(
f"Encountered a row in buildstock.csv with an empty value in column: {param_name}"
)

epw_path = epws_by_option[lookup_value]
if not epw_path:
raise ValidationError(f"Did not find an EPW for '{lookup_value}'")

# Add just the filename (without relative path)
epws_needed.add(epw_path.split("/")[-1])

logger.debug(f"Unique EPWs needed for this buildstock: {len(epws_needed):,}")
return epws_needed

@staticmethod
def determine_epws_needed_for_job(sim_dir, jobs_d):
"""
Gets the list of filenames for the weather data required for a job of simulations.
:param sim_dir: Path to the directory where job files are stored
:param jobs_d: Contents of a single job JSON file; contains the list of buildings to simulate in this job.
:returns: Set of epw filenames needed for this batch of simulations.
:returns: Set of epw filenames needed for this job of simulations.
"""
# Make a lookup of which parameter points to the weather file from options_lookup.tsv
with open(sim_dir / "lib" / "resources" / "options_lookup.tsv", "r", encoding="utf-8") as f:
tsv_reader = csv.reader(f, delimiter="\t")
next(tsv_reader) # skip headers
param_name = None
epws_by_option = {}
for row in tsv_reader:
row_has_epw = [x.endswith(".epw") for x in row[2:]]
if sum(row_has_epw):
if row[0] != param_name and param_name is not None:
raise RuntimeError(
"The epw files are specified in options_lookup.tsv under more than one parameter type: "
f"{param_name}, {row[0]}"
)
epw_filename = row[row_has_epw.index(True) + 2].split("=")[1]
param_name = row[0]
option_name = row[1]
epws_by_option[option_name] = epw_filename
# Fetch the mapping for building to weather file from options_lookup.tsv
epws_by_option, param_name = DockerBatchBase.epws_by_option(
sim_dir / "lib" / "resources" / "options_lookup.tsv"
)

# Look through the buildstock.csv to find the appropriate location and epw
epws_to_download = set()
Expand All @@ -361,6 +402,27 @@ def get_epws_to_download(cls, sim_dir, jobs_d):

return epws_to_download

@staticmethod
def epws_by_option(options_lookup_path):
epws_by_option = {}
with open(options_lookup_path, "r", encoding="utf-8") as f:
tsv_reader = csv.reader(f, delimiter="\t")
next(tsv_reader) # skip headers
param_name = None
for row in tsv_reader:
row_has_epw = [x.endswith(".epw") for x in row[2:]]
if sum(row_has_epw):
if row[0] != param_name and param_name is not None:
raise RuntimeError(
"The epw files are specified in options_lookup.tsv under more than one parameter "
f"type: {param_name}, {row[0]}"
) # noqa: E501
epw_filename = row[row_has_epw.index(True) + 2].split("=")[1]
param_name = row[0]
option_name = row[1]
epws_by_option[option_name] = epw_filename
return (epws_by_option, param_name)

@classmethod
def run_simulations(cls, cfg, job_id, jobs_d, sim_dir, fs, output_path):
"""
Expand Down
2 changes: 1 addition & 1 deletion buildstockbatch/gcp/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ def run_task(cls, task_index, job_name, gcs_bucket, gcs_prefix):
weather_dir = sim_dir / "weather"
os.makedirs(weather_dir, exist_ok=True)

epws_to_download = cls.get_epws_to_download(sim_dir, jobs_d)
epws_to_download = DockerBatchBase.determine_epws_needed_for_job(sim_dir, jobs_d)

# Download and unzip the epws needed for these simulations
for epw_filename in epws_to_download:
Expand Down
4 changes: 2 additions & 2 deletions buildstockbatch/test/test_docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ def test_get_epws_to_download():
],
}

epws = DockerBatchBase.get_epws_to_download(sim_dir, jobs_d)
assert epws == set(["weather/G0100970.epw", "weather/G0100830.epw"])
epws = DockerBatchBase.determine_epws_needed_for_job(sim_dir, jobs_d)
assert epws == {"weather/G2500210.epw", "weather/G2601390.epw"}


def test_run_simulations(basic_residential_project_file):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Building,Bedroom,Location,Vintage,State,Insulation Wall,Insulation Slab,Zipcode,County
1,1,AL_Mobile-Rgnl.AP.722230,<1950,CO,Good Option,None,36608,County1
2,3,AL_Mobile-Rgnl.AP.722230,1940s,CO,Good Option,None,36601,County1
3,2,AL_Mobile-Rgnl.AP.722230,2010s,VA,Good Option,None,36602,County1
3,2,AL_Mobile-Rgnl.AP.722230,2010s,VA,Good Option,None,36602,County2
4,1,AL_Mobile-Rgnl.AP.722230,2000s,VA,Good Option,None,36603,County2
5,2,AL_Mobile-Rgnl.AP.722230,1970s,VA,Good Option,None,36604,County2
5,2,AL_Mobile-Rgnl.AP.722230,1970s,VA,Good Option,None,36604,County3
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ Vintage 2000s
Vintage 2010s
State VA
State CO
County County1 weather_station_epw_filepath=weather/G0100970.epw
County County2 weather_station_epw_filepath=weather/G0100830.epw
County County1 weather_station_epw_filepath=weather/G2500210.epw
County County2 weather_station_epw_filepath=weather/G2601210.epw
County County3 weather_station_epw_filepath=weather/G2601390.epw
Bedroom 1
Bedroom 2
Bedroom 3
Expand Down

0 comments on commit 0bb3703

Please sign in to comment.