From 0aabbde93b8ca8bc4af6b2a65fe73b29030712c0 Mon Sep 17 00:00:00 2001 From: Kira Date: Wed, 28 Aug 2024 11:07:08 -0700 Subject: [PATCH 1/4] loop frigate on multiple nights --- frigate/utils/frigatemain.py | 67 +++++++++++++++++++++ frigate/utils/parsers.py | 113 +++++++++++++++++++++++++++++++++++ scripts/loop-frigate.py | 13 ++++ 3 files changed, 193 insertions(+) create mode 100644 frigate/utils/frigatemain.py create mode 100644 scripts/loop-frigate.py diff --git a/frigate/utils/frigatemain.py b/frigate/utils/frigatemain.py new file mode 100644 index 0000000..4c36024 --- /dev/null +++ b/frigate/utils/frigatemain.py @@ -0,0 +1,67 @@ +from frigate.utils.datasets import save_dataframe +from frigate.utils.kowalski import get_candidates_from_kowalski +from frigate.utils.skyportal import get_candids_per_filter_from_skyportal + + +def str_to_bool(value): + if isinstance(value, bool): + return value + if value.lower() in {"false", "f", "0", "no", "n"}: + return False + elif value.lower() in {"true", "t", "1", "yes", "y"}: + return True + raise ValueError(f"{value} is not a valid boolean value") + + +def process_candidates(args): + # GET CANDIDATES FROM KOWALSKI + candidates, err = get_candidates_from_kowalski( + args.start, + args.end, + args.programids, + n_threads=args.n_threads, + low_memory=args.low_memory, + low_memory_format=args.output_format, + low_memory_dir=args.output_directory, + format=args.output_format, + ) + if err or candidates is None: + print(err) + return + + # GET CANDIDATES FROM SKYPORTAL + print("Getting candidates from SkyPortal using the following filters:") + print(args.filterids) + print("Getting candidates from SkyPortal using the following groupIDs:") + print(args.groupids) + candids_per_filter, err = get_candids_per_filter_from_skyportal( + args.start, args.end, args.groupids, args.filterids, saved=False + ) + if err or candids_per_filter is None: + print(err) + return + + # ADD PASSED FILTERS TO CANDIDATES + candidates["passed_filters"] = [[] for _ in range(len(candidates))] + for filterID, candids in candids_per_filter.items(): + try: + idx = candidates[candidates["candid"].isin(candids)].index + candidates.loc[idx, "passed_filters"] = candidates.loc[ + idx, "passed_filters" + ].apply(lambda x: x + [filterID]) + except KeyError: + print(f"Candid {candids} not found in candidates dataframe, skipping...") + continue + + # SAVE CANDIDATES TO DISK + filename = f"{args.start}_{args.end}_{'_'.join(map(str, args.programids))}" + filepath = save_dataframe( + df=candidates, + filename=filename, + output_format=args.output_format, + output_compression=args.output_compression, + output_compression_level=args.output_compression_level, + output_directory=args.output_directory, + ) + + print(f"Saved candidates to {filepath}") \ No newline at end of file diff --git a/frigate/utils/parsers.py b/frigate/utils/parsers.py index a261015..a68ee5a 100644 --- a/frigate/utils/parsers.py +++ b/frigate/utils/parsers.py @@ -209,3 +209,116 @@ def stats_parser_args(): raise ValueError("No columns provided") return args + +def loop_parser(): + parser = argparse.ArgumentParser(description='Run frigate with specified parameters.') + parser.add_argument("--programids", type=str, default="1,2,3", help="Program IDs to query") + parser.add_argument('--start', nargs='+', required=True, help='List of start values') + parser.add_argument("--nb_days", type=float, default=1.0, help="Number of days to query") + parser.add_argument("--sp_token", type=str, default=None, help="Skyportal token") + parser.add_argument("--k_token", type=str, default=None, help="Kowalski token") + parser.add_argument("--groupids", type=str, default="*", help="SkyPortal Group IDs to query") + parser.add_argument("--filterids", type=str, help="SkyPortal/Kowalski Filter IDs to query") + parser.add_argument("--output_format", type=str, default="parquet", help="Output format for the results") + parser.add_argument( + "--n_threads", + type=str, + default=None, + help="Number of threads to use when parallelizing queries", + ) + parser.add_argument( + "--output_compression", + type=str, + default=None, + help="Output compression for the results", + ) + parser.add_argument( + "--output_compression_level", + type=int, + default=None, + help="Output compression level for the results", + ) + parser.add_argument( + "--output_directory", + type=str, + default="./data", + help="Output directory for the results", + ) + parser.add_argument( + "--low_memory", + type=str_to_bool, + default=False, + help="Use low memory mode, to reduce RAM usage", + ) + return parser + + +def loop_parser_args(): + args = loop_parser().parse_args() + if not args.k_token: + # we try to get the token from the environment if it is not provided here + k_token_env = os.environ.get("KOWALSKI_TOKEN") + if k_token_env: + args.k_token = k_token_env + else: + # if provided, we add the token in the environment instead + os.environ["KOWALSKI_TOKEN"] = args.k_token + + if not args.sp_token: + # we try to get the token from the environment if it is not provided here + sp_token_env = os.environ.get("SKYPORTAL_TOKEN") + if sp_token_env: + args.sp_token = sp_token_env + else: + # if provided, we add the token in the environment instead + os.environ["SKYPORTAL_TOKEN"] = args.sp_token + + # validate the output options + try: + validate_output_options( + args.output_format, + args.output_compression, + args.output_compression_level, + args.output_directory, + ) + except ValueError as e: + raise ValueError(f"Invalid output options: {e}") + + # validate the number of threads + n_threads = args.n_threads + if n_threads is None: + n_threads = multiprocessing.cpu_count() + else: + n_threads = int(n_threads) + n_threads = min(n_threads, multiprocessing.cpu_count()) + args.n_threads = n_threads + + # validate the programids + try: + programids = list(map(int, args.programids.split(","))) + except ValueError: + raise ValueError(f"Invalid programids: {args.programids}") + args.programids = programids + + # validate the groupids + if args.groupids: + if args.groupids[0] not in ["*", "all"]: + try: + groupids = list(map(int, args.groupids.split(","))) + except ValueError: + raise ValueError(f"Invalid groupids: {args.groupids}") + args.groupids = groupids + if args.groupids in [[], None, ""]: + args.groupids = [] + + # validate the filterids + if args.filterids: + try: + filterids = list(map(int, args.filterids.split(","))) + except ValueError: + raise ValueError(f"Invalid filterids: {args.filterids}") + args.filterids = filterids + if args.filterids in [[], None, ""]: + args.filterids = [] + + return args diff --git a/scripts/loop-frigate.py b/scripts/loop-frigate.py new file mode 100644 index 0000000..2a0ec57 --- /dev/null +++ b/scripts/loop-frigate.py @@ -0,0 +1,13 @@ +from frigate.utils.parsers import loop_parser_args +from frigate.utils.frigatemain import process_candidates + +args = loop_parser_args() +start_values = args.start + +for start in start_values: + try: + args.start = float(start) + args.end = args.start + args.nb_days + process_candidates(args) + except Exception as e: + print(f"Error occurred while running the command for start value {start}: {e}") \ No newline at end of file From 38fc145d939fd94019f299fc1a92e13028af4c6d Mon Sep 17 00:00:00 2001 From: Kira Date: Wed, 28 Aug 2024 11:23:53 -0700 Subject: [PATCH 2/4] address pre-commit failure --- .github/workflows/lint.yaml | 1 + README.md | 4 ++-- frigate/utils/frigatemain.py | 2 +- frigate/utils/parsers.py | 34 ++++++++++++++++++++++++++-------- scripts/loop-frigate.py | 2 +- 5 files changed, 31 insertions(+), 12 deletions(-) diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index f27c5c0..e6f4d8f 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -4,6 +4,7 @@ on: push: branches: - main + - loop pull_request: permissions: diff --git a/README.md b/README.md index 9cccb18..8f4c3fc 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,8 @@ To run the deprecated `scripts/alert-stats.py` script, you can run: PYTHONPATH=. python scripts/alert-stats.py --feature='candidate.magpsf,candidate.sigmapsf' --programids=1,2 --plot=True --start=2460355.5 --nb_days=1 --sp_token= --sp_groupIDs=41 --sp_filterIDs=1 --nb_bins=1000 --k_token= ``` -- [X] Fetch all the features of all alert packets within a given time range with given program ids and store it as a pandas dataframe -- [X] Fetch all of the candidates that passed filters in Fritz (with exact candid, not just objectIds). Relies on the new /api/candidates_filter endpoint. +- [x] Fetch all the features of all alert packets within a given time range with given program ids and store it as a pandas dataframe +- [x] Fetch all of the candidates that passed filters in Fritz (with exact candid, not just objectIds). Relies on the new /api/candidates_filter endpoint. - [ ] Looking at the subset of alerts that passed the filters, find the obj_id of the sources that were saved in Fritz. - [ ] Update the dataframe with a column containing the list of filters passed for each alert, and a column containing the groupIDs for each alert which obj has been saved as a source to the groups associated to the filters passed. - [ ] Figure out what visualizations tools and plots we can use to represent the data in a meaningful way and extract insights from it. diff --git a/frigate/utils/frigatemain.py b/frigate/utils/frigatemain.py index 4c36024..a1e0483 100644 --- a/frigate/utils/frigatemain.py +++ b/frigate/utils/frigatemain.py @@ -64,4 +64,4 @@ def process_candidates(args): output_directory=args.output_directory, ) - print(f"Saved candidates to {filepath}") \ No newline at end of file + print(f"Saved candidates to {filepath}") diff --git a/frigate/utils/parsers.py b/frigate/utils/parsers.py index a68ee5a..b22560d 100644 --- a/frigate/utils/parsers.py +++ b/frigate/utils/parsers.py @@ -210,16 +210,34 @@ def stats_parser_args(): return args + def loop_parser(): - parser = argparse.ArgumentParser(description='Run frigate with specified parameters.') - parser.add_argument("--programids", type=str, default="1,2,3", help="Program IDs to query") - parser.add_argument('--start', nargs='+', required=True, help='List of start values') - parser.add_argument("--nb_days", type=float, default=1.0, help="Number of days to query") + parser = argparse.ArgumentParser( + description="Run frigate with specified parameters." + ) + parser.add_argument( + "--programids", type=str, default="1,2,3", help="Program IDs to query" + ) + parser.add_argument( + "--start", nargs="+", required=True, help="List of start values" + ) + parser.add_argument( + "--nb_days", type=float, default=1.0, help="Number of days to query" + ) parser.add_argument("--sp_token", type=str, default=None, help="Skyportal token") parser.add_argument("--k_token", type=str, default=None, help="Kowalski token") - parser.add_argument("--groupids", type=str, default="*", help="SkyPortal Group IDs to query") - parser.add_argument("--filterids", type=str, help="SkyPortal/Kowalski Filter IDs to query") - parser.add_argument("--output_format", type=str, default="parquet", help="Output format for the results") + parser.add_argument( + "--groupids", type=str, default="*", help="SkyPortal Group IDs to query" + ) + parser.add_argument( + "--filterids", type=str, help="SkyPortal/Kowalski Filter IDs to query" + ) + parser.add_argument( + "--output_format", + type=str, + default="parquet", + help="Output format for the results", + ) parser.add_argument( "--n_threads", type=str, @@ -254,7 +272,7 @@ def loop_parser(): def loop_parser_args(): - args = loop_parser().parse_args() + args = loop_parser().parse_args() if not args.k_token: # we try to get the token from the environment if it is not provided here k_token_env = os.environ.get("KOWALSKI_TOKEN") diff --git a/scripts/loop-frigate.py b/scripts/loop-frigate.py index 2a0ec57..18a6609 100644 --- a/scripts/loop-frigate.py +++ b/scripts/loop-frigate.py @@ -10,4 +10,4 @@ args.end = args.start + args.nb_days process_candidates(args) except Exception as e: - print(f"Error occurred while running the command for start value {start}: {e}") \ No newline at end of file + print(f"Error occurred while running the command for start value {start}: {e}") From 9e7ea3af6555ad2712d06d1ed8de46a16c781972 Mon Sep 17 00:00:00 2001 From: Kira Date: Fri, 30 Aug 2024 11:45:18 -0700 Subject: [PATCH 3/4] address PR comments --- .github/workflows/lint.yaml | 1 - frigate/__main__.py | 12 ++- frigate/utils/frigatemain.py | 67 --------------- frigate/utils/parsers.py | 159 ++++------------------------------- scripts/loop-frigate.py | 8 +- 5 files changed, 30 insertions(+), 217 deletions(-) delete mode 100644 frigate/utils/frigatemain.py diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index e6f4d8f..f27c5c0 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -4,7 +4,6 @@ on: push: branches: - main - - loop pull_request: permissions: diff --git a/frigate/__main__.py b/frigate/__main__.py index ce15653..15a63ed 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -14,10 +14,7 @@ def str_to_bool(value): raise ValueError(f"{value} is not a valid boolean value") -if __name__ == "__main__": - # PARSE COMMAND LINE ARGUMENTS - args = main_parser_args() - +def process_candidates(args): # GET CANDIDATES FROM KOWALSKI candidates, err = get_candidates_from_kowalski( args.start, @@ -77,3 +74,10 @@ def str_to_bool(value): ) print(f"Saved candidates to {filepath}") + + +# PARSE COMMAND LINE ARGUMENTS +args = main_parser_args() + +if __name__ == "__main__": + process_candidates(args) diff --git a/frigate/utils/frigatemain.py b/frigate/utils/frigatemain.py deleted file mode 100644 index a1e0483..0000000 --- a/frigate/utils/frigatemain.py +++ /dev/null @@ -1,67 +0,0 @@ -from frigate.utils.datasets import save_dataframe -from frigate.utils.kowalski import get_candidates_from_kowalski -from frigate.utils.skyportal import get_candids_per_filter_from_skyportal - - -def str_to_bool(value): - if isinstance(value, bool): - return value - if value.lower() in {"false", "f", "0", "no", "n"}: - return False - elif value.lower() in {"true", "t", "1", "yes", "y"}: - return True - raise ValueError(f"{value} is not a valid boolean value") - - -def process_candidates(args): - # GET CANDIDATES FROM KOWALSKI - candidates, err = get_candidates_from_kowalski( - args.start, - args.end, - args.programids, - n_threads=args.n_threads, - low_memory=args.low_memory, - low_memory_format=args.output_format, - low_memory_dir=args.output_directory, - format=args.output_format, - ) - if err or candidates is None: - print(err) - return - - # GET CANDIDATES FROM SKYPORTAL - print("Getting candidates from SkyPortal using the following filters:") - print(args.filterids) - print("Getting candidates from SkyPortal using the following groupIDs:") - print(args.groupids) - candids_per_filter, err = get_candids_per_filter_from_skyportal( - args.start, args.end, args.groupids, args.filterids, saved=False - ) - if err or candids_per_filter is None: - print(err) - return - - # ADD PASSED FILTERS TO CANDIDATES - candidates["passed_filters"] = [[] for _ in range(len(candidates))] - for filterID, candids in candids_per_filter.items(): - try: - idx = candidates[candidates["candid"].isin(candids)].index - candidates.loc[idx, "passed_filters"] = candidates.loc[ - idx, "passed_filters" - ].apply(lambda x: x + [filterID]) - except KeyError: - print(f"Candid {candids} not found in candidates dataframe, skipping...") - continue - - # SAVE CANDIDATES TO DISK - filename = f"{args.start}_{args.end}_{'_'.join(map(str, args.programids))}" - filepath = save_dataframe( - df=candidates, - filename=filename, - output_format=args.output_format, - output_compression=args.output_compression, - output_compression_level=args.output_compression_level, - output_directory=args.output_directory, - ) - - print(f"Saved candidates to {filepath}") diff --git a/frigate/utils/parsers.py b/frigate/utils/parsers.py index b22560d..e3b3fd3 100644 --- a/frigate/utils/parsers.py +++ b/frigate/utils/parsers.py @@ -25,9 +25,10 @@ def main_parser(): ) parser.add_argument( "--start", + nargs="+", type=str, default=np.floor(Time.now().jd - 1) + 0.5, - help="Start time for the query, default to 1 day ago", + help="Start time(s) for the query, default to 1 day ago", ) parser.add_argument( "--nb_days", type=float, default=1.0, help="Number of days to query" @@ -129,14 +130,17 @@ def main_parser_args(): args.programids = programids # validate the start and end times - try: - # check if start is a string or a float as string + t_i = [] + for start in args.start: try: - t_i = float(args.start) + # check if start is a string or a float as string + try: + t_i.append(float(start)) + except ValueError: + t_i.append(Time(start).jd) except ValueError: - t_i = Time(args.start).jd - except ValueError: - raise ValueError(f"Invalid start time: {args.start}") + raise ValueError(f"Invalid start time: {start}") + if args.end: try: try: @@ -146,7 +150,7 @@ def main_parser_args(): except ValueError: raise ValueError(f"Invalid end time: {args.end}") else: - t_f = t_i + args.nb_days + t_f = [ti + args.nb_days for ti in t_i] # validate the groupids if args.groupids: @@ -169,8 +173,12 @@ def main_parser_args(): if args.filterids in [[], None, ""]: args.filterids = [] - args.start = float(t_i) - args.end = float(t_f) + if len(t_i) == 1: + args.start = float(t_i[0]) + args.end = float(t_f[0]) + else: + args.start = t_i + args.end = t_f return args @@ -209,134 +217,3 @@ def stats_parser_args(): raise ValueError("No columns provided") return args - - -def loop_parser(): - parser = argparse.ArgumentParser( - description="Run frigate with specified parameters." - ) - parser.add_argument( - "--programids", type=str, default="1,2,3", help="Program IDs to query" - ) - parser.add_argument( - "--start", nargs="+", required=True, help="List of start values" - ) - parser.add_argument( - "--nb_days", type=float, default=1.0, help="Number of days to query" - ) - parser.add_argument("--sp_token", type=str, default=None, help="Skyportal token") - parser.add_argument("--k_token", type=str, default=None, help="Kowalski token") - parser.add_argument( - "--groupids", type=str, default="*", help="SkyPortal Group IDs to query" - ) - parser.add_argument( - "--filterids", type=str, help="SkyPortal/Kowalski Filter IDs to query" - ) - parser.add_argument( - "--output_format", - type=str, - default="parquet", - help="Output format for the results", - ) - parser.add_argument( - "--n_threads", - type=str, - default=None, - help="Number of threads to use when parallelizing queries", - ) - parser.add_argument( - "--output_compression", - type=str, - default=None, - help="Output compression for the results", - ) - parser.add_argument( - "--output_compression_level", - type=int, - default=None, - help="Output compression level for the results", - ) - parser.add_argument( - "--output_directory", - type=str, - default="./data", - help="Output directory for the results", - ) - parser.add_argument( - "--low_memory", - type=str_to_bool, - default=False, - help="Use low memory mode, to reduce RAM usage", - ) - return parser - - -def loop_parser_args(): - args = loop_parser().parse_args() - if not args.k_token: - # we try to get the token from the environment if it is not provided here - k_token_env = os.environ.get("KOWALSKI_TOKEN") - if k_token_env: - args.k_token = k_token_env - else: - # if provided, we add the token in the environment instead - os.environ["KOWALSKI_TOKEN"] = args.k_token - - if not args.sp_token: - # we try to get the token from the environment if it is not provided here - sp_token_env = os.environ.get("SKYPORTAL_TOKEN") - if sp_token_env: - args.sp_token = sp_token_env - else: - # if provided, we add the token in the environment instead - os.environ["SKYPORTAL_TOKEN"] = args.sp_token - - # validate the output options - try: - validate_output_options( - args.output_format, - args.output_compression, - args.output_compression_level, - args.output_directory, - ) - except ValueError as e: - raise ValueError(f"Invalid output options: {e}") - - # validate the number of threads - n_threads = args.n_threads - if n_threads is None: - n_threads = multiprocessing.cpu_count() - else: - n_threads = int(n_threads) - n_threads = min(n_threads, multiprocessing.cpu_count()) - args.n_threads = n_threads - - # validate the programids - try: - programids = list(map(int, args.programids.split(","))) - except ValueError: - raise ValueError(f"Invalid programids: {args.programids}") - args.programids = programids - - # validate the groupids - if args.groupids: - if args.groupids[0] not in ["*", "all"]: - try: - groupids = list(map(int, args.groupids.split(","))) - except ValueError: - raise ValueError(f"Invalid groupids: {args.groupids}") - args.groupids = groupids - if args.groupids in [[], None, ""]: - args.groupids = [] - - # validate the filterids - if args.filterids: - try: - filterids = list(map(int, args.filterids.split(","))) - except ValueError: - raise ValueError(f"Invalid filterids: {args.filterids}") - args.filterids = filterids - if args.filterids in [[], None, ""]: - args.filterids = [] - - return args diff --git a/scripts/loop-frigate.py b/scripts/loop-frigate.py index 18a6609..9a6d105 100644 --- a/scripts/loop-frigate.py +++ b/scripts/loop-frigate.py @@ -1,8 +1,8 @@ -from frigate.utils.parsers import loop_parser_args -from frigate.utils.frigatemain import process_candidates +from frigate.__main__ import process_candidates +from frigate.utils.parsers import main_parser_args -args = loop_parser_args() -start_values = args.start +args = main_parser_args() +start_values = [args.start] for start in start_values: try: From 693f80904621808573c8af046c75a69b4916df88 Mon Sep 17 00:00:00 2001 From: Kira Date: Wed, 4 Sep 2024 15:50:14 -0700 Subject: [PATCH 4/4] fix start values parsing --- scripts/loop-frigate.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/loop-frigate.py b/scripts/loop-frigate.py index 9a6d105..df33489 100644 --- a/scripts/loop-frigate.py +++ b/scripts/loop-frigate.py @@ -2,7 +2,9 @@ from frigate.utils.parsers import main_parser_args args = main_parser_args() -start_values = [args.start] +start_values = args.start +if isinstance(start_values, (int, str, float)): + start_values = [start_values] for start in start_values: try: