Skip to content

Commit

Permalink
Merge remote-tracking branch 'nrel/develop' into lathanh/docker_base_ioc
Browse files Browse the repository at this point in the history
# Conflicts:
#	buildstockbatch/cloud/docker_base.py
#	docs/changelog/changelog_dev.rst
  • Loading branch information
lathanh committed Dec 11, 2023
2 parents a131f32 + 61e32be commit eccb933
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 161 deletions.
123 changes: 2 additions & 121 deletions buildstockbatch/aws/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import base64
import boto3
from botocore.exceptions import ClientError
import csv
from fsspec.implementations.local import LocalFileSystem
import gzip
from joblib import Parallel, delayed
import json
Expand All @@ -24,15 +22,12 @@
import pathlib
import random
from s3fs import S3FileSystem
import shutil
import subprocess
import tarfile
import re
import time
import io
import zipfile

from buildstockbatch import postprocessing
from buildstockbatch.aws.awsbase import AwsJobBase
from buildstockbatch.base import ValidationError
from buildstockbatch.cloud.docker_base import DockerBatchBase
Expand Down Expand Up @@ -1700,37 +1695,7 @@ def run_job(cls, job_id, bucket, prefix, job_name, region):
weather_dir = sim_dir / "weather"
os.makedirs(weather_dir, exist_ok=True)

# Make a lookup of which parameter points to the weather file from options_lookup.tsv
with open(sim_dir / "lib" / "resources" / "options_lookup.tsv", "r", encoding="utf-8") as f:
tsv_reader = csv.reader(f, delimiter="\t")
next(tsv_reader) # skip headers
param_name = None
epws_by_option = {}
for row in tsv_reader:
row_has_epw = [x.endswith(".epw") for x in row[2:]]
if sum(row_has_epw):
if row[0] != param_name and param_name is not None:
raise RuntimeError(
"The epw files are specified in options_lookup.tsv under more than one parameter type: "
f"{param_name}, {row[0]}"
)
epw_filename = row[row_has_epw.index(True) + 2].split("=")[1]
param_name = row[0]
option_name = row[1]
epws_by_option[option_name] = epw_filename

# Look through the buildstock.csv to find the appropriate location and epw
epws_to_download = set()
building_ids = [x[0] for x in jobs_d["batch"]]
with open(
sim_dir / "lib" / "housing_characteristics" / "buildstock.csv",
"r",
encoding="utf-8",
) as f:
csv_reader = csv.DictReader(f)
for row in csv_reader:
if int(row["Building"]) in building_ids:
epws_to_download.add(epws_by_option[row[param_name]])
epws_to_download = cls.get_epws_to_download(sim_dir, jobs_d)

# Download the epws needed for these simulations
for epw_filename in epws_to_download:
Expand All @@ -1740,92 +1705,8 @@ def run_job(cls, job_id, bucket, prefix, job_name, region):
with open(weather_dir / epw_filename, "wb") as f_out:
logger.debug("Extracting {}".format(epw_filename))
f_out.write(gzip.decompress(f_gz.getvalue()))
asset_dirs = os.listdir(sim_dir)

fs = S3FileSystem()
local_fs = LocalFileSystem()
reporting_measures = cls.get_reporting_measures(cfg)
dpouts = []
simulation_output_tar_filename = sim_dir.parent / "simulation_outputs.tar.gz"
with tarfile.open(str(simulation_output_tar_filename), "w:gz") as simout_tar:
for building_id, upgrade_idx in jobs_d["batch"]:
upgrade_id = 0 if upgrade_idx is None else upgrade_idx + 1
sim_id = f"bldg{building_id:07d}up{upgrade_id:02d}"

# Create OSW
osw = cls.create_osw(cfg, jobs_d["n_datapoints"], sim_id, building_id, upgrade_idx)
with open(os.path.join(sim_dir, "in.osw"), "w") as f:
json.dump(osw, f, indent=4)

# Run Simulation
with open(sim_dir / "os_stdout.log", "w") as f_out:
try:
logger.debug("Running {}".format(sim_id))
subprocess.run(
["openstudio", "run", "-w", "in.osw"],
check=True,
stdout=f_out,
stderr=subprocess.STDOUT,
cwd=str(sim_dir),
)
except subprocess.CalledProcessError:
logger.debug(f"Simulation failed: see {sim_id}/os_stdout.log")

