diff --git a/forcingprocessor/src/forcingprocessor/forcingprocessor.py b/forcingprocessor/src/forcingprocessor/forcingprocessor.py index 4b27dd0b..5f94143e 100644 --- a/forcingprocessor/src/forcingprocessor/forcingprocessor.py +++ b/forcingprocessor/src/forcingprocessor/forcingprocessor.py @@ -180,6 +180,7 @@ def forcing_grid2catchment(crosswalk_dict: dict, nwm_files: list, fs=None): data_list = [] t_list = [] nfiles = len(nwm_files) + nvar = len(nwm_variables) if fs_type == 'google' : fs = gcsfs.GCSFileSystem() id = os.getpid() if ii_verbose: print(f'Process #{id} extracting data from {nfiles} files',end=None,flush=True) @@ -203,7 +204,7 @@ def forcing_grid2catchment(crosswalk_dict: dict, nwm_files: list, fs=None): txrds += time.perf_counter() - t0 t0 = time.perf_counter() shp = nwm_data["U2D"].shape - data_allvars = np.zeros(shape=(len(nwm_variables), shp[1], shp[2]), dtype=np.float32) + data_allvars = np.zeros(shape=(nvar, shp[1], shp[2]), dtype=np.float32) for var_dx, jvar in enumerate(nwm_variables): if jvar == 'RAINRATE': # HACK CONVERSION data_allvars[var_dx, :, :] = 3600 * np.squeeze(nwm_data[jvar].values) @@ -214,15 +215,21 @@ def forcing_grid2catchment(crosswalk_dict: dict, nwm_files: list, fs=None): t = time_splt[0] + " " + time_splt[1] t_list.append(t) del nwm_data - tfill += time.perf_counter() - t0 + tfill += time.perf_counter() - t0 t0 = time.perf_counter() + data_allvars = data_allvars.reshape(nvar, shp[1] * shp[2]) ncatch = len(crosswalk_dict) - nvar = len(nwm_variables) data_array = np.zeros((nvar,ncatch), dtype=np.float32) jcatch = 0 for key, value in crosswalk_dict.items(): - data_array[:,jcatch] = np.nanmean(data_allvars[:, value[0], value[1]], axis=1) + weights = value[0] + coverage = np.array(value[1]) + coverage_mat = np.repeat(coverage[None,:],nvar,axis=0) + jcatch_data_mask = data_allvars[:,weights] + weight_sum = np.sum(coverage) + data_array[:,jcatch] = np.sum(coverage_mat * jcatch_data_mask ,axis=1) / weight_sum + jcatch += 1 data_list.append(data_array) tdata += time.perf_counter() - t0 @@ -520,7 +527,7 @@ def prep_ngen_data(conf): Bucket=output_bucket, Key=f"{output_path}/metadata/forcings_metadata/conf.json" ) - + if ii_verbose: print(f'Opening weight file...\n') ii_weights_in_bucket = weight_file.find('//') >= 0 if ii_weights_in_bucket: @@ -587,8 +594,8 @@ def prep_ngen_data(conf): jnwm_files = nwm_forcing_files[start:end] t0 = time.perf_counter() if ii_verbose: print(f'Entering data extraction...\n') - # [data_array, t_ax] = forcing_grid2catchment(crosswalk_dict, jnwm_files, fs) - data_array, t_ax = multiprocess_data_extract(jnwm_files,proc_process,crosswalk_dict,fs) + [data_array, t_ax] = forcing_grid2catchment(crosswalk_dict, jnwm_files, fs) + # data_array, t_ax = multiprocess_data_extract(jnwm_files,proc_process,crosswalk_dict,fs) t_extract = time.perf_counter() - t0 complexity = (nfiles_tot * ncatchments) / 10000 score = complexity / t_extract diff --git a/forcingprocessor/src/forcingprocessor/weights_parq2json.py b/forcingprocessor/src/forcingprocessor/weights_parq2json.py new file mode 100644 index 00000000..892df30a --- /dev/null +++ b/forcingprocessor/src/forcingprocessor/weights_parq2json.py @@ -0,0 +1,69 @@ +import pandas as pd +import concurrent.futures as cf +import json +import os, time + +import pyarrow as pa +import pyarrow.compute as pc +import pyarrow.dataset + +def get_weight_json(catchments,jproc): + + weight_data = {} + ncatch = len(catchments) + + w = pa.dataset.dataset( + 's3://lynker-spatial/v20.1/forcing_weights.parquet', format='parquet' + ).filter( + pc.field('divide_id').isin(catchments) + ).to_batches() + batch: pa.RecordBatch + for batch in w: + tbl = batch.to_pandas() + if tbl.empty: + continue + + for j, jcatch in enumerate(catchments): + t0 = time.perf_counter() + df_jcatch = tbl.loc[tbl['divide_id'] == jcatch] + if df_jcatch.empty: + continue + weight_data[jcatch] = [[int(x) for x in list(df_jcatch['cell'])],list(df_jcatch['coverage_fraction'])] + if len(list(df_jcatch['cell'])) == 0: print(f'{j} {jcatch} {df_jcatch} {tbl.empty}') + t1 = time.perf_counter() - t0 + if j % 100 == 0 and jproc == 0: print(f'{j} {jcatch} {100*j/ncatch:.2f}% {t1*ncatch/3600:.2f}') + + return (weight_data) + +uri = "s3://lynker-spatial/v20.1/forcing_weights.parquet" +weights_df = pd.read_parquet(uri) +catchment_list = list(weights_df.divide_id.unique()) +del weights_df + +nprocs = os.cpu_count() - 2 +print(nprocs) +catchment_list_list = [] +ncatchments = len(catchment_list) +nper = ncatchments // nprocs +nleft = ncatchments - (nper * nprocs) +i = 0 +k = 0 +for _ in range(nprocs): + k = nper + i + nleft + catchment_list_list.append(catchment_list[i:k]) + i = k + +with cf.ProcessPoolExecutor(max_workers=nprocs) as pool: + results = pool.map(get_weight_json, catchment_list_list,[x for x in range(nprocs)]) + +# Aggregate results +weights = {} +for jweights in results: + weights = weights | jweights + +print(len(weights)) + +data = json.dumps(weights) +with open('./weights_conus_v21.json','w') as fp: + fp.write(data) + \ No newline at end of file