From 1217a9384d6593856b1ee6096c14474132ab6dee Mon Sep 17 00:00:00 2001 From: romainsacchi Date: Sun, 30 Jul 2023 15:17:47 +0200 Subject: [PATCH] Fix multiprocessing implementation for matrices export. --- premise/__init__.py | 3 +- premise/ecoinvent_modification.py | 55 ++++++++++++--- premise/export.py | 1 + premise/pathways.py | 107 ++++++++++++++++++++++++++++++ 4 files changed, 154 insertions(+), 12 deletions(-) create mode 100644 premise/pathways.py diff --git a/premise/__init__.py b/premise/__init__.py index bbd13bf0..b49f206e 100644 --- a/premise/__init__.py +++ b/premise/__init__.py @@ -1,4 +1,4 @@ -__all__ = ("NewDatabase", "clear_cache", "get_regions_definition") +__all__ = ("NewDatabase", "clear_cache", "get_regions_definition", "PathwaysDataPackage") __version__ = (1, 5, 9) from pathlib import Path @@ -8,4 +8,5 @@ VARIABLES_DIR = Path(__file__).resolve().parent / "iam_variables_mapping" from .ecoinvent_modification import NewDatabase +from .pathways import PathwaysDataPackage from .utils import clear_cache, get_regions_definition diff --git a/premise/ecoinvent_modification.py b/premise/ecoinvent_modification.py index 428abdf9..0d5b41b3 100644 --- a/premise/ecoinvent_modification.py +++ b/premise/ecoinvent_modification.py @@ -435,6 +435,9 @@ def _update_all( return scenario, modified_datasets +def _export_to_matrices(obj): + obj.export_db_to_matrices() + class NewDatabase: """ Class that represents a new wurst inventory database, modified according to IAM data. @@ -528,7 +531,7 @@ def __init__( self.database = self.__find_cached_db( source_db, keep_uncertainty_data=keep_uncertainty_data ) - # print("Done!") + print("Done!") else: self.database = self.__clean_database( keep_uncertainty_data=keep_uncertainty_data @@ -547,7 +550,7 @@ def __init__( data = self.__import_additional_inventories(self.additional_inventories) self.database.extend(data) - # print("Done!") + print("Done!") print("\n/////////////////////// EXTRACTING IAM DATA ////////////////////////") @@ -575,7 +578,7 @@ def _fetch_iam_data(scenario): with Pool(processes=multiprocessing.cpu_count()) as pool: pool.map(_fetch_iam_data, self.scenarios) - # print("Done!") + print("Done!") def __find_cached_db(self, db_name: str, keep_uncertainty_data: bool) -> List[dict]: """ @@ -807,6 +810,8 @@ def update_electricity(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_dac(self) -> None: """ This method will update the Direct Air Capture (DAC) inventories @@ -833,6 +838,8 @@ def update_dac(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_fuels(self) -> None: """ This method will update the fuels inventories @@ -857,6 +864,8 @@ def update_fuels(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_cement(self) -> None: """ This method will update the cement inventories @@ -881,6 +890,8 @@ def update_cement(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_steel(self) -> None: """ This method will update the steel inventories @@ -905,6 +916,8 @@ def update_steel(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_cars(self) -> None: """ This method will update the cars inventories @@ -930,6 +943,8 @@ def update_cars(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_two_wheelers(self) -> None: """ This method will update the two-wheelers inventories @@ -955,6 +970,8 @@ def update_two_wheelers(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_trucks(self) -> None: """ This method will update the trucks inventories @@ -982,6 +999,8 @@ def update_trucks(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_buses(self) -> None: """ This method will update the buses inventories @@ -1008,6 +1027,8 @@ def update_buses(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_external_scenario(self): if self.datapackages: for i, scenario in enumerate(self.scenarios): @@ -1052,6 +1073,8 @@ def update_external_scenario(self): scenario["database"] = external_scenario.database print(f"Log file of exchanges saved under {DATA_DIR / 'logs'}.") + print("Done!\n") + def update_emissions(self) -> None: """ This method will update the hot pollutants emissions @@ -1078,6 +1101,8 @@ def update_emissions(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_all(self) -> None: """ Shortcut method to execute all transformation functions. @@ -1135,7 +1160,7 @@ def write_superstructure_db_to_brightway( cache = {} # use multiprocessing to speed up the process - with Pool(processes=multiprocessing.cpu_count()) as pool: + with ProcessPool(processes=multiprocessing.cpu_count()) as pool: args = [ ( scenario, @@ -1212,7 +1237,7 @@ def write_db_to_brightway(self, name: [str, List[str]] = None): cache = {} # use multiprocessing to speed up the process - with Pool(processes=multiprocessing.cpu_count()) as pool: + with ProcessPool(processes=multiprocessing.cpu_count()) as pool: args = [ ( scenario, @@ -1278,9 +1303,11 @@ def write_db_to_matrices(self, filepath: str = None): cache = {} + + # use multiprocessing to speed up the process # use multiprocessing to speed up the process - with Pool(processes=multiprocessing.cpu_count()) as pool: + with ProcessPool(processes=multiprocessing.cpu_count()) as pool: args = [ ( scenario, @@ -1291,12 +1318,18 @@ def write_db_to_matrices(self, filepath: str = None): ) for scenario in self.scenarios ] - self.scenarios, cache = pool.starmap(_prepare_database, args) + results = pool.starmap(_prepare_database, args) + + for s, scenario in enumerate(self.scenarios): + self.scenarios[s] = results[s][0] + cache.update(results[s][1]) + + with ProcessPool(processes=multiprocessing.cpu_count()) as pool: args = [ - (scenario, filepath[scen], self.version) + Export(scenario, filepath[scen], self.version) for scen, scenario in enumerate(self.scenarios) ] - pool.starmap(Export().export_db_to_matrices, args) + pool.map(_export_to_matrices, args) # generate scenario report self.generate_scenario_report() @@ -1322,7 +1355,7 @@ def write_db_to_simapro(self, filepath: str = None): cache = {} # use multiprocessing to speed up the process - with Pool(processes=multiprocessing.cpu_count()) as pool: + with ProcessPool(processes=multiprocessing.cpu_count()) as pool: args = [ ( scenario, @@ -1359,7 +1392,7 @@ def write_datapackage(self, name: str = f"datapackage_{date.today()}"): cache = {} # use multiprocessing to speed up the process - with Pool(processes=multiprocessing.cpu_count()) as pool: + with ProcessPool(processes=multiprocessing.cpu_count()) as pool: args = [ ( scenario, diff --git a/premise/export.py b/premise/export.py index 3784e673..b11923c7 100644 --- a/premise/export.py +++ b/premise/export.py @@ -992,6 +992,7 @@ def __init__( filepath: Union[list[Path], list[Union[Path, Any]]] = None, version: str = None, ): + self.db = scenario["database"] self.model = scenario["model"] self.scenario = scenario["pathway"] diff --git a/premise/pathways.py b/premise/pathways.py new file mode 100644 index 00000000..248d7ec4 --- /dev/null +++ b/premise/pathways.py @@ -0,0 +1,107 @@ +from typing import List +import copy +from datetime import date +from multiprocessing import Pool as ProcessPool +import multiprocessing +from pathlib import Path +from datapackage import Package + +from . import __version__ +from .ecoinvent_modification import NewDatabase +from .export import ( + Export, + _prepare_database, + build_datapackage, + generate_scenario_factor_file, + generate_superstructure_db, +) + + +class PathwaysDataPackage: + def __init__( + self, + scenarios: List[dict], + source_version: str = "3.9", + source_type: str = "brightway", + key: bytes = None, + source_db: str = None, + source_file_path: str = None, + additional_inventories: List[dict] = None, + system_model: str = "cutoff", + system_args: dict = None, + external_scenarios: list = None, + gains_scenario="CLE", + use_absolute_efficiency=False, + ): + self.scenarios = scenarios + self.source_db = source_db + self.source_version = source_version + self.key = key + self.years = None + + self.datapackage = NewDatabase( + scenarios=scenarios, + source_version=source_version, + source_type=source_type, + key=key, + source_db=source_db, + source_file_path=source_file_path, + additional_inventories=additional_inventories, + system_model=system_model, + system_args=system_args, + external_scenarios=external_scenarios, + gains_scenario=gains_scenario, + use_absolute_efficiency=use_absolute_efficiency, + ) + + def create_datapackage(self, name: str = f"pathways_{date.today()}"): + self.datapackage.update_all() + self.export_datapackage(name) + + def export_datapackage(self, name: str): + # create matrices in current directory + self.datapackage.write_db_to_matrices() + self.build_datapackage(name) + + def build_datapackage(self, name): + """ + Create and export a scenario datapackage. + """ + + package = Package(base_path=Path.cwd()) + package.infer("**/*.csv") + package.descriptor["name"] = name + package.descriptor["title"] = name.capitalize() + package.descriptor[ + "description" + ] = f"Data package generated by premise {__version__}." + package.descriptor["premise version"] = str(__version__) + package.descriptor["scenarios"] = [ + { + "name": f"{s['model'].upper()} - {s['pathway']}", + "description": f"Prospective db, " + f"based on {s['model'].upper()}, " + f"pathway {s['pathway']}.", + } + for s in self.scenarios + ] + package.descriptor["keywords"] = [ + "ecoinvent", + "scenario", + "data package", + "premise", + "pathways" + ] + package.descriptor["licenses"] = [ + { + "id": "CC0-1.0", + "title": "CC0 1.0", + "url": "https://creativecommons.org/publicdomain/zero/1.0/", + } + ] + package.commit() + + # save the datapackage + package.save(f"{name}.zip") + + print(f"Data package saved at {Path.cwd() / f'{name}.zip'}")