Skip to content

Commit

Permalink
update forcingprocessor to use lynker spatial weights
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanLaserGit committed Feb 7, 2024
1 parent bc71ec8 commit 47f7cb8
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 7 deletions.
21 changes: 14 additions & 7 deletions forcingprocessor/src/forcingprocessor/forcingprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def forcing_grid2catchment(crosswalk_dict: dict, nwm_files: list, fs=None):
data_list = []
t_list = []
nfiles = len(nwm_files)
nvar = len(nwm_variables)
if fs_type == 'google' : fs = gcsfs.GCSFileSystem()
id = os.getpid()
if ii_verbose: print(f'Process #{id} extracting data from {nfiles} files',end=None,flush=True)
Expand All @@ -203,7 +204,7 @@ def forcing_grid2catchment(crosswalk_dict: dict, nwm_files: list, fs=None):
txrds += time.perf_counter() - t0
t0 = time.perf_counter()
shp = nwm_data["U2D"].shape
data_allvars = np.zeros(shape=(len(nwm_variables), shp[1], shp[2]), dtype=np.float32)
data_allvars = np.zeros(shape=(nvar, shp[1], shp[2]), dtype=np.float32)
for var_dx, jvar in enumerate(nwm_variables):
if jvar == 'RAINRATE': # HACK CONVERSION
data_allvars[var_dx, :, :] = 3600 * np.squeeze(nwm_data[jvar].values)
Expand All @@ -214,15 +215,21 @@ def forcing_grid2catchment(crosswalk_dict: dict, nwm_files: list, fs=None):
t = time_splt[0] + " " + time_splt[1]
t_list.append(t)
del nwm_data
tfill += time.perf_counter() - t0
tfill += time.perf_counter() - t0

t0 = time.perf_counter()
data_allvars = data_allvars.reshape(nvar, shp[1] * shp[2])
ncatch = len(crosswalk_dict)
nvar = len(nwm_variables)
data_array = np.zeros((nvar,ncatch), dtype=np.float32)
jcatch = 0
for key, value in crosswalk_dict.items():
data_array[:,jcatch] = np.nanmean(data_allvars[:, value[0], value[1]], axis=1)
weights = value[0]
coverage = np.array(value[1])
coverage_mat = np.repeat(coverage[None,:],nvar,axis=0)
jcatch_data_mask = data_allvars[:,weights]
weight_sum = np.sum(coverage)
data_array[:,jcatch] = np.sum(coverage_mat * jcatch_data_mask ,axis=1) / weight_sum

jcatch += 1
data_list.append(data_array)
tdata += time.perf_counter() - t0
Expand Down Expand Up @@ -520,7 +527,7 @@ def prep_ngen_data(conf):
Bucket=output_bucket,
Key=f"{output_path}/metadata/forcings_metadata/conf.json"
)

if ii_verbose: print(f'Opening weight file...\n')
ii_weights_in_bucket = weight_file.find('//') >= 0
if ii_weights_in_bucket:
Expand Down Expand Up @@ -587,8 +594,8 @@ def prep_ngen_data(conf):
jnwm_files = nwm_forcing_files[start:end]
t0 = time.perf_counter()
if ii_verbose: print(f'Entering data extraction...\n')
# [data_array, t_ax] = forcing_grid2catchment(crosswalk_dict, jnwm_files, fs)
data_array, t_ax = multiprocess_data_extract(jnwm_files,proc_process,crosswalk_dict,fs)
[data_array, t_ax] = forcing_grid2catchment(crosswalk_dict, jnwm_files, fs)
# data_array, t_ax = multiprocess_data_extract(jnwm_files,proc_process,crosswalk_dict,fs)
t_extract = time.perf_counter() - t0
complexity = (nfiles_tot * ncatchments) / 10000
score = complexity / t_extract
Expand Down
69 changes: 69 additions & 0 deletions forcingprocessor/src/forcingprocessor/weights_parq2json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pandas as pd
import concurrent.futures as cf
import json
import os, time

import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset

def get_weight_json(catchments,jproc):

weight_data = {}
ncatch = len(catchments)

w = pa.dataset.dataset(
's3://lynker-spatial/v20.1/forcing_weights.parquet', format='parquet'
).filter(
pc.field('divide_id').isin(catchments)
).to_batches()
batch: pa.RecordBatch
for batch in w:
tbl = batch.to_pandas()
if tbl.empty:
continue

for j, jcatch in enumerate(catchments):
t0 = time.perf_counter()
df_jcatch = tbl.loc[tbl['divide_id'] == jcatch]
if df_jcatch.empty:
continue
weight_data[jcatch] = [[int(x) for x in list(df_jcatch['cell'])],list(df_jcatch['coverage_fraction'])]
if len(list(df_jcatch['cell'])) == 0: print(f'{j} {jcatch} {df_jcatch} {tbl.empty}')
t1 = time.perf_counter() - t0
if j % 100 == 0 and jproc == 0: print(f'{j} {jcatch} {100*j/ncatch:.2f}% {t1*ncatch/3600:.2f}')

return (weight_data)

uri = "s3://lynker-spatial/v20.1/forcing_weights.parquet"
weights_df = pd.read_parquet(uri)
catchment_list = list(weights_df.divide_id.unique())
del weights_df

nprocs = os.cpu_count() - 2
print(nprocs)
catchment_list_list = []
ncatchments = len(catchment_list)
nper = ncatchments // nprocs
nleft = ncatchments - (nper * nprocs)
i = 0
k = 0
for _ in range(nprocs):
k = nper + i + nleft
catchment_list_list.append(catchment_list[i:k])
i = k

with cf.ProcessPoolExecutor(max_workers=nprocs) as pool:
results = pool.map(get_weight_json, catchment_list_list,[x for x in range(nprocs)])

# Aggregate results
weights = {}
for jweights in results:
weights = weights | jweights

print(len(weights))

data = json.dumps(weights)
with open('./weights_conus_v21.json','w') as fp:
fp.write(data)

0 comments on commit 47f7cb8

Please sign in to comment.