Skip to content

Commit

Permalink
Merge pull request NREL#436 from rewiringamerica/epw_checks
Browse files Browse the repository at this point in the history
Improve handling of weather files in AWS/GCP
  • Loading branch information
nweires authored Mar 26, 2024
2 parents 59a307d + c0b84f1 commit f9cf6ac
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 122 deletions.
41 changes: 4 additions & 37 deletions buildstockbatch/aws/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import io
import yaml

from buildstockbatch.base import ValidationError
from buildstockbatch.aws.awsbase import AwsJobBase, boto_client_config
from buildstockbatch.cloud.docker_base import DockerBatchBase
from buildstockbatch.base import ValidationError
from buildstockbatch.cloud import docker_base
from buildstockbatch.utils import (
log_error_details,
get_project_configuration,
Expand Down Expand Up @@ -947,7 +947,7 @@ def clean(self):
raise error


class AwsBatch(DockerBatchBase):
class AwsBatch(docker_base.DockerBatchBase):
def __init__(self, project_filename):
super().__init__(project_filename)

Expand Down Expand Up @@ -1003,10 +1003,6 @@ def validate_project(project_file):
def docker_image(self):
return "nrel/buildstockbatch"

@property
def weather_dir(self):
return self._weather_dir

@property
def results_dir(self):
return f"{self.s3_bucket}/{self.s3_bucket_prefix}/results"
Expand Down Expand Up @@ -1275,36 +1271,7 @@ def run_job(cls, job_id, bucket, prefix, job_name, region):
weather_dir = sim_dir / "weather"
os.makedirs(weather_dir, exist_ok=True)

# Make a lookup of which parameter points to the weather file from options_lookup.tsv
with open(sim_dir / "lib" / "resources" / "options_lookup.tsv", "r", encoding="utf-8") as f:
tsv_reader = csv.reader(f, delimiter="\t")
next(tsv_reader) # skip headers
param_name = None
epws_by_option = {}
for row in tsv_reader:
row_has_epw = [x.endswith(".epw") for x in row[2:]]
if sum(row_has_epw):
if row[0] != param_name and param_name is not None:
raise RuntimeError(
f"The epw files are specified in options_lookup.tsv under more than one parameter type: {param_name}, {row[0]}"
) # noqa: E501
epw_filename = row[row_has_epw.index(True) + 2].split("=")[1].split("/")[-1]
param_name = row[0]
option_name = row[1]
epws_by_option[option_name] = epw_filename

# Look through the buildstock.csv to find the appropriate location and epw
epws_to_download = set()
building_ids = [x[0] for x in jobs_d["batch"]]
with open(
sim_dir / "lib" / "housing_characteristics" / "buildstock.csv",
"r",
encoding="utf-8",
) as f:
csv_reader = csv.DictReader(f)
for row in csv_reader:
if int(row["Building"]) in building_ids:
epws_to_download.add(epws_by_option[row[param_name]])
epws_to_download = docker_base.determine_weather_files_needed_for_job(sim_dir, jobs_d)

# Download the epws needed for these simulations
for epw_filename in epws_to_download:
Expand Down
207 changes: 136 additions & 71 deletions buildstockbatch/cloud/docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,65 @@
import time

from buildstockbatch import postprocessing
from buildstockbatch.base import BuildStockBatchBase
from buildstockbatch.base import BuildStockBatchBase, ValidationError
from buildstockbatch.utils import ContainerRuntime, calc_hash_for_file, compress_file, read_csv, get_bool_env_var

logger = logging.getLogger(__name__)


def determine_weather_files_needed_for_job(sim_dir, jobs_d):
"""
Gets the list of filenames for the weather data required for a job of simulations.
:param sim_dir: Path to the directory where job files are stored
:param jobs_d: Contents of a single job JSON file; contains the list of buildings to simulate in this job.
:returns: Set of weather filenames needed for this job of simulations.
"""
# Fetch the mapping for building to weather file from options_lookup.tsv
epws_by_option, param_name = _epws_by_option(sim_dir / "lib" / "resources" / "options_lookup.tsv")

# ComStock requires these empty files to exist.
files_to_download = set(["empty.epw", "empty.stat", "empty.ddy"])

# Look through the buildstock.csv to find the appropriate location and epw
building_ids = [x[0] for x in jobs_d["batch"]]
with open(
sim_dir / "lib" / "housing_characteristics" / "buildstock.csv",
"r",
encoding="utf-8",
) as f:
csv_reader = csv.DictReader(f)
for row in csv_reader:
if int(row["Building"]) in building_ids:
epw_file = epws_by_option[row[param_name]]
root, _ = os.path.splitext(epw_file)
files_to_download.update((f"{root}.epw", f"{root}.stat", f"{root}.ddy"))

return files_to_download


def _epws_by_option(options_lookup_path):
epws_by_option = {}
with open(options_lookup_path, "r", encoding="utf-8") as f:
tsv_reader = csv.reader(f, delimiter="\t")
next(tsv_reader) # skip headers
param_name = None
for row in tsv_reader:
row_has_epw = [x.endswith(".epw") for x in row[2:]]
if sum(row_has_epw):
if row[0] != param_name and param_name is not None:
raise RuntimeError(
"The epw files are specified in options_lookup.tsv under more than one parameter "
f"type: {param_name}, {row[0]}"
) # noqa: E501
epw_filename = row[row_has_epw.index(True) + 2].split("=")[1]
param_name = row[0]
option_name = row[1]
epws_by_option[option_name] = epw_filename
return (epws_by_option, param_name)


class DockerBatchBase(BuildStockBatchBase):
"""Base class for implementations that run in Docker containers."""

Expand Down Expand Up @@ -95,7 +148,7 @@ def copy_files_at_cloud(self, files_to_copy):
:param files_to_copy: a dict where the key is a file on the cloud to copy, and the value is
the filename to copy the source file to. Both are relative to the ``tmppath`` used in
``prep_batches()`` (so the implementation should prepend the bucket name and prefix
``_run_batch_prep()`` (so the implementation should prepend the bucket name and prefix
where they were uploaded to by ``upload_batch_files_to_cloud``).
"""
raise NotImplementedError
Expand All @@ -120,14 +173,14 @@ def run_batch(self):
"""
with tempfile.TemporaryDirectory(prefix="bsb_") as tmpdir:
tmppath = pathlib.Path(tmpdir)
epws_to_copy, batch_info = self._run_batch_prep(tmppath)
weather_files_to_copy, batch_info = self._run_batch_prep(tmppath)

# Copy all the files to cloud storage
logger.info("Uploading files for batch...")
self.upload_batch_files_to_cloud(tmppath)

logger.info("Copying duplicate weather files...")
self.copy_files_at_cloud(epws_to_copy)
self.copy_files_at_cloud(weather_files_to_copy)

self.start_batch_job(batch_info)

Expand All @@ -136,9 +189,9 @@ def _run_batch_prep(self, tmppath):
files to the cloud that the batch jobs will use.
This includes:
- Weather files (:func:`_prep_weather_files_for_batch`)
- Sampling, and splitting the samples into (at most) ``self.batch_array_size`` batches,
and bundling other assets needed for running simulations (:func:`_prep_jobs_for_batch`)
- Weather files (:func:`_prep_weather_files_for_batch`)
Those functions place their files to be uploaded into ``tmppath``, and then this will upload
them to the cloud using (:func:`upload_batch_files_to_cloud`).
Expand All @@ -156,33 +209,36 @@ def _run_batch_prep(self, tmppath):
:returns: DockerBatchBase.BatchInfo
"""

# Weather files
logger.info("Prepping weather files...")
epws_to_copy = self._prep_weather_files_for_batch(tmppath)

# Project configuration
logger.info("Writing project configuration for upload")
logger.info("Writing project configuration for upload...")
with open(tmppath / "config.json", "wt", encoding="utf-8") as f:
json.dump(self.cfg, f)

# Collect simulations to queue
batch_info = self._prep_jobs_for_batch(tmppath)
# Collect simulations to queue (along with the EPWs those sims need)
logger.info("Preparing simulation batch jobs...")
batch_info, files_needed = self._prep_jobs_for_batch(tmppath)

return (epws_to_copy, batch_info)
# Weather files
logger.info("Prepping weather files...")
epws_to_copy = self._prep_weather_files_for_batch(tmppath, files_needed)

def _prep_weather_files_for_batch(self, tmppath):
"""Downloads, if necessary, and extracts weather files to ``self._weather_dir``.
return (epws_to_copy, batch_info)

Because there may be duplicate weather files, this also identifies duplicates to avoid
redundant compression work and bytes uploaded to the cloud.
def _prep_weather_files_for_batch(self, tmppath, weather_files_needed_set):
"""Prepare the weather files needed by the batch.
It will put unique files in the ``tmppath`` (in the 'weather' subdir) which will get
uploaded to the cloud along with other batch files. It will also return a list of
duplicates. This will allow the duplicates to be quickly recreated on the cloud via copying
from-cloud-to-cloud.
1. Downloads, if necessary, and extracts weather files to ``self._weather_dir``.
2. Ensures that all EPWs needed by the batch are present.
3. Identifies weather files thare are duplicates to avoid redundant compression work and
bytes uploaded to the cloud.
* Puts unique files in the ``tmppath`` (in the 'weather' subdir) which will get uploaded
to the cloud along with other batch files.
* Returns a list duplicates, which allows them to be quickly recreated on the cloud via
copying from-cloud-to-cloud.
:param tmppath: Unique weather files (compressed) will be copied into a 'weather' subdir
of this path.
:param weather_files_needed_set: A set of weather filenames needed by the batch.
:returns: an array of tuples where the first value is the filename of a file that will be
uploaded to cloud storage (because it's in the ``tmppath``), and the second value is the
Expand All @@ -195,9 +251,23 @@ def _prep_weather_files_for_batch(self, tmppath):
# Downloads, if necessary, and extracts weather files to ``self._weather_dir``
self._get_weather_files()

# Ensure all needed weather files are present
logger.info("Ensuring all needed weather files are present...")
weather_files = os.listdir(self.weather_dir)
missing_epws = set()
for needed_epw in weather_files_needed_set:
if needed_epw not in weather_files:
missing_epws.add(needed_epw)
if missing_epws:
raise ValidationError(
"Not all weather files referenced by the sampled buildstock are available. "
f"{len(missing_epws):,} missing EPWs: {missing_epws}."
)
logger.debug("...all needed weather files are present.")

# Determine the unique weather files
epw_filenames = list(filter(lambda x: x.endswith(".epw"), os.listdir(self.weather_dir)))
logger.info("Calculating hashes for weather files")
epw_filenames = list(weather_files_needed_set)
epw_hashes = Parallel(n_jobs=-1, verbose=9)(
delayed(calc_hash_for_file)(pathlib.Path(self.weather_dir) / epw_filename)
for epw_filename in epw_filenames
Expand Down Expand Up @@ -226,28 +296,30 @@ def _prep_weather_files_for_batch(self, tmppath):
)

# Calculate and print savings of duplicate files
total_count = 0
upload_bytes = 0
dupe_count = 0
dupe_bytes = 0
for epws in unique_epws.values():
count = len(epws)
total_count += count
bytes = os.path.getsize(str(tmp_weather_out_path / epws[0]) + ".gz")
upload_bytes += bytes
if count > 1:
dupe_count += count - 1
bytes = os.path.getsize(str(tmp_weather_out_path / epws[0]) + ".gz") * dupe_count
dupe_bytes = bytes * (count - 1)
dupe_bytes += bytes * (count - 1)
logger.info(
f"Identified {dupe_count:,} duplicate weather files "
f"({len(unique_epws):,} unique, {total_count:,} total); "
f"saved from uploading {(dupe_bytes / 1024 / 1024):,.1f} MiB"
f"Weather files: {len(weather_files_needed_set):,}/{len(weather_files):,} referenced; "
f"{len(unique_epws):,} unique ({(upload_bytes / 1024 / 1024):,.1f} MiB to upload), "
f"{dupe_count:,} duplicates ({(dupe_bytes / 1024 / 1024):,.1f} MiB saved from uploading)"
)
return epws_to_copy

def _prep_jobs_for_batch(self, tmppath):
"""Splits simulations into batches, and prepares asset files needed to run them."""
# Run sampling - generates buildstock.csv
logger.debug("Running sampling....")
buildstock_csv_filename = self.sampler.run_sampling()

logger.debug("Validating sampled buildstock...")
df = read_csv(buildstock_csv_filename, index_col=0, dtype=str)
self.validate_buildstock_csv(self.project_filename, df)
building_ids = df.index.tolist()
Expand All @@ -268,6 +340,10 @@ def _prep_jobs_for_batch(self, tmppath):

os.makedirs(tmppath / "jobs")

# Ensure all weather files are available
logger.debug("Determining which weather files are needed...")
files_needed = self._determine_weather_files_needed_for_batch(df)

# Write each batch of simulations to a file.
logger.info("Queueing jobs")
for i in itertools.count(0):
Expand Down Expand Up @@ -317,51 +393,40 @@ def _prep_jobs_for_batch(self, tmppath):
"lib/housing_characteristics",
)

return DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count)
return (
DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count),
files_needed,
)

@classmethod
def get_epws_to_download(cls, sim_dir, jobs_d):
def _determine_weather_files_needed_for_batch(self, buildstock_df):
"""
Gets the list of weather filenames required for a batch of simulations.
:param buildstock_df: DataFrame of the buildstock batch being simulated.
:returns: Set of weather filenames needed for this batch of simulations.
"""
Gets the list of filenames for the weather data required for a single batch of simulations.
# Fetch the mapping for building to weather file from options_lookup.tsv
epws_by_option, param_name = _epws_by_option(
pathlib.Path(self.buildstock_dir) / "resources" / "options_lookup.tsv"
)

:param sim_dir: Path to the directory where job files are stored
:param jobs_d: Contents of a single job JSON file; contains the list of buildings to simulate in this job.
# Iterate over all values in the `param_name` column and collect the referenced EPWs
files_needed = set(["empty.epw", "empty.stat", "empty.ddy"])
for lookup_value in buildstock_df[param_name]:
if not lookup_value:
raise ValidationError(
f"Encountered a row in buildstock.csv with an empty value in column: {param_name}"
)

:returns: Set of epw filenames needed for this batch of simulations.
"""
# Make a lookup of which parameter points to the weather file from options_lookup.tsv
with open(sim_dir / "lib" / "resources" / "options_lookup.tsv", "r", encoding="utf-8") as f:
tsv_reader = csv.reader(f, delimiter="\t")
next(tsv_reader) # skip headers
param_name = None
epws_by_option = {}
for row in tsv_reader:
row_has_epw = [x.endswith(".epw") for x in row[2:]]
if sum(row_has_epw):
if row[0] != param_name and param_name is not None:
raise RuntimeError(
"The epw files are specified in options_lookup.tsv under more than one parameter type: "
f"{param_name}, {row[0]}"
)
epw_filename = row[row_has_epw.index(True) + 2].split("=")[1]
param_name = row[0]
option_name = row[1]
epws_by_option[option_name] = epw_filename

# Look through the buildstock.csv to find the appropriate location and epw
epws_to_download = set()
building_ids = [x[0] for x in jobs_d["batch"]]
with open(
sim_dir / "lib" / "housing_characteristics" / "buildstock.csv",
"r",
encoding="utf-8",
) as f:
csv_reader = csv.DictReader(f)
for row in csv_reader:
if int(row["Building"]) in building_ids:
epws_to_download.add(epws_by_option[row[param_name]])

return epws_to_download
epw_path = epws_by_option[lookup_value]
if not epw_path:
raise ValidationError(f"Did not find an EPW for '{lookup_value}'")

# Add just the filenames (without relative path)
root, _ = os.path.splitext(os.path.basename(epw_path))
files_needed.update((f"{root}.epw", f"{root}.stat", f"{root}.ddy"))

logger.debug(f"Unique weather files needed for this buildstock: {len(files_needed):,}")
return files_needed

@classmethod
def run_simulations(cls, cfg, job_id, jobs_d, sim_dir, fs, output_path):
Expand Down Expand Up @@ -457,7 +522,7 @@ def run_simulations(cls, cfg, job_id, jobs_d, sim_dir, fs, output_path):
elif os.path.isfile(item):
os.remove(item)

# Upload simulation outputs tarfile to s3
# Upload simulation outputs tarfile to bucket
fs.put(
str(simulation_output_tar_filename),
f"{output_path}/results/simulation_output/simulations_job{job_id}.tar.gz",
Expand Down
Loading

0 comments on commit f9cf6ac

Please sign in to comment.