Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(commongrid): improve 'compute_MVBS' using flox [all tests ci] #1124

Merged
merged 50 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
149d7da
chore(deps): add flox dependency >=0.7.2
lsetiawan Aug 15, 2023
d911c45
fix(commongrid): fixes 'compute_MVBS' so it can work better and scale
lsetiawan Aug 15, 2023
a052854
docs: add small code comment
lsetiawan Aug 15, 2023
364ebce
refactor: change how ping_time index is retrieved
lsetiawan Aug 15, 2023
a0fb46a
refactor: remove for loop for channel
lsetiawan Aug 15, 2023
d5558e8
test(mvbs): add mock Sv datasets and tests for dims (#2)
leewujung Aug 24, 2023
a1e5ec1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 24, 2023
f30df73
fix: change dask to numpy
lsetiawan Aug 24, 2023
c75d9a2
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Aug 25, 2023
99f2b65
feat: Add method argument
lsetiawan Aug 25, 2023
a536d22
fix(commongrid): Fixed to include lat lon
lsetiawan Aug 29, 2023
3cdfebf
refactor: Set defaults to recommended
lsetiawan Aug 29, 2023
afa418d
feat(commongrid): Add 'range_var' argument to 'compute_MVBS'
lsetiawan Aug 29, 2023
7a861b4
fix: Add missing attributes for lat lon
lsetiawan Aug 29, 2023
41ad126
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Aug 29, 2023
53cebd4
test: Update test to use random generator
lsetiawan Aug 29, 2023
22f03d2
fix: Add case for no 'water_level'
lsetiawan Aug 29, 2023
9e2a0aa
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Aug 30, 2023
0a4dbf3
test(nasc): Remove 'compute_NASC' import to avoid failure
lsetiawan Aug 30, 2023
7c52b28
fix: Removed assumption on echo range max
lsetiawan Aug 31, 2023
828ab6d
test: Extract api test and add markings
lsetiawan Aug 31, 2023
60b025c
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Aug 31, 2023
db53a72
test: Add latlon test for 'compute_MVBS'
lsetiawan Aug 31, 2023
3f24590
test: Add small get_MVBS_along_channels test
lsetiawan Aug 31, 2023
5a963bd
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Aug 31, 2023
f3c3503
refactor: Integrate suggested changes
lsetiawan Sep 1, 2023
7d457d5
test: Added check for position values
lsetiawan Sep 1, 2023
6a33efa
test: Update range_meter_bin to strings
lsetiawan Sep 1, 2023
b64eca3
test: Added 'compute_MVBS' values test
lsetiawan Sep 2, 2023
f00c291
Update echopype/tests/utils/test_processinglevels_integration.py
emiliom Sep 2, 2023
456fb50
test: Add 'nan' sprinkles
lsetiawan Sep 8, 2023
8578432
revert: Revert the use of 'pint'
lsetiawan Sep 8, 2023
31032af
feat: Allow 'range_bin' to have space
lsetiawan Sep 8, 2023
d8dd4f8
fix: Apply suggestions from code review
lsetiawan Sep 8, 2023
5e5e19e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 8, 2023
343b31b
test: Fix test text for wrong unit
lsetiawan Sep 8, 2023
ab89fba
test: Remove the 'e.g.' part on pytest
lsetiawan Sep 8, 2023
09eb7b7
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Sep 9, 2023
d510d25
test: Remove remnant for test_ek.py
lsetiawan Sep 20, 2023
8aa9201
refactor: Extract range_bin parsing and add close arg
lsetiawan Sep 20, 2023
73fb446
refactor: Update arg types to include interval index
lsetiawan Sep 20, 2023
744228d
test: Update tests to have brute force creation
lsetiawan Sep 20, 2023
48573fe
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 20, 2023
f1df1d6
test: Fix brute force mvbs gen
lsetiawan Sep 20, 2023
93b17b6
chore: Clean up old code for doing compute MVBS
lsetiawan Sep 20, 2023
9443c2a
chore(pytest): Added custom markers 'unit' and 'integration'
lsetiawan Sep 20, 2023
6eec56b
docs: Update docstring for `compute_MVBS`
lsetiawan Sep 20, 2023
3492b36
refactor: Merge branch 'dev' into fix_mvbs
lsetiawan Sep 20, 2023
56795a5
refactor: Change 'parse_range_bin' to 'parse_x_bin'
lsetiawan Sep 21, 2023
70a4dd8
chore: Update suggested changes
lsetiawan Sep 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 95 additions & 21 deletions echopype/commongrid/api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""
Functions for enhancing the spatial and temporal coherence of data.
"""
import re
from typing import Literal

import numpy as np
import pandas as pd
import xarray as xr

from ..consolidate.api import POSITION_VARIABLES
from ..utils.prov import add_processing_level, echopype_prov_attrs, insert_input_processing_level
from .mvbs import get_MVBS_along_channels

Expand Down Expand Up @@ -63,10 +67,18 @@ def _set_MVBS_attrs(ds):


@add_processing_level("L3*")
def compute_MVBS(ds_Sv, range_meter_bin=20, ping_time_bin="20S"):
def compute_MVBS(
ds_Sv: xr.Dataset,
range_var: Literal["echo_range", "depth"] = "echo_range",
range_bin: str = "20m",
ping_time_bin: str = "20S",
method="map-reduce",
**flox_kwargs,
):
"""
Compute Mean Volume Backscattering Strength (MVBS)
based on intervals of range (``echo_range``) and ``ping_time`` specified in physical units.
based on intervals of range (``echo_range``) or depth (``depth``)
and ``ping_time`` specified in physical units.

Output of this function differs from that of ``compute_MVBS_index_binning``, which computes
bin-averaged Sv according to intervals of ``echo_range`` and ``ping_time`` specified as
Expand All @@ -76,41 +88,103 @@ def compute_MVBS(ds_Sv, range_meter_bin=20, ping_time_bin="20S"):
----------
ds_Sv : xr.Dataset
dataset containing Sv and ``echo_range`` [m]
range_meter_bin : Union[int, float]
bin size along ``echo_range`` in meters, default to ``20``
range_var: str
The variable to use for range binning.
Must be one of ``echo_range`` or ``depth``.
Note that ``depth`` is only available if the input dataset contains
``depth`` as a data variable.
range_bin : str
bin size along ``echo_range`` or ``depth`` in meters,
default to ``20m``
ping_time_bin : str
bin size along ``ping_time``, default to ``20S``
method: str
The flox strategy for reduction of dask arrays only,
default to ``map-reduce``
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
for more details.
**kwargs
Additional keyword arguments to be passed
to flox reduction function.

Returns
-------
A dataset containing bin-averaged Sv
"""

# First check for bin types
if not isinstance(range_bin, str):
raise TypeError("range_bin must be a string")

if not isinstance(ping_time_bin, str):
raise TypeError("ping_time_bin must be a string")

# normalize to lower case
# for range_bin
range_bin = range_bin.strip().lower()
# Only matches meters
match_obj = re.match(r"([\d+]*[.,]{0,1}[\d+]*)(\s+)?(m)", range_bin)

# Do some checks on range meter inputs
if match_obj is None:
# This shouldn't be other units
raise ValueError("range_bin must be in meters (e.g., '10m').")

# Convert back to float
range_bin = float(match_obj.group(1))
lsetiawan marked this conversation as resolved.
Show resolved Hide resolved
lsetiawan marked this conversation as resolved.
Show resolved Hide resolved

# Clean up filenames dimension if it exists
# not needed here
if "filenames" in ds_Sv.dims:
ds_Sv = ds_Sv.drop_dims("filenames")

# Check if range_var is valid
if range_var not in ["echo_range", "depth"]:
raise ValueError("range_var must be one of 'echo_range' or 'depth'.")

# Check if range_var exists in ds_Sv
if range_var not in ds_Sv.data_vars:
raise ValueError(f"range_var '{range_var}' does not exist in the input dataset.")

# create bin information for echo_range
range_interval = np.arange(0, ds_Sv["echo_range"].max() + range_meter_bin, range_meter_bin)
# this computes the echo range max since there might NaNs in the data
echo_range_max = ds_Sv[range_var].max()
range_interval = np.arange(0, echo_range_max + range_bin, range_bin)

# create bin information needed for ping_time
ping_interval = (
ds_Sv.ping_time.resample(ping_time=ping_time_bin, skipna=True).asfreq().ping_time.values
d_index = (
ds_Sv["ping_time"]
.resample(ping_time=ping_time_bin, skipna=True)
.first() # Not actually being used, but needed to get the bin groups
.indexes["ping_time"]
)
ping_interval = d_index.union([d_index[-1] + pd.Timedelta(ping_time_bin)]).values

# calculate the MVBS along each channel
MVBS_values = get_MVBS_along_channels(ds_Sv, range_interval, ping_interval)
raw_MVBS = get_MVBS_along_channels(
ds_Sv, range_interval, ping_interval, range_var=range_var, method=method, **flox_kwargs
)

# create MVBS dataset
# by transforming the binned dimensions to regular coords
ds_MVBS = xr.Dataset(
data_vars={"Sv": (["channel", "ping_time", "echo_range"], MVBS_values)},
data_vars={"Sv": (["channel", "ping_time", range_var], raw_MVBS["Sv"].data)},
coords={
"ping_time": ping_interval,
"channel": ds_Sv.channel,
"echo_range": range_interval[:-1],
"ping_time": np.array([v.left for v in raw_MVBS.ping_time_bins.values]),
"channel": raw_MVBS.channel.values,
range_var: np.array([v.left for v in raw_MVBS[f"{range_var}_bins"].values]),
},
)

# TODO: look into why 'filenames' exist here as a variable
# Added this check to support the test in test_process.py::test_compute_MVBS
if "filenames" in ds_MVBS.variables:
ds_MVBS = ds_MVBS.drop_vars("filenames")
# "has_positions" attribute is inserted in get_MVBS_along_channels
# when the dataset has position information
# propagate this to the final MVBS dataset
if raw_MVBS.attrs.get("has_positions", False):
for var in POSITION_VARIABLES:
ds_MVBS[var] = (["ping_time"], raw_MVBS[var].data, ds_Sv[var].attrs)

# Add water level if uses echo_range and it exists in Sv dataset
if range_var == "echo_range" and "water_level" in ds_Sv.data_vars:
ds_MVBS["water_level"] = ds_Sv["water_level"]

# ping_time_bin parsing and conversions
# Need to convert between pd.Timedelta and np.timedelta64 offsets/frequency strings
Expand Down Expand Up @@ -143,17 +217,17 @@ def compute_MVBS(ds_Sv, range_meter_bin=20, ping_time_bin="20S"):

# Attach attributes
_set_MVBS_attrs(ds_MVBS)
ds_MVBS["echo_range"].attrs = {"long_name": "Range distance", "units": "m"}
ds_MVBS[range_var].attrs = {"long_name": "Range distance", "units": "m"}
ds_MVBS["Sv"] = ds_MVBS["Sv"].assign_attrs(
{
"cell_methods": (
f"ping_time: mean (interval: {ping_time_bin_resvalue} {ping_time_bin_resunit_label} " # noqa
"comment: ping_time is the interval start) "
f"echo_range: mean (interval: {range_meter_bin} meter "
"comment: echo_range is the interval start)"
f"{range_var}: mean (interval: {range_bin} meter "
f"comment: {range_var} is the interval start)"
),
"binning_mode": "physical units",
"range_meter_interval": str(range_meter_bin) + "m",
"range_meter_interval": str(range_bin) + "m",
"ping_time_interval": ping_time_bin,
"actual_range": [
round(float(ds_MVBS["Sv"].min().values), 2),
Expand Down
99 changes: 61 additions & 38 deletions echopype/commongrid/mvbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
"""

import warnings
from typing import Tuple, Union
from typing import Literal, Tuple, Union

import dask.array
import numpy as np
import xarray as xr
from flox.xarray import xarray_reduce

from ..consolidate.api import POSITION_VARIABLES
from ..utils.compute import _lin2log, _log2lin


def get_bin_indices(
Expand Down Expand Up @@ -405,8 +409,13 @@ def bin_and_mean_2d(


def get_MVBS_along_channels(
ds_Sv: xr.Dataset, echo_range_interval: np.ndarray, ping_interval: np.ndarray
) -> np.ndarray:
ds_Sv: xr.Dataset,
range_interval: np.ndarray,
ping_interval: np.ndarray,
range_var: Literal["echo_range", "depth"] = "echo_range",
method: str = "map-reduce",
**kwargs
) -> xr.Dataset:
"""
Computes the MVBS of ``ds_Sv`` along each channel for the given
intervals.
Expand All @@ -416,46 +425,60 @@ def get_MVBS_along_channels(
ds_Sv: xr.Dataset
A Dataset containing ``Sv`` and ``echo_range`` data with coordinates
``channel``, ``ping_time``, and ``range_sample``
echo_range_interval: np.ndarray
1D array (used by np.digitize) representing the binning required for ``echo_range``
range_interval: np.ndarray
1D array representing
the bins required for ``range_var``
ping_interval: np.ndarray
1D array (used by np.digitize) representing the binning required for ``ping_time``
1D array representing
the bins required for ``ping_time``
range_var: str
The variable to use for range binning.
Either ``echo_range`` or ``depth``.
method: str
The flox strategy for reduction of dask arrays only.
See flox `documentation <https://flox.readthedocs.io/en/latest/implementation.html>`_
for more details.
**kwargs
Additional keyword arguments to be passed
to flox reduction function

Returns
-------
np.ndarray
The MVBS value of the input ``ds_Sv`` for all channels

Notes
-----
If the values in ``ds_Sv`` are delayed then the binning and mean of ``Sv`` with
respect to ``echo_range`` will take place, then the delayed result will be computed,
and lastly the binning and mean with respect to ``ping_time`` will be completed. It
is necessary to apply a compute midway through this method because Dask graph layers
get too large and this makes downstream operations very inefficient.
xr.Dataset
The MVBS dataset of the input ``ds_Sv`` for all channels
"""

all_MVBS = []
for chan in ds_Sv.channel:
# squeeze to remove "channel" dim if present
# TODO: not sure why not already removed for the AZFP case. Investigate.
ds = ds_Sv.sel(channel=chan).squeeze()

# average should be done in linear domain
sv = 10 ** (ds["Sv"] / 10)

# get MVBS for channel in linear domain
chan_MVBS = bin_and_mean_2d(
sv.data,
bins_time=ping_interval,
bins_er=echo_range_interval,
times=sv.ping_time.data,
echo_range=ds["echo_range"],
comprehensive_er_check=True,
# average should be done in linear domain
sv = ds_Sv["Sv"].pipe(_log2lin)

# Get positions if exists
# otherwise just use an empty dataset
ds_Pos = xr.Dataset(attrs={"has_positions": False})
if all(v in ds_Sv for v in POSITION_VARIABLES):
ds_Pos = xarray_reduce(
ds_Sv[POSITION_VARIABLES],
ds_Sv["ping_time"],
func="nanmean",
expected_groups=(ping_interval),
isbin=True,
method=method,
)
ds_Pos.attrs["has_positions"] = True

# reduce along ping_time and echo_range or depth
# by binning and averaging
mvbs = xarray_reduce(
sv,
sv["channel"],
ds_Sv["ping_time"],
ds_Sv[range_var],
func="nanmean",
expected_groups=(None, ping_interval, range_interval),
isbin=[False, True, True],
method=method,
**kwargs
)

# apply inverse mapping to get back to the original domain and store values
all_MVBS.append(10 * np.log10(chan_MVBS))

# collect the MVBS values for each channel
return np.stack(all_MVBS, axis=0)
# apply inverse mapping to get back to the original domain and store values
da_MVBS = mvbs.pipe(_lin2log)
return xr.merge([ds_Pos, da_MVBS])
4 changes: 3 additions & 1 deletion echopype/consolidate/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from ..utils.prov import add_processing_level
from .split_beam_angle import add_angle_to_ds, get_angle_complex_samples, get_angle_power_samples

POSITION_VARIABLES = ["latitude", "longitude"]


def swap_dims_channel_frequency(ds: xr.Dataset) -> xr.Dataset:
"""
Expand Down Expand Up @@ -185,7 +187,7 @@ def sel_interp(var, time_dim_name):
f"{datetime.datetime.utcnow()} +00:00. "
"Interpolated or propagated from Platform latitude/longitude." # noqa
)
for da_name in ["latitude", "longitude"]:
for da_name in POSITION_VARIABLES:
interp_ds[da_name] = interp_ds[da_name].assign_attrs({"history": history_attr})

if time_dim_name in interp_ds:
Expand Down
Loading