Skip to content

Commit

Permalink
Low memory mode (#4)
Browse files Browse the repository at this point in the history
* shorten string fields, concatenate at the end to reduce memory footprint in both RAM and disk

* low memory mode save subset to disk and join after thread pool is closed, add troubleshoot section to README.md
  • Loading branch information
Theodlz authored Mar 1, 2024
1 parent 81e041d commit 7876868
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 28 deletions.
37 changes: 18 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
Example cmd to run the script:
### What's Frigate?

Frigate is a tool to retrieve and analyze alert data from the ZTF alert stream, from the full set of alerts to the subset of alerts that passed filters in Fritz, and the subset that was saved as proper astronomical sources.
The current `frigate/__main__.py` only addresses step 1 (getting the full set of alerts for a specified time period, usually one night). The script found in `scripts/alert-stats.py` is an earlier attempt at step 1,2, and 3 along with visualization. Next, we plan to integrate better versions of step 2 and 3 in the `frigate/__main__.py` script and add more visualization tools.

You can run frigate with:

```bash
PYTHONPATH=. python scripts/alert-stats.py --feature='candidate.magpsf,candidate.sigmapsf' --programids=1,2 --plot=True --start=2460355.5 --nb_days=1 --sp_token=<your_sp_token> --sp_groupIDs=41 --sp_filterIDs=1 --nb_bins=1000 --k_token=<your_kowalski_token>
PYTHONPATH=. python frigate --k_token=<your_token>
```

This would:
To get the list of all possible arguments, you can run:

- grab magpsf and sigmapsf values of all alerts from one night starting at jd=2460355.5
- get these values for the subset of alerts that passed the alert filter from group 41 and filter 1 on SkyPortal/Fritz
- get these values for the subset of alerts that passed the alert filter AND were saved as sources (at anytime, not necessarily during that night).
- plot histograms for each feature and alert subset (red: all, blue: passed filter, green: saved as sources)
- plot corner plots for each feature and alert subset (same color palette)
```bash
PYTHONPATH=. python frigate --help
```

#### V2 (WIP)
To run the deprecated `scripts/alert-stats.py` script, you can run:

```bash
PYTHONPATH=. python scripts/alert-stats.py --feature='candidate.magpsf,candidate.sigmapsf' --programids=1,2 --plot=True --start=2460355.5 --nb_days=1 --sp_token=<your_sp_token> --sp_groupIDs=41 --sp_filterIDs=1 --nb_bins=1000 --k_token=<your_kowalski_token>
```

- [] Fetch all the features of all alert packets within a given time range with given program ids and store it as a pandas dataframe
- [] Fetch all of the candidates that passed filters in Fritz (with exact candid, not just objectIds). Relies on the new /api/candidates_filter endpoint.
- [] Looking at the subset of alerts that passed the filters, find the obj_id of the sources that were saved in Fritz.
- [] Update the dataframe with a column containing the list of filters passed for each alert, and a column containing the groupIDs for each alert which obj has been saved as a source to the groups associated to the filters passed.
- [] Figure out what visualizations tools and plots we can use to represent the data in a meaningful way and extract insights from it.

You can run frigate with:

```bash
PYTHONPATH=. python frigate --k_token=<your_token>
```
#### Troubleshooting

To get the list of all possible arguments, you can run:

```bash
PYTHONPATH=. python frigate --help
```
On a system with low memory, you can call frigate with the `--low_memory=True` flag to reduce memory usage. This will save each subset of alerts to disk, and concatenate them at the end instead of concatenating as the batched queries return. That way we avoid growing the memory of the main process while the individual threads are running. In the future, we want to expand on that mode to reduce the nb of alerts fetched per batch query to reduce the memory usage even more.
8 changes: 7 additions & 1 deletion frigate/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ def str_to_bool(value):

# GET CANDIDATES FROM KOWALSKI
candidates, err = get_candidates_from_kowalski(
args.start, args.end, args.programids, n_threads=args.n_threads
args.start,
args.end,
args.programids,
n_threads=args.n_threads,
low_memory=args.low_memory,
low_memory_format=args.output_format,
low_memory_dir=args.output_directory,
)
if err:
print(err)
Expand Down
31 changes: 31 additions & 0 deletions frigate/utils/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,34 @@ def save_dataframe(df, filename, output_format, output_compression, output_compr

# return the filename that includes the output dir and the extension
return filename
def load_dataframe(filename, format=None, directory=None):
if directory is not None and not filename.startswith(directory):
filename = os.path.join(directory, filename)


if format is None:
# try to infer the output format from the filename
if filename.endswith(".parquet"):
format = "parquet"
elif filename.endswith(".feather"):
format = "feather"
elif filename.endswith(".csv"):
format = "csv"
else:
raise ValueError(f"Could not infer output format from filename: {filename}")
if format == "parquet":
return pd.read_parquet(filename)
elif format == "feather":
return pd.read_feather(filename)
elif format == "csv":
return pd.read_csv(filename)
else:
raise ValueError(f"Invalid output format: {format}, must be one of ['parquet', 'feather', 'csv']")

def remove_file(filename, directory=None):
if directory is not None and not filename.startswith(directory):
filename = os.path.join(directory, filename)
try:
os.remove(filename)
except Exception as e:
raise ValueError(f"Failed to remove file: {e}")
92 changes: 84 additions & 8 deletions frigate/utils/kowalski.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
import multiprocessing
import os
import uuid

from contextlib import closing


import numpy as np
import pandas as pd

from penquins import Kowalski
from tqdm import tqdm

from frigate.utils.datasets import save_dataframe, load_dataframe, remove_file

ZTF_ALERTS_CATALOG = "ZTF_alerts"

STRING_FIELDS = [
"rbversion",
"drbversion",
"braai_version",
"acai_b_version",
"acai_h_version",
"acai_n_version",
"acai_o_version",
"acai_v_version",
"bts_version",
]


def shorten_string_fields(data: pd.DataFrame) -> pd.DataFrame:
for field in STRING_FIELDS:
if field in data.columns:
data[field] = data[field].str.replace("_", "")
return data


def connect_to_kowalski() -> Kowalski:
try:
Expand Down Expand Up @@ -71,17 +96,24 @@ def get_candidates_from_kowalski(
programids: list,
objectIds=None,
n_threads=multiprocessing.cpu_count(),
low_memory=False,
low_memory_format="parquet",
low_memory_dir=None,
):
if low_memory is True and low_memory_format not in ["parquet", "csv", "feather"]:
return None, f"Invalid low_memory_format: {low_memory_format}"
if low_memory is True and low_memory_dir is None:
return None, "low_memory_dir is required when low_memory is True"

total, err = candidates_count_from_kowalski(t_i, t_f, programids, objectIds)
if err:
return None, err

print(
f"Expecting {total} candidates between {t_i} and {t_f} for programids {programids} (n_threads: {n_threads})"
f"Expecting {total} candidates between {t_i} and {t_f} for programids {programids} (n_threads: {n_threads}, low_memory: {low_memory})"
)

numPerPage = 10000
candidates = pd.DataFrame()
batches = int(np.ceil(total / numPerPage))
if objectIds is not None:
batches = int(np.ceil(len(objectIds) / numPerPage))
Expand All @@ -97,11 +129,17 @@ def get_candidates_from_kowalski(
"candidate.programid": {"$in": programids},
},
"projection": {
# we include everything except the following fields
"_id": 0,
"candid": 1,
"objectId": 1,
"candidate": 1,
"classifications": 1,
"schemavsn": 0,
"publisher": 0,
"candidate.pdiffimfilename": 0,
"candidate.programpi": 0,
"candidate.candid": 0,
"cutoutScience": 0,
"cutoutTemplate": 0,
"cutoutDifference": 0,
"coordinates": 0,
},
},
"kwargs": {"limit": numPerPage, "skip": i * numPerPage},
Expand All @@ -110,7 +148,12 @@ def get_candidates_from_kowalski(
query["query"]["filter"]["objectId"] = {"$in": objectIds}
queries.append(query)

with multiprocessing.Pool(processes=n_threads) as pool:
candidates = [] # list of dataframes to concatenate later
low_memory_pointers = [] # to use with low_memory=True

# contextlib.closing should help close opened files or other things
# it's just added security, it might not be necessary but could be in the future
with closing(multiprocessing.Pool(processes=n_threads)) as pool:
with tqdm(total=total) as pbar:
for response in pool.imap_unordered(_run_query, queries):
if not isinstance(response, dict):
Expand All @@ -120,8 +163,41 @@ def get_candidates_from_kowalski(
data = response.get("data", [])
# wa want to flatten the candidate object
data = pd.json_normalize(data)
candidates = pd.concat([candidates, data], ignore_index=True)
# we want to remove unnecessary chars from string fields to save space
data = shorten_string_fields(data)

if low_memory:
# if running in low memory mode, we directly store the partial dataframe
# and concatenate them later
# so we generate a random filename
filename = f"tmp_{uuid.uuid4()}.{low_memory_format}"
save_dataframe(
df=data,
filename=filename,
output_format=low_memory_format,
output_directory=low_memory_dir,
output_compression=None,
output_compression_level=None,
)
low_memory_pointers.append(filename)
else:
# append to list of dataframes
candidates.append(data)
pbar.update(len(data))
del data

# concatenate all dataframes
if low_memory:
candidates = []
for filename in low_memory_pointers:
data = load_dataframe(
filename, format=low_memory_format, directory=low_memory_dir
)
candidates.append(data)
remove_file(filename, directory=low_memory_dir)

candidates = pd.concat(candidates, ignore_index=True)

# sort by jd from oldest to newest (lowest to highest)
candidates = candidates.sort_values(by="candidate.jd", ascending=True)

Expand Down
6 changes: 6 additions & 0 deletions frigate/utils/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def main_parser():
default="./data",
help="Output directory for the results",
)
parser.add_argument(
"--low_memory",
type=str_to_bool,
default=False,
help="Use low memory mode, to reduce RAM usage",
)
return parser


Expand Down

0 comments on commit 7876868

Please sign in to comment.