# Clean Up simulation directory
cls.cleanup_sim_dir(
sim_dir,
fs,
f"{bucket}/{prefix}/results/simulation_output/timeseries",
upgrade_id,
building_id,
)

# Read data_point_out.json
dpout = postprocessing.read_simulation_outputs(
local_fs, reporting_measures, str(sim_dir), upgrade_id, building_id
)
dpouts.append(dpout)

# Add the rest of the simulation outputs to the tar archive
logger.info("Archiving simulation outputs")
for dirpath, dirnames, filenames in os.walk(sim_dir):
if dirpath == str(sim_dir):
for dirname in set(dirnames).intersection(asset_dirs):
dirnames.remove(dirname)
for filename in filenames:
abspath = os.path.join(dirpath, filename)
relpath = os.path.relpath(abspath, sim_dir)
simout_tar.add(abspath, os.path.join(sim_id, relpath))

# Clear directory for next simulation
logger.debug("Clearing out simulation directory")
for item in set(os.listdir(sim_dir)).difference(asset_dirs):
if os.path.isdir(item):
shutil.rmtree(item)
elif os.path.isfile(item):
os.remove(item)

# Upload simulation outputs tarfile to s3
fs.put(
str(simulation_output_tar_filename),
f"{bucket}/{prefix}/results/simulation_output/simulations_job{job_id}.tar.gz",
)

# Upload aggregated dpouts as a json file
with fs.open(
f"{bucket}/{prefix}/results/simulation_output/results_job{job_id}.json.gz",
"wb",
) as f1:
with gzip.open(f1, "wt", encoding="utf-8") as f2:
json.dump(dpouts, f2)

# Remove files (it helps docker if we don't leave a bunch of files laying around)
os.remove(simulation_output_tar_filename)
for item in os.listdir(sim_dir):
if os.path.isdir(item):
shutil.rmtree(item)
elif os.path.isfile(item):
os.remove(item)
cls.run_simulations(cfg, jobs_d, job_id, sim_dir, S3FileSystem(), bucket, prefix)


