Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loop changes #10

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions frigate/__main__.py → frigate/frigate_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ def str_to_bool(value):
raise ValueError(f"{value} is not a valid boolean value")


if __name__ == "__main__":
# PARSE COMMAND LINE ARGUMENTS
args = main_parser_args()

def process_candidates(args):
# GET CANDIDATES FROM KOWALSKI
candidates, err = get_candidates_from_kowalski(
args.start,
Expand All @@ -31,7 +28,7 @@ def str_to_bool(value):
)
if err or candidates is None:
print(err)
exit(1)
return

# GET CANDIDATES FROM SKYPORTAL
print("Getting candidates from SkyPortal using the following filters:")
Expand All @@ -43,20 +40,13 @@ def str_to_bool(value):
)
if err or candids_per_filter is None:
print(err)
exit(1)

# candids_per_filter is a dictionary with keys being filterIDs and values being the corresponding candidates
# candid value, that we find in the candidates dataframe.
# add a "passed_filters" column to the candidates dataframe, which is a list of filterIDs that the candidate passed
# through.
return

# ADD PASSED FILTERS TO CANDIDATES
candidates["passed_filters"] = [[] for _ in range(len(candidates))]
for filterID, candids in candids_per_filter.items():
try:
# find the index of the row that has this candid in the candidates dataframe
idx = candidates[candidates["candid"].isin(candids)].index
# add the filterID to the "passed_filters" column of the candidates dataframe
candidates.loc[idx, "passed_filters"] = candidates.loc[
idx, "passed_filters"
].apply(lambda x: x + [filterID])
Expand All @@ -65,7 +55,6 @@ def str_to_bool(value):
continue

# SAVE CANDIDATES TO DISK
# filename: <start>_<end>_<programids>.<output_format> (ext added by save_dataframe function)
filename = f"{args.start}_{args.end}_{'_'.join(map(str, args.programids))}"
filepath = save_dataframe(
df=candidates,
Expand All @@ -77,3 +66,12 @@ def str_to_bool(value):
)

print(f"Saved candidates to {filepath}")


def main():
args = main_parser_args()
process_candidates(args)


if __name__ == "__main__":
main()
116 changes: 116 additions & 0 deletions frigate/utils/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,119 @@ def stats_parser_args():
raise ValueError("No columns provided")

return args




def loop_parser():
parser = argparse.ArgumentParser(description='Run frigate with specified parameters.')
parser.add_argument("--programids", type=str, default="1,2,3", help="Program IDs to query")
parser.add_argument('--start', nargs='+', required=True, help='List of start values')
parser.add_argument("--nb_days", type=float, default=1.0, help="Number of days to query")
parser.add_argument("--sp_token", type=str, default=None, help="Skyportal token")
parser.add_argument("--k_token", type=str, default=None, help="Kowalski token")
parser.add_argument("--groupids", type=str, default="*", help="SkyPortal Group IDs to query")
parser.add_argument("--filterids", type=str, help="SkyPortal/Kowalski Filter IDs to query")
parser.add_argument("--output_format", type=str, default="parquet", help="Output format for the results")
parser.add_argument(
"--n_threads",
type=str,
default=None,
help="Number of threads to use when parallelizing queries",
)
parser.add_argument(
"--output_compression",
type=str,
default=None,
help="Output compression for the results",
)
parser.add_argument(
"--output_compression_level",
type=int,
default=None,
help="Output compression level for the results",
)
parser.add_argument(
"--output_directory",
type=str,
default="./data",
help="Output directory for the results",
)
parser.add_argument(
"--low_memory",
type=str_to_bool,
default=False,
help="Use low memory mode, to reduce RAM usage",
)
return parser


def loop_parser_args():
args = loop_parser().parse_args()
if not args.k_token:
# we try to get the token from the environment if it is not provided here
k_token_env = os.environ.get("KOWALSKI_TOKEN")
if k_token_env:
args.k_token = k_token_env
else:
# if provided, we add the token in the environment instead
os.environ["KOWALSKI_TOKEN"] = args.k_token

if not args.sp_token:
# we try to get the token from the environment if it is not provided here
sp_token_env = os.environ.get("SKYPORTAL_TOKEN")
if sp_token_env:
args.sp_token = sp_token_env
else:
# if provided, we add the token in the environment instead
os.environ["SKYPORTAL_TOKEN"] = args.sp_token

# validate the output options
try:
validate_output_options(
args.output_format,
args.output_compression,
args.output_compression_level,
args.output_directory,
)
except ValueError as e:
raise ValueError(f"Invalid output options: {e}")

