Skip to content

Commit

Permalink
JP-3290 Isolate candidate processing into their own pools (#8227)
Browse files Browse the repository at this point in the history
Co-authored-by: Howard Bushouse <[email protected]>
  • Loading branch information
stscieisenhamer and hbushouse committed Feb 21, 2024
1 parent 4cc0ac1 commit 613383f
Show file tree
Hide file tree
Showing 19 changed files with 666 additions and 203 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ associations
sub-pixel dithers, so that only exposures from other nod positions
are used as background members in "spec2" associations. [#8184]

- JP-3290 Isolate candidate processing into their own pools [#8227]

cube_build
----------

Expand Down
2 changes: 1 addition & 1 deletion jwst/associations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def libpath(filepath):
from .association import *
from .association_io import *
from .exceptions import *
from .generate import *
from .generator import *
from .lib.process_list import *
from .pool import *
from .registry import *
Expand Down
8 changes: 4 additions & 4 deletions jwst/associations/association.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,10 @@ def check_and_set_constraints(self, item):
- [ProcessItem[, ...]]: List of items to process again.
"""
cached_constraints = deepcopy(self.constraints)
match, reprocess = cached_constraints.check_and_set(item)
if match:
self.constraints = cached_constraints
self.constraints.preserve()
match, reprocess = self.constraints.check_and_set(item)
if not match:
self.constraints.restore()

# Set the association type for all reprocessed items.
for process_list in reprocess:
Expand Down
1 change: 1 addition & 0 deletions jwst/associations/generator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .generate import generate
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import logging
from timeit import default_timer as timer

from .association import make_timestamp
from .lib.process_list import (
from ..association import make_timestamp
from ..lib.process_list import (
ListCategory,
ProcessList,
ProcessQueueSorted,
workover_filter
)
from .pool import PoolRow
from ..lib.progress import Bar
from ..pool import PoolRow
from ...lib.progress import Bar

# Configure logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -93,12 +93,12 @@ def generate(pool, rules, version_id=None, finalize=True):
total_reprocess += len(to_process_modified)
bar.next()

logger.info('Seconds to process: %.2f', timer() - time_start)
logger.debug('Existing associations modified: %d New associations created: %d', total_mod_existing, total_new)
logger.debug('New process lists: %d', total_reprocess)
logger.debug('Updated process queue: %s', process_queue)
logger.debug('# associations: %d', len(associations))
logger.debug('Associations: %s', [type(_association) for _association in associations])
logger.debug('Seconds to process: %.2f\n', timer() - time_start)

# Finalize found associations
logger.debug('# associations before finalization: %d', len(associations))
Expand All @@ -110,9 +110,9 @@ def generate(pool, rules, version_id=None, finalize=True):
except KeyError as exception:
logger.debug('Finalization failed for reason: %s', exception)

logger.info('Associations generated: %s', len(finalized_asns))
return finalized_asns


def generate_from_item(
item,
version_id,
Expand Down
228 changes: 228 additions & 0 deletions jwst/associations/generator/generate_per_candidate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import collections
import logging
from timeit import default_timer as timer

from .generate import generate
from .generate_per_pool import CANDIDATE_RULESET, DISCOVER_RULESET, constrain_on_candidates
from ..lib.utilities import evaluate, filter_discovered_only
from ..registry import AssociationRegistry

# Configure logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())


def generate_per_candidate(pool, rule_defs, candidate_ids=None, all_candidates=True, discover=False,
version_id=None, finalize=True, merge=False, ignore_default=False):
"""Generate associations in the pool according to the rules.
Parameters
----------
pool : AssociationPool
The pool to generate from.
rule_defs : [File-like[,...]] or None
The rule definitions to use. None to use the defaults if `ignore_default` is False.
candidate_ids : [str,[...]] or None
List of candidates to produce for. If None, do all possible candidates
all_candidates : bool
Keep associations generated from candidates when in discovery mode.
discover : bool
Find associations that are not candidate-based.
version_id : None, True, or str
The string to use to tag associations and products.
If None, no tagging occurs.
If True, use a timestamp
If a string, the string.
finalize : bool
Run all rule methods marked as 'finalized'.
merge : bool
Merge single associations into a common association with separate products.
ignore_default : bool
Ignore the default rules. Use only the user-specified ones.
Returns
-------
associations : [Association[,...]]
List of associations
Notes
-----
Refer to the :ref:`Association Generator <design-generator>`
documentation for a full description.
"""
logger.info('Generating based on the per-candidate algorithm.')

# Get the candidates
cids_by_type = ids_by_ctype(pool)
if candidate_ids is None:
cids_ctypes = [(cid, ctype)
for ctype in cids_by_type
for cid in cids_by_type[ctype]
]
else:
cids_ctypes = []
for cid in candidate_ids:
for ctype in cids_by_type:
if cid in cids_by_type[ctype]:
cids_ctypes.append((cid, ctype))
break
else:
logger.warning('Candidate id %s not found in pool', cid)

associations = []
for cid_ctype in cids_ctypes:
time_start = timer()
# Generate the association for the given candidate
associations_cid = generate_on_candidate(cid_ctype, pool, rule_defs, version_id=version_id, ignore_default=ignore_default)

# Add to the list
associations.extend(associations_cid)

logger.info('Time to process candidate %s: %.2f', cid_ctype[0], timer() - time_start)

# The ruleset has been generated on a per-candidate case.
# Here, need to do a final rebuild of the ruleset to get the finalization
# functions. This ruleset does not need any of the candidate specifications.
# This ruleset is also used if discovery is in play.
rules = AssociationRegistry(rule_defs, include_default=not ignore_default, name=DISCOVER_RULESET)
if discover:
logger.info('Discovering associations...')
associations_discover = generate(pool, rules, version_id=version_id, finalize=False)
associations.extend(associations_discover)
logger.info('# associations found before discover filter: %d', len(associations_discover))
associations = filter_discovered_only(
associations,
DISCOVER_RULESET,
CANDIDATE_RULESET,
keep_candidates=all_candidates,
)
rules.Utility.resequence(associations)

# Finalize found associations
logger.debug('# associations before finalization: %d', len(associations))
finalized_asns = associations
if finalize and len(associations):
logger.debug('Performing association finalization.')

try:
finalized_asns = rules.callback.reduce('finalize', associations)
except KeyError as exception:
logger.debug('Finalization failed for reason: %s', exception)


# Do a grand merging. This is done particularly for
# Level2 associations.
if merge:
try:
finalized_asns = rules.Utility.merge_asns(finalized_asns)
except AttributeError:
pass

logger.info('Total associations generated: %s', len(finalized_asns))
return finalized_asns


def generate_on_candidate(cid_ctype, pool, rule_defs, version_id=None, ignore_default=False):
"""Generate associations based on a candidate ID
Parameters
----------
cid_ctype : (str, str)
2-tuple of candidate ID and the candidate type
pool : AssociationPool
The pool to generate from.
rule_defs : [File-like[,...]] or None
The rule definitions to use. None to use the defaults if `ignore_default` is False.
version_id : None, True, or str
The string to use to tag associations and products.
If None, no tagging occurs.
If True, use a timestamp
If a string, the string.
ignore_default : bool
Ignore the default rules. Use only the user-specified ones.
Returns
-------
associations : [Association[,...]]
List of associations
"""
cid, ctype = cid_ctype
logger.info(f'Generating associations on candidate {cid_ctype}')

# Get the pool
pool_cid = pool_from_candidate(pool, cid)
pool_cid['asn_candidate'] = [f"[('{cid}', '{ctype}')]"] * len(pool_cid)
logger.info(f'Length of pool for {cid}: {len(pool_cid)}')

# Create the rules with the simplified asn_candidate constraint
asn_constraint = constrain_on_candidates([cid])
rules = AssociationRegistry(rule_defs, include_default=not ignore_default, global_constraints=asn_constraint, name=CANDIDATE_RULESET)

# Get the associations
associations = generate(pool_cid, rules, version_id=version_id, finalize=False)

return associations


def ids_by_ctype(pool):
"""Groups candidate ids by the candidate type
Parameters
----------
pool : AssociationPool
The association pool
Returns
-------
ids_by_ctype : {ctype: counter}
Dict with the key of the candidate type. Value is a
`collections.Counter` object of the ids and their counts.
"""
ids_by_ctype = collections.defaultdict(list)
for exp_candidates in pool['asn_candidate']:
candidates = evaluate(exp_candidates)
if isinstance(candidates, int):
ids_by_ctype['unknown'].append(str(candidates))
continue
try:
for id, ctype in candidates:
ids_by_ctype[ctype].append(id)
except ValueError:
logger.debug('Cannot parse asn_candidate field: %s', candidates)

for ctype in ids_by_ctype:
ids_by_ctype[ctype] = collections.Counter(ids_by_ctype[ctype])

return ids_by_ctype


def pool_from_candidate(pool, candidate):
"""Create a pool containing only the candidate
Parameters
----------
pool : AssociationPool
The pool to filter from.
candidate : str
The candidate id to filter.
Returns
-------
candidate_pool : AssociationPool
Pool containing only the candidate
"""
candidate_pool = pool[[candidate in row['asn_candidate'] for row in pool]]
return candidate_pool
Loading

0 comments on commit 613383f

Please sign in to comment.