Skip to content

Commit

Permalink
add options
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanLaserGit committed Feb 7, 2024
1 parent 47f7cb8 commit 1f9942b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 34 deletions.
4 changes: 2 additions & 2 deletions forcingprocessor/src/forcingprocessor/forcingprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,8 @@ def prep_ngen_data(conf):
jnwm_files = nwm_forcing_files[start:end]
t0 = time.perf_counter()
if ii_verbose: print(f'Entering data extraction...\n')
[data_array, t_ax] = forcing_grid2catchment(crosswalk_dict, jnwm_files, fs)
# data_array, t_ax = multiprocess_data_extract(jnwm_files,proc_process,crosswalk_dict,fs)
# [data_array, t_ax] = forcing_grid2catchment(crosswalk_dict, jnwm_files, fs)
data_array, t_ax = multiprocess_data_extract(jnwm_files,proc_process,crosswalk_dict,fs)
t_extract = time.perf_counter() - t0
complexity = (nfiles_tot * ncatchments) / 10000
score = complexity / t_extract
Expand Down
70 changes: 38 additions & 32 deletions forcingprocessor/src/forcingprocessor/weights_parq2json.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pandas as pd
import concurrent.futures as cf
import json
import os, time
import os, time, argparse

import pyarrow as pa
import pyarrow.compute as pc
Expand All @@ -24,46 +24,52 @@ def get_weight_json(catchments,jproc):
continue

for j, jcatch in enumerate(catchments):
t0 = time.perf_counter()
df_jcatch = tbl.loc[tbl['divide_id'] == jcatch]
if df_jcatch.empty:
continue
weight_data[jcatch] = [[int(x) for x in list(df_jcatch['cell'])],list(df_jcatch['coverage_fraction'])]
if len(list(df_jcatch['cell'])) == 0: print(f'{j} {jcatch} {df_jcatch} {tbl.empty}')
t1 = time.perf_counter() - t0
if j % 100 == 0 and jproc == 0: print(f'{j} {jcatch} {100*j/ncatch:.2f}% {t1*ncatch/3600:.2f}')
if j % 100 == 0 and jproc == 0: print(f'{j} {jcatch} {100*j/ncatch:.2f}%')

return (weight_data)

uri = "s3://lynker-spatial/v20.1/forcing_weights.parquet"
weights_df = pd.read_parquet(uri)
catchment_list = list(weights_df.divide_id.unique())
del weights_df
if __name__ == "__main__":

nprocs = os.cpu_count() - 2
print(nprocs)
catchment_list_list = []
ncatchments = len(catchment_list)
nper = ncatchments // nprocs
nleft = ncatchments - (nper * nprocs)
i = 0
k = 0
for _ in range(nprocs):
k = nper + i + nleft
catchment_list_list.append(catchment_list[i:k])
i = k

with cf.ProcessPoolExecutor(max_workers=nprocs) as pool:
results = pool.map(get_weight_json, catchment_list_list,[x for x in range(nprocs)])
parser = argparse.ArgumentParser()
parser.add_argument('--gpkg', dest="geopackage", type=str, help="Path to geopackage file",default = None)
parser.add_argument('--outname', dest="weights_filename", type=str, help="Filename for the weight file")
args = parser.parse_args()

if args.geopackage is None:
# go for conus
uri = "s3://lynker-spatial/v20.1/forcing_weights.parquet"
weights_df = pd.read_parquet(uri)
catchment_list = list(weights_df.divide_id.unique())
del weights_df
else:
import geopandas as gpd
catchments = gpd.read_file(args.geopackage, layer='divides')
catchment_list = sorted(list(catchments['divide_id']))

# Aggregate results
weights = {}
for jweights in results:
weights = weights | jweights
nprocs = os.cpu_count() - 2
catchment_list_list = []
ncatchments = len(catchment_list)
nper = ncatchments // nprocs
nleft = ncatchments - (nper * nprocs)
i = 0
k = 0
for _ in range(nprocs):
k = nper + i + nleft
catchment_list_list.append(catchment_list[i:k])
i = k

with cf.ProcessPoolExecutor(max_workers=nprocs) as pool:
results = pool.map(get_weight_json, catchment_list_list,[x for x in range(nprocs)])

print(len(weights))
weights = {}
for jweights in results:
weights = weights | jweights

data = json.dumps(weights)
with open('./weights_conus_v21.json','w') as fp:
fp.write(data)
data = json.dumps(weights)
with open(args.weights_filename,'w') as fp:
fp.write(data)

0 comments on commit 1f9942b

Please sign in to comment.