Skip to content

Commit

Permalink
set to 2
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanLaserGit committed Feb 21, 2024
1 parent 294452a commit 14e9cb3
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions forcingprocessor/src/forcingprocessor/weights_parq2json.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
import pandas as pd
import concurrent.futures as cf
import json
import os, argparse
import os, argparse, time

import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset

def get_weight_json(catchments,jproc,version=None):
def get_weight_json(catchments,version=None):
if version is None: version = "v20.1"


weight_data = {}
ncatch = len(catchments)

print(f'Beginning weights query')
w = pa.dataset.dataset(
f's3://lynker-spatial/{version}/forcing_weights.parquet', format='parquet'
).filter(
pc.field('divide_id').isin(catchments)
).to_batches()
batch: pa.RecordBatch
ncatch_found = 0
t_weights = time.perf_counter()
for batch in w:
tbl = batch.to_pandas()
if tbl.empty:
Expand All @@ -29,8 +28,9 @@ def get_weight_json(catchments,jproc,version=None):
df_jcatch = tbl.loc[tbl['divide_id'] == jcatch]
if df_jcatch.empty:
continue
ncatch_found+=1
weight_data[jcatch] = [[int(x) for x in list(df_jcatch['cell'])],list(df_jcatch['coverage_fraction'])]
if j % 100 == 0 and jproc == 0: print(f'{j} {jcatch} {100*j/ncatch:.2f}%')
print(f'Weights calculated for {ncatch_found} catchments in {time.perf_counter() - t_weights:.1f} seconds')

return (weight_data)

Expand All @@ -39,7 +39,7 @@ def get_weight_json(catchments,jproc,version=None):
parser = argparse.ArgumentParser()
parser.add_argument('--gpkg', dest="geopackage", type=str, help="Path to geopackage file",default = None)
parser.add_argument('--outname', dest="weights_filename", type=str, help="Filename for the weight file")
parser.add_argument('--version', dest="version", type=str, help="Hydrofabric version e.g. \"v21\"")
parser.add_argument('--version', dest="version", type=str, help="Hydrofabric version e.g. \"v21\"",default = None)
args = parser.parse_args()

version = args.version
Expand All @@ -49,6 +49,7 @@ def get_weight_json(catchments,jproc,version=None):

if args.geopackage is None:
# go for conus
import pandas as pd
uri = f"s3://lynker-spatial/{version}/forcing_weights.parquet"
weights_df = pd.read_parquet(uri)
catchment_list = list(weights_df.divide_id.unique())
Expand All @@ -59,7 +60,7 @@ def get_weight_json(catchments,jproc,version=None):
catchments = gpd.read_file(args.geopackage, layer='divides')
catchment_list = sorted(list(catchments['divide_id']))

nprocs = os.cpu_count() // 5
nprocs = 2
catchment_list_list = []
ncatchments = len(catchment_list)
nper = ncatchments // nprocs
Expand All @@ -70,9 +71,9 @@ def get_weight_json(catchments,jproc,version=None):
k = nper + i + nleft
catchment_list_list.append(catchment_list[i:k])
i = k

with cf.ProcessPoolExecutor(max_workers=nprocs) as pool:
results = pool.map(get_weight_json,catchment_list_list,[x for x in range(nprocs)])
results = pool.map(get_weight_json,catchment_list_list)

weights = {}
for jweights in results:
Expand Down

0 comments on commit 14e9cb3

Please sign in to comment.