Skip to content

Commit

Permalink
Merge pull request #44 from nsidc/typing-enhancements
Browse files Browse the repository at this point in the history
Typing enhancements
  • Loading branch information
mfisher87 committed Jul 22, 2024
2 parents 4c4eb58 + 6ce8068 commit 0c6f7e3
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 87 deletions.
136 changes: 79 additions & 57 deletions antarctica_today/compute_mean_climatology.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime
import os
import pickle
from pathlib import Path
from typing import Dict, Literal, Optional, Tuple, Union

import numpy
import pandas
Expand All @@ -26,13 +28,13 @@


def compute_daily_climatology_pixel_averages(
baseline_start_year=1990,
melt_start_mmdd=(10, 1),
baseline_end_year=2020,
melt_end_mmdd=(4, 30),
output_picklefile=daily_melt_averages_picklefile,
verbose=True,
):
baseline_start_year: int = 1990,
melt_start_mmdd: Tuple[int, int] = (10, 1),
baseline_end_year: int = 2020,
melt_end_mmdd: Tuple[int, int] = (4, 30),
output_picklefile: Path = daily_melt_averages_picklefile,
verbose: bool = True,
) -> Tuple[numpy.ndarray, Dict[Tuple[int, int], int]]:
"""Compute fraction of days in the baseline period in which each give pixel melts.
Use the baseline period. Calculate, of the days with data (ignoring
Expand All @@ -46,7 +48,7 @@ def compute_daily_climatology_pixel_averages(
melt_array, datetimes_dict = read_model_array_picklefile(resample_melt_codes=True)

# Recode melt array from (-1, 0, 1, 2), to (nan, nan, 0.0, 1.0), and convert to floating-point
melt_array_nan_filled = numpy.array(melt_array, dtype=numpy.float32)
melt_array_nan_filled: numpy.ndarray = numpy.array(melt_array, dtype=numpy.float32)
melt_array_nan_filled[melt_array_nan_filled == -1.0] = numpy.nan
melt_array_nan_filled[melt_array_nan_filled == 0.0] = numpy.nan
melt_array_nan_filled[melt_array_nan_filled == 1.0] = 0.0
Expand Down Expand Up @@ -112,14 +114,14 @@ def compute_daily_climatology_pixel_averages(
# baseline_dt_list_day_of_months = numpy.array([dt.day for dt in dt_list_melt_season], dtype=numpy.uint8)

# Generate an empty MxNxT array with
average_melt_array = numpy.zeros(
average_melt_array: numpy.ndarray = numpy.zeros(
(melt_array.shape[0], melt_array.shape[1], len(baseline_filler_dt_list)),
dtype=float,
)

# Now, compute the average odds (0-1) of melt on any given day for any given pixel, over the baseline period.
for i, bdt in enumerate(baseline_filler_dt_list):
bdt_day_mask = numpy.array(
bdt_day_mask: numpy.ndarray = numpy.array(
[
((dt.month == bdt.month) and (dt.day == bdt.day))
for dt in dt_list_melt_season
Expand Down Expand Up @@ -175,9 +177,9 @@ def compute_daily_climatology_pixel_averages(


def read_daily_melt_averages_picklefile(
build_picklefile_if_not_present=True,
daily_climatology_picklefile=daily_melt_averages_picklefile,
verbose=True,
build_picklefile_if_not_present: bool = True,
daily_climatology_picklefile: Path = daily_melt_averages_picklefile,
verbose: bool = True,
):
"""Read the daily climatology averages picklefile."""
if not os.path.exists(daily_climatology_picklefile):
Expand All @@ -192,18 +194,18 @@ def read_daily_melt_averages_picklefile(

if verbose:
print("Reading", daily_climatology_picklefile)
f = open(daily_climatology_picklefile, "rb")
array, dt_dict = pickle.load(f)
f.close()

with open(daily_climatology_picklefile, "rb") as f:
array, dt_dict = pickle.load(f)

return array, dt_dict


def compute_daily_sum_pixel_averages(
daily_picklefile=daily_melt_averages_picklefile,
sum_picklefile=daily_cumulative_melt_averages_picklefile,
verbose=True,
):
daily_picklefile: Path = daily_melt_averages_picklefile,
sum_picklefile: Path = daily_cumulative_melt_averages_picklefile,
verbose: bool = True,
) -> None:
"""Compute a mean daily cumulative melt-day value for each pixel throughout the melt season.
{(mm,dd):(MxN array of integer melt days)}
Expand All @@ -217,7 +219,7 @@ def compute_daily_sum_pixel_averages(
"""
# First, read the daily melt value picklefile.
daily_array, dt_dict = read_daily_melt_averages_picklefile(verbose=verbose)
daily_sum_array = numpy.zeros(daily_array.shape, dtype=numpy.int32)
daily_sum_array: numpy.ndarray = numpy.zeros(daily_array.shape, dtype=numpy.int32)
for dt in dt_dict:
daily_sum_array[:, :, dt_dict[dt]] = numpy.array(
numpy.round(
Expand All @@ -228,17 +230,18 @@ def compute_daily_sum_pixel_averages(

if verbose:
print("Writing", sum_picklefile, end="...")
f = open(sum_picklefile, "wb")
pickle.dump((daily_sum_array, dt_dict), f)
f.close()

with open(sum_picklefile, "wb") as f:
pickle.dump((daily_sum_array, dt_dict), f)

if verbose:
print("Done.")


def read_daily_sum_melt_averages_picklefile(
build_picklefile_if_not_present=True,
daily_sum_picklefile=daily_cumulative_melt_averages_picklefile,
verbose=True,
build_picklefile_if_not_present: bool = True,
daily_sum_picklefile: Path = daily_cumulative_melt_averages_picklefile,
verbose: bool = True,
):
"""Read the daily climatology averages picklefile."""
if not os.path.exists(daily_sum_picklefile):
Expand All @@ -261,14 +264,14 @@ def read_daily_sum_melt_averages_picklefile(


def create_baseline_climatology_tif(
start_date=datetime.datetime(1990, 10, 1),
end_date=datetime.datetime(2020, 4, 30),
f_out_mean=mean_climatology_geotiff,
f_out_std=std_climatology_geotiff,
round_to_integers=True,
gap_filled=True,
verbose=True,
):
start_date: datetime.datetime = datetime.datetime(1990, 10, 1),
end_date: datetime.datetime = datetime.datetime(2020, 4, 30),
f_out_mean: str = mean_climatology_geotiff,
f_out_std: str = std_climatology_geotiff,
round_to_integers: bool = True,
gap_filled: bool = True,
verbose: bool = True,
) -> numpy.ndarray:
"""Generate a "mean annual melt" map over the baseline period.
The melt year for each season is defined from the (mm,dd) from the "start_date"
Expand All @@ -287,7 +290,9 @@ def create_baseline_climatology_tif(
num_years = int((end_date - start_date).days / 365.25)
# print(num_years)

annual_sum_grids = numpy.empty(model_array.shape[0:2] + (num_years,), dtype=int)
annual_sum_grids: numpy.ndarray = numpy.empty(
model_array.shape[0:2] + (num_years,), dtype=int
)

if gap_filled:
model_melt_days = model_array
Expand All @@ -306,7 +311,7 @@ def create_baseline_climatology_tif(

# print(i, dt1, dt2)

dates_mask = numpy.array(
dates_mask: numpy.ndarray = numpy.array(
[(dt >= dt1) & (dt <= dt2) for dt in datetimes], dtype=bool
)

Expand Down Expand Up @@ -340,17 +345,27 @@ def create_baseline_climatology_tif(


def create_partial_year_melt_anomaly_tif(
current_datetime=None, dest_fname=None, gap_filled=True, verbose=True
):
"""Create a tif of melt anomlay compared to baseline climatology for that day of the melt season."""
current_datetime: Optional[datetime.datetime] = None,
dest_fname: Optional[str] = None,
gap_filled: bool = True,
verbose: bool = True,
) -> numpy.ndarray:
"""Create a tif of melt anomaly compared to baseline climatology for that day of the melt season."""
# If no datetime is given, use "today"
if current_datetime is None:
now = datetime.datetime.today()
# Strip of the hour,min,second
# Strip off the hour,min,second
# TODO: Why not use a datetime.date object instead?
current_datetime = datetime.datetime(
year=now.year, month=now.month, day=now.day
)

if not isinstance(current_datetime, datetime.datetime):
raise ValueError(
f"Unexpected value for current_datetime: {current_datetime}."
"This should never happen, but this helps the typechecker narrow."
)

daily_melt_sums, daily_sums_dt_dict = read_daily_sum_melt_averages_picklefile()

first_mmdd_of_melt_season = list(daily_sums_dt_dict.keys())[0]
Expand All @@ -376,7 +391,7 @@ def create_partial_year_melt_anomaly_tif(
)

dt_list = sorted(list(dt_dict.keys()))
dt_mask = numpy.array(
dt_mask: numpy.ndarray = numpy.array(
[
((dt >= first_dt_of_present_melt_season) and (dt <= current_datetime))
for dt in dt_list
Expand Down Expand Up @@ -413,8 +428,9 @@ def create_partial_year_melt_anomaly_tif(
anomaly_this_season_so_far[ice_mask == 0] = -999

# Round to integers, if it isn't already.
anomalies_int = numpy.array(
numpy.round(anomaly_this_season_so_far), dtype=numpy.int32
anomalies_int: numpy.ndarray = numpy.array(
numpy.round(anomaly_this_season_so_far),
dtype=numpy.int32,
)

# If dest_fname is None, create it.
Expand All @@ -434,7 +450,11 @@ def create_partial_year_melt_anomaly_tif(


def create_annual_melt_anomaly_tif(
year, year_melt_tif=None, baseline_melt_tif=None, gap_filled=True, verbose=True
year: int,
year_melt_tif: Optional[str] = None,
baseline_melt_tif: Optional[str] = None,
gap_filled: bool = True,
verbose: bool = True,
):
"""Create a tif of annual melt anomaly compared to baseline climatology.
Expand Down Expand Up @@ -522,7 +542,11 @@ def get_baseline_climatology_array(fname=None, gap_filled=True):
return create_baseline_climatology_tif(gap_filled=gap_filled)


def get_annual_melt_sum_array(year, fname=None, gap_filled=True):
def get_annual_melt_sum_array(
year: int,
fname: Optional[str] = None,
gap_filled: bool = True,
):
"""Retrieve the melt year array from the tif.
If it's not available, create it and write the file, then return it.
Expand All @@ -545,13 +569,13 @@ def get_annual_melt_sum_array(year, fname=None, gap_filled=True):


def create_annual_melt_sum_tif(
year="all",
output_tif=None,
melt_start_mmdd=(10, 1),
melt_end_mmdd=(4, 30),
gap_filled=True,
verbose=True,
):
year: Union[Literal["all"], int] = "all",
output_tif: Optional[str] = None,
melt_start_mmdd: Tuple[int, int] = (10, 1),
melt_end_mmdd: Tuple[int, int] = (4, 30),
gap_filled: bool = True,
verbose: bool = True,
) -> Optional[numpy.ndarray]:
"""Create an integer tif file of that year's annual sum of melt-days, per pixel.
If gap_filled, create a floating-point tif file of the same.
Expand All @@ -568,9 +592,7 @@ def create_annual_melt_sum_tif(
dt_list = list(datetimes_dict.keys())

if year == "all":
years = numpy.unique([dt.year for dt in dt_list])
years.sort()

years = sorted({dt.year for dt in dt_list})
else:
assert year == int(year)
years = [year]
Expand All @@ -589,7 +611,7 @@ def create_annual_melt_sum_tif(
day=melt_end_mmdd[1],
)

dates_mask = numpy.array(
dates_mask: numpy.ndarray = numpy.array(
[((dt >= start_date) and (dt <= end_date)) for dt in dt_list],
dtype=bool,
)
Expand Down
12 changes: 6 additions & 6 deletions antarctica_today/melt_array_picklefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pickle
import re
from pathlib import Path
from typing import Any
from typing import Any, Dict, Tuple

import numpy
from osgeo import gdal
Expand Down Expand Up @@ -323,15 +323,15 @@ def _filter_out_erroneous_swaths(model_array, datetimes_dict):


def read_gap_filled_melt_picklefile(
picklefile=gap_filled_melt_picklefile, verbose=True
):
picklefile: Path = gap_filled_melt_picklefile,
verbose: bool = True,
) -> Tuple[numpy.ndarray, Dict[datetime.datetime, int]]:
"""Read the gap-filled picklefile, return to user."""
if verbose:
print("Reading", picklefile)

f = open(picklefile, "rb")
array, dt_dict = pickle.load(f)
f.close()
with open(picklefile, "rb") as f:
array, dt_dict = pickle.load(f)

return array, dt_dict

Expand Down
Loading

0 comments on commit 0c6f7e3

Please sign in to comment.