-
Notifications
You must be signed in to change notification settings - Fork 4
/
spectra.py
275 lines (214 loc) · 10.3 KB
/
spectra.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
from signal_extraction import SignalExtraction
import ROOT
import numpy as np
import sys
sys.path.append('utils')
import utils as utils
class SpectraMaker:
def __init__(self):
# data related members
self.data_hdl = None
self.mc_hdl = None
self.mc_hdl_sign_extr = None
self.mc_reco_hdl = None
self.n_ev = 0
self.branching_ratio = 0.25
self.delta_rap = 2.0
self.h_absorption = None
self.event_loss = None
self.signal_loss = None
# variable related members
self.var = ''
self.bins = []
self.selection_string = '' # could be either a string or a list of strings
self.is_matter = ''
self.n_bins_mass_data = 30
self.n_bins_mass_mc = 80
self.raw_counts = []
self.raw_counts_err = []
self.chi2 = []
self.efficiency = []
self.corrected_counts = []
self.corrected_counts_err = []
self.inv_mass_signal_func = "dscb"
self.inv_mass_bkg_func = "pol1" # could be either a string or a list of strings
self.fit_func = None
self.fit_options = None
self.fit_range = []
self.sigma_range_mc_to_data = [1., 1.5]
self.output_dir = None
self.h_signal_extractions_data = []
self.h_signal_extractions_mc = []
self.h_raw_counts = None
self.h_efficiency = None
self.h_corrected_counts = None
# cuts for systematic uncertainties
self.chi2_cut = 1.4
self.relative_error_cut = 1.
self.outlier_cut = 3
def _check_members(self):
var_options = ['fCt', 'fPt']
if self.var not in var_options:
raise ValueError(
f'Invalid var option: {self.var}. Expected one of: {var_options}')
if not self.data_hdl:
raise ValueError(f'data_hdl not correctly set')
if not self.mc_hdl:
raise ValueError(f'mc_hdl not correctly set')
if not self.mc_reco_hdl:
raise ValueError(f'mc_reco_hdl not correctly set')
def make_spectra(self):
self._check_members()
for ibin in range(0, len(self.bins) - 1):
bin = [self.bins[ibin], self.bins[ibin + 1]]
bin_sel = f'{self.var} > {bin[0]} & {self.var} < {bin[1]}'
if self.var == 'fCt':
mc_bin_sel = f'fGenCt > {bin[0]} & fGenCt < {bin[1]}'
else:
mc_bin_sel = f'abs(fGenPt) > {bin[0]} & abs(fGenPt) < {bin[1]}'
# count generated per ct bin
bin_mc_hdl = self.mc_hdl.apply_preselections(mc_bin_sel, inplace=False)
if self.mc_hdl_sign_extr:
bin_mc_hdl_sign_extr = self.mc_hdl_sign_extr.apply_preselections(mc_bin_sel, inplace=False)
else:
bin_mc_hdl_sign_extr = bin_mc_hdl
if isinstance(self.selection_string, list):
bin_sel = f'{bin_sel} and {self.selection_string[ibin]}'
mc_bin_sel = f'{mc_bin_sel} and {self.selection_string[ibin]}'
else:
bin_sel = f'{bin_sel} and {self.selection_string}'
mc_bin_sel = f'{mc_bin_sel} and {self.selection_string}'
# select reconstructed in data and mc
bin_data_hdl = self.data_hdl.apply_preselections(bin_sel, inplace=False)
bin_mc_reco_hdl = self.mc_reco_hdl.apply_preselections(mc_bin_sel, inplace=False)
# compute efficiency
eff = len(bin_mc_reco_hdl) / len(bin_mc_hdl)
print(mc_bin_sel)
print("bin low", bin[0], "bin high", bin[1], "efficiency", eff)
self.efficiency.append(eff)
signal_extraction = SignalExtraction(bin_data_hdl, bin_mc_hdl_sign_extr)
bkg_mass_fit_func = None
if isinstance(self.inv_mass_bkg_func, list):
bkg_mass_fit_func = self.inv_mass_bkg_func[ibin]
else:
bkg_mass_fit_func = self.inv_mass_bkg_func
sgn_mass_fit_func = None
if isinstance(self.inv_mass_signal_func, list):
sgn_mass_fit_func = self.inv_mass_signal_func[ibin]
else:
sgn_mass_fit_func = self.inv_mass_signal_func
signal_extraction.bkg_fit_func = bkg_mass_fit_func
signal_extraction.signal_fit_func = sgn_mass_fit_func
signal_extraction.n_bins_data = self.n_bins_mass_data
signal_extraction.n_bins_mc = self.n_bins_mass_mc
signal_extraction.n_evts = self.n_ev
signal_extraction.matter_type = self.is_matter
signal_extraction.performance = False
signal_extraction.is_3lh = True
signal_extraction.out_file = self.output_dir
signal_extraction.data_frame_fit_name = f'data_fit_{ibin}'
signal_extraction.mc_frame_fit_name = f'mc_fit_{ibin}'
if self.var == 'fPt':
bin_label = f'{bin[0]} #leq #it{{p}}_{{T}} < {bin[1]} GeV/#it{{c}}'
else:
bin_label = f'{bin[0]} #leq #it{{ct}} < {bin[1]} cm'
signal_extraction.additional_pave_text = bin_label
if isinstance(self.sigma_range_mc_to_data[0], list):
signal_extraction.sigma_range_mc_to_data = self.sigma_range_mc_to_data[ibin]
else:
signal_extraction.sigma_range_mc_to_data = self.sigma_range_mc_to_data
fit_stats = signal_extraction.process_fit()
self.raw_counts.append(fit_stats['signal'][0])
self.raw_counts_err.append(fit_stats['signal'][1])
self.chi2.append(fit_stats['chi2'])
def make_histos(self):
self._check_members()
if not self.raw_counts:
raise RuntimeError(
'raw_counts is empty. You must run make_spectra first.')
if self.var == 'fCt':
x_label = r'#it{ct} (cm)'
y_raw_label = r'#it{N}_{raw}'
y_eff_label = r'#epsilon #times acc.'
y_corr_label = r'#frac{d#it{N}}{d(#it{ct})} (cm^{-1})'
else:
x_label = r'#it{p}_{T} (GeV/#it{c})'
y_raw_label = r'#it{N}_{raw}'
y_eff_label = r'#epsilon #times acc.'
y_corr_label = r'#frac{1}{N_{ev}}#frac{#it{d}N}{#it{d}y#it{d}#it{p}_{T}} (GeV/#it{c})^{-1}'
self.h_raw_counts = ROOT.TH1D('h_raw_counts', f';{x_label};{y_raw_label}', len(
self.bins) - 1, np.array(self.bins, dtype=np.float64))
self.h_efficiency = ROOT.TH1D('h_efficiency', f';{x_label};{y_eff_label}', len(
self.bins) - 1, np.array(self.bins, dtype=np.float64))
self.h_corrected_counts = ROOT.TH1D('h_corrected_counts', f';{x_label};{y_corr_label}', len(
self.bins) - 1, np.array(self.bins, dtype=np.float64))
self.h_corrected_counts.GetXaxis().SetTitleSize(0.05)
self.h_corrected_counts.GetYaxis().SetTitleSize(0.05)
for ibin in range(0, len(self.bins) - 1):
bin_width = self.bins[ibin + 1] - self.bins[ibin]
self.h_raw_counts.SetBinContent(ibin + 1, self.raw_counts[ibin]/bin_width)
self.h_raw_counts.SetBinError(ibin + 1, self.raw_counts_err[ibin]/bin_width)
self.h_efficiency.SetBinContent(ibin + 1, self.efficiency[ibin])
absorption_corr = 1
if self.h_absorption is not None:
absorption_corr = self.h_absorption.GetBinContent(ibin + 1)
event_loss_corr = 1
if self.event_loss is not None:
event_loss_corr = self.event_loss
signal_loss_corr = 1
if self.signal_loss is not None:
signal_loss_corr = self.signal_loss
local_corrected_counts = self.raw_counts[ibin] / self.efficiency[ibin] / absorption_corr / bin_width
local_corrected_counts_err = self.raw_counts_err[ibin] / self.efficiency[ibin] / absorption_corr / bin_width
if self.var == 'fPt':
local_corrected_counts = local_corrected_counts / self.n_ev / self.branching_ratio / self.delta_rap / signal_loss_corr * event_loss_corr
local_corrected_counts_err = local_corrected_counts_err / self.n_ev / self.branching_ratio / self.delta_rap / signal_loss_corr * event_loss_corr
self.h_corrected_counts.SetBinContent(ibin + 1, local_corrected_counts)
self.h_corrected_counts.SetBinError(ibin + 1, local_corrected_counts_err)
self.corrected_counts.append(local_corrected_counts)
self.corrected_counts_err.append(local_corrected_counts_err)
def fit(self):
if not self.h_corrected_counts:
raise ValueError(
'h_corrected_counts not set. Use make_histos first.')
if not self.fit_func:
raise ValueError('Fit function not set.')
if self.fit_range:
self.h_corrected_counts.Fit(self.fit_func, self.fit_options, '', self.fit_range[0], self.fit_range[1])
else:
self.h_corrected_counts.Fit(self.fit_func, 'RMI+')
def del_dyn_members(self):
self.raw_counts = []
self.raw_counts_err = []
self.efficiency = []
self.corrected_counts = []
self.corrected_counts_err = []
self.h_raw_counts = None
self.h_efficiency = None
self.h_corrected_counts = None
def dump_to_output_dir(self):
self.output_dir.cd()
self.h_raw_counts.Write()
self.h_efficiency.Write()
if self.h_absorption is not None:
self.h_absorption.Write()
self.h_corrected_counts.Write()
def chi2_selection(self):
for el in self.chi2:
if el > self.chi2_cut:
return False
return True
def relative_error_selection(self):
relative_errors = [
err/val for val, err in zip(self.corrected_counts, self.corrected_counts_err)]
for el in relative_errors:
if el > self.relative_error_cut:
return False
return True
def outlier_selection(self, std_corrected_counts, std_corrected_counts_err):
for i, counts in enumerate(self.corrected_counts):
distance = abs(std_corrected_counts[i] - counts) / np.hypot(
self.corrected_counts_err[i], std_corrected_counts_err[i])
if distance > self.outlier_cut:
return False
return True