diff --git a/examples/0xx_parallel_computation.py b/examples/0xx_parallel_computation.py deleted file mode 100644 index 98a74e3f4..000000000 --- a/examples/0xx_parallel_computation.py +++ /dev/null @@ -1,117 +0,0 @@ -"""Example: Compare parallel interfaces -""" - -from time import perf_counter as timerpc - -import numpy as np - -from floris import ( - FlorisModel, - TimeSeries, - WindRose, -) -from floris.parallel_floris_model import ParallelFlorisModel as ParallelFlorisModel_orig -from floris.parallel_floris_model_2 import ParallelFlorisModel as ParallelFlorisModel_new - - -if __name__ == "__main__": - # Parallelization parameters - parallel_interface = "multiprocessing" - max_workers = 16 - - # Load the wind rose from csv - wind_rose = WindRose.read_csv_long( - "inputs/wind_rose.csv", wd_col="wd", ws_col="ws", freq_col="freq_val", - ti_col_or_value=0.06 - ) - fmodel = FlorisModel("inputs/gch.yaml") - - # Specify wind farm layout and update in the floris object - N = 12 # number of turbines per row and per column - X, Y = np.meshgrid( - 5.0 * fmodel.core.farm.rotor_diameters_sorted[0][0] * np.arange(0, N, 1), - 5.0 * fmodel.core.farm.rotor_diameters_sorted[0][0] * np.arange(0, N, 1), - ) - fmodel.set(layout_x=X.flatten(), layout_y=Y.flatten()) - - # Set up original parallel Floris model - pfmodel_orig = ParallelFlorisModel_orig( - fmodel=fmodel, - max_workers=max_workers, - n_wind_condition_splits=max_workers, - interface=parallel_interface, - print_timings=True - ) - - - # Set up new parallel Floris model - pfmodel_new = ParallelFlorisModel_new( - "inputs/gch.yaml", - max_workers=max_workers, - n_wind_condition_splits=max_workers, - interface=parallel_interface, - print_timings=True, - ) - - # Set up new parallel Floris model using only powers - pfmodel_new_p = ParallelFlorisModel_new( - "inputs/gch.yaml", - max_workers=max_workers, - n_wind_condition_splits=max_workers, - interface=parallel_interface, - return_turbine_powers_only=True, - print_timings=True, - ) - - # Set layout, wind data on all models - fmodel.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=wind_rose) - pfmodel_orig.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=wind_rose) - pfmodel_new.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=wind_rose) - pfmodel_new_p.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=wind_rose) - - # Run and evaluate farm over the wind rose - t0 = timerpc() - fmodel.run() - aep_fmodel = fmodel.get_farm_AEP() - t_fmodel = timerpc() - t0 - - t0 = timerpc() - #pfmodel_orig.run() - aep_pfmodel_orig = pfmodel_orig.get_farm_AEP(freq=wind_rose.unpack_freq()) - t_pfmodel_orig = timerpc() - t0 - - t0 = timerpc() - pfmodel_new.run() - aep_pfmodel_new = pfmodel_new.get_farm_AEP() - t_pfmodel_new = timerpc() - t0 - - t0 = timerpc() - pfmodel_new.run() - aep_pfmodel_new = pfmodel_new.get_farm_AEP() - t_pfmodel_new2 = timerpc() - t0 - - t0 = timerpc() - pfmodel_new_p.run() - aep_pfmodel_new_p = pfmodel_new_p.get_farm_AEP() - t_pfmodel_new_p = timerpc() - t0 - - print("FlorisModel AEP calculation took {:.2f} seconds.".format(t_fmodel)) - print("Original ParallelFlorisModel AEP calculation took {:.2f} seconds.".format( - t_pfmodel_orig - ) - ) - print("New ParallelFlorisModel AEP calculation took {:.2f} seconds.".format(t_pfmodel_new)) - print("New ParallelFlorisModel AEP calculation took {:.2f} seconds the second time.".format( - t_pfmodel_new2 - ) - ) - print("New ParallelFlorisModel (powers only) AEP calculation took {:.2f} seconds.".format( - t_pfmodel_new_p - ) - ) - - print("\n") - print("FlorisModel AEP: {:.2f} GWh.".format(aep_fmodel/1E9)) - print("Original ParallelFlorisModel AEP: {:.2f} GWh.".format(aep_pfmodel_orig/1E9)) - print("New ParallelFlorisModel AEP: {:.2f} GWh.".format(aep_pfmodel_new/1E9)) - print("New ParallelFlorisModel (powers only) AEP: {:.2f} GWh.".format(aep_pfmodel_new/1E9)) diff --git a/examples/examples_parallel/000_parallel_timing.py b/examples/examples_parallel/000_parallel_timing.py new file mode 100644 index 000000000..44e114991 --- /dev/null +++ b/examples/examples_parallel/000_parallel_timing.py @@ -0,0 +1,161 @@ +"""Example: Timing tests for parallel computation interfaces. + +Tests: +- max_workers specified, small. +- max_workers specified, large. +- max_workers unspecified. + +- various n_findex +- various n_turbines + +- return_turbine_powers_only=True +- return_turbine_powers_only=False +""" + +from time import perf_counter as timerpc + +import numpy as np +import pandas as pd + +from floris import ( + FlorisModel, + TimeSeries, +) +from floris.parallel_floris_model import ParallelFlorisModel as ParallelFlorisModel_orig +from floris.parallel_floris_model_2 import ParallelFlorisModel as ParallelFlorisModel_new + + +DEBUG = True + +if __name__ == "__main__": + max_workers_options = [2, 16, -1] + n_findex_options = [100, 1000, 10000] + n_turbines_options = [5, 10, 15] # Will be squared! + # Parallelization parameters + + def set_up_and_run_models(n_turbs, n_findex, max_workers): + # Create random wind data + np.random.seed(0) + wind_speeds = np.random.normal(loc=8.0, scale=2.0, size=n_findex) + wind_directions = np.random.normal(loc=270.0, scale=15.0, size=n_findex) + turbulence_intensities = 0.06*np.ones_like(wind_speeds) + + time_series = TimeSeries( + wind_directions=wind_directions, + wind_speeds=wind_speeds, + turbulence_intensities=turbulence_intensities, + ) + + # Clip wind_rose to specified n_findex + + fmodel = FlorisModel("../inputs/gch.yaml") + + # Specify wind farm layout and update in the floris object + N = n_turbs + + X, Y = np.meshgrid( + 5.0 * fmodel.core.farm.rotor_diameters_sorted[0][0] * np.arange(0, N, 1), + 5.0 * fmodel.core.farm.rotor_diameters_sorted[0][0] * np.arange(0, N, 1), + ) + fmodel.set(layout_x=X.flatten(), layout_y=Y.flatten()) + + # Set up original parallel Floris model + parallel_interface = "multiprocessing" + pfmodel_orig = ParallelFlorisModel_orig( + fmodel=fmodel, + max_workers=100 if max_workers < 0 else max_workers, + n_wind_condition_splits=100 if max_workers < 0 else max_workers, + interface=parallel_interface, + print_timings=True + ) + + # Set up new parallel Floris model + pfmodel_new = ParallelFlorisModel_new( + "../inputs/gch.yaml", + max_workers=max_workers, + n_wind_condition_splits=max_workers, + interface="pathos", + print_timings=True, + ) + + # Set up new parallel Floris model using only powers + pfmodel_new_p = ParallelFlorisModel_new( + "../inputs/gch.yaml", + max_workers=max_workers, + n_wind_condition_splits=max_workers, + interface=parallel_interface, + return_turbine_powers_only=True, + print_timings=True, + ) + + # Set layout, wind data on all models + fmodel.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=time_series) + pfmodel_orig.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=time_series) + pfmodel_new.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=time_series) + pfmodel_new_p.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=time_series) + + # Limit to a subset of the wind rose, maybe. + + + # Run and evaluate farm over the wind rose + t0 = timerpc() + fmodel.run() + aep_fmodel = fmodel.get_farm_AEP() + t_fmodel = timerpc() - t0 + + t0 = timerpc() + #pfmodel_orig.run() + aep_pfmodel_orig = pfmodel_orig.get_farm_AEP(freq=time_series.unpack_freq()) + t_pfmodel_orig = timerpc() - t0 + + t0 = timerpc() + pfmodel_new.run() + aep_pfmodel_new = pfmodel_new.get_farm_AEP() + t_pfmodel_new = timerpc() - t0 + + t0 = timerpc() + pfmodel_new_p.run() + aep_pfmodel_new_p = pfmodel_new_p.get_farm_AEP() + t_pfmodel_new_p = timerpc() - t0 + + # Save the data + df = pd.DataFrame({ + "model": ["FlorisModel", "ParallelFlorisModel_orig", "ParallelFlorisModel_new", + "ParallelFlorisModel_new_poweronly"], + "AEP": [aep_fmodel, aep_pfmodel_orig, aep_pfmodel_new, aep_pfmodel_new_p], + "time": [t_fmodel, t_pfmodel_orig, t_pfmodel_new, t_pfmodel_new_p], + }) + + df.to_csv(f"comptime_maxworkers{mw}_nturbs{n_turbs}_nfindex{n_findex}.csv") + + return None + + # First run max_workers tests + for mw in max_workers_options: + # Set up models + n_turbs = 2 if DEBUG else 10 # Will be squared + n_findex = 1000 + set_up_and_run_models( + n_turbs=n_turbs, n_findex=n_findex, max_workers=mw + ) + + # Then run n_turbines tests + for nt in n_turbines_options: + # Set up models + n_findex = 10 if DEBUG else 1000 + max_workers = 16 + + set_up_and_run_models( + n_turbs=nt, n_findex=n_findex, max_workers=max_workers + ) + + # Then run n_findex tests + for nf in n_findex_options: + # Set up models + n_turbs = 2 if DEBUG else 10 # Will be squared + max_workers = 16 + + set_up_and_run_models( + n_turbs=n_turbs, n_findex=nf, max_workers=max_workers + ) + diff --git a/examples/examples_parallel/001_parallel_timing_output.py b/examples/examples_parallel/001_parallel_timing_output.py new file mode 100644 index 000000000..e1264f199 --- /dev/null +++ b/examples/examples_parallel/001_parallel_timing_output.py @@ -0,0 +1,151 @@ +"""Example: Timing tests for parallel computation interfaces. + +Tests: +- max_workers specified, small. +- max_workers specified, large. +- max_workers unspecified. + +- various n_findex +- various n_turbines + +- return_turbine_powers_only=True +- return_turbine_powers_only=False +""" + +from time import perf_counter as timerpc + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd + + +max_workers_options = [2, 16, -1] +n_findex_options = [100, 1000, 10000] +n_turbines_options = [5, 10, 15] # Will be squared! +# Parallelization parameters + +DEBUG = True + +# First run max_workers tests +timing_data = [] +for mw in max_workers_options: + # Set up models + n_turbs = 2 if DEBUG else 10 # Will be squared + n_findex = 1000 + + df = pd.read_csv(f"outputs/comptime_maxworkers{mw}_nturbs{n_turbs}_nfindex{n_findex}.csv") + + timing_data.append(df.time.values) + +timing_data = np.array(timing_data).T + +x = np.arange(len(max_workers_options)) +width = 0.2 +multiplier = 0 + +fig, ax = plt.subplots(1,1) + +for dat, lab in zip(timing_data.tolist(), df.model.values): + offset = width * multiplier + rects = ax.bar(x + offset, dat, width, label=lab) + ax.bar_label(rects, padding=3, fmt='%.1f') + multiplier += 1 + +# Add some text for labels, title and custom x-axis tick labels, etc. +ax.set_xlabel('Max. workers [-]') +ax.set_xticks(x + width, max_workers_options) +ax.set_ylabel('Time [s]') +ax.legend(loc='upper left', ncols=2) +ax.set_yscale('log') +fig.savefig("outputs/max_workers_timing.png", format='png', dpi=300) + + +# Similar now for n_turbs +timing_data = [] +for nt in n_turbines_options: + # Set up models + n_findex = 10 if DEBUG else 1000 + max_workers = -1 + df = pd.read_csv(f"outputs/comptime_maxworkers{max_workers}_nturbs{nt}_nfindex{n_findex}.csv") + timing_data.append(df.time.values) + +timing_data = np.array(timing_data).T + +x = np.arange(len(n_turbines_options)) +width = 0.2 +multiplier = 0 + +fig, ax = plt.subplots(1,1) + +for dat, lab in zip(timing_data.tolist(), df.model.values): + offset = width * multiplier + rects = ax.bar(x + offset, dat, width, label=lab) + ax.bar_label(rects, padding=3, fmt='%.1f') + multiplier += 1 + +# Add some text for labels, title and custom x-axis tick labels, etc. +ax.set_xlabel('n_turbines [-]') +ax.set_xticks(x + width, np.array(n_turbines_options)**2) +ax.set_ylabel('Time [s]') +ax.legend(loc='upper left', ncols=2) +ax.set_yscale('log') +fig.savefig("outputs/n_turbines_timing.png", format='png', dpi=300) + + +# Similar now for n_findex +timing_data = [] +for nf in n_findex_options: + # Set up models + n_turbs = 2 if DEBUG else 10 # Will be squared + max_workers = -1 + df = pd.read_csv(f"outputs/comptime_maxworkers{max_workers}_nturbs{n_turbs}_nfindex{nf}.csv") + timing_data.append(df.time.values) + +timing_data = np.array(timing_data).T + +x = np.arange(len(n_findex_options)) +width = 0.2 +multiplier = 0 + +fig, ax = plt.subplots(1,1) + +for dat, lab in zip(timing_data.tolist(), df.model.values): + offset = width * multiplier + rects = ax.bar(x + offset, dat, width, label=lab) + ax.bar_label(rects, padding=3, fmt='%.1f') + multiplier += 1 + +# Add some text for labels, title and custom x-axis tick labels, etc. +ax.set_xlabel('n_findex [-]') +ax.set_xticks(x + width, n_findex_options) +ax.set_ylabel('Time [s]') +ax.legend(loc='upper left', ncols=2) +ax.set_yscale('log') +fig.savefig("outputs/n_findex_timing.png", format='png', dpi=300) + + +plt.show() + + + + + # # Then run n_turbines tests + # for nt in n_turbines_options: + # # Set up models + # n_findex = 10 if DEBUG else 1000 + # max_workers = 16 + + # set_up_and_run_models( + # n_turbs=nt, n_findex=n_findex, max_workers=max_workers + # ) + + # # Then run n_findex tests + # for nf in n_findex_options: + # # Set up models + # n_turbs = 2 if DEBUG else 10 # Will be squared + # max_workers = 16 + + # set_up_and_run_models( + # n_turbs=n_turbs, n_findex=nf, max_workers=max_workers + # ) + diff --git a/examples/examples_parallel/002_worker_pool.py b/examples/examples_parallel/002_worker_pool.py new file mode 100644 index 000000000..090300b66 --- /dev/null +++ b/examples/examples_parallel/002_worker_pool.py @@ -0,0 +1,107 @@ +"""Example: Timing tests for parallel computation interfaces. + +Tests: +- max_workers specified, small. +- max_workers specified, large. +- max_workers unspecified. + +- various n_findex +- various n_turbines + +- return_turbine_powers_only=True +- return_turbine_powers_only=False +""" + +from time import perf_counter as timerpc + +import numpy as np +import pandas as pd + +from floris import ( + FlorisModel, + TimeSeries, +) +from floris.parallel_floris_model import ParallelFlorisModel as ParallelFlorisModel_orig +from floris.parallel_floris_model_2 import ParallelFlorisModel as ParallelFlorisModel_new + + +DEBUG = True + +if __name__ == "__main__": + # Create random wind data + np.random.seed(0) + n_findex = 10 if DEBUG else 1000 + wind_speeds = np.random.normal(loc=8.0, scale=2.0, size=n_findex) + wind_directions = np.random.normal(loc=270.0, scale=15.0, size=n_findex) + turbulence_intensities = 0.06*np.ones_like(wind_speeds) + + time_series = TimeSeries( + wind_directions=wind_directions, + wind_speeds=wind_speeds, + turbulence_intensities=turbulence_intensities, + ) + + # Clip wind_rose to specified n_findex + + fmodel = FlorisModel("../inputs/gch.yaml") + + # Specify wind farm layout and update in the floris object + N = 20 if DEBUG else 100 + + X, Y = np.meshgrid( + 5.0 * fmodel.core.farm.rotor_diameters_sorted[0][0] * np.arange(0, N, 1), + 5.0 * fmodel.core.farm.rotor_diameters_sorted[0][0] * np.arange(0, N, 1), + ) + + # Set up new parallel Floris model + print("Beginning multiprocessing test") + t0 = timerpc() + pfmodel_mp = ParallelFlorisModel_new( + "../inputs/gch.yaml", + max_workers=-1, + n_wind_condition_splits=-1, + interface="multiprocessing", + print_timings=True, + ) + pfmodel_mp.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=time_series) + t1 = timerpc() + pfmodel_mp.run() + aep1 = pfmodel_mp.get_farm_AEP() + t2 = timerpc() + pfmodel_mp.set(layout_x=X.flatten()+10, layout_y=Y.flatten()) + pfmodel_mp.run() + aep2 = pfmodel_mp.get_farm_AEP() + t3 = timerpc() + + print(f"Multiprocessing (max_workers={pfmodel_mp.max_workers})") + print(f"Time to set up: {t1-t0}") + print(f"Time to run first: {t2-t1}") + print(f"Time to run second: {t3-t2}") + + # When is the worker pool released, though?? + print("Beginning pathos test") + t0 = timerpc() + pfmodel_pathos = ParallelFlorisModel_new( + "../inputs/gch.yaml", + max_workers=-1, + n_wind_condition_splits=-1, + interface="pathos", + print_timings=True, + ) + pfmodel_pathos.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=time_series) + t1 = timerpc() + pfmodel_pathos.run() + aep3 = pfmodel_pathos.get_farm_AEP() + t2 = timerpc() + pfmodel_pathos.set(layout_x=X.flatten()+10, layout_y=Y.flatten()) + pfmodel_pathos.run() + aep4 = pfmodel_pathos.get_farm_AEP() + t3 = timerpc() + + print(f"Pathos (max_workers={pfmodel_pathos.max_workers})") + print(f"Time to set up: {t1-t0}") + print(f"Time to run first: {t2-t1}") + print(f"Time to run second: {t3-t2}") + + if np.isclose(aep1 + aep2 + aep3 + aep4, 4*aep4): + print("AEPs are equal!") diff --git a/floris/parallel_floris_model_2.py b/floris/parallel_floris_model_2.py index a20e1ff8f..094af0592 100644 --- a/floris/parallel_floris_model_2.py +++ b/floris/parallel_floris_model_2.py @@ -1,16 +1,13 @@ from __future__ import annotations import copy -import warnings from pathlib import Path from time import perf_counter as timerpc import numpy as np -import pandas as pd from floris.core import State from floris.floris_model import FlorisModel -from floris.optimization.yaw_optimization.yaw_optimizer_sr import YawOptimizationSR class ParallelFlorisModel(FlorisModel): @@ -40,7 +37,7 @@ def __init__( - **wake**: See `floris.simulation.wake.WakeManager` for more details. - **logging**: See `floris.simulation.core.Core` for more details. interface: The parallelization interface to use. Options are "multiprocessing", - with possible future support for "mpi4py" and "concurrent" + "pathos", and "concurrent", with possible future support for "mpi4py" max_workers: The maximum number of workers to use. Defaults to -1, which then takes the number of CPUs available. n_wind_condition_splits: The number of wind conditions to split the simulation over. @@ -65,7 +62,18 @@ def __init__( if max_workers == -1: max_workers = mp.cpu_count() # TODO: test spinning up the worker pool at this point - elif interface in ["mpi4py", "concurrent"]: + elif interface == "pathos": + import pathos + if max_workers == -1: + max_workers = pathos.helpers.cpu_count() + self.pathos_pool = pathos.pools.ProcessPool(nodes=max_workers) + elif interface == "concurrent": + from concurrent.futures import ProcessPoolExecutor + if max_workers == -1: + from multiprocessing import cpu_count + max_workers = cpu_count() + self._PoolExecutor = ProcessPoolExecutor + elif interface in ["mpi4py"]: raise NotImplementedError( f"Parallelization interface {interface} not yet supported." ) @@ -80,10 +88,10 @@ def __init__( else: raise ValueError( f"Invalid parallelization interface {interface}. " - "Options are 'multiprocessing', 'mpi4py' or 'concurrent'." + "Options are 'multiprocessing', 'pathos', or 'concurrent'." ) - self.interface = interface + self._interface = interface self.max_workers = max_workers if n_wind_condition_splits == -1: self.n_wind_condition_splits = max_workers @@ -128,6 +136,50 @@ def run(self) -> None: self.core.farm.finalize(self.core.grid.unsorted_indices) self.core.state = State.USED t3 = timerpc() + elif self.interface == "pathos": + t0 = timerpc() + self.core.initialize_domain() + parallel_run_inputs = self._preprocessing() + t1 = timerpc() + if self.return_turbine_powers_only: + self._turbine_powers_split = self.pathos_pool.map( + _parallel_run_powers_only_map, + parallel_run_inputs + ) + else: + self._fmodels_split = self.pathos_pool.map( + _parallel_run_map, + parallel_run_inputs + ) + t2 = timerpc() + self._postprocessing() + self.core.farm.finalize(self.core.grid.unsorted_indices) + self.core.state = State.USED + t3 = timerpc() + elif self.interface == "concurrent": + t0 = timerpc() + self.core.initialize_domain() + parallel_run_inputs = self._preprocessing() + t1 = timerpc() + if self.return_turbine_powers_only: + with self._PoolExecutor(self.max_workers) as p: + self._turbine_powers_split = p.map( + _parallel_run_powers_only_map, + parallel_run_inputs + ) + self._turbine_powers_split = list(self._turbine_powers_split) + else: + with self._PoolExecutor(self.max_workers) as p: + self._fmodels_split = p.map( + _parallel_run_map, + parallel_run_inputs + ) + self._fmodels_split = list(self._fmodels_split) + t2 = timerpc() + self._postprocessing() + self.core.farm.finalize(self.core.grid.unsorted_indices) + self.core.state = State.USED + t3 = timerpc() if self.print_timings: print("===============================================================================") if self.interface is None: @@ -249,6 +301,21 @@ def fmodel(self): "attributes and methods of FlorisModel directly." ) + @property + def interface(self): + """ + The parallelization interface used. + """ + return self._interface + + @interface.setter + def interface(self, value): + """ + Raise error regarding setting the interface. + """ + raise AttributeError( + "The parallelization interface cannot be changed after instantiation." + ) def _parallel_run(fmodel_dict, set_kwargs) -> FlorisModel: """ @@ -275,3 +342,15 @@ def _parallel_run_powers_only(fmodel_dict, set_kwargs) -> np.ndarray: fmodel.set(**set_kwargs) fmodel.run() return fmodel.get_turbine_powers() + +def _parallel_run_map(x): + """ + Wrapper for unpacking inputs to _parallel_run() for use with map(). + """ + return _parallel_run(*x) + +def _parallel_run_powers_only_map(x): + """ + Wrapper for unpacking inputs to _parallel_run_powers_only() for use with map(). + """ + return _parallel_run_powers_only(*x) diff --git a/setup.py b/setup.py index b3532ffd5..7e3a11be4 100644 --- a/setup.py +++ b/setup.py @@ -28,6 +28,7 @@ # utilities "coloredlogs~=15.0", + "pathos~=0.3", ] # What packages are optional? diff --git a/tests/parallel_floris_model_2_unit_test.py b/tests/parallel_floris_model_2_unit_test.py index 9b35b2cb7..1a2e254d2 100644 --- a/tests/parallel_floris_model_2_unit_test.py +++ b/tests/parallel_floris_model_2_unit_test.py @@ -63,6 +63,78 @@ def test_multiprocessing_interface(sample_inputs_fixture): assert np.allclose(f_turb_powers, pf_turb_powers) +def test_pathos_interface(sample_inputs_fixture): + """ + With interface="pathos", the ParallelFlorisModel should return the same powers + as the FlorisModel. + """ + sample_inputs_fixture.core["wake"]["model_strings"]["velocity_model"] = VELOCITY_MODEL + sample_inputs_fixture.core["wake"]["model_strings"]["deflection_model"] = DEFLECTION_MODEL + + fmodel = FlorisModel(sample_inputs_fixture.core) + pfmodel = ParallelFlorisModel( + sample_inputs_fixture.core, + interface="pathos", + n_wind_condition_splits=2 + ) + + fmodel.run() + pfmodel.run() + + f_turb_powers = fmodel.get_turbine_powers() + pf_turb_powers = pfmodel.get_turbine_powers() + + assert np.allclose(f_turb_powers, pf_turb_powers) + + # Run in powers_only mode + pfmodel = ParallelFlorisModel( + sample_inputs_fixture.core, + interface="pathos", + n_wind_condition_splits=2, + return_turbine_powers_only=True + ) + + pfmodel.run() + pf_turb_powers = pfmodel.get_turbine_powers() + + assert np.allclose(f_turb_powers, pf_turb_powers) + +def test_concurrent_interface(sample_inputs_fixture): + """ + With interface="concurrent", the ParallelFlorisModel should return the same powers + as the FlorisModel. + """ + sample_inputs_fixture.core["wake"]["model_strings"]["velocity_model"] = VELOCITY_MODEL + sample_inputs_fixture.core["wake"]["model_strings"]["deflection_model"] = DEFLECTION_MODEL + + fmodel = FlorisModel(sample_inputs_fixture.core) + pfmodel = ParallelFlorisModel( + sample_inputs_fixture.core, + interface="concurrent", + n_wind_condition_splits=2, + ) + + fmodel.run() + pfmodel.run() + + f_turb_powers = fmodel.get_turbine_powers() + pf_turb_powers = pfmodel.get_turbine_powers() + + assert np.allclose(f_turb_powers, pf_turb_powers) + + # Run in powers_only mode + pfmodel = ParallelFlorisModel( + sample_inputs_fixture.core, + interface="concurrent", + n_wind_condition_splits=2, + return_turbine_powers_only=True + ) + + pfmodel.run() + pf_turb_powers = pfmodel.get_turbine_powers() + + assert np.allclose(f_turb_powers, pf_turb_powers) + def test_return_turbine_powers_only(sample_inputs_fixture): """ With return_turbine_powers_only=True, the ParallelFlorisModel should return only the