Skip to content

Commit

Permalink
Factorize looking for a band in a product
Browse files Browse the repository at this point in the history
  • Loading branch information
remi-braun committed Jan 20, 2025
1 parent 36fe223 commit 1e74912
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 48 deletions.
58 changes: 13 additions & 45 deletions eosets/mosaic.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
import contextlib
import logging
import os
import shutil
from collections import defaultdict
from enum import unique
from glob import glob
from typing import Union

import geopandas as gpd
import xarray as xr
from eoreader import cache, utils
from eoreader.bands import is_spectral_band, to_band, to_str
from eoreader.bands import to_band, to_str
from eoreader.products import Product
from eoreader.reader import Reader
from eoreader.utils import UINT16_NODATA
Expand All @@ -38,7 +36,7 @@
from eosets import EOSETS_NAME
from eosets.exceptions import IncompatibleProducts
from eosets.set import GeometryCheck, GeometryCheckType, Set
from eosets.utils import AnyProductType, BandsType, stack
from eosets.utils import AnyProductType, BandsType, look_for_prod_band_file, stack

READER = Reader()

Expand Down Expand Up @@ -143,7 +141,6 @@ def __open(prod_or_path: AnyProductType) -> Product:
prod_: Product = READER.open(
prod_or_path,
remove_tmp=self._remove_tmp,
output_path=self._get_tmp_folder(writable=True),
**kwargs,
)
elif isinstance(prod_or_path, Product):
Expand Down Expand Up @@ -370,70 +367,41 @@ def load(
pixel_size = prod.pixel_size

# Load bands
prod.load(bands_to_load, pixel_size, **kwargs).keys()
prod.load(bands_to_load, pixel_size, **kwargs)

# Store paths
for band in bands_to_load:
if is_spectral_band(band):
band_path = prod.get_band_paths([band], pixel_size, **kwargs)[
band
]
else:
band_regex = f"*{prod.condensed_name}*_{to_str(band)[0]}_*"
# Use glob fct as _get_band_folder is a tmpDirectory
try:
# Check if the band exists in a non-writable directory
band_path = glob(
os.path.join(
prod._get_band_folder(writable=False), band_regex
)
)[0]
except IndexError:
# Check if the band exists in a writable directory
band_path = glob(
os.path.join(
prod._get_band_folder(writable=True), band_regex
)
)[0]

band_path = look_for_prod_band_file(
prod, band, pixel_size, **kwargs
)
prod_band_paths[band].append(str(band_path))

# Merge
merged_dict = {}

for band in bands_path:
output_path = bands_path[band]
if not output_path.is_file():
if self.nof_prods > 1:
LOGGER.debug(f"Merging bands {to_str(band)[0]}")
if self.mosaic_method == MosaicMethod.VRT:
prod_paths = []
for prod_path in prod_band_paths[band]:
for band_path in prod_band_paths[band]:
out_path, exists = self._get_out_path(
os.path.basename(prod_path)
os.path.basename(band_path)
)
if not exists:
if AnyPath(prod_path).parent.is_relative_to(
self.output
):
# If EOReader's band: move
shutil.move(prod_path, out_path)
else:
# If raw product's band: copy
files.copy(prod_path, out_path)
# Copy any band, even EOReader's as this band may be needed several times in indices computation (don't move it)
files.copy(band_path, out_path)
prod_paths.append(out_path)
else:
prod_paths = prod_band_paths[band]

# Don't pass kwargs here because of unwanted errors
merge_fct(prod_paths, output_path)
else:
prod_path = prod_band_paths[band][0]
if AnyPath(prod_path).parent.is_relative_to(self.output):
# If EOReader's band: move
shutil.move(prod_path, output_path)
else:
# If raw product's band: copy
files.copy(prod_path, output_path)
# Copy any band, even EOReader's as this band may be needed several times in indices computation (don't move it)
files.copy(prod_band_paths[band][0], output_path)

# Load in memory and update attribute
merged_dict[band] = self._update_attrs(
Expand Down
91 changes: 88 additions & 3 deletions eosets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
# limitations under the License.
"""Utils file"""

import contextlib
from typing import Union

from eoreader import utils
from eoreader.products import Product
from sertit.types import AnyPathStrType
from sertit import AnyPath
from sertit.types import AnyPathStrType, AnyPathType

read = utils.read
write = utils.write
Expand All @@ -28,12 +30,95 @@
AnyProductType = Union[AnyPathStrType, Product]
""" Any Product Type, either a path or an eoreader.Product"""

# Bands Type
# Band(s) Type
try:
from eoreader.bands import BandsType
from eoreader.bands import (
BandNames,
BandsType,
CloudsBandNames,
DemBandNames,
SarBandNames,
SpectralBandNames,
is_spectral_band,
to_str,
)

BandType = Union[
str, SpectralBandNames, SarBandNames, CloudsBandNames, DemBandNames, BandNames
]
BandsType = BandsType
except ImportError:
from eoreader.bands import BandType

BandsType = Union[list, BandType]


def look_for_prod_band_file(prod: Product, band: BandType, pixel_size: float, **kwargs):
"""
Look for a product's band file
Args:
prod (Product): Product to look in
band (BandType): Band to look for
pixel_size (float): Pixel size in meters (if needed)
**kwargs: Other args
Returns:
AnyPathType: Band file path
"""
band_path = _look_for_prod_band_file(
prod, band, pixel_size, writable=False, **kwargs
)

if band_path is None:
band_path = _look_for_prod_band_file(
prod, band, pixel_size, writable=True, **kwargs
)

if band_path is None:
raise FileNotFoundError(
f"Non-existing processed band {to_str(band)[0]} in {prod.condensed_name}!"
)

return band_path


def _look_for_prod_band_file(
prod: Product, band: BandType, pixel_size: float, writable: bool, **kwargs
) -> AnyPathType:
"""
Look for a product's band file
Args:
prod (Product): Product to look in
band (BandType): Band to look for
pixel_size (float): Pixel size in meters (if needed)
writable (bool): Whether to force look in writable folder or not
**kwargs: Other args
Returns:
AnyPathType: Band file path
"""
band_path = None

# Get the band name
band_name = to_str(band)[0]

# Spectral band case : use dedicated function
if is_spectral_band(band):
band_path = prod.get_band_paths(
[band], pixel_size, writable=writable, **kwargs
)[band]

# Check if the band exists in a writable directory if not existing in the default one
if not AnyPath(band_path).is_file():
band_path = None

else:
# No window in non-spectral paths (for now ?)
with contextlib.suppress(StopIteration):
# Check if the band exists in a non-writable directory
band_regex = f"*{prod.condensed_name}*_{band_name}_*"
band_path = next(prod._get_band_folder(writable=writable).glob(band_regex))

return band_path

0 comments on commit 1e74912

Please sign in to comment.