From 613383f5968b90cc52274682dfd1ead93bddec54 Mon Sep 17 00:00:00 2001 From: Jonathan Eisenhamer Date: Wed, 21 Feb 2024 10:41:57 -0500 Subject: [PATCH] JP-3290 Isolate candidate processing into their own pools (#8227) Co-authored-by: Howard Bushouse --- CHANGES.rst | 2 + jwst/associations/__init__.py | 2 +- jwst/associations/association.py | 8 +- jwst/associations/generator/__init__.py | 1 + jwst/associations/{ => generator}/generate.py | 12 +- .../generator/generate_per_candidate.py | 228 ++++++++++++++++++ .../generator/generate_per_pool.py | 123 ++++++++++ jwst/associations/lib/constraint.py | 60 ++++- jwst/associations/lib/process_list.py | 81 ++++++- jwst/associations/lib/utilities.py | 139 +++++++++-- jwst/associations/main.py | 169 ++----------- jwst/associations/tests/test_asn_names.py | 2 +- jwst/associations/tests/test_constraints.py | 21 ++ jwst/associations/tests/test_io.py | 2 +- .../tests/test_level2_background.py | 2 +- .../tests/test_level3_duplicate.py | 3 +- .../tests/test_level3_spectrographic.py | 2 +- jwst/associations/tests/test_level3_wfs.py | 2 +- jwst/regtest/test_associations_sdp_pools.py | 10 + 19 files changed, 666 insertions(+), 203 deletions(-) create mode 100644 jwst/associations/generator/__init__.py rename jwst/associations/{ => generator}/generate.py (96%) create mode 100644 jwst/associations/generator/generate_per_candidate.py create mode 100644 jwst/associations/generator/generate_per_pool.py diff --git a/CHANGES.rst b/CHANGES.rst index 5e3559690d..c865170ed8 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -8,6 +8,8 @@ associations sub-pixel dithers, so that only exposures from other nod positions are used as background members in "spec2" associations. [#8184] +- JP-3290 Isolate candidate processing into their own pools [#8227] + cube_build ---------- diff --git a/jwst/associations/__init__.py b/jwst/associations/__init__.py index fe27b492f9..0697dca651 100644 --- a/jwst/associations/__init__.py +++ b/jwst/associations/__init__.py @@ -27,7 +27,7 @@ def libpath(filepath): from .association import * from .association_io import * from .exceptions import * -from .generate import * +from .generator import * from .lib.process_list import * from .pool import * from .registry import * diff --git a/jwst/associations/association.py b/jwst/associations/association.py index 92de32e0ac..17a9f7ac4f 100644 --- a/jwst/associations/association.py +++ b/jwst/associations/association.py @@ -389,10 +389,10 @@ def check_and_set_constraints(self, item): - [ProcessItem[, ...]]: List of items to process again. """ - cached_constraints = deepcopy(self.constraints) - match, reprocess = cached_constraints.check_and_set(item) - if match: - self.constraints = cached_constraints + self.constraints.preserve() + match, reprocess = self.constraints.check_and_set(item) + if not match: + self.constraints.restore() # Set the association type for all reprocessed items. for process_list in reprocess: diff --git a/jwst/associations/generator/__init__.py b/jwst/associations/generator/__init__.py new file mode 100644 index 0000000000..7bae595061 --- /dev/null +++ b/jwst/associations/generator/__init__.py @@ -0,0 +1 @@ +from .generate import generate diff --git a/jwst/associations/generate.py b/jwst/associations/generator/generate.py similarity index 96% rename from jwst/associations/generate.py rename to jwst/associations/generator/generate.py index fea0d4bdd2..333bf17eef 100644 --- a/jwst/associations/generate.py +++ b/jwst/associations/generator/generate.py @@ -1,15 +1,15 @@ import logging from timeit import default_timer as timer -from .association import make_timestamp -from .lib.process_list import ( +from ..association import make_timestamp +from ..lib.process_list import ( ListCategory, ProcessList, ProcessQueueSorted, workover_filter ) -from .pool import PoolRow -from ..lib.progress import Bar +from ..pool import PoolRow +from ...lib.progress import Bar # Configure logging logger = logging.getLogger(__name__) @@ -93,12 +93,12 @@ def generate(pool, rules, version_id=None, finalize=True): total_reprocess += len(to_process_modified) bar.next() + logger.info('Seconds to process: %.2f', timer() - time_start) logger.debug('Existing associations modified: %d New associations created: %d', total_mod_existing, total_new) logger.debug('New process lists: %d', total_reprocess) logger.debug('Updated process queue: %s', process_queue) logger.debug('# associations: %d', len(associations)) logger.debug('Associations: %s', [type(_association) for _association in associations]) - logger.debug('Seconds to process: %.2f\n', timer() - time_start) # Finalize found associations logger.debug('# associations before finalization: %d', len(associations)) @@ -110,9 +110,9 @@ def generate(pool, rules, version_id=None, finalize=True): except KeyError as exception: logger.debug('Finalization failed for reason: %s', exception) + logger.info('Associations generated: %s', len(finalized_asns)) return finalized_asns - def generate_from_item( item, version_id, diff --git a/jwst/associations/generator/generate_per_candidate.py b/jwst/associations/generator/generate_per_candidate.py new file mode 100644 index 0000000000..c7962da44e --- /dev/null +++ b/jwst/associations/generator/generate_per_candidate.py @@ -0,0 +1,228 @@ +import collections +import logging +from timeit import default_timer as timer + +from .generate import generate +from .generate_per_pool import CANDIDATE_RULESET, DISCOVER_RULESET, constrain_on_candidates +from ..lib.utilities import evaluate, filter_discovered_only +from ..registry import AssociationRegistry + +# Configure logging +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + + +def generate_per_candidate(pool, rule_defs, candidate_ids=None, all_candidates=True, discover=False, + version_id=None, finalize=True, merge=False, ignore_default=False): + """Generate associations in the pool according to the rules. + + Parameters + ---------- + pool : AssociationPool + The pool to generate from. + + rule_defs : [File-like[,...]] or None + The rule definitions to use. None to use the defaults if `ignore_default` is False. + + candidate_ids : [str,[...]] or None + List of candidates to produce for. If None, do all possible candidates + + all_candidates : bool + Keep associations generated from candidates when in discovery mode. + + discover : bool + Find associations that are not candidate-based. + + version_id : None, True, or str + The string to use to tag associations and products. + If None, no tagging occurs. + If True, use a timestamp + If a string, the string. + + finalize : bool + Run all rule methods marked as 'finalized'. + + merge : bool + Merge single associations into a common association with separate products. + + ignore_default : bool + Ignore the default rules. Use only the user-specified ones. + + Returns + ------- + associations : [Association[,...]] + List of associations + + Notes + ----- + Refer to the :ref:`Association Generator ` + documentation for a full description. + """ + logger.info('Generating based on the per-candidate algorithm.') + + # Get the candidates + cids_by_type = ids_by_ctype(pool) + if candidate_ids is None: + cids_ctypes = [(cid, ctype) + for ctype in cids_by_type + for cid in cids_by_type[ctype] + ] + else: + cids_ctypes = [] + for cid in candidate_ids: + for ctype in cids_by_type: + if cid in cids_by_type[ctype]: + cids_ctypes.append((cid, ctype)) + break + else: + logger.warning('Candidate id %s not found in pool', cid) + + associations = [] + for cid_ctype in cids_ctypes: + time_start = timer() + # Generate the association for the given candidate + associations_cid = generate_on_candidate(cid_ctype, pool, rule_defs, version_id=version_id, ignore_default=ignore_default) + + # Add to the list + associations.extend(associations_cid) + + logger.info('Time to process candidate %s: %.2f', cid_ctype[0], timer() - time_start) + + # The ruleset has been generated on a per-candidate case. + # Here, need to do a final rebuild of the ruleset to get the finalization + # functions. This ruleset does not need any of the candidate specifications. + # This ruleset is also used if discovery is in play. + rules = AssociationRegistry(rule_defs, include_default=not ignore_default, name=DISCOVER_RULESET) + if discover: + logger.info('Discovering associations...') + associations_discover = generate(pool, rules, version_id=version_id, finalize=False) + associations.extend(associations_discover) + logger.info('# associations found before discover filter: %d', len(associations_discover)) + associations = filter_discovered_only( + associations, + DISCOVER_RULESET, + CANDIDATE_RULESET, + keep_candidates=all_candidates, + ) + rules.Utility.resequence(associations) + + # Finalize found associations + logger.debug('# associations before finalization: %d', len(associations)) + finalized_asns = associations + if finalize and len(associations): + logger.debug('Performing association finalization.') + + try: + finalized_asns = rules.callback.reduce('finalize', associations) + except KeyError as exception: + logger.debug('Finalization failed for reason: %s', exception) + + + # Do a grand merging. This is done particularly for + # Level2 associations. + if merge: + try: + finalized_asns = rules.Utility.merge_asns(finalized_asns) + except AttributeError: + pass + + logger.info('Total associations generated: %s', len(finalized_asns)) + return finalized_asns + + +def generate_on_candidate(cid_ctype, pool, rule_defs, version_id=None, ignore_default=False): + """Generate associations based on a candidate ID + + Parameters + ---------- + cid_ctype : (str, str) + 2-tuple of candidate ID and the candidate type + + pool : AssociationPool + The pool to generate from. + + rule_defs : [File-like[,...]] or None + The rule definitions to use. None to use the defaults if `ignore_default` is False. + + version_id : None, True, or str + The string to use to tag associations and products. + If None, no tagging occurs. + If True, use a timestamp + If a string, the string. + + ignore_default : bool + Ignore the default rules. Use only the user-specified ones. + + Returns + ------- + associations : [Association[,...]] + List of associations + """ + cid, ctype = cid_ctype + logger.info(f'Generating associations on candidate {cid_ctype}') + + # Get the pool + pool_cid = pool_from_candidate(pool, cid) + pool_cid['asn_candidate'] = [f"[('{cid}', '{ctype}')]"] * len(pool_cid) + logger.info(f'Length of pool for {cid}: {len(pool_cid)}') + + # Create the rules with the simplified asn_candidate constraint + asn_constraint = constrain_on_candidates([cid]) + rules = AssociationRegistry(rule_defs, include_default=not ignore_default, global_constraints=asn_constraint, name=CANDIDATE_RULESET) + + # Get the associations + associations = generate(pool_cid, rules, version_id=version_id, finalize=False) + + return associations + + +def ids_by_ctype(pool): + """Groups candidate ids by the candidate type + + Parameters + ---------- + pool : AssociationPool + The association pool + + Returns + ------- + ids_by_ctype : {ctype: counter} + Dict with the key of the candidate type. Value is a + `collections.Counter` object of the ids and their counts. + """ + ids_by_ctype = collections.defaultdict(list) + for exp_candidates in pool['asn_candidate']: + candidates = evaluate(exp_candidates) + if isinstance(candidates, int): + ids_by_ctype['unknown'].append(str(candidates)) + continue + try: + for id, ctype in candidates: + ids_by_ctype[ctype].append(id) + except ValueError: + logger.debug('Cannot parse asn_candidate field: %s', candidates) + + for ctype in ids_by_ctype: + ids_by_ctype[ctype] = collections.Counter(ids_by_ctype[ctype]) + + return ids_by_ctype + + +def pool_from_candidate(pool, candidate): + """Create a pool containing only the candidate + + Parameters + ---------- + pool : AssociationPool + The pool to filter from. + + candidate : str + The candidate id to filter. + + Returns + ------- + candidate_pool : AssociationPool + Pool containing only the candidate + """ + candidate_pool = pool[[candidate in row['asn_candidate'] for row in pool]] + return candidate_pool diff --git a/jwst/associations/generator/generate_per_pool.py b/jwst/associations/generator/generate_per_pool.py new file mode 100644 index 0000000000..45694419a9 --- /dev/null +++ b/jwst/associations/generator/generate_per_pool.py @@ -0,0 +1,123 @@ +# Generate associations per-pool +import logging + +from .generate import generate +from ..lib.utilities import constrain_on_candidates, filter_discovered_only +from ..registry import AssociationRegistry + +__all__ = ['generate_per_pool'] + +# Configure logging +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + +# Ruleset names +DISCOVER_RULESET = 'discover' +CANDIDATE_RULESET = 'candidate' + + +def generate_per_pool(pool, rule_defs=None, candidate_ids=None, all_candidates=True, discover=False, + version_id=None, finalize=True, merge=False, ignore_default=False): + """Generate associations on a specified pool + + Association candidates are filtered based on global constraints added to the rules. + + Parameters + ---------- + pool : AssociationPool + The pool to generate from. + + rule_defs : [File-like[,...] or None + List of rule definitions to use. None to use the defaults if `ignore_default` is False. + + candidate_ids : [str,[...]] or None + List of candidates to produce for. + + all_candidates : bool + Find associations for all possible candidates + + discover : bool + Find associations that are not candidate-based. + + version_id : None, True, or str + The string to use to tag associations and products. + If None, no tagging occurs. + If True, use a timestamp + If a string, the string. + + finalize : bool + Run all rule methods marked as 'finalized'. + + merge : bool + Merge single associations into a common association with separate products. + + ignore_default : bool + Ignore the default rules. Use only the user-specified ones. + + Returns + ------- + associations : [Association[,...]] + List of associations + + Notes + ----- + Refer to the :ref:`Association Generator ` + documentation for a full description. + """ + logger.info('Generating based on the per-pool algorithm') + + # Setup the rule registry + global_constraints = None + if discover or all_candidates: + global_constraints = constrain_on_candidates( + None + ) + elif candidate_ids is not None: + global_constraints = constrain_on_candidates( + candidate_ids + ) + + rules = AssociationRegistry( + rule_defs, + include_default=not ignore_default, + global_constraints=global_constraints, + name=CANDIDATE_RULESET + ) + + if discover: + rules.update( + AssociationRegistry( + rule_defs, + include_default=not ignore_default, + name=DISCOVER_RULESET + ) + ) + + # Generate the associations + associations = generate(pool, rules, version_id=version_id, finalize=finalize) + + # If doing discover, filter out all specified candidates + if discover: + logger.debug( + '# asns found before discover filtering={}'.format( + len(associations) + ) + ) + associations = filter_discovered_only( + associations, + DISCOVER_RULESET, + CANDIDATE_RULESET, + keep_candidates=all_candidates, + ) + rules.Utility.resequence(associations) + + # Do a grand merging. This is done particularly for + # Level2 associations. + if merge: + try: + associations = rules.Utility.merge_asns(associations) + except AttributeError: + pass + + # That's all folks + return associations diff --git a/jwst/associations/lib/constraint.py b/jwst/associations/lib/constraint.py index 2612e815e7..34193033e0 100644 --- a/jwst/associations/lib/constraint.py +++ b/jwst/associations/lib/constraint.py @@ -3,6 +3,7 @@ import abc import collections from copy import deepcopy +from functools import wraps from itertools import chain import logging import re @@ -48,6 +49,10 @@ class SimpleConstraintABC(abc.ABC): Attributes ---------- + found_values : set(str[,...]) + Set of actual found values for this condition. True SimpleConstraints + do not normally set this; the value is not different than `value`. + matched : bool Last call to `check_and_set` """ @@ -55,17 +60,40 @@ class SimpleConstraintABC(abc.ABC): # Attributes to show in the string representation. _str_attrs = ('name', 'value') + def __new__(cls, *args, **kwargs): + """Force creation of the constraint attribute dict before anything else.""" + obj = super().__new__(cls) + obj._ca_history = collections.deque() + obj._constraint_attributes = {} + return obj + def __init__(self, init=None, value=None, name=None, **kwargs): # Defined attributes self.value = value self.name = name self.matched = False + self.found_values = set() if init is not None: - self.__dict__.update(init) + self._constraint_attributes.update(init) else: - self.__dict__.update(kwargs) + self._constraint_attributes.update(kwargs) + + def __getattr__(self, name): + """Retrieve user defined attribute""" + if name.startswith('_'): + return super().__getattribute__(name) + if name in self._constraint_attributes: + return self._constraint_attributes[name] + raise AttributeError(f'No such attribute {name}') + + def __setattr__(self, name, value): + """Store all attributes in the user dictionary""" + if not name.startswith('_'): + self._constraint_attributes[name] = value + else: + object.__setattr__(self, name, value) @abc.abstractmethod def check_and_set(self, item): @@ -134,6 +162,19 @@ def get_all_attr(self, attribute: str): # -> list[tuple[SimpleConstraint, typing return [(self, value)] return [] + def restore(self): + """Restore constraint state""" + try: + self._constraint_attributes = self._ca_history.pop() + except IndexError: + logger.debug('No more attribute history to restore from. restore is a NOOP') + + def preserve(self): + """Save the current state of the constraints""" + ca_copy = self._constraint_attributes.copy() + ca_copy['found_values'] = self._constraint_attributes['found_values'].copy() + self._ca_history.append(ca_copy) + # Make iterable to work with `Constraint`. # Since this is a leaf, simple return ourselves. def __iter__(self): @@ -142,7 +183,7 @@ def __iter__(self): def __repr__(self): result = '{}({})'.format( self.__class__.__name__, - str(self.__dict__) + str(self._constraint_attributes) ) return result @@ -274,7 +315,6 @@ def __init__( reprocess_rules=None, **kwargs ): - # Defined attributes self.sources = sources self.force_unique = force_unique @@ -413,7 +453,7 @@ def __init__(self, self.only_on_match = only_on_match self.onlyif = onlyif self.required = required - super(AttrConstraint, self).__init__(init=init, **kwargs) + super().__init__(init=init, **kwargs) # Give some defaults real meaning. if invalid_values is None: @@ -760,6 +800,16 @@ def get_all_attr(self, attribute: str): # -> list[tuple[typing.Union[SimpleConst return result + def preserve(self): + """Preserve all constraint states""" + for constraint in self.constraints: + constraint.preserve() + + def restore(self): + """Restore all constraint states""" + for constraint in self.constraints: + constraint.restore() + @staticmethod def all(item, constraints): """Return positive only if all results are positive.""" diff --git a/jwst/associations/lib/process_list.py b/jwst/associations/lib/process_list.py index 96fdfe44f1..7a9aa1ec71 100644 --- a/jwst/associations/lib/process_list.py +++ b/jwst/associations/lib/process_list.py @@ -1,4 +1,23 @@ -"""Reprocessing List""" +"""Reprocessing Lists and Queues + +This modules defines what process lists are and queues of process lists. + +A process list, `ProcessList`, is a list of (items, rules) and meta information +, most notably `work_over`. `work_over` is one of the values of `ListCategory`. +A `ListCategory` defines which stage of association processing the list is +relevant to. In other words, the order, or priority, of when a list should be processed +is defined by its `ListCategory`. The priority is the value of each `ListCategory`, +starting with zero. + +ProcessLists are primarily put into queues for processing. There are two +queues for handling ProcessLists. `ProcessListQueue` is a basic +First-In-First-Out (FIFO) queue that can be used as a generator. + +The second queue is `ProcessQueueSorted`, which returns ProcessLists according to +their priority as defined by each ProcessList's `work_over`. An important aspect of +ProcessQueueSorted is that it is mutable: New ProcessLists can be added to the queue +while iterating over the queue. +""" from collections import deque from enum import Enum from functools import reduce @@ -14,15 +33,19 @@ class ListCategory(Enum): - RULES = 0 - BOTH = 1 - EXISTING = 2 - NONSCIENCE = 3 + """The work_over categories for ProcessLists""" + RULES = 0 # Operate over rules only + BOTH = 1 # Operate over both rules and existing associations + EXISTING = 2 # Operate over existing associations only + NONSCIENCE = 3 # Items that are not science specific that should be applied to only + # existing associations class ProcessItem: """Items to be processed + Create hashable objects from a list of arbitrary objects. + Parameters ---------- obj : object @@ -77,8 +100,8 @@ class ProcessList: work_over : int What the reprocessing should work on: - - `ProcessList.EXISTING`: Only existing associations - `ProcessList.RULES`: Only on the rules to create new associations + - `ProcessList.EXISTING`: Only existing associations - `ProcessList.BOTH`: Compare to both existing and rules - `ProcessList.NONSCIENCE`: Only on non-science items @@ -161,10 +184,35 @@ def __iter__(self): class ProcessListQueue: """First-In-First-Out queue of ProcessLists + ProcessLists can be added either individually using `append` method, or + a list of ProcessLists can be added through object initialization or + the `extend` method. + + There are two generators implement. The first is the ProcessListQueue + object itself. When the object is used as a generator, the generator will + return the earliest ProcessList added to the queue (FIFO), popping that + ProcessList from the queue, hence draining the queue. + + The second generator is returned by the `items` method. This method will + return all the items from all the ProcessLists in the queue, + non-destructively. The ProcessLists are accessed in their order in the + queue, and then each item is retrieved from their ProcessList in the list + order of the ProcessList. + + A final feature of ProcessListQueue is that it is mutable: New items can + be added to the queue while items are being popped from the queue. + Parameters ---------- init : [ProcessList[,...]] or None List of ProcessLists to put on the queue. + + Notes + ----- + The FIFO operations depends on the fact that, inherently, + `dict` preserves order in which key/value pairs are added to the + dictionary. + """ def __init__(self, init=None): self._queue = dict() @@ -172,7 +220,7 @@ def __init__(self, init=None): self.extend(init) def append(self, process_list): - """Add ProcessList to queue""" + """Add ProcessList to queue, if not already in the queue.""" plhash = process_list.hash if plhash not in self._queue: self._queue[plhash] = process_list @@ -180,7 +228,7 @@ def append(self, process_list): self._queue[plhash].update(process_list) def extend(self, iterable): - """Add objects if not already in the queue""" + """Add lists of ProcessLists if not already in the queue""" for process_list in iterable: self.append(process_list) @@ -215,13 +263,26 @@ def __str__(self): class ProcessQueueSorted: """Sort ProcessItem based on work_over - `ProcessList`s are handled in order of `RULES`, `BOTH`, - `EXISTING`, and `NONSCIENCE`. + Create a generator that implements a First-In-First-Out (FIFO) queue, with the one + modification that the queues are handled in order of their `work_over` priority. + For example, even if a ProcessList with work_over of ListCategory.EXISTING had + been added to the queue before a ProcessList with work_over of ListCategory.RULES, + the second ProcessList will be returned before the first. + + ProcessQueueSorted is also mutable: ProcessLists can be added to the queue + while the lists are being popped from the queue. When doing so, it is + important to remember that the order of return, as described above, still + pertains. For example, if the queue only has ProcessLists of work_over + ListCategory.EXISTING, and a new ProcessList of work_over + ListCategory.RULES is added during iteration, the next list returned will + be the RULES one, because the RULES lists have priority over EXISTING + lists, regardless of when the list was added. Parameters ---------- init : [ProcessList[,...]] List of `ProcessList` to start the queue with. + """ def __init__(self, init=None): self.queues = { diff --git a/jwst/associations/lib/utilities.py b/jwst/associations/lib/utilities.py index ec781072c9..ee3f37f5f7 100644 --- a/jwst/associations/lib/utilities.py +++ b/jwst/associations/lib/utilities.py @@ -5,35 +5,40 @@ from numpy.ma import masked +from .. import config + # Configure logging logger = logging.getLogger(__name__) logger.addHandler(logging.NullHandler()) -def return_on_exception(exceptions=(Exception,), default=None): - """Decorator to force functions raising exceptions to return a value +def constrain_on_candidates(candidates): + """Create a constraint based on a list of candidates Parameters ---------- - exceptions: (Exception(,...)) - Tuple of exceptions to catch - - default: obj - The value to return when a specified exception occurs + candidates : (str, ...) or None + List of candidate id's. + If None, then all candidates are matched. """ - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except exceptions as err: - logger.debug( - 'Caught exception %s in function %s, forcing return value of %s', - err, func, default - ) - return default - return wrapper - return decorator + from .dms_base import DMSAttrConstraint + if candidates is not None and len(candidates): + c_list = '|'.join(candidates) + values = ''.join([ + '.+(', c_list, ').+' + ]) + else: + values = None + constraint = DMSAttrConstraint( + name='asn_candidate', + sources=['asn_candidate'], + value=values, + force_unique=True, + is_acid=True, + evaluate=True, + ) + + return constraint def evaluate(value): @@ -57,6 +62,74 @@ def evaluate(value): return evaled +def filter_discovered_only( + associations, + discover_ruleset, + candidate_ruleset, + keep_candidates=True, +): + """Return only those associations that have multiple candidates + + Parameters + ---------- + associations : iterable + The list of associations to check. The list + is that returned by the `generate` function. + + discover_ruleset : str + The name of the ruleset that has the discover rules + + candidate_ruleset : str + The name of the ruleset that finds just candidates + + keep_candidates : bool + Keep explicit candidate associations in the list. + + Returns + ------- + iterable + The new list of just cross candidate associations. + + Notes + ----- + This utility is only meant to run on associations that have + been constructed. Associations that have been Association.dump + and then Association.load will not return proper results. + """ + from .prune import identify_dups + + # Split the associations along discovered/not discovered lines + dups, valid = identify_dups(associations) + asn_by_ruleset = { + candidate_ruleset: [], + discover_ruleset: [] + } + for asn in valid: + asn_by_ruleset[asn.registry.name].append(asn) + candidate_list = asn_by_ruleset[candidate_ruleset] + discover_list = asn_by_ruleset[discover_ruleset] + + # Filter out the non-unique discovered. + for candidate in candidate_list: + if len(discover_list) == 0: + break + unique_list = [] + for discover in discover_list: + if discover != candidate: + unique_list.append(discover) + + # Reset the discovered list to the new unique list + # and try the next candidate. + discover_list = unique_list + + if keep_candidates: + discover_list.extend(candidate_list) + + if config.DEBUG: + discover_list += dups + return discover_list + + def getattr_from_list(adict, attributes, invalid_values=None): """Retrieve value from dict using a list of attributes @@ -102,6 +175,32 @@ def getattr_from_list(adict, attributes, invalid_values=None): raise KeyError('Object has no attributes in {}'.format(attributes)) +def return_on_exception(exceptions=(Exception,), default=None): + """Decorator to force functions raising exceptions to return a value + + Parameters + ---------- + exceptions: (Exception(,...)) + Tuple of exceptions to catch + + default: obj + The value to return when a specified exception occurs + """ + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except exceptions as err: + logger.debug( + 'Caught exception %s in function %s, forcing return value of %s', + err, func, default + ) + return default + return wrapper + return decorator + + @return_on_exception(exceptions=(KeyError,), default=None) def getattr_from_list_nofail(*args, **kwargs): """Call getattr_from_list without allows exceptions. diff --git a/jwst/associations/main.py b/jwst/associations/main.py index e9cd83ff0b..888c560bc6 100644 --- a/jwst/associations/main.py +++ b/jwst/associations/main.py @@ -9,27 +9,16 @@ from jwst.associations import ( __version__, AssociationPool, - AssociationRegistry, - generate, ) from jwst.associations import config from jwst.associations.exceptions import AssociationError -from jwst.associations.lib.constraint import ( - ConstraintTrue, -) -from jwst.associations.lib.dms_base import DMSAttrConstraint from jwst.associations.lib.log_config import (log_config, DMS_config) -from jwst.associations.lib.prune import identify_dups __all__ = ['Main', 'main'] # Configure logging logger = log_config(name=__package__) -# Ruleset names -DISCOVER_RULESET = 'discover' -CANDIDATE_RULESET = 'candidate' - class Main(): """ @@ -159,66 +148,33 @@ def configure(self, args=None, pool=None): # 2) Only discovered associations that do not match # candidate associations # 3) Both discovered and all candidate associations. - logger.info('Reading rules.') if not parsed.discover and\ not parsed.all_candidates and\ parsed.asn_candidate_ids is None: parsed.discover = True parsed.all_candidates = True - if parsed.discover or parsed.all_candidates: - global_constraints = constrain_on_candidates( - None - ) - elif parsed.asn_candidate_ids is not None: - global_constraints = constrain_on_candidates( - parsed.asn_candidate_ids - ) - - self.rules = AssociationRegistry( - parsed.rules, - include_default=not parsed.ignore_default, - global_constraints=global_constraints, - name=CANDIDATE_RULESET - ) - - if parsed.discover: - self.rules.update( - AssociationRegistry( - parsed.rules, - include_default=not parsed.ignore_default, - name=DISCOVER_RULESET - ) - ) def generate(self): """Generate the associations""" logger.info('Generating associations.') parsed = self.parsed - self.associations = generate( - self.pool, self.rules, version_id=parsed.version_id, finalize=not parsed.no_finalize - ) + if parsed.per_pool_algorithm: + from jwst.associations.generator.generate_per_pool import generate_per_pool - if parsed.discover: - logger.debug( - '# asns found before discover filtering={}'.format( - len(self.associations) - ) + self.associations = generate_per_pool( + self.pool, rule_defs=parsed.rules, + candidate_ids=parsed.asn_candidate_ids, all_candidates=parsed.all_candidates, discover=parsed.discover, + version_id=parsed.version_id, finalize=not parsed.no_finalize, merge=parsed.merge, ignore_default=parsed.ignore_default ) - self.associations = filter_discovered_only( - self.associations, - DISCOVER_RULESET, - CANDIDATE_RULESET, - keep_candidates=parsed.all_candidates, + else: + from jwst.associations.generator.generate_per_candidate import generate_per_candidate + + self.associations = generate_per_candidate( + self.pool, rule_defs=parsed.rules, + candidate_ids=parsed.asn_candidate_ids, all_candidates=parsed.all_candidates, discover=parsed.discover, + version_id=parsed.version_id, finalize=not parsed.no_finalize, merge=parsed.merge, ignore_default=parsed.ignore_default ) - self.rules.Utility.resequence(self.associations) - # Do a grand merging. This is done particularly for - # Level2 associations. - if parsed.merge: - try: - self.associations = self.rules.Utility.merge_asns(self.associations) - except AttributeError: - pass logger.debug(self.__str__()) @@ -356,6 +312,11 @@ def parse_args(self, args=None, has_pool=False): '--no-merge', action=DeprecateNoMerge, help='Deprecated: Default is to not merge. See "--merge".' ) + parser.add_argument( + '--per-pool-algorithm', + action='store_true', + help='Use the original, per-pool, algorithm that does not segment pools based on candidates' + ) self.parsed = parser.parse_args(args=args) @@ -431,99 +392,5 @@ def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, values) -def constrain_on_candidates(candidates): - """Create a constraint based on a list of candidates - - Parameters - ---------- - candidates : (str, ...) or None - List of candidate id's. - If None, then all candidates are matched. - """ - if candidates is not None and len(candidates): - c_list = '|'.join(candidates) - values = ''.join([ - '.+(', c_list, ').+' - ]) - else: - values = None - constraint = DMSAttrConstraint( - name='asn_candidate', - sources=['asn_candidate'], - value=values, - force_unique=True, - is_acid=True, - evaluate=True, - ) - - return constraint - - -def filter_discovered_only( - associations, - discover_ruleset, - candidate_ruleset, - keep_candidates=True, -): - """Return only those associations that have multiple candidates - - Parameters - ---------- - associations : iterable - The list of associations to check. The list - is that returned by the `generate` function. - - discover_ruleset : str - The name of the ruleset that has the discover rules - - candidate_ruleset : str - The name of the ruleset that finds just candidates - - keep_candidates : bool - Keep explicit candidate associations in the list. - - Returns - ------- - iterable - The new list of just cross candidate associations. - - Notes - ----- - This utility is only meant to run on associations that have - been constructed. Associations that have been Association.dump - and then Association.load will not return proper results. - """ - # Split the associations along discovered/not discovered lines - dups, valid = identify_dups(associations) - asn_by_ruleset = { - candidate_ruleset: [], - discover_ruleset: [] - } - for asn in valid: - asn_by_ruleset[asn.registry.name].append(asn) - candidate_list = asn_by_ruleset[candidate_ruleset] - discover_list = asn_by_ruleset[discover_ruleset] - - # Filter out the non-unique discovered. - for candidate in candidate_list: - if len(discover_list) == 0: - break - unique_list = [] - for discover in discover_list: - if discover != candidate: - unique_list.append(discover) - - # Reset the discovered list to the new unique list - # and try the next candidate. - discover_list = unique_list - - if keep_candidates: - discover_list.extend(candidate_list) - - if config.DEBUG: - discover_list += dups - return discover_list - - if __name__ == '__main__': Main() diff --git a/jwst/associations/tests/test_asn_names.py b/jwst/associations/tests/test_asn_names.py index f4b4865464..204893c7df 100644 --- a/jwst/associations/tests/test_asn_names.py +++ b/jwst/associations/tests/test_asn_names.py @@ -8,7 +8,7 @@ from jwst import associations from jwst.associations import generate from jwst.associations import load_asn -from jwst.associations.main import constrain_on_candidates +from jwst.associations.lib.utilities import constrain_on_candidates LEVEL3_ASN_ACID_NAME_REGEX = ( r'jw' diff --git a/jwst/associations/tests/test_constraints.py b/jwst/associations/tests/test_constraints.py index 741b134e37..e38b27b35a 100644 --- a/jwst/associations/tests/test_constraints.py +++ b/jwst/associations/tests/test_constraints.py @@ -8,6 +8,27 @@ ) +def test_constraint_history(): + """Test the saving and restoring of constraint values + + A baseline case is also checked to ensure no cross-talk + between constraints and to check restore without previous preserve. + """ + sc1 = SimpleConstraint(name='sc1') + sc2 = SimpleConstraint(name='sc2') + assert sc1.name == 'sc1' + assert sc2.name == 'sc2' + sc1.preserve() + sc2.preserve() + sc1.name = 'sc1 modified' + assert sc1.name == 'sc1 modified' + assert sc2.name == 'sc2' + sc1.restore() + sc2.restore() + assert sc1.name == 'sc1' + assert sc2.name == 'sc2' + + def test_sc_dup_names(): """Test that SimpleConstraint returns an empty dict""" sc = SimpleConstraint(name='sc_name') diff --git a/jwst/associations/tests/test_io.py b/jwst/associations/tests/test_io.py index 3f01c23537..0968745c12 100644 --- a/jwst/associations/tests/test_io.py +++ b/jwst/associations/tests/test_io.py @@ -65,5 +65,5 @@ def test_load_asn_all(make_asns): for asn_file in asn_files: with open(asn_file, 'r') as asn_fp: - asns = load_asn(asn_fp, registry=generated.rules, first=False) + asns = load_asn(asn_fp, first=False) assert len(asns) > 1 diff --git a/jwst/associations/tests/test_level2_background.py b/jwst/associations/tests/test_level2_background.py index e77b2042fc..4af3eb8685 100644 --- a/jwst/associations/tests/test_level2_background.py +++ b/jwst/associations/tests/test_level2_background.py @@ -6,7 +6,7 @@ ) from jwst.associations import generate -from jwst.associations.main import constrain_on_candidates +from jwst.associations.lib.utilities import constrain_on_candidates DITHER_PATTERN_MULTIPLIER = { '0': 1, # No pattern, 1-to-1 exposure count diff --git a/jwst/associations/tests/test_level3_duplicate.py b/jwst/associations/tests/test_level3_duplicate.py index 29a738e89d..7fe76609ad 100644 --- a/jwst/associations/tests/test_level3_duplicate.py +++ b/jwst/associations/tests/test_level3_duplicate.py @@ -9,7 +9,8 @@ ) from jwst.associations import (AssociationPool, generate) -from jwst.associations.main import (Main, constrain_on_candidates) +from jwst.associations.main import Main +from jwst.associations.lib.utilities import constrain_on_candidates def test_duplicate_names(caplog): diff --git a/jwst/associations/tests/test_level3_spectrographic.py b/jwst/associations/tests/test_level3_spectrographic.py index 8d5aaa8f9a..387e639120 100644 --- a/jwst/associations/tests/test_level3_spectrographic.py +++ b/jwst/associations/tests/test_level3_spectrographic.py @@ -11,7 +11,7 @@ ) from jwst.associations import generate -from jwst.associations.main import constrain_on_candidates +from jwst.associations.lib.utilities import constrain_on_candidates class TestLevel3Spec(BasePoolRule): diff --git a/jwst/associations/tests/test_level3_wfs.py b/jwst/associations/tests/test_level3_wfs.py index 60f7a4d4f0..b83b82fd7b 100644 --- a/jwst/associations/tests/test_level3_wfs.py +++ b/jwst/associations/tests/test_level3_wfs.py @@ -3,7 +3,7 @@ from jwst.associations.tests import helpers from jwst.associations import generate -from jwst.associations.main import constrain_on_candidates +from jwst.associations.lib.utilities import constrain_on_candidates # Generate Level3 associations all_candidates = constrain_on_candidates(None) diff --git a/jwst/regtest/test_associations_sdp_pools.py b/jwst/regtest/test_associations_sdp_pools.py index 7f29fac43f..ce25e54432 100644 --- a/jwst/regtest/test_associations_sdp_pools.py +++ b/jwst/regtest/test_associations_sdp_pools.py @@ -32,6 +32,11 @@ 'slow': False, } SPECIAL_POOLS = { + 'jw00016_20230331t130733_pool': { + 'args': [], + 'xfail': 'See issue JP-3516', + 'slow': False, + }, 'jw00623_20190607t021101_pool': { 'args': [], 'xfail': None, @@ -77,6 +82,11 @@ 'xfail': None, 'slow': False, }, + 'jw01093_20230626t181341_pool': { + 'args': [], + 'xfail': None, + 'slow': True, + }, 'jw01194_20230115t113819_pool': { 'args': [], 'xfail': None,