From b4c51102d561d7df16774579688d7c912eedcc8a Mon Sep 17 00:00:00 2001 From: Theodlz Date: Thu, 29 Feb 2024 12:39:03 -0800 Subject: [PATCH 1/4] improved README --- README.md | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 5de9636..a9994ba 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,7 @@ -Example cmd to run the script: +### What's Frigate? -```bash -PYTHONPATH=. python scripts/alert-stats.py --feature='candidate.magpsf,candidate.sigmapsf' --programids=1,2 --plot=True --start=2460355.5 --nb_days=1 --sp_token= --sp_groupIDs=41 --sp_filterIDs=1 --nb_bins=1000 --k_token= -``` - -This would: - -- grab magpsf and sigmapsf values of all alerts from one night starting at jd=2460355.5 -- get these values for the subset of alerts that passed the alert filter from group 41 and filter 1 on SkyPortal/Fritz -- get these values for the subset of alerts that passed the alert filter AND were saved as sources (at anytime, not necessarily during that night). -- plot histograms for each feature and alert subset (red: all, blue: passed filter, green: saved as sources) -- plot corner plots for each feature and alert subset (same color palette) - -#### V2 (WIP) - -- [] Fetch all the features of all alert packets within a given time range with given program ids and store it as a pandas dataframe -- [] Fetch all of the candidates that passed filters in Fritz (with exact candid, not just objectIds). Relies on the new /api/candidates_filter endpoint. -- [] Looking at the subset of alerts that passed the filters, find the obj_id of the sources that were saved in Fritz. -- [] Update the dataframe with a column containing the list of filters passed for each alert, and a column containing the groupIDs for each alert which obj has been saved as a source to the groups associated to the filters passed. -- [] Figure out what visualizations tools and plots we can use to represent the data in a meaningful way and extract insights from it. +Frigate is a tool to retrieve and analyze alert data from the ZTF alert stream, from the full set of alerts to the subset of alerts that passed filters in Fritz, and the subset that was saved as proper astronomical sources. +The current `frigate/__main__.py` only addresses step 1 (getting the full set of alerts for a specified time period, usually one night). The script found in `scripts/alert-stats.py` is an earlier attempt at step 1,2, and 3 along with visualization. Next, we plan to integrate better versions of step 2 and 3 in the `frigate/__main__.py` script and add more visualization tools. You can run frigate with: @@ -31,3 +14,15 @@ To get the list of all possible arguments, you can run: ```bash PYTHONPATH=. python frigate --help ``` + +To run the deprecated `scripts/alert-stats.py` script, you can run: + +```bash +PYTHONPATH=. python scripts/alert-stats.py --feature='candidate.magpsf,candidate.sigmapsf' --programids=1,2 --plot=True --start=2460355.5 --nb_days=1 --sp_token= --sp_groupIDs=41 --sp_filterIDs=1 --nb_bins=1000 --k_token= +``` + +- [] Fetch all the features of all alert packets within a given time range with given program ids and store it as a pandas dataframe +- [] Fetch all of the candidates that passed filters in Fritz (with exact candid, not just objectIds). Relies on the new /api/candidates_filter endpoint. +- [] Looking at the subset of alerts that passed the filters, find the obj_id of the sources that were saved in Fritz. +- [] Update the dataframe with a column containing the list of filters passed for each alert, and a column containing the groupIDs for each alert which obj has been saved as a source to the groups associated to the filters passed. +- [] Figure out what visualizations tools and plots we can use to represent the data in a meaningful way and extract insights from it. From 119c3524f41f35c802aeb63e17fae6fcef2d392e Mon Sep 17 00:00:00 2001 From: Theodlz Date: Thu, 29 Feb 2024 13:22:12 -0800 Subject: [PATCH 2/4] shorten string fields, concatenate at the end to reduce memory footprint in both RAM and disk --- frigate/utils/kowalski.py | 44 +++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/frigate/utils/kowalski.py b/frigate/utils/kowalski.py index 94c031d..f394ffa 100644 --- a/frigate/utils/kowalski.py +++ b/frigate/utils/kowalski.py @@ -9,6 +9,25 @@ ZTF_ALERTS_CATALOG = "ZTF_alerts" +STRING_FIELDS = [ + "rbversion", + "drbversion", + "braai_version", + "acai_b_version", + "acai_h_version", + "acai_n_version", + "acai_o_version", + "acai_v_version", + "bts_version", +] + + +def shorten_string_fields(data: pd.DataFrame) -> pd.DataFrame: + for field in STRING_FIELDS: + if field in data.columns: + data[field] = data[field].str.replace("_", "") + return data + def connect_to_kowalski() -> Kowalski: try: @@ -81,7 +100,6 @@ def get_candidates_from_kowalski( ) numPerPage = 10000 - candidates = pd.DataFrame() batches = int(np.ceil(total / numPerPage)) if objectIds is not None: batches = int(np.ceil(len(objectIds) / numPerPage)) @@ -97,11 +115,17 @@ def get_candidates_from_kowalski( "candidate.programid": {"$in": programids}, }, "projection": { + # we include everything except the following fields "_id": 0, - "candid": 1, - "objectId": 1, - "candidate": 1, - "classifications": 1, + "schemavsn": 0, + "publisher": 0, + "candidate.pdiffimfilename": 0, + "candidate.programpi": 0, + "candidate.candid": 0, + "cutoutScience": 0, + "cutoutTemplate": 0, + "cutoutDifference": 0, + "coordinates": 0, }, }, "kwargs": {"limit": numPerPage, "skip": i * numPerPage}, @@ -110,6 +134,7 @@ def get_candidates_from_kowalski( query["query"]["filter"]["objectId"] = {"$in": objectIds} queries.append(query) + candidates = [] # list of dataframes to concatenate later with multiprocessing.Pool(processes=n_threads) as pool: with tqdm(total=total) as pbar: for response in pool.imap_unordered(_run_query, queries): @@ -120,8 +145,15 @@ def get_candidates_from_kowalski( data = response.get("data", []) # wa want to flatten the candidate object data = pd.json_normalize(data) - candidates = pd.concat([candidates, data], ignore_index=True) + # we want to remove unnecessary chars from string fields to save space + data = shorten_string_fields(data) + + # append to list of dataframes + candidates.append(data) pbar.update(len(data)) + + # concatenate all dataframes + candidates = pd.concat(candidates, ignore_index=True) # sort by jd from oldest to newest (lowest to highest) candidates = candidates.sort_values(by="candidate.jd", ascending=True) From 401fff2c7f929a786a4f323409564f3e6f518dee Mon Sep 17 00:00:00 2001 From: Theodlz Date: Thu, 29 Feb 2024 14:38:11 -0800 Subject: [PATCH 3/4] low memory mode save subset to disk and join after thread pool is closed, add troubleshoot section to README.md --- README.md | 4 +++ frigate/__main__.py | 8 +++++- frigate/utils/datasets.py | 31 +++++++++++++++++++++++ frigate/utils/kowalski.py | 52 ++++++++++++++++++++++++++++++++++++--- frigate/utils/parsers.py | 6 +++++ 5 files changed, 96 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index a9994ba..279c36d 100644 --- a/README.md +++ b/README.md @@ -26,3 +26,7 @@ PYTHONPATH=. python scripts/alert-stats.py --feature='candidate.magpsf,candidate - [] Looking at the subset of alerts that passed the filters, find the obj_id of the sources that were saved in Fritz. - [] Update the dataframe with a column containing the list of filters passed for each alert, and a column containing the groupIDs for each alert which obj has been saved as a source to the groups associated to the filters passed. - [] Figure out what visualizations tools and plots we can use to represent the data in a meaningful way and extract insights from it. + +#### Troubleshooting + +On a system with low memory, you can call frigate with the `--low_memory` flag to reduce memory usage. This will save each subset of alerts to disk, and concatenate them at the end instead of concatenating as the batched queries return. That way we avoid growing the memory of the main process while the individual threads are running. In the future, we want to expand on that mode to reduce the nb of alerts fetched per batch query to reduce the memory usage even more. diff --git a/frigate/__main__.py b/frigate/__main__.py index 8754ff5..9169398 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -19,7 +19,13 @@ def str_to_bool(value): # GET CANDIDATES FROM KOWALSKI candidates, err = get_candidates_from_kowalski( - args.start, args.end, args.programids, n_threads=args.n_threads + args.start, + args.end, + args.programids, + n_threads=args.n_threads, + low_memory=args.low_memory, + low_memory_format=args.output_format, + low_memory_dir=args.output_directory, ) if err: print(err) diff --git a/frigate/utils/datasets.py b/frigate/utils/datasets.py index f0ccaa7..6094bd4 100644 --- a/frigate/utils/datasets.py +++ b/frigate/utils/datasets.py @@ -72,3 +72,34 @@ def save_dataframe(df, filename, output_format, output_compression, output_compr # return the filename that includes the output dir and the extension return filename +def load_dataframe(filename, format=None, directory=None): + if directory is not None and not filename.startswith(directory): + filename = os.path.join(directory, filename) + + + if format is None: + # try to infer the output format from the filename + if filename.endswith(".parquet"): + format = "parquet" + elif filename.endswith(".feather"): + format = "feather" + elif filename.endswith(".csv"): + format = "csv" + else: + raise ValueError(f"Could not infer output format from filename: {filename}") + if format == "parquet": + return pd.read_parquet(filename) + elif format == "feather": + return pd.read_feather(filename) + elif format == "csv": + return pd.read_csv(filename) + else: + raise ValueError(f"Invalid output format: {format}, must be one of ['parquet', 'feather', 'csv']") + +def remove_file(filename, directory=None): + if directory is not None and not filename.startswith(directory): + filename = os.path.join(directory, filename) + try: + os.remove(filename) + except Exception as e: + raise ValueError(f"Failed to remove file: {e}") diff --git a/frigate/utils/kowalski.py b/frigate/utils/kowalski.py index f394ffa..f7f4635 100644 --- a/frigate/utils/kowalski.py +++ b/frigate/utils/kowalski.py @@ -1,5 +1,9 @@ import multiprocessing import os +import uuid + +from contextlib import closing + import numpy as np import pandas as pd @@ -7,6 +11,8 @@ from penquins import Kowalski from tqdm import tqdm +from frigate.utils.datasets import save_dataframe, load_dataframe, remove_file + ZTF_ALERTS_CATALOG = "ZTF_alerts" STRING_FIELDS = [ @@ -90,13 +96,21 @@ def get_candidates_from_kowalski( programids: list, objectIds=None, n_threads=multiprocessing.cpu_count(), + low_memory=False, + low_memory_format="parquet", + low_memory_dir=None, ): + if low_memory is True and low_memory_format not in ["parquet", "csv", "feather"]: + return None, f"Invalid low_memory_format: {low_memory_format}" + if low_memory is True and low_memory_dir is None: + return None, "low_memory_dir is required when low_memory is True" + total, err = candidates_count_from_kowalski(t_i, t_f, programids, objectIds) if err: return None, err print( - f"Expecting {total} candidates between {t_i} and {t_f} for programids {programids} (n_threads: {n_threads})" + f"Expecting {total} candidates between {t_i} and {t_f} for programids {programids} (n_threads: {n_threads}, low_memory: {low_memory})" ) numPerPage = 10000 @@ -135,7 +149,11 @@ def get_candidates_from_kowalski( queries.append(query) candidates = [] # list of dataframes to concatenate later - with multiprocessing.Pool(processes=n_threads) as pool: + low_memory_pointers = [] # to use with low_memory=True + + # contextlib.closing should help close opened files or other things + # it's just added security, it might not be necessary but could be in the future + with closing(multiprocessing.Pool(processes=n_threads)) as pool: with tqdm(total=total) as pbar: for response in pool.imap_unordered(_run_query, queries): if not isinstance(response, dict): @@ -148,12 +166,38 @@ def get_candidates_from_kowalski( # we want to remove unnecessary chars from string fields to save space data = shorten_string_fields(data) - # append to list of dataframes - candidates.append(data) + if low_memory: + # if running in low memory mode, we directly store the partial dataframe + # and concatenate them later + # so we generate a random filename + filename = f"tmp_{uuid.uuid4()}.{low_memory_format}" + save_dataframe( + df=data, + filename=filename, + output_format=low_memory_format, + output_directory=low_memory_dir, + output_compression=None, + output_compression_level=None, + ) + low_memory_pointers.append(filename) + else: + # append to list of dataframes + candidates.append(data) pbar.update(len(data)) + del data # concatenate all dataframes + if low_memory: + candidates = [] + for filename in low_memory_pointers: + data = load_dataframe( + filename, format=low_memory_format, directory=low_memory_dir + ) + candidates.append(data) + remove_file(filename, directory=low_memory_dir) + candidates = pd.concat(candidates, ignore_index=True) + # sort by jd from oldest to newest (lowest to highest) candidates = candidates.sort_values(by="candidate.jd", ascending=True) diff --git a/frigate/utils/parsers.py b/frigate/utils/parsers.py index 654fa89..359a836 100644 --- a/frigate/utils/parsers.py +++ b/frigate/utils/parsers.py @@ -64,6 +64,12 @@ def main_parser(): default="./data", help="Output directory for the results", ) + parser.add_argument( + "--low_memory", + type=str_to_bool, + default=False, + help="Use low memory mode, to reduce RAM usage", + ) return parser From f967e3ddb4142fb8d6304d76950c0ac55cfb9051 Mon Sep 17 00:00:00 2001 From: Theodlz Date: Thu, 29 Feb 2024 14:46:52 -0800 Subject: [PATCH 4/4] --low_memory -> --low_memory=True --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 279c36d..01a3fe0 100644 --- a/README.md +++ b/README.md @@ -29,4 +29,4 @@ PYTHONPATH=. python scripts/alert-stats.py --feature='candidate.magpsf,candidate #### Troubleshooting -On a system with low memory, you can call frigate with the `--low_memory` flag to reduce memory usage. This will save each subset of alerts to disk, and concatenate them at the end instead of concatenating as the batched queries return. That way we avoid growing the memory of the main process while the individual threads are running. In the future, we want to expand on that mode to reduce the nb of alerts fetched per batch query to reduce the memory usage even more. +On a system with low memory, you can call frigate with the `--low_memory=True` flag to reduce memory usage. This will save each subset of alerts to disk, and concatenate them at the end instead of concatenating as the batched queries return. That way we avoid growing the memory of the main process while the individual threads are running. In the future, we want to expand on that mode to reduce the nb of alerts fetched per batch query to reduce the memory usage even more.