Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spawn many small reprojection tasks #41

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
Prev Previous commit
Next Next commit
WIP - Expect several bugs, but need to test on Klone now.
drewoldag committed Sep 5, 2024
commit fb2b9a4b7a7ad89cffe7419aa94c1676d1639104
4 changes: 2 additions & 2 deletions example_runtime_config.toml
Original file line number Diff line number Diff line change
@@ -11,8 +11,8 @@ checkpoint_mode = 'task_exit'
[apps.create_manifest]
# The path to the staging directory
# e.g. "/gscratch/dirac/kbmod/workflow/staging"
staging_directory = "/home/drew/code/kbmod-wf/dev_staging"
output_directory = "/home/drew/code/kbmod-wf/dev_staging/processing"
staging_directory = "/Users/drew/code/kbmod-wf/dev_staging"
output_directory = "/Users/drew/code/kbmod-wf/dev_staging/processing"
file_pattern = "*.collection"


79 changes: 26 additions & 53 deletions src/kbmod_wf/parallel_repro_single_chip_wf.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@

import toml
import parsl
from parsl import join_app, python_app, File
from parsl import python_app, File
import parsl.executors

from kbmod_wf.utilities import (
@@ -14,50 +14,26 @@
get_configured_logger,
)

from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, kbmod_search


# There's still a ton of duplicated code here and in kbmod_wf.workflow_tasks.reproject_wu
# that should be refactored.
# The only difference is the import of reproject_single_chip_single_night_wu here.
@join_app(
cache=True,
executors=get_executors(["local_dev_testing", "sharded_reproject"]),
ignore_for_cache=["logging_file"],
)
def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger

logger = get_configured_logger("task.reproject_wu", logging_file.filepath)

logger.info("Starting reproject_ic")
with ErrorLogger(logger):
future = sharded_reproject(
original_wu_filepath=inputs[0].filepath,
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
)
logger.info("Completed reproject_ic")
return future
from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu_return_shards, kbmod_search


