Skip to content

Commit

Permalink
Merge branch 'issue/355/radec' of http://github.com/astronomy-commons…
Browse files Browse the repository at this point in the history
…/hipscat-import into issue/355/radec
  • Loading branch information
delucchi-cmu committed Aug 12, 2024
2 parents 8418882 + 4310774 commit a6abcb6
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 164 deletions.
16 changes: 6 additions & 10 deletions src/hipscat_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from hipscat.pixel_math import hipscat_id

from hipscat_import.catalog.file_readers import InputReader, get_file_reader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.runtime_arguments import RuntimeArguments, find_input_paths

# pylint: disable=too-many-locals,too-many-arguments,too-many-instance-attributes,too-many-branches,too-few-public-methods
Expand Down Expand Up @@ -83,14 +82,18 @@ class ImportArguments(RuntimeArguments):
"""should we delete task-level done files once each stage is complete?
if False, we will keep all sub-histograms from the mapping stage, and all
done marker files at the end of the pipeline."""
run_stages: List[str] = field(default_factory=list)
"""list of parallel stages to run. options are ['mapping', 'splitting', 'reducing',
'finishing']. ['planning', 'binning'] stages are not optional.
this can be used to force the pipeline to stop after an early stage, to allow the
user to reset the dask client with different resources for different stages of
the workflow. if not specified, we will run all pipeline stages."""
debug_stats_only: bool = False
"""do not perform a map reduce and don't create a new
catalog. generate the partition info"""
file_reader: InputReader | str | None = None
"""instance of input reader that specifies arguments necessary for reading
from your input files"""
resume_plan: ResumePlan | None = None
"""container that handles read/write of log files for this pipeline"""

def __post_init__(self):
self._check_arguments()
Expand Down Expand Up @@ -133,13 +136,6 @@ def _check_arguments(self):
self.input_file_list,
storage_options=self.input_storage_options,
)
self.resume_plan = ResumePlan(
resume=self.resume,
progress_bar=self.progress_bar,
input_paths=self.input_paths,
tmp_path=self.resume_tmp,
delete_resume_log_files=self.delete_resume_log_files,
)

def to_catalog_info(self, total_rows) -> CatalogInfo:
"""Catalog-type-specific dataset info."""
Expand Down
114 changes: 76 additions & 38 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class ResumePlan(PipelineResumePlan):
"""set of files (and job keys) that have yet to be split"""
destination_pixel_map: Optional[List[Tuple[int, int, int]]] = None
"""Destination pixels and their expected final count"""
should_run_mapping: bool = True
should_run_splitting: bool = True
should_run_reducing: bool = True
should_run_finishing: bool = True

MAPPING_STAGE = "mapping"
SPLITTING_STAGE = "splitting"
Expand All @@ -37,54 +41,100 @@ class ResumePlan(PipelineResumePlan):
HISTOGRAMS_DIR = "histograms"
ALIGNMENT_FILE = "alignment.pickle"

def __post_init__(self):
"""Initialize the plan."""
self.gather_plan()
def __init__(
self,
resume: bool = True,
progress_bar: bool = True,
simple_progress_bar: bool = False,
input_paths=None,
tmp_path=None,
delete_resume_log_files: bool = True,
run_stages: List[str] | None = None,
import_args=None,
):
if import_args:
super().__init__(
resume=import_args.resume,
progress_bar=import_args.progress_bar,
simple_progress_bar=import_args.simple_progress_bar,
tmp_path=import_args.resume_tmp,
delete_resume_log_files=import_args.delete_resume_log_files,
)
if import_args.debug_stats_only:
run_stages = ["mapping", "finishing"]
self.input_paths = import_args.input_paths
else:
super().__init__(
resume=resume,
progress_bar=progress_bar,
simple_progress_bar=simple_progress_bar,
tmp_path=tmp_path,
delete_resume_log_files=delete_resume_log_files,
)
self.input_paths = input_paths
self.gather_plan(run_stages)

def gather_plan(self):
def gather_plan(self, run_stages: List[str] | None = None):
"""Initialize the plan."""
with self.print_progress(total=5, stage_name="Planning") as step_progress:
with self.print_progress(total=4, stage_name="Planning") as step_progress:
## Make sure it's safe to use existing resume state.
super().safe_to_resume()
step_progress.update(1)

