Skip to content

Commit

Permalink
tryu dask again
Browse files Browse the repository at this point in the history
  • Loading branch information
davedavemckay committed Jan 15, 2025
1 parent ba473c9 commit 4477649
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions csd3-side/scripts/lsst-backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ def compare_zip_contents_bool(to_collate, id: int, current_objects: pd.DataFrame
Returns:
return_bool: A bool == True if the zip should be uploaded.
"""
collate_object_names = to_collate[[to_collate.id == id]]['object_names'].values[0]
ids = to_collate['id'].compute()
collate_object_names = to_collate[[ids == id]]['object_names'].compute().values[0]
return_bool = True
# dprint(f'collate_object_names: {collate_object_names}', flush=True)
# dprint(f'type: {type(collate_object_names)}', flush=True)
Expand Down Expand Up @@ -1225,22 +1226,23 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des
print(f'Loaded collate list from {collate_list_file}.', flush=True)
# now using pandas for both current_objects and to_collate - this could be re-written to using vectorised operations
# client.scatter([current_objects,to_collate])
# to_collate = dd.from_pandas(to_collate, npartitions=len(client.scheduler_info()['workers'])*10)
to_collate = dd.from_pandas(to_collate, npartitions=len(client.scheduler_info()['workers'])*10)
# print('Created Dask dataframe for to_collate.', flush=True)
print('Created Pandas dataframe for to_collate.', flush=True)
# to_collate = to_collate.drop(columns='upload')

# print(to_collate.index)
# print(to_collate.columns)
# print(to_collate.dtypes)
client.scatter(to_collate)
# client.scatter(to_collate)
print('Scattered dataframe to distributed memory.', flush=True)
# to_collate = to_collate.compute()

comp_futures = []
# dprint(to_collate[to_collate.id == 0]['object_names'].values[0])
# for i, on in enumerate(to_collate['object_names']):
dprint('Comparing existing zips to collate list.', flush=True)
for id in to_collate['id']:
ids = to_collate['id'].compute()
for id in ids:
# for i,args in enumerate(zip(
# to_collate['object_names'],
# [i for i in range(len(to_collate))],
Expand All @@ -1256,6 +1258,7 @@ def process_files(s3, bucket_name, api, current_objects, exclude, local_dir, des
destination_dir,
))
wait(comp_futures)
to_collate = to_collate.compute()
to_collate['upload'] = [f.result() for f in comp_futures]
# print(to_collate)
# exit()
Expand Down

0 comments on commit 4477649

Please sign in to comment.