@python_app(
cache=True,
executors=get_executors(["local_dev_testing", "sharded_reproject"]),
executors=get_executors(["local_dev_testing", "reproject_single_shard"]),
ignore_for_cache=["logging_file"],
)
def sharded_reproject(inputs=(), outputs=(), runtime_config={}, logging_file=None):
def reproject_shard(inputs=(), outputs=(), runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger

logger = get_configured_logger("task.sharded_reproject", logging_file.filepath)

from kbmod_wf.task_impls.reproject_single_chip_single_night_wu_shard import reproject_wu_shard
from kbmod_wf.task_impls.reproject_single_chip_single_night_wu_shard import reproject_shard

logger.info("Starting reproject_ic")
with ErrorLogger(logger):
reproject_wu_shard(
reproject_shard(
original_wu_filepath=inputs[0].filepath,
original_wcs=inputs[1],
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
@@ -94,64 +70,61 @@ def workflow_runner(env=None, runtime_config={}):

# gather all the *.collection files that are staged for processing
create_manifest_config = app_configs.get("create_manifest", {})
manifest_file = File(
os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt")
)
manifest_file_path = Path(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt")

create_manifest_future = create_manifest(
inputs=[],
outputs=[manifest_file],
outputs=[File(manifest_file_path)],
runtime_config=app_configs.get("create_manifest", {}),
logging_file=logging_file,
)

with open(create_manifest_future.result(), "r") as f:
# process each .collection file in the manifest into a .wu file
with open(create_manifest_future.result(), "r") as manifest:
# process each .collection file in the manifest
original_work_unit_futures = []
for line in f:
for line in manifest:
# Create path object for the line in the manifest
input_file = Path(line.strip())

# Create a directory for the sharded work unit files
# Create a directory to contain each work unit's shards
sharded_directory = Path(input_file.parent, input_file.stem)
sharded_directory.mkdir(exist_ok=True)

# Create the work unit filepath
# Construct the work unit filepath
output_workunit_filepath = Path(sharded_directory, input_file.stem + ".wu")

# Create the work unit future
original_work_unit_futures.append(
ic_to_wu(
ic_to_wu_return_shards(
inputs=[input_file],
outputs=[File(output_workunit_filepath)],
runtime_config=app_configs.get("ic_to_wu", {}),
logging_file=logging_file,
)
)

# reproject each WorkUnit
# reproject each WorkUnit shard individually
# For chip-by-chip, this isn't really necessary, so hardcoding to 0.
reproject_futures = []
for f in original_work_unit_futures:
distance = 0

unique_obstimes, unique_obstimes_indices = work_unit.get_unique_obstimes_and_indices()

reproject_futures.append(
reproject_wu(
inputs=[f.result()],
outputs=[File(f.result().filepath + f".{distance}.repro")],
shard_futures = []
for i in f.result():
shard_future = reproject_shard(
inputs=[i],
outputs=[File(i.parent / (i.stem + ".repro"))],
runtime_config=app_configs.get("reproject_wu", {}),
logging_file=logging_file,
)
)
shard_futures.append(shard_future)
reproject_futures.append(shard_futures)

# run kbmod search on each reprojected WorkUnit
search_futures = []
for f in reproject_futures:
search_futures.append(
kbmod_search(
inputs=[f.result()],
outputs=[File(f.result().filepath + ".search.ecsv")],
inputs=[i.result() for i in f],
outputs=[],
runtime_config=app_configs.get("kbmod_search", {}),
logging_file=logging_file,
)
10 changes: 5 additions & 5 deletions src/kbmod_wf/resource_configs/klone_configuration.py
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@
"compute_bigmem": "01:00:00",
"large_mem": "04:00:00",
"sharded_reproject": "04:00:00",
"parallel_reproject": "00:30:00",
"reproject_single_shard": "00:30:00",
"gpu_max": "08:00:00",
}

@@ -82,20 +82,20 @@ def klone_resource_config():
),
),
HighThroughputExecutor(
label="parallel_reproject",
label="reproject_single_shard",
max_workers=1,
provider=SlurmProvider(
partition="ckpt-g2",
account="astro",
min_blocks=0,
max_blocks=2,
max_blocks=256,
init_blocks=0,
parallelism=1,
nodes_per_block=1,
cores_per_node=1,
mem_per_node=2, # ~2-4 GB per core
mem_per_node=1, # only working on 1 image, so <1 GB should be required
exclusive=False,
walltime=walltimes["parallel_reproject"],
walltime=walltimes["reproject_single_shard"],
# Command to run before starting worker - i.e. conda activate <special_env>
worker_init="",
),
4 changes: 3 additions & 1 deletion src/kbmod_wf/task_impls/ic_to_wu.py
Original file line number Diff line number Diff line change
@@ -83,4 +83,6 @@ def create_work_unit(self):
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to write WorkUnit to disk: {self.wu_filepath}")

return self.wu_filepath
wcs = list(orig_wu._per_image_wcs)

return self.wu_filepath, wcs
140 changes: 78 additions & 62 deletions src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import os
import time
from logging import Logger

import numpy as np
import astropy.io.fits as fitsio

import kbmod
from kbmod.work_unit import WorkUnit

import kbmod.reprojection as reprojection

from reoproject import reproject_adaptive
from reproject.mosaicking import find_optimal_celestial_wcs
import os
import time
from logging import Logger


def reproject_wu_shard(
def reproject_shard(
original_wu_shard_filepath: str = None,
original_wcs=None,
reprojected_wu_shard_filepath: str = None,
runtime_config: dict = {},
logger: Logger = None,
@@ -35,60 +40,71 @@ def reproject_wu_shard(
The fully resolved filepath of the resulting WorkUnit file after reflex
and reprojection.
"""
wu_shard_reprojector = WUShardReprojector(
original_wu_filepath=original_wu_shard_filepath,
reprojected_wu_filepath=reprojected_wu_shard_filepath,
runtime_config=runtime_config,
logger=logger,
)

return wu_shard_reprojector.reproject_workunit()


class WUShardReprojector:
def __init__(
self,
original_wu_filepath: str = None,
reprojected_wu_filepath: str = None,
runtime_config: dict = {},
logger: Logger = None,
):
self.original_wu_filepath = original_wu_filepath
self.reprojected_wu_filepath = reprojected_wu_filepath
self.runtime_config = runtime_config
self.logger = logger

# Default to 8 workers if not in the config. Value must be 0<num workers<65.
self.n_workers = max(1, min(self.runtime_config.get("n_workers", 8), 64))

def reproject_workunit_shard(self):
last_time = time.time()
self.logger.info(f"Lazy reading existing WorkUnit from disk: {self.original_wu_filepath}")
directory_containing_shards, wu_filename = os.path.split(self.original_wu_filepath)
wu = WorkUnit.from_sharded_fits(wu_filename, directory_containing_shards, lazy=True)
elapsed = round(time.time() - last_time, 1)
self.logger.info(f"Required {elapsed}[s] to lazy read original WorkUnit {self.original_wu_filepath}.")

directory_containing_reprojected_shards, reprojected_wu_filename = os.path.split(
self.reprojected_wu_filepath
)

# Reproject to a common WCS using the WCS for our patch
self.logger.info(f"Reprojecting WorkUnit with {self.n_workers} workers...")
last_time = time.time()

opt_wcs, shape = find_optimal_celestial_wcs(list(wu._per_image_wcs))
opt_wcs.array_shape = shape
reprojection.reproject_work_unit(
wu,
opt_wcs,
max_parallel_processes=self.n_workers,
write_output=True,
directory=directory_containing_reprojected_shards,
filename=reprojected_wu_filename,
)

elapsed = round(time.time() - last_time, 1)
self.logger.info(f"Required {elapsed}[s] to create the sharded reprojected WorkUnit.")

return self.reprojected_wu_filepath

opt_wcs, shape = find_optimal_celestial_wcs(original_wcs)
opt_wcs.array_shape = shape

shard = fitsio.open(original_wu_shard_filepath)
sci = reproject_adaptive(shard, opt_wcs, hdu_in=0)
var = reproject_adaptive(shard, opt_wcs, hdu_in=1)
mask = reproject_adaptive(shard, opt_wcs, hdu_in=2)

shard[0].data = sci.astype(np.float32)
shard[1].data = var.astype(np.float32)
shard[2].data = mask.astype(np.float32)

shard.write(original_wu_shard_filepath)

with open(reprojected_wu_shard_filepath, "w") as f:
f.write(f"Reprojected: {original_wu_shard_filepath}")

return original_wu_shard_filepath


# class WUShardReprojector:
# def __init__(
# self,
# original_wu_filepath: str = None,
# reprojected_wu_filepath: str = None,
# runtime_config: dict = {},
# logger: Logger = None,
# ):
# self.original_wu_filepath = original_wu_filepath
# self.reprojected_wu_filepath = reprojected_wu_filepath
# self.runtime_config = runtime_config
# self.logger = logger

# # Default to 8 workers if not in the config. Value must be 0<num workers<65.
# self.n_workers = max(1, min(self.runtime_config.get("n_workers", 8), 64))

# def reproject_workunit_shard(self):
# last_time = time.time()
# self.logger.info(f"Lazy reading existing WorkUnit from disk: {self.original_wu_filepath}")
# directory_containing_shards, wu_filename = os.path.split(self.original_wu_filepath)
# wu = WorkUnit.from_sharded_fits(wu_filename, directory_containing_shards, lazy=True)
# elapsed = round(time.time() - last_time, 1)
# self.logger.info(f"Required {elapsed}[s] to lazy read original WorkUnit {self.original_wu_filepath}.")

# directory_containing_reprojected_shards, reprojected_wu_filename = os.path.split(
# self.reprojected_wu_filepath
# )

# # Reproject to a common WCS using the WCS for our patch
# self.logger.info(f"Reprojecting WorkUnit with {self.n_workers} workers...")
# last_time = time.time()

# opt_wcs, shape = find_optimal_celestial_wcs(list(wu._per_image_wcs))
# opt_wcs.array_shape = shape
# reprojection.reproject_work_unit(
# wu,
# opt_wcs,
# max_parallel_processes=self.n_workers,
# write_output=True,
# directory=directory_containing_reprojected_shards,
# filename=reprojected_wu_filename,
# )

# elapsed = round(time.time() - last_time, 1)
# self.logger.info(f"Required {elapsed}[s] to create the sharded reprojected WorkUnit.")

# return self.reprojected_wu_filepath
2 changes: 1 addition & 1 deletion src/kbmod_wf/test_workflow.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this PR is ultimately merged to main, we should delete this file. I only created it initially to ensure that using Futures in a specific way would not block the workflow. Specifically on line 217 of this file, the input for the next task is defined as [i.result() for i in list_of_futures].

Original file line number Diff line number Diff line change
@@ -249,7 +249,7 @@ def workflow_runner(env=None, runtime_config={}):
runtime_config = {}

#! Don't forget to remove this hardcoded path!!!
args.runtime_config = "/home/drew/code/kbmod-wf/example_runtime_config.toml"
args.runtime_config = "/Users/drew/code/kbmod-wf/example_runtime_config.toml"
if args.runtime_config is not None and os.path.exists(args.runtime_config):
with open(args.runtime_config, "r") as toml_runtime_config:
runtime_config = toml.load(toml_runtime_config)
1 change: 1 addition & 0 deletions src/kbmod_wf/workflow_tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -3,3 +3,4 @@
from .kbmod_search import kbmod_search
from .reproject_wu import reproject_wu
from .uri_to_ic import uri_to_ic
from .ic_to_wu_return_shards import ic_to_wu_return_shards
Loading