Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low memory mode #4

Merged
merged 4 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
Example cmd to run the script:
### What's Frigate?

Frigate is a tool to retrieve and analyze alert data from the ZTF alert stream, from the full set of alerts to the subset of alerts that passed filters in Fritz, and the subset that was saved as proper astronomical sources.
The current `frigate/__main__.py` only addresses step 1 (getting the full set of alerts for a specified time period, usually one night). The script found in `scripts/alert-stats.py` is an earlier attempt at step 1,2, and 3 along with visualization. Next, we plan to integrate better versions of step 2 and 3 in the `frigate/__main__.py` script and add more visualization tools.

You can run frigate with:

```bash
PYTHONPATH=. python scripts/alert-stats.py --feature='candidate.magpsf,candidate.sigmapsf' --programids=1,2 --plot=True --start=2460355.5 --nb_days=1 --sp_token=<your_sp_token> --sp_groupIDs=41 --sp_filterIDs=1 --nb_bins=1000 --k_token=<your_kowalski_token>
PYTHONPATH=. python frigate --k_token=<your_token>
```

This would:
To get the list of all possible arguments, you can run:

- grab magpsf and sigmapsf values of all alerts from one night starting at jd=2460355.5
- get these values for the subset of alerts that passed the alert filter from group 41 and filter 1 on SkyPortal/Fritz
- get these values for the subset of alerts that passed the alert filter AND were saved as sources (at anytime, not necessarily during that night).
- plot histograms for each feature and alert subset (red: all, blue: passed filter, green: saved as sources)
- plot corner plots for each feature and alert subset (same color palette)
```bash
PYTHONPATH=. python frigate --help
```

#### V2 (WIP)
To run the deprecated `scripts/alert-stats.py` script, you can run:

```bash
PYTHONPATH=. python scripts/alert-stats.py --feature='candidate.magpsf,candidate.sigmapsf' --programids=1,2 --plot=True --start=2460355.5 --nb_days=1 --sp_token=<your_sp_token> --sp_groupIDs=41 --sp_filterIDs=1 --nb_bins=1000 --k_token=<your_kowalski_token>
```

- [] Fetch all the features of all alert packets within a given time range with given program ids and store it as a pandas dataframe
- [] Fetch all of the candidates that passed filters in Fritz (with exact candid, not just objectIds). Relies on the new /api/candidates_filter endpoint.
- [] Looking at the subset of alerts that passed the filters, find the obj_id of the sources that were saved in Fritz.
- [] Update the dataframe with a column containing the list of filters passed for each alert, and a column containing the groupIDs for each alert which obj has been saved as a source to the groups associated to the filters passed.
- [] Figure out what visualizations tools and plots we can use to represent the data in a meaningful way and extract insights from it.

You can run frigate with:

```bash
PYTHONPATH=. python frigate --k_token=<your_token>
```
#### Troubleshooting

To get the list of all possible arguments, you can run:

```bash
PYTHONPATH=. python frigate --help
```
On a system with low memory, you can call frigate with the `--low_memory=True` flag to reduce memory usage. This will save each subset of alerts to disk, and concatenate them at the end instead of concatenating as the batched queries return. That way we avoid growing the memory of the main process while the individual threads are running. In the future, we want to expand on that mode to reduce the nb of alerts fetched per batch query to reduce the memory usage even more.
8 changes: 7 additions & 1 deletion frigate/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ def str_to_bool(value):

