Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

color and slope metrics #406

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions rubin_sim/maf/batches/science_radar_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,41 @@ def science_radar_batch(
)
)

# color plus slope metrics
displayDict["group"] = "Variables/Transients"
displayDict["subgroup"] = "Color and slope"
displayDict["caption"] = "Number of times a color and slope are measured in a night"
sql = "visitExposureTime > 19"
metric = maf.ColorSlopeMetric()
summaryMetrics_cs = [maf.SumMetric()]
bundleList.append(
maf.MetricBundle(
metric,
healpixslicer,
sql,
run_name=runName,
display_dict=displayDict,
summary_metrics=summaryMetrics_cs,
)
)

displayDict["group"] = "Variables/Transients"
displayDict["subgroup"] = "Color and slope"
displayDict["caption"] = "Number of times a color and slope are measured over 2 nights."
sql = "visitExposureTime > 19"
metric = maf.ColorSlope2NightMetric()
summaryMetrics_cs = [maf.SumMetric()]
bundleList.append(
maf.MetricBundle(
metric,
healpixslicer,
sql,
run_name=runName,
display_dict=displayDict,
summary_metrics=summaryMetrics_cs,
)
)

# XRB metric
displayDict["subgroup"] = "XRB"
displayDict["order"] = 0
Expand Down
1 change: 1 addition & 0 deletions rubin_sim/maf/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .cadence_metrics import *
from .calibration_metrics import *
from .chip_vendor_metric import *
from .color_slope_metrics import *
from .coverage_metric import *
from .crowding_metric import *
from .cumulative_metric import *
Expand Down
188 changes: 188 additions & 0 deletions rubin_sim/maf/metrics/color_slope_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
__all__ = ["CheckColorSlope", "ColorSlopeMetric", "ColorSlope2NightMetric"]

import numpy as np
from rubin_scheduler.utils import int_binned_stat

from .base_metric import BaseMetric


class CheckColorSlope(object):
"""Check if the data has a color and a slope

Parameters
----------
color_length : `float`
The maximum length of time different filters be observed
to still count as a color (hours). Default 1 hour.
slope_length : `float`
The length of time to demand observations in the
same filter be greater than (hours). Default 3 hours.
"""

def __init__(
self, color_length=1.0, slope_length=3.0, filter_col="filter", mjd_col="observationStartMJD"
):
self.color_length = color_length / 24.0
self.slope_length = slope_length / 24.0

self.filter_col = filter_col
self.mjd_col = mjd_col

def __call__(self, data_slice):
has_color = False
has_slope = False

if np.size(data_slice) < 3:
return 0
filters = data_slice[self.filter_col]

u_filters = np.unique(filters)

for filtername in u_filters:
in_filt = np.where(data_slice[self.filter_col] == filtername)[0]
time_gap = (
data_slice[self.mjd_col][in_filt].max() - data_slice[self.mjd_col][in_filt][np.newaxis].min()
)
if time_gap >= self.slope_length:
has_slope = True
break
for filtername1 in u_filters:
for filtername2 in u_filters:
if filtername1 != filtername2:
in_filt1 = np.where(filters == filtername1)[0]
in_filt2 = np.where(filters == filtername2)[0]
time_gaps = (
data_slice[self.mjd_col][in_filt1] - data_slice[self.mjd_col][in_filt2][np.newaxis].T
)
time_gaps = time_gaps[np.where(time_gaps > 0)]
if time_gaps.size > 0:
if np.min(time_gaps[np.where(time_gaps > 0)]) <= self.color_length:
has_color = True
break
if has_color & has_slope:
return 1
else:
return 0


class ColorSlopeMetric(BaseMetric):
"""How many times do we get a color and slope in a night
A proxie metric for seeing how many times
there would be the possibility of identifying and
classifying a transient.

Parameters
----------
mag : `dict`
Dictionary with filternames as keys and minimum depth m5
magnitudes as values. If None, defaults to mag 20 in ugrizy.
color_length : `float`
The maximum length of time different filters be observed
to still count as a color (hours). Default 1 hour.
slope_length : `float`
The length of time to demand observations in the
same filter be greater than (hours). Default 3 hours."""

def __init__(
self,
mag=None,
night_col="night",
filter_col="filter",
m5_col="fiveSigmaDepth",
color_length=1.0,
slope_length=3.0,
time_col="observationStartMJD",
units="#",
metric_name="ColorSlope",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious because I see this pattern a lot -- but why specify the metric_name here when it's the same as what will be constructed by default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bet I was probably planning on having one metric with a kwarg rather then two, so I'd be setting the metric_name to run it different ways.

**kwargs,
):
cols = [filter_col, night_col, m5_col, time_col]

if mag is None:
mag = {"u": 20, "g": 20, "r": 20, "i": 20, "z": 20, "y": 20}

self.night_col = night_col
self.filter_col = filter_col
self.m5_col = m5_col
self.mag = mag
self.time_col = time_col

super().__init__(col=cols, units=units, metric_name=metric_name, **kwargs)