## Validate existing resume state.
## - if a later stage is complete, the earlier stages should be complete too.
mapping_done = self.is_mapping_done()
splitting_done = self.is_splitting_done()
reducing_done = self.is_reducing_done()
mapping_done = self.done_file_exists(self.MAPPING_STAGE)
splitting_done = self.done_file_exists(self.SPLITTING_STAGE)
reducing_done = self.done_file_exists(self.REDUCING_STAGE)

if reducing_done and (not mapping_done or not splitting_done):
raise ValueError("mapping and splitting must be complete before reducing")
if splitting_done and not mapping_done:
raise ValueError("mapping must be complete before splitting")
step_progress.update(1)

## Figure out which stages we should run, based on requested `run_stages`
self.should_run_mapping = not mapping_done
self.should_run_splitting = not splitting_done
self.should_run_reducing = not reducing_done
self.should_run_finishing = True

if run_stages:
self.should_run_mapping &= self.MAPPING_STAGE in run_stages
self.should_run_splitting &= self.SPLITTING_STAGE in run_stages
self.should_run_reducing &= self.REDUCING_STAGE in run_stages
self.should_run_finishing = "finishing" in run_stages

## Validate that we're operating on the same file set as the previous instance.
self.input_paths = self.check_original_input_paths(self.input_paths)
step_progress.update(1)

## Gather keys for execution.
if not mapping_done:
if self.should_run_mapping:
self.map_files = self.get_remaining_map_keys()
if not splitting_done:
file_io.make_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR),
exist_ok=True,
)
if self.should_run_splitting:
if not (mapping_done or self.should_run_mapping):
raise ValueError("mapping must be complete before splitting")

self.split_keys = self.get_remaining_split_keys()
## We don't pre-gather the plan for the reducing keys.
## It requires the full destination pixel map.
step_progress.update(1)
## Go ahead and create our directories for storing resume files.
file_io.make_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR),
exist_ok=True,
)
file_io.make_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.SPLITTING_STAGE),
exist_ok=True,
)
file_io.make_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.REDUCING_STAGE),
exist_ok=True,
)
file_io.make_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.SPLITTING_STAGE),
exist_ok=True,
)
if self.should_run_reducing:
## We don't pre-gather the plan for the reducing keys.
## It requires the full destination pixel map.
if not (splitting_done or self.should_run_splitting):
raise ValueError("splitting must be complete before reducing")

file_io.make_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.REDUCING_STAGE),
exist_ok=True,
)
step_progress.update(1)

def get_remaining_map_keys(self):
Expand Down Expand Up @@ -193,10 +243,6 @@ def wait_for_mapping(self, futures):
raise RuntimeError("some map stages did not complete successfully.")
self.touch_stage_done_file(self.MAPPING_STAGE)

def is_mapping_done(self) -> bool:
"""Are there files left to map?"""
return self.done_file_exists(self.MAPPING_STAGE)

def get_alignment_file(
self,
raw_histogram,
Expand Down Expand Up @@ -270,10 +316,6 @@ def wait_for_splitting(self, futures):
raise RuntimeError(f"{len(remaining_split_items)} split stages did not complete successfully.")
self.touch_stage_done_file(self.SPLITTING_STAGE)

def is_splitting_done(self) -> bool:
"""Are there files left to split?"""
return self.done_file_exists(self.SPLITTING_STAGE)

def get_reduce_items(self):
"""Fetch a triple for each partition to reduce.
Expand All @@ -299,10 +341,6 @@ def get_destination_pixels(self):
raise RuntimeError("destination pixel map not known.")
return [HealpixPixel(hp_order, hp_pixel) for hp_order, hp_pixel, _ in self.destination_pixel_map]

def is_reducing_done(self) -> bool:
"""Are there partitions left to reduce?"""
return self.done_file_exists(self.REDUCING_STAGE)

def wait_for_reducing(self, futures):
"""Wait for reducing futures to complete."""
self.wait_for_futures(futures, self.REDUCING_STAGE, fail_fast=True)
Expand Down
Loading

0 comments on commit a6abcb6

Please sign in to comment.