@log_error_details()
Expand Down
152 changes: 151 additions & 1 deletion buildstockbatch/cloud/docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
:license: BSD-3
"""
import collections
import docker
import csv
from dataclasses import dataclass
import docker
from fsspec.implementations.local import LocalFileSystem
import gzip
import itertools
from joblib import Parallel, delayed
import json
Expand All @@ -20,10 +23,12 @@
import pathlib
import random
import shutil
import subprocess
import tarfile
import tempfile
import time

from buildstockbatch import postprocessing
from buildstockbatch.base import BuildStockBatchBase
from buildstockbatch.utils import ContainerRuntime, calc_hash_for_file, compress_file, read_csv

Expand Down Expand Up @@ -311,3 +316,148 @@ def _prep_jobs_for_batch(self, tmppath):
)

return DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count)

@classmethod
def get_epws_to_download(cls, sim_dir, jobs_d):
"""
Gets the list of filenames for the weather data required for a single batch of simulations.
:param sim_dir: Path to the directory where job files are stored
:param jobs_d: Contents of a single job JSON file; contains the list of buildings to simulate in this job.
:returns: Set of epw filenames needed for this batch of simulations.
"""
# Make a lookup of which parameter points to the weather file from options_lookup.tsv
with open(sim_dir / "lib" / "resources" / "options_lookup.tsv", "r", encoding="utf-8") as f:
tsv_reader = csv.reader(f, delimiter="\t")
next(tsv_reader) # skip headers
param_name = None
epws_by_option = {}
for row in tsv_reader:
row_has_epw = [x.endswith(".epw") for x in row[2:]]
if sum(row_has_epw):
if row[0] != param_name and param_name is not None:
raise RuntimeError(
"The epw files are specified in options_lookup.tsv under more than one parameter type: "
f"{param_name}, {row[0]}"
)
epw_filename = row[row_has_epw.index(True) + 2].split("=")[1]
param_name = row[0]
option_name = row[1]
epws_by_option[option_name] = epw_filename

# Look through the buildstock.csv to find the appropriate location and epw
epws_to_download = set()
building_ids = [x[0] for x in jobs_d["batch"]]
with open(
sim_dir / "lib" / "housing_characteristics" / "buildstock.csv",
"r",
encoding="utf-8",
) as f:
csv_reader = csv.DictReader(f)
for row in csv_reader:
if int(row["Building"]) in building_ids:
epws_to_download.add(epws_by_option[row[param_name]])

return epws_to_download

@classmethod
def run_simulations(cls, cfg, job_id, jobs_d, sim_dir, fs, output_path):
"""
Run one batch of simulations.
Runs the simulations, writes outputs to the provided storage bucket, and cleans up intermediate files.
:param cfg: Project config contents.
:param job_id: Index of this job.
:param jobs_d: Contents of a single job JSON file; contains the list of buildings to simulate in this job.
:param sim_dir: Path to the (local) directory where job files are stored.
:param fs: Filesystem to use when writing outputs to storage bucket
:param output_path: File path (typically `bucket/prefix`) to write outputs to.
"""
local_fs = LocalFileSystem()
reporting_measures = cls.get_reporting_measures(cfg)
dpouts = []
simulation_output_tar_filename = sim_dir.parent / "simulation_outputs.tar.gz"
asset_dirs = os.listdir(sim_dir)
ts_output_dir = (f"{output_path}/results/simulation_output/timeseries",)

with tarfile.open(str(simulation_output_tar_filename), "w:gz") as simout_tar:
for building_id, upgrade_idx in jobs_d["batch"]:
upgrade_id = 0 if upgrade_idx is None else upgrade_idx + 1
sim_id = f"bldg{building_id:07d}up{upgrade_id:02d}"

# Create OSW
osw = cls.create_osw(cfg, jobs_d["n_datapoints"], sim_id, building_id, upgrade_idx)
with open(os.path.join(sim_dir, "in.osw"), "w") as f:
json.dump(osw, f, indent=4)

# Run Simulation
with open(sim_dir / "os_stdout.log", "w") as f_out:
try:
logger.debug("Running {}".format(sim_id))
subprocess.run(
["openstudio", "run", "-w", "in.osw"],
check=True,
stdout=f_out,
stderr=subprocess.STDOUT,
cwd=str(sim_dir),
)
except subprocess.CalledProcessError:
logger.debug(f"Simulation failed: see {sim_id}/os_stdout.log")

# Clean Up simulation directory
cls.cleanup_sim_dir(
sim_dir,
fs,
ts_output_dir,
upgrade_id,
building_id,
)

# Read data_point_out.json
dpout = postprocessing.read_simulation_outputs(
local_fs, reporting_measures, str(sim_dir), upgrade_id, building_id
)
dpouts.append(dpout)

# Add the rest of the simulation outputs to the tar archive
logger.info("Archiving simulation outputs")
for dirpath, dirnames, filenames in os.walk(sim_dir):
if dirpath == str(sim_dir):
for dirname in set(dirnames).intersection(asset_dirs):
dirnames.remove(dirname)
for filename in filenames:
abspath = os.path.join(dirpath, filename)
relpath = os.path.relpath(abspath, sim_dir)
simout_tar.add(abspath, os.path.join(sim_id, relpath))

# Clear directory for next simulation
logger.debug("Clearing out simulation directory")
for item in set(os.listdir(sim_dir)).difference(asset_dirs):
if os.path.isdir(item):
shutil.rmtree(item)
elif os.path.isfile(item):
os.remove(item)

# Upload simulation outputs tarfile to s3
fs.put(
str(simulation_output_tar_filename),
f"{output_path}/results/simulation_output/simulations_job{job_id}.tar.gz",
)

# Upload aggregated dpouts as a json file
with fs.open(
f"{output_path}/results/simulation_output/results_job{job_id}.json.gz",
"wb",
) as f1:
with gzip.open(f1, "wt", encoding="utf-8") as f2:
json.dump(dpouts, f2)

# Remove files (it helps docker if we don't leave a bunch of files laying around)
os.remove(simulation_output_tar_filename)
for item in os.listdir(sim_dir):
if os.path.isdir(item):
shutil.rmtree(item)
elif os.path.isfile(item):
os.remove(item)
Loading

0 comments on commit eccb933

Please sign in to comment.