self.sequence_checker = CheckColorSlope(color_length=color_length, slope_length=slope_length)

def run(self, data_slice, slice_point=None):
result = 0
deep_enough = np.zeros(data_slice.size, dtype=bool)
for filtername in np.unique(data_slice[self.filter_col]):
in_filt = np.where(data_slice[self.filter_col] == filtername)[0]
indx = np.where(data_slice[self.m5_col][in_filt] > self.mag[filtername])[0]
deep_enough[in_filt[indx]] = True
data = data_slice[deep_enough]
if data.size > 0:
_night, result = int_binned_stat(data[self.night_col], data, statistic=self.sequence_checker)

return np.sum(result)


class ColorSlope2NightMetric(ColorSlopeMetric):
"""Like ColorSlopeMetric, but span over 2 nights

Parameters
----------
mag : `dict`
Dictionary with filternames as keys and minimum depth m5
magnitudes as values. If None, defaults to mag 20 in ugrizy.
color_length : `float`
The maximum length of time different filters be observed
to still count as a color (hours). Default 1 hour.
slope_length : `float`
The length of time to demand observations in the
same filter be greater than (hours). Default 15 hours."""

def __init__(
self,
mag=None,
night_col="night",
filter_col="filter",
m5_col="fiveSigmaDepth",
color_length=1.0,
slope_length=15.0,
time_col="observationStartMJD",
units="#",
metric_name="ColorSlope2Night",
**kwargs,
):
super().__init__(
mag=mag,
night_col=night_col,
filter_col=filter_col,
m5_col=m5_col,
color_length=color_length,
slope_length=slope_length,
time_col=time_col,
units=units,
metric_name=metric_name,
**kwargs,
)

def run(self, data_slice, slice_point=None):
result = 0
deep_enough = np.zeros(data_slice.size, dtype=bool)
for filtername in np.unique(data_slice[self.filter_col]):
in_filt = np.where(data_slice[self.filter_col] == filtername)[0]
indx = np.where(data_slice[self.m5_col][in_filt] > self.mag[filtername])[0]
deep_enough[in_filt[indx]] = True
data = data_slice[deep_enough]
if data.size > 0:
# Send in nights as pairs, (0,1) (2,3), (4,5), etc
night_id = np.floor(data[self.night_col] / 2).astype(int)
_night, result1 = int_binned_stat(night_id, data, statistic=self.sequence_checker)

# Now to do pairs (1,2), (3,4)
night_id = np.ceil(data[self.night_col] / 2).astype(int)
_night, result2 = int_binned_stat(night_id, data, statistic=self.sequence_checker)

result = np.sum(result1) + np.sum(result2)

return result
71 changes: 71 additions & 0 deletions tests/maf/test_color_slopes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import unittest

import numpy as np

import rubin_sim.maf.metrics as metrics


class TestSimpleMetrics(unittest.TestCase):
def test_color_slope(self):
names = ["night", "observationStartMJD", "filter", "fiveSigmaDepth"]
types = [int, float, "<U1", float]

data = np.zeros(4, dtype=list(zip(names, types)))

# same filter, same night
data["observationStartMJD"] = np.array([0, 0.25, 0.5, 0.55]) / 24
data["filter"] = ["r", "r", "r", "r"]
data["fiveSigmaDepth"] = 25.0

csm = metrics.ColorSlopeMetric(color_length=1.0, slope_length=3.0)

cs2n = metrics.ColorSlope2NightMetric(color_length=1.0, slope_length=15.0)
assert csm.run(data) == 0
assert cs2n.run(data) == 0

# diff filter, same night
# has color, but no slope
data["observationStartMJD"] = np.array([0, 0.25, 0.5, 0.55]) / 24
data["filter"] = ["r", "g", "r", "r"]

assert csm.run(data) == 0
assert cs2n.run(data) == 0

# diff filter, same night
# slope on 1st night, not second
data["observationStartMJD"] = np.array([0, 0.25, 0.5, 3.55]) / 24
data["filter"] = ["r", "g", "r", "r"]

assert csm.run(data) == 1
assert cs2n.run(data) == 0

# diff filter, diff night
# slope on 2nd night, not first
data["night"] = [0, 0, 0, 1]
data["observationStartMJD"] = np.array([0, 0.25, 0.5, 25]) / 24
data["filter"] = ["r", "g", "r", "r"]

assert csm.run(data) == 0
assert cs2n.run(data) == 1

# diff filter, diff night
# slope on both nights
data["night"] = [0, 0, 0, 1]
data["observationStartMJD"] = np.array([0, 0.25, 3.5, 25]) / 24
data["filter"] = ["r", "g", "r", "r"]

assert csm.run(data) == 1
assert cs2n.run(data) == 1

# diff filter, diff night
# slope on both nights, but no color
data["night"] = [0, 0, 0, 1]
data["observationStartMJD"] = np.array([0, 5.25, 3.5, 25]) / 24
data["filter"] = ["r", "g", "r", "r"]

assert csm.run(data) == 0
assert cs2n.run(data) == 0


if __name__ == "__main__":
unittest.main()
Loading