diff --git a/frigate/utils/frigatemain.py b/frigate/utils/frigatemain.py new file mode 100644 index 0000000..4c36024 --- /dev/null +++ b/frigate/utils/frigatemain.py @@ -0,0 +1,67 @@ +from frigate.utils.datasets import save_dataframe +from frigate.utils.kowalski import get_candidates_from_kowalski +from frigate.utils.skyportal import get_candids_per_filter_from_skyportal + + +def str_to_bool(value): + if isinstance(value, bool): + return value + if value.lower() in {"false", "f", "0", "no", "n"}: + return False + elif value.lower() in {"true", "t", "1", "yes", "y"}: + return True + raise ValueError(f"{value} is not a valid boolean value") + + +def process_candidates(args): + # GET CANDIDATES FROM KOWALSKI + candidates, err = get_candidates_from_kowalski( + args.start, + args.end, + args.programids, + n_threads=args.n_threads, + low_memory=args.low_memory, + low_memory_format=args.output_format, + low_memory_dir=args.output_directory, + format=args.output_format, + ) + if err or candidates is None: + print(err) + return + + # GET CANDIDATES FROM SKYPORTAL + print("Getting candidates from SkyPortal using the following filters:") + print(args.filterids) + print("Getting candidates from SkyPortal using the following groupIDs:") + print(args.groupids) + candids_per_filter, err = get_candids_per_filter_from_skyportal( + args.start, args.end, args.groupids, args.filterids, saved=False + ) + if err or candids_per_filter is None: + print(err) + return + + # ADD PASSED FILTERS TO CANDIDATES + candidates["passed_filters"] = [[] for _ in range(len(candidates))] + for filterID, candids in candids_per_filter.items(): + try: + idx = candidates[candidates["candid"].isin(candids)].index + candidates.loc[idx, "passed_filters"] = candidates.loc[ + idx, "passed_filters" + ].apply(lambda x: x + [filterID]) + except KeyError: + print(f"Candid {candids} not found in candidates dataframe, skipping...") + continue + + # SAVE CANDIDATES TO DISK + filename = f"{args.start}_{args.end}_{'_'.join(map(str, args.programids))}" + filepath = save_dataframe( + df=candidates, + filename=filename, + output_format=args.output_format, + output_compression=args.output_compression, + output_compression_level=args.output_compression_level, + output_directory=args.output_directory, + ) + + print(f"Saved candidates to {filepath}") \ No newline at end of file diff --git a/frigate/utils/parsers.py b/frigate/utils/parsers.py index a261015..a68ee5a 100644 --- a/frigate/utils/parsers.py +++ b/frigate/utils/parsers.py @@ -209,3 +209,116 @@ def stats_parser_args(): raise ValueError("No columns provided") return args + +def loop_parser(): + parser = argparse.ArgumentParser(description='Run frigate with specified parameters.') + parser.add_argument("--programids", type=str, default="1,2,3", help="Program IDs to query") + parser.add_argument('--start', nargs='+', required=True, help='List of start values') + parser.add_argument("--nb_days", type=float, default=1.0, help="Number of days to query") + parser.add_argument("--sp_token", type=str, default=None, help="Skyportal token") + parser.add_argument("--k_token", type=str, default=None, help="Kowalski token") + parser.add_argument("--groupids", type=str, default="*", help="SkyPortal Group IDs to query") + parser.add_argument("--filterids", type=str, help="SkyPortal/Kowalski Filter IDs to query") + parser.add_argument("--output_format", type=str, default="parquet", help="Output format for the results") + parser.add_argument( + "--n_threads", + type=str, + default=None, + help="Number of threads to use when parallelizing queries", + ) + parser.add_argument( + "--output_compression", + type=str, + default=None, + help="Output compression for the results", + ) + parser.add_argument( + "--output_compression_level", + type=int, + default=None, + help="Output compression level for the results", + ) + parser.add_argument( + "--output_directory", + type=str, + default="./data", + help="Output directory for the results", + ) + parser.add_argument( + "--low_memory", + type=str_to_bool, + default=False, + help="Use low memory mode, to reduce RAM usage", + ) + return parser + + +def loop_parser_args(): + args = loop_parser().parse_args() + if not args.k_token: + # we try to get the token from the environment if it is not provided here + k_token_env = os.environ.get("KOWALSKI_TOKEN") + if k_token_env: + args.k_token = k_token_env + else: + # if provided, we add the token in the environment instead + os.environ["KOWALSKI_TOKEN"] = args.k_token + + if not args.sp_token: + # we try to get the token from the environment if it is not provided here + sp_token_env = os.environ.get("SKYPORTAL_TOKEN") + if sp_token_env: + args.sp_token = sp_token_env + else: + # if provided, we add the token in the environment instead + os.environ["SKYPORTAL_TOKEN"] = args.sp_token + + # validate the output options + try: + validate_output_options( + args.output_format, + args.output_compression, + args.output_compression_level, + args.output_directory, + ) + except ValueError as e: + raise ValueError(f"Invalid output options: {e}") + + # validate the number of threads + n_threads = args.n_threads + if n_threads is None: + n_threads = multiprocessing.cpu_count() + else: + n_threads = int(n_threads) + n_threads = min(n_threads, multiprocessing.cpu_count()) + args.n_threads = n_threads + + # validate the programids + try: + programids = list(map(int, args.programids.split(","))) + except ValueError: + raise ValueError(f"Invalid programids: {args.programids}") + args.programids = programids + + # validate the groupids + if args.groupids: + if args.groupids[0] not in ["*", "all"]: + try: + groupids = list(map(int, args.groupids.split(","))) + except ValueError: + raise ValueError(f"Invalid groupids: {args.groupids}") + args.groupids = groupids + if args.groupids in [[], None, ""]: + args.groupids = [] + + # validate the filterids + if args.filterids: + try: + filterids = list(map(int, args.filterids.split(","))) + except ValueError: + raise ValueError(f"Invalid filterids: {args.filterids}") + args.filterids = filterids + if args.filterids in [[], None, ""]: + args.filterids = [] + + return args diff --git a/scripts/loop-frigate.py b/scripts/loop-frigate.py new file mode 100644 index 0000000..2a0ec57 --- /dev/null +++ b/scripts/loop-frigate.py @@ -0,0 +1,13 @@ +from frigate.utils.parsers import loop_parser_args +from frigate.utils.frigatemain import process_candidates + +args = loop_parser_args() +start_values = args.start + +for start in start_values: + try: + args.start = float(start) + args.end = args.start + args.nb_days + process_candidates(args) + except Exception as e: + print(f"Error occurred while running the command for start value {start}: {e}") \ No newline at end of file