Skip to content

Commit

Permalink
Removed all timing
Browse files Browse the repository at this point in the history
  • Loading branch information
wbenoit26 committed Feb 26, 2025
1 parent 3270da6 commit f888c4b
Show file tree
Hide file tree
Showing 5 changed files with 0 additions and 247 deletions.
100 changes: 0 additions & 100 deletions projects/online/online/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import csv
import logging
import os
import time
from pathlib import Path
from queue import Empty
Expand Down Expand Up @@ -171,43 +169,34 @@ def search(

# we have a frame that is analysis ready,
# so lets analyze it:
start = time.time()
X = X.to(device)
to_device = time.time()

# update the snapshotter state and return
# unfolded batch of overlapping windows
batch, state = snapshotter(X[None], state)
snapshotter_time = time.time()

# whiten the batch, and analyze with aframe
whitened = whitener(batch)
y = aframe(whitened)[:, 0]
aframe_time = time.time()

# update our input buffer with latest strain data,
X_cpu = X.cpu()
to_cpu = time.time()
input_buffer.update(X_cpu, t0)
input_buffer_time = time.time()
# update our output buffer with the latest aframe output,
# which will also automatically integrate the output
integrated = output_buffer.update(y.cpu(), t0)
output_buffer_time = time.time()

# if this frame was analysis ready,
# and we had enough previous to build whitening filter
# search for events in the integrated output
event = None
if snapshotter.full_psd_present and ready:
event = searcher.search(integrated, t0 + time_offset)
search_time = time.time()

# if we found an event, process it!
if event is not None:
logging.info("Putting event in event queue")
event_queue.put(event)
event_queue_time = time.time()
logging.info("Running AMPLFI")
descaled_samples = run_amplfi(
event_time=event.gpstime,
Expand All @@ -219,53 +208,9 @@ def search(
std_scaler=scaler,
device=device,
)
amplfi_time = time.time()
shared_samples = descaled_samples.flatten()
amplfi_queue.put(event.gpstime)
amplfi_queue_time = time.time()
searcher.detecting = False
with open("event_detection_times.csv", "a", newline="") as f:
writer = csv.writer(f)
if os.stat("event_detection_times.csv").st_size == 0:
writer.writerow(
[
"search_time",
"event_queue_time",
"amplfi_time",
"amplfi_queue_time",
]
)
writer.writerow(
[
search_time - output_buffer_time,
event_queue_time - search_time,
amplfi_time - event_queue_time,
amplfi_queue_time - amplfi_time,
]
)
with open("main_loop_times.csv", "a", newline="") as f:
writer = csv.writer(f)
if os.stat("main_loop_times.csv").st_size == 0:
writer.writerow(
[
"to_device",
"snapshotter_time",
"aframe_time",
"to_cpu",
"input_buffer_time",
"output_buffer_time",
]
)
writer.writerow(
[
to_device - start,
snapshotter_time - to_device,
aframe_time - snapshotter_time,
to_cpu - aframe_time,
input_buffer_time - to_cpu,
output_buffer_time - input_buffer_time,
]
)
# TODO write buffers to disk:


Expand Down Expand Up @@ -293,21 +238,11 @@ def pastro_subprocess(
while True:
event = pastro_queue.get()
logging.info("Calculating p_astro")
start = time.time()
pastro = pastro_model(event.detection_statistic)
p_astro_time = time.time() - start
graceid = pastro_queue.get()
logging.info(f"Submitting p_astro: {pastro}")
start = time.time()
gdb.submit_pastro(float(pastro), graceid, event.gpstime)
submission_time = time.time() - start
logging.info("Submitted p_astro")
fname = "pastro_submission_times.csv"
with open(fname, "a", newline="") as f:
writer = csv.writer(f)
if os.stat(fname).st_size == 0:
writer.writerow(["p_astro_time", "submission_time"])
writer.writerow([p_astro_time, submission_time])


def amplfi_subprocess(
Expand All @@ -324,51 +259,30 @@ def amplfi_subprocess(
arg = amplfi_queue.get()
if isinstance(arg, float):
event_time = arg
start = time.time()
descaled_samples = torch.reshape(
torch.Tensor(shared_samples), (-1, len(inference_params))
)
from_shared = time.time()
logging.info("Creating skymap")
posterior, mollview_map, skymap = skymap_from_samples(
descaled_samples, event_time, inference_params, nside
)
skymap_time = time.time() - from_shared
from_shared -= start
graceid = amplfi_queue.get()
logging.info("Submitting PE")
start = time.time()
gdb.submit_pe(posterior, mollview_map, skymap, graceid, event_time)
submission_time = time.time() - start
logging.info("Submitted all PE")
else:
graceid = arg
event_time = amplfi_queue.get()
start = time.time()
descaled_samples = torch.reshape(
torch.Tensor(shared_samples), (-1, len(inference_params))
)
from_shared = time.time()
logging.info("Creating skymap")
posterior, mollview_map, skymap = skymap_from_samples(
descaled_samples, event_time, inference_params, nside
)
skymap_time = time.time()
logging.info("Submitting PE")
gdb.submit_pe(posterior, mollview_map, skymap, graceid, event_time)
submission_time = time.time()
logging.info("Submitted all PE")
submission_time -= skymap_time
skymap_time -= from_shared
from_shared -= start
fname = "amplfi_queue_times.csv"
with open(fname, "a", newline="") as f:
writer = csv.writer(f)
if os.stat(fname).st_size == 0:
writer.writerow(
["from_shared", "skymap_time", "submission_time"]
)
writer.writerow([from_shared, skymap_time, submission_time])


def event_creation_subprocess(
Expand All @@ -388,22 +302,8 @@ def event_creation_subprocess(

# write event information to disk
# and submit it to gracedb
start = time.time()
event.write(outdir)
event_write_time = time.time()
response = gdb.submit(event)
submission_time = time.time()
fname = "event_queue_times.csv"
with open(fname, "a", newline="") as f:
writer = csv.writer(f)
if os.stat(fname).st_size == 0:
writer.writerow(["event_write_time", "submission_time"])
writer.writerow(
[
event_write_time - start,
submission_time - event_write_time,
]
)
# Get the event's graceid for submitting
# further data products
if isinstance(response, str):
Expand Down
22 changes: 0 additions & 22 deletions projects/online/online/utils/buffer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
import csv
import os
import time

import h5py
import numpy as np
import torch
Expand Down Expand Up @@ -152,7 +148,6 @@ def integrate(self, x: torch.Tensor):
def update(self, update: torch.Tensor, t0: float):
# first append update to the output buffer
# and remove buffer_size samples from front
start = time.time()
self.output_buffer = torch.cat([self.output_buffer, update])
self.output_buffer = self.output_buffer[-self.buffer_size :]

Expand All @@ -163,27 +158,10 @@ def update(self, update: torch.Tensor, t0: float):

integration_size = self.integrator_size + len(update)
y = self.output_buffer[-integration_size:]
pre_int = time.time()
integrated = self.integrate(y)
integrate = time.time()
self.integrated_buffer = torch.cat(
[self.integrated_buffer, integrated]
)
self.integrated_buffer = self.integrated_buffer[-self.buffer_size :]
cat_crop = time.time()
integrated_cpu = integrated.detach().cpu().numpy()
cpu = time.time()
fname = "output_buffer_timing.csv"
with open(fname, "a", newline="") as f:
writer = csv.writer(f)
if os.stat(fname).st_size == 0:
writer.writerow(["pre_int", "integrate", "cat_crop", "cpu"])
writer.writerow(
[
pre_int - start,
integrate - pre_int,
cat_crop - integrate,
cpu - cat_crop,
]
)
return integrated_cpu
47 changes: 0 additions & 47 deletions projects/online/online/utils/gdb.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import csv
import json
import logging
import os
import subprocess
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Literal
Expand Down Expand Up @@ -43,14 +41,12 @@ def submit(self, event: Event):
event_dir = self.write_dir / f"event_{int(event.gpstime)}"
filename = event_dir / event.filename
logging.info("Creating event in GraceDB")
start = time.time()
response = self.create_event(
group="CBC",
pipeline="aframe",
filename=str(filename),
search="AllSky",
)
event_creation = time.time()
logging.info("Event created")

# record latencies for this event
Expand All @@ -67,15 +63,6 @@ def submit(self, event: Event):
latency += f"{total_latency},{write_latency},{aframe_latency}"
with open(latency_fname, "w") as f:
f.write(latency)
latency_logging = time.time()
fname = "event_submission.csv"
with open(fname, "a", newline="") as f:
writer = csv.writer(f)
if os.stat(fname).st_size == 0:
writer.writerow(["event_creation", "latency_logging"])
writer.writerow(
[event_creation - start, latency_logging - event_creation]
)

return response

Expand All @@ -89,67 +76,33 @@ def submit_pe(
):
event_dir = self.write_dir / f"event_{int(event_time)}"
skymap_fname = event_dir / "amplfi.fits"
start = time.time()
skymap.writeto(skymap_fname)
fits_write = time.time()
logging.info("Submitting skymap to GraceDB")
self.write_log(graceid, "skymap", filename=skymap_fname, tag_name="pe")
fits_submit = time.time()
logging.info("Skymap submitted")

corner_fname = event_dir / "corner_plot.png"
result.plot_corner(filename=corner_fname)
plot_corner = time.time()
logging.info("Submitting corner plot to GraceDB")
self.write_log(
graceid, "Corner plot", filename=corner_fname, tag_name="pe"
)
corner_submit = time.time()
logging.info("Corner plot submitted")

mollview_fname = event_dir / "mollview_plot.png"
fig = plt.figure()
title = (f"{event_time:.3} sky map",)
hp.mollview(mollview_map, fig=fig, title=title, hold=True)
plt.close()
create_mollview = time.time()
fig.savefig(mollview_fname, dpi=300)
save_mollview = time.time()
logging.info("Submitting Mollview plot to GraceDB")
self.write_log(
graceid,
"Mollview projection",
filename=mollview_fname,
tag_name="sky_loc",
)
mollview_submit = time.time()
logging.info("Mollview plot submitted")
fname = "pe_submission.csv"
with open(fname, "a", newline="") as f:
writer = csv.writer(f)
if os.stat(fname).st_size == 0:
writer.writerow(
[
"fits_write",
"fits_submit",
"plot_corner",
"corner_submit",
"create_mollview",
"save_mollview",
"mollview_submit",
]
)
writer.writerow(
[
fits_write - start,
fits_submit - fits_write,
plot_corner - fits_submit,
corner_submit - plot_corner,
create_mollview - corner_submit,
save_mollview - create_mollview,
mollview_submit - save_mollview,
]
)

def submit_pastro(self, pastro: float, graceid: int, event_time: float):
event_dir = self.write_dir / f"event_{int(event_time)}"
Expand Down
Loading

0 comments on commit f888c4b

Please sign in to comment.