Skip to content

Commit

Permalink
Merge branch 'dev_concurrent_download'
Browse files Browse the repository at this point in the history
  • Loading branch information
j-haacker committed Mar 1, 2024
2 parents 5795242 + 16538a6 commit 9d32ac7
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 77 deletions.
157 changes: 110 additions & 47 deletions cryoswath/l1b.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from dateutil.relativedelta import relativedelta
import fnmatch
import ftplib
import geopandas as gpd
Expand All @@ -7,14 +6,13 @@
import os
import pandas as pd
from pyproj import Transformer
import rasterio as rio
import rioxarray as rioxr
from scipy.spatial.transform import Rotation
import shapely
from threading import Event, Thread
import time
import warnings
import xarray as xr

from . import gis
from .misc import *

__all__ = list()
Expand All @@ -37,7 +35,19 @@ def __init__(self, l1b_filename: str, *,
# ! tbi customize or drop misleading attributes of xr.Dataset
# currently only originally named CryoSat-2 SARIn files implemented
assert(fnmatch.fnmatch(l1b_filename, "*CS_????_SIR_SIN_1B_*.nc"))
tmp = xr.open_dataset(l1b_filename)#, chunks={"time_20_ku": 256}
try:
tmp = xr.open_dataset(l1b_filename)#, chunks={"time_20_ku": 256}
except (OSError, ValueError) as err:
if isinstance(err, OSError):
if not err.errno == -101:
raise err
else:
warnings.warn(err.strerror+" was raised. Downloading file again.")
else:
warnings.warn(str(err)+" was raised. Downloading file again.")
os.remove(l1b_filename)
download_single_file(os.path.split(l1b_filename)[-1][19:34])
tmp = xr.open_dataset(l1b_filename)
# at least until baseline E ns_20_ku needs to be made a coordinate
tmp = tmp.assign_coords(ns_20_ku=("ns_20_ku", np.arange(len(tmp.ns_20_ku))))
# first: get azimuth bearing from smoothed incremental azimuths.
Expand Down Expand Up @@ -485,73 +495,126 @@ def flag_func(int_code: int):
__all__.append("build_flag_mask")


def download_files(region_of_interest: str|shapely.Polygon,
start_datetime: str|pd.Timestamp = "2010-10-10",
end_datetime: str|pd.Timestamp = "2011-11-11", *,
def download_wrapper(region_of_interest: str|shapely.Polygon = None,
start_datetime: str|pd.Timestamp = "2010",
end_datetime: str|pd.Timestamp = "2035", *,
buffer_region_by: float = None,
track_idx: pd.DatetimeIndex|str = None,
stop_event: Event = None,
num_processes: int = 2,
#baseline: str = "latest",
):
start_datetime, end_datetime = pd.to_datetime([start_datetime, end_datetime])
cs_tracks = load_cs_ground_tracks(region_of_interest, start_datetime, end_datetime, buffer_region_by=buffer_region_by)
for period in pd.date_range(start_datetime,
end_datetime+np.timedelta64(1, 'm'), freq="M"):
if track_idx is None:
start_datetime, end_datetime = pd.to_datetime([start_datetime, end_datetime])
track_idx = load_cs_ground_tracks(region_of_interest, start_datetime, end_datetime, buffer_region_by=buffer_region_by).index
else:
track_idx = track_idx.sort_values()
start_datetime, end_datetime = track_idx[[0,-1]]
months = pd.date_range(start_datetime.normalize(), end_datetime+pd.offsets.MonthBegin(), freq="M")
if len(track_idx) > 100:
if stop_event is None:
stop_event = Event()
download_threads = []
for i in range(min(num_processes, len(months))):
idx_selection = track_idx[track_idx.snap("M").normalize().isin(months[i::num_processes])]
download_threads.append(Thread(target=download_files,
kwargs=dict(track_idx=idx_selection, stop_event=stop_event),
name=f"dl_l1b_child_thread_{i}",
daemon=False))
download_threads[-1].start()
print(f"Started {num_processes} download threads;",
f"each ensuring {len(months)/num_processes:.1f} months' availability.")
while ~stop_event.is_set() and any([thread.is_alive() for thread in download_threads]):
stop_event.wait(60)
if any([thread.is_alive() for thread in download_threads]):
for thread in download_threads:
thread.join(30)
print("Closed all download threads. Likely not all files were downloaded.")
return 1
else:
print("All downloads finished.")
else:
download_files(track_idx, stop_event=stop_event)
__all__.append("download_wrapper")


def download_files(track_idx: pd.DatetimeIndex|str,
stop_event: Event = None,
#baseline: str = "latest",
):
year_month_str_list = [month.strftime(f"%Y{os.path.sep}%m") for month in track_idx.snap("M").normalize().unique()]
print(f"Start downloading {len(track_idx)} L1b .nc files if not present for months:", year_month_str_list)
for year_month_str in year_month_str_list:
if stop_event is not None and stop_event.is_set():
return
try:
currently_present_files = os.listdir("../data/L1b/"+period.strftime("%Y/%m"))
currently_present_files = os.listdir(os.path.join(data_path, "L1b", year_month_str))
except FileNotFoundError:
os.makedirs("../data/L1b/"+period.strftime("%Y/%m"))
os.makedirs("../data/L1b/"+year_month_str)
currently_present_files = []
with ftplib.FTP("science-pds.cryosat.esa.int") as ftp:
ftp.login(passwd=personal_email)
try:
ftp.cwd("/SIR_SIN_L1/"+period.strftime("%Y/%m"))
# ! this will fail on windows/non-UNIX
ftp.cwd("/SIR_SIN_L1/"+year_month_str)
except ftplib.error_perm:
warnings.warn("Directory /SIR_SIN_L1/"+period.strftime("%Y/%m")+" couldn't be accessed.")
warnings.warn("Directory /SIR_SIN_L1/"+year_month_str+" couldn't be accessed.")
continue
else:
print("\n_______\nentering", period.strftime("%Y - %m"))
for remote_file in ftp.nlst():
if stop_event is not None and stop_event.is_set():
return
if remote_file[-3:] == ".nc" \
and pd.to_datetime(remote_file[19:34]) in cs_tracks.index \
and remote_file not in currently_present_files:
local_path = os.path.join("../data/L1b/", period.strftime("%Y/%m"), remote_file)
and pd.to_datetime(remote_file[19:34]) in track_idx \
and remote_file not in currently_present_files:
local_path = os.path.join("../data/L1b/", year_month_str, remote_file)
try:
with open(local_path, "wb") \
as local_file:
print("___\ndownloading "+remote_file)
with open(local_path, "wb") as local_file:
print("downloading", remote_file)
ftp.retrbinary("RETR "+remote_file, local_file.write)
print("done")
except:
print("download failed for", remote_file)
if os.path.isfile(local_path):
os.remove(local_path)
raise
print("finished downloading tracks for months:\n", year_month_str_list)
__all__.append("download_files")


def download_single_file(track_id: str) -> str:
# currently only CryoSat-2
with ftplib.FTP("science-pds.cryosat.esa.int") as ftp:
ftp.login(passwd=personal_email)
ftp.cwd("/SIR_SIN_L1/"+pd.to_datetime(track_id).strftime("%Y/%m"))
for remote_file in ftp.nlst():
if remote_file[-3:] == ".nc" \
and remote_file[19:34] == track_id:
local_path = os.path.join(data_path, "L1b", pd.to_datetime(track_id).strftime("%Y/%m"))
if not os.path.isdir(local_path):
os.makedirs(local_path)
local_path = os.path.join(local_path, remote_file)
try:
with open(local_path, "wb") as local_file:
print("___\ndownloading "+remote_file)
ftp.retrbinary("RETR "+remote_file, local_file.write)
print("done")
return local_path
except:
if os.path.isfile(local_path):
os.remove(local_path)
raise
print(f"File for id {track_id} couldn't be found in remote dir {ftp.pwd()}.")
raise FileNotFoundError()
retries = 10
while retries > 0:
try:
with ftplib.FTP("science-pds.cryosat.esa.int") as ftp:
ftp.login(passwd=personal_email)
ftp.cwd("/SIR_SIN_L1/"+pd.to_datetime(track_id).strftime("%Y/%m"))
for remote_file in ftp.nlst():
if remote_file[-3:] == ".nc" \
and remote_file[19:34] == track_id:
local_path = os.path.join(data_path, "L1b", pd.to_datetime(track_id).strftime("%Y/%m"))
if not os.path.isdir(local_path):
os.makedirs(local_path)
local_path = os.path.join(local_path, remote_file)
try:
with open(local_path, "wb") as local_file:
print("downloading "+remote_file)
ftp.retrbinary("RETR "+remote_file, local_file.write)
return local_path
except:
print("download failed for", remote_file)
if os.path.isfile(local_path):
os.remove(local_path)
raise
print(f"File for id {track_id} couldn't be found in remote dir {ftp.pwd()}.")
# ! should this raise an error?
raise FileNotFoundError()
except ftplib.error_temp as err:
print(str(err), f"raised. Retrying to download file with id {track_id} in 10 s for the {11-retries}. time.")
time.sleep(10)
retries -= 1
__all__.append("download_single_file")


def drop_waveform(cs_l1b_ds, time_20_ku_mask):
"""Use mask along time dim to drop waveforms.
Expand Down
95 changes: 65 additions & 30 deletions cryoswath/l2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pyproj import CRS
import re
import shapely
from threading import Event, Thread
import warnings

from .misc import *
Expand All @@ -27,31 +28,58 @@ def from_id(track_idx: pd.DatetimeIndex|str, *,
# the number of cores. 8 cores worked for me, 16 was too many
if not isinstance(track_idx, pd.DatetimeIndex):
track_idx = pd.DatetimeIndex(track_idx if isinstance(track_idx, list) else [track_idx])
if track_idx.tz == None: track_idx.tz_localize("UTC")
start_datetime, end_datetime = track_idx.sort_values()[[0,-1]]
swath_list = []
poca_list = []
for current_month in pd.date_range(start_datetime.normalize()-pd.offsets.MonthBegin(), end_datetime,freq="MS"):
current_subdir = current_month.strftime(f"%Y{os.path.sep}%m")
l2_paths = pd.DataFrame(columns=["swath", "poca"])
for l2_type in ["swath", "poca"]:
if os.path.isdir(os.path.join(data_path, f"L2_{l2_type}", current_subdir)):
for filename in os.listdir(os.path.join(data_path, f"L2_{l2_type}", current_subdir)):
match = re.search(cryosat_id_pattern, filename)
if match is not None:
l2_paths.loc[cs_id_to_time(match.group()), l2_type] = filename
else:
os.makedirs(os.path.join(data_path, f"L2_{l2_type}", current_subdir))
with Pool(processes=cores) as p:
# function is defined at the bottom of this module
collective_swath_poca_list = p.starmap(process_track, [(idx, reprocess, l2_paths, save_or_return, data_path, current_subdir, kwargs) for idx in pd.Series(index=track_idx).loc[current_month:current_month+pd.offsets.MonthBegin(1)].index]) # indices per month with work-around :/ should be easier
if track_idx.tz == None:
track_idx.tz_localize("UTC")
# somehow the download thread prevents the processing of tracks. it may
# be due to GIL lock. for now, it is just disabled, so one has to
# download in advance. on the fly is always possible, however, with
# parallel processing this can lead to issues because ESA blocks ftp
# connections if there are too many.
# stop_event = Event()
# download_thread = Thread(target=l1b.download_wrapper,
# kwargs=dict(track_idx=track_idx, num_processes=8, stop_event=stop_event),
# name="dl_l1b_mother_thread",
# daemon=False)
# download_thread.start()
try:
start_datetime, end_datetime = track_idx.sort_values()[[0,-1]]
swath_list = []
poca_list = []
kwargs["cs_full_file_names"] = load_cs_full_file_names(update="no")
for current_month in pd.date_range(start_datetime.normalize()-pd.offsets.MonthBegin(), end_datetime, freq="MS"):
current_subdir = current_month.strftime(f"%Y{os.path.sep}%m")
l2_paths = pd.DataFrame(columns=["swath", "poca"])
for l2_type in ["swath", "poca"]:
if os.path.isdir(os.path.join(data_path, f"L2_{l2_type}", current_subdir)):
for filename in os.listdir(os.path.join(data_path, f"L2_{l2_type}", current_subdir)):
match = re.search(cryosat_id_pattern, filename)
if match is not None:
l2_paths.loc[cs_id_to_time(match.group()), l2_type] = filename
else:
os.makedirs(os.path.join(data_path, f"L2_{l2_type}", current_subdir))
print("start processing", current_month)
with Pool(processes=cores) as p:
# function is defined at the bottom of this module
collective_swath_poca_list = p.starmap(
process_track,
[(idx, reprocess, l2_paths, save_or_return, data_path, current_subdir, kwargs)
for idx
# indices per month with work-around :/ should be easier
in pd.Series(index=track_idx).loc[current_month:current_month+pd.offsets.MonthBegin(1)].index],
chunksize=1)
if save_or_return != "save":
for swath_poca_tuple in collective_swath_poca_list: # .get()
swath_list.append(swath_poca_tuple[0])
poca_list.append(swath_poca_tuple[1])
print("done processing", current_month)
if save_or_return != "save":
for swath_poca_tuple in collective_swath_poca_list:
swath_list.append(swath_poca_tuple[0])
poca_list.append(swath_poca_tuple[1])
print("done processing", current_month)
if save_or_return != "save":
return pd.concat(swath_list), pd.concat(poca_list)
return pd.concat(swath_list), pd.concat(poca_list)
except:
# print("Waiting for download threads to join.")
# stop_event.set()
# print("Waiting for download threads to join.")
# download_thread.join(30)
raise
__all__.append("from_id")


Expand Down Expand Up @@ -155,18 +183,22 @@ def process_and_save(region_of_interest: str|shapely.Polygon,
# local helper function. can't be defined where it is needed because of namespace issues
def process_track(idx, reprocess, l2_paths, save_or_return, data_path, current_subdir, kwargs):
print("getting", idx)
# print("kwargs", wargs)
try:
if reprocess or any(l2_paths.loc[idx,:].isnull()):
raise FileNotFoundError()
if save_or_return != "save":
swath_poca_tuple = (
gpd.read_feather(os.path.join(data_path, "L2_swath", current_subdir,
gpd.read_feather(os.path.join(l2_swath_path, current_subdir,
l2_paths.loc[idx, "swath"])),
gpd.read_feather(os.path.join(data_path, "L2_poca", current_subdir,
gpd.read_feather(os.path.join(l2_poca_path, current_subdir,
l2_paths.loc[idx, "poca"])))
except (KeyError, FileNotFoundError):
if "cs_full_file_names" not in locals():
cs_full_file_names = load_cs_full_file_names(update="no")
if "cs_full_file_names" in kwargs:
cs_full_file_names = kwargs.pop("cs_full_file_names")
else:
cs_full_file_names = load_cs_full_file_names(update="no")
# filter l1b_data kwargs
params = inspect.signature(l1b.l1b_data).parameters
l1b_kwargs = {k: v for k, v in kwargs.items() if k in params}
Expand All @@ -176,15 +208,16 @@ def process_track(idx, reprocess, l2_paths, save_or_return, data_path, current_s
swath_poca_tuple = l1b.l1b_data.from_id(cs_time_to_id(idx), **l1b_kwargs)\
.to_l2(swath_or_poca="both", **to_l2_kwargs)
if save_or_return != "return":
print("saving", idx)
# ! consider writing empty files
# the below skips if there are no data. this means, that processing is
# attempted the next time again. I consider this safer and the
# performance loss is on the order of seconds. however, there might be
# better options
try:
swath_poca_tuple[0].to_feather(os.path.join(data_path, "L2_swath", current_subdir,
swath_poca_tuple[0].to_feather(os.path.join(l2_swath_path, current_subdir,
cs_full_file_names.loc[idx]+".feather"))
swath_poca_tuple[1].to_feather(os.path.join(data_path, "L2_poca", current_subdir,
swath_poca_tuple[1].to_feather(os.path.join(l2_poca_path, current_subdir,
cs_full_file_names.loc[idx]+".feather"))
except ValueError:
if swath_poca_tuple[0].empty:
Expand All @@ -194,4 +227,6 @@ def process_track(idx, reprocess, l2_paths, save_or_return, data_path, current_s
warnings.warn(f"{which} points for {cs_time_to_id(idx)}.")
if save_or_return != "save":
return swath_poca_tuple

else: # not sure that its necessary
return 0

Loading

0 comments on commit 9d32ac7

Please sign in to comment.