Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(commongrid): improve 'compute_MVBS' using flox [all tests ci] #1124

Merged
merged 50 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
149d7da
chore(deps): add flox dependency >=0.7.2
lsetiawan Aug 15, 2023
d911c45
fix(commongrid): fixes 'compute_MVBS' so it can work better and scale
lsetiawan Aug 15, 2023
a052854
docs: add small code comment
lsetiawan Aug 15, 2023
364ebce
refactor: change how ping_time index is retrieved
lsetiawan Aug 15, 2023
a0fb46a
refactor: remove for loop for channel
lsetiawan Aug 15, 2023
d5558e8
test(mvbs): add mock Sv datasets and tests for dims (#2)
leewujung Aug 24, 2023
a1e5ec1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 24, 2023
f30df73
fix: change dask to numpy
lsetiawan Aug 24, 2023
c75d9a2
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Aug 25, 2023
99f2b65
feat: Add method argument
lsetiawan Aug 25, 2023
a536d22
fix(commongrid): Fixed to include lat lon
lsetiawan Aug 29, 2023
3cdfebf
refactor: Set defaults to recommended
lsetiawan Aug 29, 2023
afa418d
feat(commongrid): Add 'range_var' argument to 'compute_MVBS'
lsetiawan Aug 29, 2023
7a861b4
fix: Add missing attributes for lat lon
lsetiawan Aug 29, 2023
41ad126
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Aug 29, 2023
53cebd4
test: Update test to use random generator
lsetiawan Aug 29, 2023
22f03d2
fix: Add case for no 'water_level'
lsetiawan Aug 29, 2023
9e2a0aa
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Aug 30, 2023
0a4dbf3
test(nasc): Remove 'compute_NASC' import to avoid failure
lsetiawan Aug 30, 2023
7c52b28
fix: Removed assumption on echo range max
lsetiawan Aug 31, 2023
828ab6d
test: Extract api test and add markings
lsetiawan Aug 31, 2023
60b025c
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Aug 31, 2023
db53a72
test: Add latlon test for 'compute_MVBS'
lsetiawan Aug 31, 2023
3f24590
test: Add small get_MVBS_along_channels test
lsetiawan Aug 31, 2023
5a963bd
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Aug 31, 2023
f3c3503
refactor: Integrate suggested changes
lsetiawan Sep 1, 2023
7d457d5
test: Added check for position values
lsetiawan Sep 1, 2023
6a33efa
test: Update range_meter_bin to strings
lsetiawan Sep 1, 2023
b64eca3
test: Added 'compute_MVBS' values test
lsetiawan Sep 2, 2023
f00c291
Update echopype/tests/utils/test_processinglevels_integration.py
emiliom Sep 2, 2023
456fb50
test: Add 'nan' sprinkles
lsetiawan Sep 8, 2023
8578432
revert: Revert the use of 'pint'
lsetiawan Sep 8, 2023
31032af
feat: Allow 'range_bin' to have space
lsetiawan Sep 8, 2023
d8dd4f8
fix: Apply suggestions from code review
lsetiawan Sep 8, 2023
5e5e19e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 8, 2023
343b31b
test: Fix test text for wrong unit
lsetiawan Sep 8, 2023
ab89fba
test: Remove the 'e.g.' part on pytest
lsetiawan Sep 8, 2023
09eb7b7
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Sep 9, 2023
d510d25
test: Remove remnant for test_ek.py
lsetiawan Sep 20, 2023
8aa9201
refactor: Extract range_bin parsing and add close arg
lsetiawan Sep 20, 2023
73fb446
refactor: Update arg types to include interval index
lsetiawan Sep 20, 2023
744228d
test: Update tests to have brute force creation
lsetiawan Sep 20, 2023
48573fe
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 20, 2023
f1df1d6
test: Fix brute force mvbs gen
lsetiawan Sep 20, 2023
93b17b6
chore: Clean up old code for doing compute MVBS
lsetiawan Sep 20, 2023
9443c2a
chore(pytest): Added custom markers 'unit' and 'integration'
lsetiawan Sep 20, 2023
6eec56b
docs: Update docstring for `compute_MVBS`
lsetiawan Sep 20, 2023
3492b36
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Sep 20, 2023
56795a5
refactor: Change 'parse_range_bin' to 'parse_x_bin'
lsetiawan Sep 21, 2023
70a4dd8
chore: Update suggested changes
lsetiawan Sep 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 192 additions & 24 deletions echopype/commongrid/api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""
Functions for enhancing the spatial and temporal coherence of data.
"""
import re
from typing import Literal

import numpy as np
import pandas as pd
import xarray as xr

from ..consolidate.api import POSITION_VARIABLES
from ..utils.prov import add_processing_level, echopype_prov_attrs, insert_input_processing_level
from .mvbs import get_MVBS_along_channels

Expand Down Expand Up @@ -62,11 +66,117 @@ def _set_MVBS_attrs(ds):
)


def _convert_bins_to_interval_index(
bins: list, closed: Literal["left", "right"] = "left"
) -> pd.IntervalIndex:
"""
Convert bins to sorted pandas IntervalIndex
with specified closed end

Parameters
----------
bins : list
The bin edges
closed : {'left', 'right'}, default 'left'
Which side of bin interval is closed

Returns
-------
pd.IntervalIndex
The resulting IntervalIndex
"""
return pd.IntervalIndex.from_breaks(bins, closed=closed).sort_values()


def _parse_x_bin(x_bin: str, x_label="range_bin") -> float:
"""
Parses x bin string, check unit,
and returns x bin in the specified unit.

Currently only available for:
range_bin: meters (m)
dist_bin: nautical miles (nmi)

Parameters
----------
x_bin : str
X bin string, e.g., "0.5nmi" or "10m"
x_label : {"range_bin", "dist_bin"}, default "range_bin"
The label of the x bin.

Returns
-------
float
The resulting x bin value in x unit,
based on label.

Raises
------
ValueError
If the x bin string doesn't include unit value.
TypeError
If the x bin is not a type string.
KeyError
If the x label is not one of the available labels.
"""
x_bin_map = {
"range_bin": {
"name": "Range bin",
"unit": "m",
"ex": "10m",
"unit_label": "meters",
"pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(m)",
},
"dist_bin": {
"name": "Distance bin",
"unit": "nmi",
"ex": "0.5nmi",
"unit_label": "nautical miles",
"pattern": r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(nmi)",
},
}
x_bin_info = x_bin_map.get(x_label, None)

if x_bin_info is None:
raise KeyError(f"x_label must be one of {list(x_bin_map.keys())}")

# First check for bin types
if not isinstance(x_bin, str):
raise TypeError("'x_bin' must be a string")
# normalize to lower case
# for x_bin
x_bin = x_bin.strip().lower()
# Only matches meters
match_obj = re.match(x_bin_info["pattern"], x_bin)

# Do some checks on x_bin inputs
if match_obj is None:
# This shouldn't be other units
raise ValueError(
f"{x_bin_info['name']} must be in "
f"{x_bin_info['unit_label']} "
f"(e.g., '{x_bin_info['ex']}')."
)

# Convert back to float
x_bin = float(match_obj.group(1))
return x_bin


@add_processing_level("L3*")
def compute_MVBS(ds_Sv, range_meter_bin=20, ping_time_bin="20S"):
def compute_MVBS(
ds_Sv: xr.Dataset,
range_var: Literal["echo_range", "depth"] = "echo_range",
range_bin: str = "20m",
ping_time_bin: str = "20S",
method="map-reduce",
closed: Literal["left", "right"] = "left",
**flox_kwargs,
):
"""
Compute Mean Volume Backscattering Strength (MVBS)
based on intervals of range (``echo_range``) and ``ping_time`` specified in physical units.
based on intervals of range (``echo_range``) or depth (``depth``)
and ``ping_time`` specified in physical units.

Output of this function differs from that of ``compute_MVBS_index_binning``, which computes
bin-averaged Sv according to intervals of ``echo_range`` and ``ping_time`` specified as
Expand All @@ -76,41 +186,99 @@ def compute_MVBS(ds_Sv, range_meter_bin=20, ping_time_bin="20S"):
----------
ds_Sv : xr.Dataset
dataset containing Sv and ``echo_range`` [m]
range_meter_bin : Union[int, float]
bin size along ``echo_range`` in meters, default to ``20``
ping_time_bin : str
bin size along ``ping_time``, default to ``20S``
range_var: {'echo_range', 'depth'}, default 'echo_range'
The variable to use for range binning.
Must be one of ``echo_range`` or ``depth``.
Note that ``depth`` is only available if the input dataset contains
``depth`` as a data variable.
range_bin : str, default '20m'
bin size along ``echo_range`` or ``depth`` in meters.
ping_time_bin : str, default '20S'
bin size along ``ping_time``
method: str, default 'map-reduce'
The flox strategy for reduction of dask arrays only.
lsetiawan marked this conversation as resolved.
Show resolved Hide resolved
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
for more details.
closed: {'left', 'right'}, default 'left'
Which side of bin interval is closed.
**kwargs
Additional keyword arguments to be passed
to flox reduction function.

Returns
-------
A dataset containing bin-averaged Sv
"""

if not isinstance(ping_time_bin, str):
raise TypeError("ping_time_bin must be a string")

range_bin = _parse_x_bin(range_bin, "range_bin")

# Clean up filenames dimension if it exists
# not needed here
if "filenames" in ds_Sv.dims:
ds_Sv = ds_Sv.drop_dims("filenames")

# Check if range_var is valid
if range_var not in ["echo_range", "depth"]:
raise ValueError("range_var must be one of 'echo_range' or 'depth'.")

# Check if range_var exists in ds_Sv
if range_var not in ds_Sv.data_vars:
raise ValueError(f"range_var '{range_var}' does not exist in the input dataset.")

# Check for closed values
if closed not in ["right", "left"]:
raise ValueError(f"{closed} is not a valid option. Options are 'left' or 'right'.")

# create bin information for echo_range
range_interval = np.arange(0, ds_Sv["echo_range"].max() + range_meter_bin, range_meter_bin)
# this computes the echo range max since there might NaNs in the data
echo_range_max = ds_Sv[range_var].max()
range_interval = np.arange(0, echo_range_max + range_bin, range_bin)

# create bin information needed for ping_time
ping_interval = (
ds_Sv.ping_time.resample(ping_time=ping_time_bin, skipna=True).asfreq().ping_time.values
d_index = (
ds_Sv["ping_time"]
.resample(ping_time=ping_time_bin, skipna=True)
.first() # Not actually being used, but needed to get the bin groups
.indexes["ping_time"]
)
ping_interval = d_index.union([d_index[-1] + pd.Timedelta(ping_time_bin)]).values

# Set interval index for groups
ping_interval = _convert_bins_to_interval_index(ping_interval, closed=closed)
range_interval = _convert_bins_to_interval_index(range_interval, closed=closed)
raw_MVBS = get_MVBS_along_channels(
ds_Sv,
range_interval,
ping_interval,
range_var=range_var,
method=method,
**flox_kwargs,
)

# calculate the MVBS along each channel
MVBS_values = get_MVBS_along_channels(ds_Sv, range_interval, ping_interval)

# create MVBS dataset
# by transforming the binned dimensions to regular coords
ds_MVBS = xr.Dataset(
data_vars={"Sv": (["channel", "ping_time", "echo_range"], MVBS_values)},
data_vars={"Sv": (["channel", "ping_time", range_var], raw_MVBS["Sv"].data)},
coords={
"ping_time": ping_interval,
"channel": ds_Sv.channel,
"echo_range": range_interval[:-1],
"ping_time": np.array([v.left for v in raw_MVBS.ping_time_bins.values]),
"channel": raw_MVBS.channel.values,
range_var: np.array([v.left for v in raw_MVBS[f"{range_var}_bins"].values]),
},
)

# TODO: look into why 'filenames' exist here as a variable
# Added this check to support the test in test_process.py::test_compute_MVBS
if "filenames" in ds_MVBS.variables:
ds_MVBS = ds_MVBS.drop_vars("filenames")
# "has_positions" attribute is inserted in get_MVBS_along_channels
# when the dataset has position information
# propagate this to the final MVBS dataset
if raw_MVBS.attrs.get("has_positions", False):
for var in POSITION_VARIABLES:
ds_MVBS[var] = (["ping_time"], raw_MVBS[var].data, ds_Sv[var].attrs)

# Add water level if uses echo_range and it exists in Sv dataset
if range_var == "echo_range" and "water_level" in ds_Sv.data_vars:
ds_MVBS["water_level"] = ds_Sv["water_level"]

# ping_time_bin parsing and conversions
# Need to convert between pd.Timedelta and np.timedelta64 offsets/frequency strings
Expand Down Expand Up @@ -143,17 +311,17 @@ def compute_MVBS(ds_Sv, range_meter_bin=20, ping_time_bin="20S"):

# Attach attributes
_set_MVBS_attrs(ds_MVBS)
ds_MVBS["echo_range"].attrs = {"long_name": "Range distance", "units": "m"}
ds_MVBS[range_var].attrs = {"long_name": "Range distance", "units": "m"}
ds_MVBS["Sv"] = ds_MVBS["Sv"].assign_attrs(
{
"cell_methods": (
f"ping_time: mean (interval: {ping_time_bin_resvalue} {ping_time_bin_resunit_label} " # noqa
"comment: ping_time is the interval start) "
f"echo_range: mean (interval: {range_meter_bin} meter "
"comment: echo_range is the interval start)"
f"{range_var}: mean (interval: {range_bin} meter "
f"comment: {range_var} is the interval start)"
),
"binning_mode": "physical units",
"range_meter_interval": str(range_meter_bin) + "m",
"range_meter_interval": str(range_bin) + "m",
"ping_time_interval": ping_time_bin,
"actual_range": [
round(float(ds_MVBS["Sv"].min().values), 2),
Expand Down
Loading