From 41bb114dc71c4cfc48b2c32b094c53122e9ff585 Mon Sep 17 00:00:00 2001 From: romainsacchi Date: Sun, 30 Jul 2023 15:19:49 +0200 Subject: [PATCH] Fix multiprocessing implementation for matrices export. --- premise/ecoinvent_modification.py | 55 ++++++++++++++++++++++++------- premise/export.py | 1 + 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/premise/ecoinvent_modification.py b/premise/ecoinvent_modification.py index 428abdf9..0d5b41b3 100644 --- a/premise/ecoinvent_modification.py +++ b/premise/ecoinvent_modification.py @@ -435,6 +435,9 @@ def _update_all( return scenario, modified_datasets +def _export_to_matrices(obj): + obj.export_db_to_matrices() + class NewDatabase: """ Class that represents a new wurst inventory database, modified according to IAM data. @@ -528,7 +531,7 @@ def __init__( self.database = self.__find_cached_db( source_db, keep_uncertainty_data=keep_uncertainty_data ) - # print("Done!") + print("Done!") else: self.database = self.__clean_database( keep_uncertainty_data=keep_uncertainty_data @@ -547,7 +550,7 @@ def __init__( data = self.__import_additional_inventories(self.additional_inventories) self.database.extend(data) - # print("Done!") + print("Done!") print("\n/////////////////////// EXTRACTING IAM DATA ////////////////////////") @@ -575,7 +578,7 @@ def _fetch_iam_data(scenario): with Pool(processes=multiprocessing.cpu_count()) as pool: pool.map(_fetch_iam_data, self.scenarios) - # print("Done!") + print("Done!") def __find_cached_db(self, db_name: str, keep_uncertainty_data: bool) -> List[dict]: """ @@ -807,6 +810,8 @@ def update_electricity(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_dac(self) -> None: """ This method will update the Direct Air Capture (DAC) inventories @@ -833,6 +838,8 @@ def update_dac(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_fuels(self) -> None: """ This method will update the fuels inventories @@ -857,6 +864,8 @@ def update_fuels(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_cement(self) -> None: """ This method will update the cement inventories @@ -881,6 +890,8 @@ def update_cement(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_steel(self) -> None: """ This method will update the steel inventories @@ -905,6 +916,8 @@ def update_steel(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_cars(self) -> None: """ This method will update the cars inventories @@ -930,6 +943,8 @@ def update_cars(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_two_wheelers(self) -> None: """ This method will update the two-wheelers inventories @@ -955,6 +970,8 @@ def update_two_wheelers(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_trucks(self) -> None: """ This method will update the trucks inventories @@ -982,6 +999,8 @@ def update_trucks(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_buses(self) -> None: """ This method will update the buses inventories @@ -1008,6 +1027,8 @@ def update_buses(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_external_scenario(self): if self.datapackages: for i, scenario in enumerate(self.scenarios): @@ -1052,6 +1073,8 @@ def update_external_scenario(self): scenario["database"] = external_scenario.database print(f"Log file of exchanges saved under {DATA_DIR / 'logs'}.") + print("Done!\n") + def update_emissions(self) -> None: """ This method will update the hot pollutants emissions @@ -1078,6 +1101,8 @@ def update_emissions(self) -> None: self.scenarios[s] = results[s][0] self.modified_datasets.update(results[s][1]) + print("Done!\n") + def update_all(self) -> None: """ Shortcut method to execute all transformation functions. @@ -1135,7 +1160,7 @@ def write_superstructure_db_to_brightway( cache = {} # use multiprocessing to speed up the process - with Pool(processes=multiprocessing.cpu_count()) as pool: + with ProcessPool(processes=multiprocessing.cpu_count()) as pool: args = [ ( scenario, @@ -1212,7 +1237,7 @@ def write_db_to_brightway(self, name: [str, List[str]] = None): cache = {} # use multiprocessing to speed up the process - with Pool(processes=multiprocessing.cpu_count()) as pool: + with ProcessPool(processes=multiprocessing.cpu_count()) as pool: args = [ ( scenario, @@ -1278,9 +1303,11 @@ def write_db_to_matrices(self, filepath: str = None): cache = {} + + # use multiprocessing to speed up the process # use multiprocessing to speed up the process - with Pool(processes=multiprocessing.cpu_count()) as pool: + with ProcessPool(processes=multiprocessing.cpu_count()) as pool: args = [ ( scenario, @@ -1291,12 +1318,18 @@ def write_db_to_matrices(self, filepath: str = None): ) for scenario in self.scenarios ] - self.scenarios, cache = pool.starmap(_prepare_database, args) + results = pool.starmap(_prepare_database, args) + + for s, scenario in enumerate(self.scenarios): + self.scenarios[s] = results[s][0] + cache.update(results[s][1]) + + with ProcessPool(processes=multiprocessing.cpu_count()) as pool: args = [ - (scenario, filepath[scen], self.version) + Export(scenario, filepath[scen], self.version) for scen, scenario in enumerate(self.scenarios) ] - pool.starmap(Export().export_db_to_matrices, args) + pool.map(_export_to_matrices, args) # generate scenario report self.generate_scenario_report() @@ -1322,7 +1355,7 @@ def write_db_to_simapro(self, filepath: str = None): cache = {} # use multiprocessing to speed up the process - with Pool(processes=multiprocessing.cpu_count()) as pool: + with ProcessPool(processes=multiprocessing.cpu_count()) as pool: args = [ ( scenario, @@ -1359,7 +1392,7 @@ def write_datapackage(self, name: str = f"datapackage_{date.today()}"): cache = {} # use multiprocessing to speed up the process - with Pool(processes=multiprocessing.cpu_count()) as pool: + with ProcessPool(processes=multiprocessing.cpu_count()) as pool: args = [ ( scenario, diff --git a/premise/export.py b/premise/export.py index 3784e673..b11923c7 100644 --- a/premise/export.py +++ b/premise/export.py @@ -992,6 +992,7 @@ def __init__( filepath: Union[list[Path], list[Union[Path, Any]]] = None, version: str = None, ): + self.db = scenario["database"] self.model = scenario["model"] self.scenario = scenario["pathway"]