# validate the number of threads
n_threads = args.n_threads
if n_threads is None:
n_threads = multiprocessing.cpu_count()
else:
n_threads = int(n_threads)
n_threads = min(n_threads, multiprocessing.cpu_count())
args.n_threads = n_threads

# validate the programids
try:
programids = list(map(int, args.programids.split(",")))
except ValueError:
raise ValueError(f"Invalid programids: {args.programids}")
args.programids = programids

# validate the groupids
if args.groupids:
if args.groupids[0] not in ["*", "all"]:
try:
groupids = list(map(int, args.groupids.split(",")))
except ValueError:
raise ValueError(f"Invalid groupids: {args.groupids}")
args.groupids = groupids
if args.groupids in [[], None, ""]:
args.groupids = []

# validate the filterids
if args.filterids:
try:
filterids = list(map(int, args.filterids.split(",")))
except ValueError:
raise ValueError(f"Invalid filterids: {args.filterids}")
args.filterids = filterids
if args.filterids in [[], None, ""]:
args.filterids = []

return args
9 changes: 7 additions & 2 deletions frigate/utils/skyportal.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def get_skyportal_token():
return token
except Exception as e:
raise ValueError(f"Failed to get SkyPortal token: {e}")


def get_candids_per_filter_from_skyportal(t_i, t_f, groupIDs, filterIDs, saved=False):
host = "https://fritz.science/api/candidates_filter"
Expand Down Expand Up @@ -49,9 +49,14 @@ def get_candids_per_filter_from_skyportal(t_i, t_f, groupIDs, filterIDs, saved=F
params["filterIDs"] = filterIDs
if total:
params["totalMatches"] = total

print(f"host: {host}")
print(f"headers: {headers}")
print(f"pararams: {params}")

response = requests.get(host, headers=headers, params=params)
if response.status_code != 200:
return None, f"Failed to get candidates from SkyPortal: {response.text}"
return None, f"Failed to get candidates from SkyPortal (in skyportal.py): {response.text}"
data = response.json().get("data", {})
for candidate in data.get("candidates", []):
# each candidate has a filter_id and a passing_alert_id which is the candid
Expand Down
13 changes: 13 additions & 0 deletions scripts/loop-frigate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from frigate.utils.parsers import loop_parser_args
from frigate.frigate_main import process_candidates

args = loop_parser_args()
start_values = args.start

for start in start_values:
try:
args.start = float(start)
args.end = args.start + args.nb_days
process_candidates(args)
except Exception as e:
print(f"Error occurred while running the command for start value {start}: {e}")
58 changes: 58 additions & 0 deletions scripts/subprocess-frigate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import argparse
import sys
import os
import subprocess

def loop_parser():
parser = argparse.ArgumentParser(description='Run frigate with specified parameters.')
parser.add_argument('--start', nargs='+', required=True, help='List of start values')
parser.add_argument("--nb_days", type=float, default=1.0, help="Number of days to query")
parser.add_argument("--sp_token", type=str, default=None, help="Skyportal token")
parser.add_argument("--k_token", type=str, default=None, help="Kowalski token")
parser.add_argument("--output_directory", type=str, default="./data", help="Output directory for the results")
return parser

def loop_parser_args():
args = loop_parser().parse_args()
if not args.k_token:
# we try to get the token from the environment if it is not provided here
k_token_env = os.environ.get("KOWALSKI_TOKEN")
if k_token_env:
args.k_token = k_token_env
else:
# if provided, we add the token in the environment instead
os.environ["KOWALSKI_TOKEN"] = args.k_token

if not args.sp_token:
# we try to get the token from the environment if it is not provided here
sp_token_env = os.environ.get("SKYPORTAL_TOKEN")
if sp_token_env:
args.sp_token = sp_token_env
else:
# if provided, we add the token in the environment instead
os.environ["SKYPORTAL_TOKEN"] = args.sp_token
return args

args = loop_parser_args()
start_values = args.start
nb_days = args.nb_days
sp_token = args.sp_token
k_token = args.k_token
output_directory = args.output_directory


for start in start_values:
try:
result = subprocess.run([
sys.executable, '-m', 'frigate',
f'--start={start}',
f'--nb_days={nb_days}',
f'--sp_token={sp_token}',
f'--k_token={k_token}',
f'--output_directory={output_directory}'
])
result.check_returncode()
except subprocess.CalledProcessError as e:
print(f"Error occurred while running the command for start value {start}: {e}")


Loading