# GET CANDIDATES FROM KOWALSKI
candidates, err = get_candidates_from_kowalski(
args.start, args.end, args.programids, n_threads=args.n_threads
args.start,
args.end,
args.programids,
n_threads=args.n_threads,
low_memory=args.low_memory,
low_memory_format=args.output_format,
low_memory_dir=args.output_directory,
)
if err:
print(err)
Expand Down
31 changes: 31 additions & 0 deletions frigate/utils/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,34 @@ def save_dataframe(df, filename, output_format, output_compression, output_compr

# return the filename that includes the output dir and the extension
return filename
def load_dataframe(filename, format=None, directory=None):
if directory is not None and not filename.startswith(directory):
filename = os.path.join(directory, filename)


if format is None:
# try to infer the output format from the filename
if filename.endswith(".parquet"):
format = "parquet"
elif filename.endswith(".feather"):
format = "feather"
elif filename.endswith(".csv"):
format = "csv"
else:
raise ValueError(f"Could not infer output format from filename: {filename}")
if format == "parquet":
return pd.read_parquet(filename)
elif format == "feather":
return pd.read_feather(filename)
elif format == "csv":
return pd.read_csv(filename)
else:
raise ValueError(f"Invalid output format: {format}, must be one of ['parquet', 'feather', 'csv']")

def remove_file(filename, directory=None):
if directory is not None and not filename.startswith(directory):
filename = os.path.join(directory, filename)
try:
os.remove(filename)
except Exception as e:
raise ValueError(f"Failed to remove file: {e}")
92 changes: 84 additions & 8 deletions frigate/utils/kowalski.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
import multiprocessing
import os
import uuid

from contextlib import closing


import numpy as np
import pandas as pd

from penquins import Kowalski
from tqdm import tqdm

from frigate.utils.datasets import save_dataframe, load_dataframe, remove_file

ZTF_ALERTS_CATALOG = "ZTF_alerts"

STRING_FIELDS = [
"rbversion",
"drbversion",
"braai_version",
"acai_b_version",
"acai_h_version",
"acai_n_version",
"acai_o_version",
"acai_v_version",
"bts_version",
]


def shorten_string_fields(data: pd.DataFrame) -> pd.DataFrame:
for field in STRING_FIELDS:
if field in data.columns:
data[field] = data[field].str.replace("_", "")
return data


def connect_to_kowalski() -> Kowalski:
try:
Expand Down Expand Up @@ -71,17 +96,24 @@ def get_candidates_from_kowalski(
programids: list,
objectIds=None,
n_threads=multiprocessing.cpu_count(),
low_memory=False,
low_memory_format="parquet",
low_memory_dir=None,
):
if low_memory is True and low_memory_format not in ["parquet", "csv", "feather"]:
return None, f"Invalid low_memory_format: {low_memory_format}"
if low_memory is True and low_memory_dir is None:
return None, "low_memory_dir is required when low_memory is True"

total, err = candidates_count_from_kowalski(t_i, t_f, programids, objectIds)
if err:
return None, err

print(
f"Expecting {total} candidates between {t_i} and {t_f} for programids {programids} (n_threads: {n_threads})"
f"Expecting {total} candidates between {t_i} and {t_f} for programids {programids} (n_threads: {n_threads}, low_memory: {low_memory})"
)

numPerPage = 10000
candidates = pd.DataFrame()
batches = int(np.ceil(total / numPerPage))
if objectIds is not None:
batches = int(np.ceil(len(objectIds) / numPerPage))
Expand All @@ -97,11 +129,17 @@ def get_candidates_from_kowalski(
"candidate.programid": {"$in": programids},
},
"projection": {
# we include everything except the following fields
"_id": 0,
"candid": 1,
"objectId": 1,
"candidate": 1,
"classifications": 1,
"schemavsn": 0,
"publisher": 0,
"candidate.pdiffimfilename": 0,
"candidate.programpi": 0,
"candidate.candid": 0,
"cutoutScience": 0,
"cutoutTemplate": 0,
"cutoutDifference": 0,
"coordinates": 0,
},
},
"kwargs": {"limit": numPerPage, "skip": i * numPerPage},
Expand All @@ -110,7 +148,12 @@ def get_candidates_from_kowalski(
query["query"]["filter"]["objectId"] = {"$in": objectIds}
queries.append(query)

with multiprocessing.Pool(processes=n_threads) as pool:
candidates = [] # list of dataframes to concatenate later
low_memory_pointers = [] # to use with low_memory=True

# contextlib.closing should help close opened files or other things
# it's just added security, it might not be necessary but could be in the future
with closing(multiprocessing.Pool(processes=n_threads)) as pool:
with tqdm(total=total) as pbar:
for response in pool.imap_unordered(_run_query, queries):
if not isinstance(response, dict):
Expand All @@ -120,8 +163,41 @@ def get_candidates_from_kowalski(
data = response.get("data", [])
# wa want to flatten the candidate object
data = pd.json_normalize(data)
candidates = pd.concat([candidates, data], ignore_index=True)
# we want to remove unnecessary chars from string fields to save space
data = shorten_string_fields(data)

if low_memory:
# if running in low memory mode, we directly store the partial dataframe
# and concatenate them later
# so we generate a random filename
filename = f"tmp_{uuid.uuid4()}.{low_memory_format}"
save_dataframe(
df=data,
filename=filename,
output_format=low_memory_format,
output_directory=low_memory_dir,
output_compression=None,
output_compression_level=None,
)
low_memory_pointers.append(filename)
else:
# append to list of dataframes
candidates.append(data)
pbar.update(len(data))
del data

# concatenate all dataframes
if low_memory:
candidates = []
for filename in low_memory_pointers:
data = load_dataframe(
filename, format=low_memory_format, directory=low_memory_dir
)
candidates.append(data)
remove_file(filename, directory=low_memory_dir)

candidates = pd.concat(candidates, ignore_index=True)

# sort by jd from oldest to newest (lowest to highest)
candidates = candidates.sort_values(by="candidate.jd", ascending=True)

Expand Down
6 changes: 6 additions & 0 deletions frigate/utils/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def main_parser():
default="./data",
help="Output directory for the results",
)
parser.add_argument(
"--low_memory",
type=str_to_bool,
default=False,
help="Use low memory mode, to reduce RAM usage",
)
return parser


Expand Down
Loading