Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Improve parallelisation
Browse files Browse the repository at this point in the history
robbibt committed Oct 31, 2024
1 parent 7cfeac2 commit c90d940
Showing 3 changed files with 7,675 additions and 275 deletions.
348 changes: 202 additions & 146 deletions eo_tides/model.py
Original file line number Diff line number Diff line change
@@ -192,139 +192,6 @@ def list_models(
return available_models, supported_models


def _model_tides(
model,
x,
y,
time,
directory,
crs,
crop,
method,
extrapolate,
cutoff,
output_units,
mode,
):
"""Worker function applied in parallel by `model_tides`. Handles the
extraction of tide modelling constituents and tide modelling using
`pyTMD`.
"""
# Obtain model details
pytmd_model = pyTMD.io.model(directory).elevation(model)

# Reproject x, y to latitude/longitude
transformer = pyproj.Transformer.from_crs(crs, "EPSG:4326", always_xy=True)
lon, lat = transformer.transform(x.flatten(), y.flatten())

# Convert datetime
timescale = pyTMD.time.timescale().from_datetime(time.flatten())

try:
# Read tidal constants and interpolate to grid points
amp, ph, c = pytmd_model.extract_constants(
lon,
lat,
type=pytmd_model.type,
crop=crop,
bounds=None,
method=method,
extrapolate=extrapolate,
cutoff=cutoff,
append_node=False,
# append_node=True,
)

# TODO: Return constituents
# print(amp.shape, ph.shape, c)
# print(pd.DataFrame({"amplitude": amp}))

# Raise error if constituent files no not cover analysis extent
except IndexError as e:
error_msg = f"""
The {model} tide model constituent files do not cover the requested analysis extent.
This can occur if you are using clipped model files to improve run times.
Consider using model files that cover your entire analysis area, or set `crop=False`
to reduce the extent of tide model constituent files that is loaded.
"""
raise Exception(textwrap.dedent(error_msg).strip()) from None

# Calculate complex phase in radians for Euler's
cph = -1j * ph * np.pi / 180.0

# Calculate constituent oscillation
hc = amp * np.exp(cph)

# Compute deltat based on model
if pytmd_model.corrections in ("OTIS", "ATLAS", "TMD3", "netcdf"):
# Use delta time at 2000.0 to match TMD outputs
deltat = np.zeros_like(timescale.tt_ut1)
else:
# Use interpolated delta times
deltat = timescale.tt_ut1

# Determine the number of points and times to process. If in
# "one-to-many" mode, these counts are used to repeat our extracted
# constituents and timesteps so we can extract tides for all
# combinations of our input times and tide modelling points.
# If in "one-to-many" mode, repeat constituents to length of time
# and number of input coords before passing to `predict_tide_drift`
# If in "one-to-one" mode, we avoid this step by setting counts to 1
# (e.g. "repeat 1 times")
points_repeat = len(x) if mode == "one-to-many" else 1
time_repeat = len(time) if mode == "one-to-many" else 1
t, hc, deltat = (
np.tile(timescale.tide, points_repeat),
hc.repeat(time_repeat, axis=0),
np.tile(deltat, points_repeat),
)

# Create arrays to hold outputs
tide = np.ma.zeros((len(t)), fill_value=np.nan)
tide.mask = np.any(hc.mask, axis=1)

# Predict tidal elevations at time and infer minor corrections
tide.data[:] = pyTMD.predict.drift(
t,
hc,
c,
deltat=deltat,
corrections=pytmd_model.corrections,
)
minor = pyTMD.predict.infer_minor(
t,
hc,
c,
deltat=deltat,
corrections=pytmd_model.corrections,
minor=pytmd_model.minor,
)
tide.data[:] += minor.data[:]

# Replace invalid values with fill value
tide.data[tide.mask] = tide.fill_value

# Convert data to pandas.DataFrame, and set index to our input
# time/x/y values
tide_df = pd.DataFrame({
"time": np.tile(time, points_repeat),
"x": np.repeat(x, time_repeat),
"y": np.repeat(y, time_repeat),
"tide_model": model,
"tide_height": tide,
}).set_index(["time", "x", "y"])

# Optionally convert outputs to integer units (can save memory)
if output_units == "m":
tide_df["tide_height"] = tide_df.tide_height.astype(np.float32)
elif output_units == "cm":
tide_df["tide_height"] = (tide_df.tide_height * 100).astype(np.int16)
elif output_units == "mm":
tide_df["tide_height"] = (tide_df.tide_height * 1000).astype(np.int16)

return tide_df


def _ensemble_model(
tide_df,
crs,
@@ -490,6 +357,179 @@ def _ensemble_model(
return pd.concat(ensemble_list)


def _parallel_splits(
total_points,
model_count,
parallel_max=None,
min_points_per_split=1000,
):
"""
Calculates the optimal number of parallel splits for data
processing based on system resources and processing constraints.
Parameters:
-----------
total_points : int
Total number of data points to process
model_count : int
Number of models that will be run in parallel
parallel_max : int, optional
Maximum number of parallel processes to use. If None, uses CPU core count
min_points_per_split : int, default=1000
Minimum number of points that should be processed in each split
"""
# Available CPUs
if parallel_max is None:
try:
import psutil

parallel_max = psutil.cpu_count(logical=False)
except ImportError:
parallel_max = os.cpu_count()

# Calculate optimal number of splits based on constraints
splits_by_size = total_points / min_points_per_split
splits_by_cpu = parallel_max / model_count
optimal_splits = min(splits_by_size, splits_by_cpu)

# Convert to integer and ensure at least 1 split
final_split_count = int(max(1, optimal_splits))
return final_split_count


def _model_tides(
model,
x,
y,
time,
directory,
crs,
crop,
method,
extrapolate,
cutoff,
output_units,
mode,
):
"""Worker function applied in parallel by `model_tides`. Handles the
extraction of tide modelling constituents and tide modelling using
`pyTMD`.
"""
# Obtain model details
pytmd_model = pyTMD.io.model(directory).elevation(model)

# Reproject x, y to latitude/longitude
transformer = pyproj.Transformer.from_crs(crs, "EPSG:4326", always_xy=True)
lon, lat = transformer.transform(x.flatten(), y.flatten())

# Convert datetime
timescale = pyTMD.time.timescale().from_datetime(time.flatten())

try:
# Read tidal constants and interpolate to grid points
amp, ph, c = pytmd_model.extract_constants(
lon,
lat,
type=pytmd_model.type,
crop=crop,
bounds=None,
method=method,
extrapolate=extrapolate,
cutoff=cutoff,
append_node=False,
# append_node=True,
)

# TODO: Return constituents
# print(amp.shape, ph.shape, c)
# print(pd.DataFrame({"amplitude": amp}))

# Raise error if constituent files no not cover analysis extent
except IndexError as e:
error_msg = f"""
The {model} tide model constituent files do not cover the requested analysis extent.
This can occur if you are using clipped model files to improve run times.
Consider using model files that cover your entire analysis area, or set `crop=False`
to reduce the extent of tide model constituent files that is loaded.
"""
raise Exception(textwrap.dedent(error_msg).strip()) from None

# Calculate complex phase in radians for Euler's
cph = -1j * ph * np.pi / 180.0

# Calculate constituent oscillation
hc = amp * np.exp(cph)

# Compute deltat based on model
if pytmd_model.corrections in ("OTIS", "ATLAS", "TMD3", "netcdf"):
# Use delta time at 2000.0 to match TMD outputs
deltat = np.zeros_like(timescale.tt_ut1)
else:
# Use interpolated delta times
deltat = timescale.tt_ut1

# Determine the number of points and times to process. If in
# "one-to-many" mode, these counts are used to repeat our extracted
# constituents and timesteps so we can extract tides for all
# combinations of our input times and tide modelling points.
# If in "one-to-many" mode, repeat constituents to length of time
# and number of input coords before passing to `predict_tide_drift`
# If in "one-to-one" mode, we avoid this step by setting counts to 1
# (e.g. "repeat 1 times")
points_repeat = len(x) if mode == "one-to-many" else 1
time_repeat = len(time) if mode == "one-to-many" else 1
t, hc, deltat = (
np.tile(timescale.tide, points_repeat),
hc.repeat(time_repeat, axis=0),
np.tile(deltat, points_repeat),
)

# Create arrays to hold outputs
tide = np.ma.zeros((len(t)), fill_value=np.nan)
tide.mask = np.any(hc.mask, axis=1)

# Predict tidal elevations at time and infer minor corrections
tide.data[:] = pyTMD.predict.drift(
t,
hc,
c,
deltat=deltat,
corrections=pytmd_model.corrections,
)
minor = pyTMD.predict.infer_minor(
t,
hc,
c,
deltat=deltat,
corrections=pytmd_model.corrections,
minor=pytmd_model.minor,
)
tide.data[:] += minor.data[:]

# Replace invalid values with fill value
tide.data[tide.mask] = tide.fill_value

# Convert data to pandas.DataFrame, and set index to our input
# time/x/y values
tide_df = pd.DataFrame({
"time": np.tile(time, points_repeat),
"x": np.repeat(x, time_repeat),
"y": np.repeat(y, time_repeat),
"tide_model": model,
"tide_height": tide,
}).set_index(["time", "x", "y"])

# Optionally convert outputs to integer units (can save memory)
if output_units == "m":
tide_df["tide_height"] = tide_df.tide_height.astype(np.float32)
elif output_units == "cm":
tide_df["tide_height"] = (tide_df.tide_height * 100).astype(np.int16)
elif output_units == "mm":
tide_df["tide_height"] = (tide_df.tide_height * 1000).astype(np.int16)

return tide_df


def model_tides(
x: float | list[float] | xr.DataArray,
y: float | list[float] | xr.DataArray,
@@ -498,12 +538,13 @@ def model_tides(
directory: str | os.PathLike | None = None,
crs: str = "EPSG:4326",
crop: bool = True,
method: str = "spline",
method: str = "linear",
extrapolate: bool = True,
cutoff: float | None = None,
mode: str = "one-to-many",
parallel: bool = True,
parallel_splits: int = 5,
parallel_splits: int | str = "auto",
parallel_max: int | None = None,
output_units: str = "m",
output_format: str = "long",
ensemble_models: list[str] | None = None,
@@ -564,11 +605,11 @@ def model_tides(
1 degree buffer around all input points. Defaults to True.
method : str, optional
Method used to interpolate tidal constituents
from model files. Options include:
from model files. Defaults to "linear"; options include:
- "spline": scipy bivariate spline interpolation (default)
- "bilinear": quick bilinear interpolation
- "linear", "nearest": scipy regular grid interpolations
- "spline": scipy bivariate spline interpolation
- "bilinear": quick bilinear interpolation
extrapolate : bool, optional
Whether to extrapolate tides for x and y coordinates outside of
the valid tide modelling domain using nearest-neighbor.
@@ -594,12 +635,16 @@ def model_tides(
parallel. Optionally, tide modelling can also be run in parallel
across input x and y coordinates (see "parallel_splits" below).
Default is True.
parallel_splits : int, optional
parallel_splits : str or int, optional
Whether to split the input x and y coordinates into smaller,
evenly-sized chunks that are processed in parallel. This can
provide a large performance boost when processing large numbers
of coordinates. The default is 5 chunks, which will split
coordinates into 5 parallelised chunks.
of coordinates. The default is "auto", which will automatically
attempt to determine optimal splits based on available CPUs,
the number of input points, and the number of models.
parallel_max : int, optional
Maximum number of processes to run in parallel. The default of
None will automatically determine this from your available CPUs.
output_units : str, optional
Whether to return modelled tides in floating point metre units,
or integer centimetre units (i.e. scaled by 100) or integer
@@ -729,13 +774,24 @@ def model_tides(
mode=mode,
)

# Ensure requested parallel splits is not smaller than number of points
parallel_splits = min(parallel_splits, len(x))
# If automatic parallel splits, calculate optimal value
# based on available parallelisation, number of points
# and number of models
if parallel_splits == "auto":
parallel_splits = _parallel_splits(
total_points=len(x),
model_count=len(models_to_process),
parallel_max=parallel_max,
)
elif parallel_splits > len(x):
raise ValueError(f"Parallel splits ({parallel_splits}) cannot be larger than the number of points ({len(x)}).")

# Parallelise if either multiple models or multiple splits requested
if parallel & ((len(models_to_process) > 1) | (parallel_splits > 1)):
with ProcessPoolExecutor() as executor:
print(f"Modelling tides using {', '.join(models_to_process)} in parallel")
with ProcessPoolExecutor(max_workers=parallel_max) as executor:
print(
f"Modelling tides with {', '.join(models_to_process)} in parallel (models: {len(models_to_process)}, splits: {parallel_splits})"
)

# Optionally split lon/lat points into `splits_n` chunks
# that will be applied in parallel
@@ -783,7 +839,7 @@ def model_tides(
model_outputs = []

for model_i in models_to_process:
print(f"Modelling tides using {model_i}")
print(f"Modelling tides with {model_i}")
tide_df = iter_func(model_i, x, y, time)
model_outputs.append(tide_df)

182 changes: 107 additions & 75 deletions tests/test_model.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
import pytest
from pyTMD.compute import tide_elevations

from eo_tides.model import _set_directory, _standardise_time, list_models, model_phases, model_tides
from eo_tides.model import _parallel_splits, _set_directory, _standardise_time, list_models, model_phases, model_tides
from eo_tides.validation import eval_metrics

GAUGE_X = 122.2183
@@ -62,86 +62,36 @@ def test_standardise_time(input_value, expected_output):
assert np.array_equal(result, expected_output)


@pytest.mark.parametrize("time_offset", ["15 min", "20 min"])
def test_model_phases(time_offset):
phase_df = model_phases(
x=[122.14],
y=[-17.91],
time=pd.date_range("2020-01-01", "2020-01-02", freq="h"),
model=["EOT20"],
time_offset=time_offset,
)

assert phase_df.tide_phase.tolist() == [
"low-flow",
"low-flow",
"low-flow",
"low-flow",
"high-flow",
"high-flow",
"high-flow",
"high-ebb",
"high-ebb",
"high-ebb",
"low-ebb",
"low-ebb",
"low-ebb",
"low-flow",
"low-flow",
"high-flow",
"high-flow",
"high-flow",
"high-flow",
"high-ebb",
"high-ebb",
"high-ebb",
"low-ebb",
"low-ebb",
"low-ebb",
]


@pytest.mark.parametrize(
"models,output_format,return_tides,expected_cols",
"total_points, model_count, parallel_max, expected_splits",
[
(["EOT20"], "long", False, ["tide_model", "tide_phase"]),
(["EOT20"], "long", True, ["tide_model", "tide_height", "tide_phase"]),
(["EOT20", "GOT5.5"], "long", False, ["tide_model", "tide_phase"]),
(
["EOT20", "GOT5.5"],
"long",
True,
["tide_model", "tide_height", "tide_phase"],
),
(["EOT20"], "wide", False, ["EOT20"]),
(["EOT20"], "wide", True, [("tide_height", "EOT20"), ("tide_phase", "EOT20")]),
(["EOT20", "GOT5.5"], "wide", False, ["EOT20", "GOT5.5"]),
(
["EOT20", "GOT5.5"],
"wide",
True,
[
("tide_height", "EOT20"),
("tide_height", "GOT5.5"),
("tide_phase", "EOT20"),
("tide_phase", "GOT5.5"),
],
),
# Basic cases
(10000, 2, 8, 4), # Standard case with explicit parallel_max
(5000, 1, 4, 4), # Single model case
# Minimum split size cases
(900, 1, 4, 1), # Less than min_points_per_split
(2000, 2, 2, 1), # Just enough for 1 split with 2 models
# Maximum parallelization cases
(100000, 2, 4, 2), # Limited by CPU cores / model_count
(100000, 4, 8, 2), # Testing with more models
# Edge cases
(1, 1, 1, 1), # Minimum possible values
(999999, 1, 8, 8), # Large number of points
(10000, 8, 8, 1), # Many models relative to cores
],
)
def test_model_phases_format(models, output_format, return_tides, expected_cols):
phase_df = model_phases(
x=[122.14],
y=[-17.91],
time=pd.date_range("2020", "2021", periods=2),
model=models,
output_format=output_format,
return_tides=return_tides,
def test_parallel_splits(total_points, model_count, parallel_max, expected_splits):
"""
Test the _parallel_splits function with various parameter combinations.
"""
result = _parallel_splits(
total_points=total_points,
model_count=model_count,
parallel_max=parallel_max,
)

# Assert expected indexes and columns
assert phase_df.index.names == ["time", "x", "y"]
assert phase_df.columns.tolist() == expected_cols
# Check the returned value
assert result == expected_splits


# Test available tide models
@@ -552,3 +502,85 @@ def test_model_tides_ensemble():
"ensemble-mean-weighted",
"ensemble-mean",
])


@pytest.mark.parametrize("time_offset", ["15 min", "20 min"])
def test_model_phases(time_offset):
phase_df = model_phases(
x=[122.14],
y=[-17.91],
time=pd.date_range("2020-01-01", "2020-01-02", freq="h"),
model=["EOT20"],
time_offset=time_offset,
)

assert phase_df.tide_phase.tolist() == [
"low-flow",
"low-flow",
"low-flow",
"low-flow",
"high-flow",
"high-flow",
"high-flow",
"high-ebb",
"high-ebb",
"high-ebb",
"low-ebb",
"low-ebb",
"low-ebb",
"low-flow",
"low-flow",
"high-flow",
"high-flow",
"high-flow",
"high-flow",
"high-ebb",
"high-ebb",
"high-ebb",
"low-ebb",
"low-ebb",
"low-ebb",
]


@pytest.mark.parametrize(
"models,output_format,return_tides,expected_cols",
[
(["EOT20"], "long", False, ["tide_model", "tide_phase"]),
(["EOT20"], "long", True, ["tide_model", "tide_height", "tide_phase"]),
(["EOT20", "GOT5.5"], "long", False, ["tide_model", "tide_phase"]),
(
["EOT20", "GOT5.5"],
"long",
True,
["tide_model", "tide_height", "tide_phase"],
),
(["EOT20"], "wide", False, ["EOT20"]),
(["EOT20"], "wide", True, [("tide_height", "EOT20"), ("tide_phase", "EOT20")]),
(["EOT20", "GOT5.5"], "wide", False, ["EOT20", "GOT5.5"]),
(
["EOT20", "GOT5.5"],
"wide",
True,
[
("tide_height", "EOT20"),
("tide_height", "GOT5.5"),
("tide_phase", "EOT20"),
("tide_phase", "GOT5.5"),
],
),
],
)
def test_model_phases_format(models, output_format, return_tides, expected_cols):
phase_df = model_phases(
x=[122.14],
y=[-17.91],
time=pd.date_range("2020", "2021", periods=2),
model=models,
output_format=output_format,
return_tides=return_tides,
)

# Assert expected indexes and columns
assert phase_df.index.names == ["time", "x", "y"]
assert phase_df.columns.tolist() == expected_cols
7,420 changes: 7,366 additions & 54 deletions tests/testing.ipynb

Large diffs are not rendered by default.

0 comments on commit c90d940

Please sign in to comment.