diff --git a/forcingprocessor/src/forcingprocessor/forcingprocessor.py b/forcingprocessor/src/forcingprocessor/forcingprocessor.py index 6247294c..f6a4d9fb 100644 --- a/forcingprocessor/src/forcingprocessor/forcingprocessor.py +++ b/forcingprocessor/src/forcingprocessor/forcingprocessor.py @@ -1,6 +1,6 @@ import pandas as pd import argparse, os, json, sys -import fsspec +import requests import s3fs import gcsfs from pathlib import Path @@ -137,7 +137,7 @@ def forcing_grid2catchment(crosswalk_dict: dict, nwm_files: list, var_list: list wgt_file: a path to the weights json, filelist: list of filenames (urls for remote, local paths otherwise), var_list: list (list of variable names to read), - jt: the index to place the file. This is used to ensure elements increase in time, regardless of thread number, + fs: an optional file system for cloud storage reads Outputs: df_by_t : (returned for local files) a list (in time) of forcing data. Note that this list may not be consistent in time @@ -151,24 +151,25 @@ def forcing_grid2catchment(crosswalk_dict: dict, nwm_files: list, var_list: list data_list = [] t_list = [] nfiles = len(nwm_files) - if nwm_files[0].find('googleapis') >= 0: - fs = gcsfs.GCSFileSystem() - fs_type = 'google' - elif nwm_files[0].find('s3.amazon') >= 0: - fs_type = 's3' + if fs_type == 'google' : fs = gcsfs.GCSFileSystem() id = os.getpid() if ii_verbose: print(f'{id} extracting data from {nfiles} files',end=None,flush=True) for j, nwm_file in enumerate(nwm_files): t0 = time.perf_counter() eng = "h5netcdf" - if fs is not None: - bucket_key = convert_url2key(nwm_file,fs_type) + if fs: + if nwm_file.find('https://') >= 0: bucket_key = convert_url2key(nwm_file,fs_type) + else: bucket_key = nwm_file file_obj = fs.open(bucket_key, mode='rb') + elif 'https://' in nwm_file: + response = requests.get(nwm_file) + file_obj = BytesIO(response.content) else: file_obj = nwm_file topen += time.perf_counter() - t0 t0 = time.perf_counter() + with xr.open_dataset(file_obj, engine=eng) as nwm_data: txrds += time.perf_counter() - t0 t0 = time.perf_counter() @@ -444,7 +445,7 @@ def prep_ngen_data(conf): "VGRD_10maboveground", "DLWRF_surface", "APCP_surface", - "precip_rate", # BROKEN (Identical to APCP!) + "precip_rate", # HACK RAINRATE * 3600 "TMP_2maboveground", "SPFH_2maboveground", "PRES_surface", @@ -557,17 +558,19 @@ def prep_ngen_data(conf): nfiles = len(nwm_forcing_files) - if nwm_forcing_files[0].find('s3.amazonaws') >= 0: + global fs_type + if 's3://' in nwm_forcing_files[0] or 's3.amazonaws' in nwm_forcing_files[0]: fs = s3fs.S3FileSystem( anon=True, client_kwargs={'region_name': 'us-east-1'} ) fs_type = 's3' - elif nwm_forcing_files[0].find('storage.googleapis') >= 0: + elif 'google' in nwm_forcing_files[0] or 'gs://' in nwm_forcing_files[0] or 'gcs://' in nwm_forcing_files[0]: fs = "google" fs_type = 'google' else: fs = None + fs_type = None if ii_verbose: print(f"NWM file names:") @@ -593,6 +596,7 @@ def prep_ngen_data(conf): jnwm_files = nwm_forcing_files[start:end] t0 = time.perf_counter() if ii_verbose: print(f'Entering data extraction...\n') + # [data_array, t_ax] = forcing_grid2catchment(crosswalk_dict, jnwm_files, var_list, fs) data_array, t_ax = multiprocess_data_extract(jnwm_files,proc_threads,crosswalk_dict,var_list,fs) t_extract = time.perf_counter() - t0 complexity = (nfiles_tot * ncatchments) / 10000 @@ -624,10 +628,14 @@ def prep_ngen_data(conf): for j, jfile in enumerate(nwm_forcing_files): if j > 10: break if fs: - bucket_key = convert_url2key(jfile, fs_type) + if jfile.find('https://') >= 0: bucket_key = convert_url2key(jfile, fs_type) + else: bucket_key = jfile if fs_type == 'google': fs = gcsfs.GCSFileSystem() + response = fs.open(bucket_key, mode='rb') nwm_file_sizes.append(response.details['size']) + elif jfile.find('https://') >= 0: + nwm_file_sizes = len(requests.get(jfile).content) else: nwm_file_sizes = os.path.getsize(jfile)