Skip to content

Commit

Permalink
Return Exposure object in SpectrographSet.expose() and support async …
Browse files Browse the repository at this point in the history
…readout
  • Loading branch information
albireox committed Jul 13, 2023
1 parent 0ba0d8a commit fa6591e
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 48 deletions.
303 changes: 256 additions & 47 deletions src/gort/devices/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,218 @@

import asyncio
import json
import pathlib
import warnings
from contextlib import suppress

from typing import TYPE_CHECKING

from sdsstools.time import get_sjd

from gort import config
from gort.exceptions import GortSpecError
from gort.gort import GortDevice, GortDeviceSet
from gort.tools import move_mask_interval, tqdm_timer
from gort.tools import is_notebook, move_mask_interval


if TYPE_CHECKING:
from tqdm import tqdm as tqdm_type

from gort.core import ActorReply
from gort.gort import GortClient


__all__ = ["Spectrograph", "SpectrographSet", "READOUT_TIME"]
__all__ = ["Spectrograph", "SpectrographSet", "Exposure", "READOUT_TIME"]


READOUT_TIME = 51


class Exposure(asyncio.Future["Exposure"]):
"""A class representing an exposure taken by a `.SpectrographSet`.
Parameters
----------
exp_no
The exposure sequence number.
spec_set
The `.SpectrographSet` commanding this exposure.
"""

def __init__(self, exp_no: int, spec_set: SpectrographSet):
self.spec_set = spec_set
self.exp_no = exp_no

self.error: bool = False
self.reading: bool = False

self._timer_task: asyncio.Task | None = None
self._tqdm: tqdm_type | None = None

super().__init__()

def __repr__(self):
return (
f"<Exposure (exp_no={self.exp_no}, error={self.error}, "
f"reading={self.reading}, done={self.done()})>"
)

async def expose(
self,
exposure_time: float | None = None,
header: str | None = None,
async_readout: bool = False,
show_progress: bool = False,
**kwargs,
):
"""Exposes the spectrograph.
Parameters
----------
exposure_time
The exposure time.
header
The header JSON string to pass to the ``lvmscp expose`` command.
async_readout
Returns after integration completes. Readout is initiated
but handled asynchronously and can be await by awaiting
the returned `.Exposure` object.
show_progress
Displays a progress bar with the elapsed exposure time.
kwargs
Keyword arguments to pass to ``lvmscp expose``.
"""

if show_progress and exposure_time:
await self.start_timer(exposure_time)

if (
exposure_time is None
and kwargs.get("flavour", "object") != "bias"
and not kwargs.get("bias", False)
):
raise GortSpecError(
"Exposure time required for all flavours except bias.",
error_code=3,
)

warnings.filterwarnings("ignore", message=".*cannot modify a done command.*")

self.spec_set.last_exposure = self

try:
await self.spec_set._send_command_all(
"expose",
exposure_time=exposure_time,
seqno=self.exp_no,
header=(header or "{}"),
async_readout=async_readout,
**kwargs,
)

self.reading = True

# Now launch the task that marks the Future done when the spec
# is IDLE. If async_readout=False then that will return immediately
# because the spec is already idle. If async_readout=True, this method
# will return now and the task will mark the Future done when readout
# complete (readout is ongoing and does not need to be launched).
monitor_task = asyncio.create_task(self._done_monitor())
if not async_readout:
await monitor_task
else:
self.spec_set.write_to_log("Returning with async readout ongoing.")

except Exception as err:
await self.stop_timer()
self.error = True
raise GortSpecError(f"Exposure failed with error {err}", error_code=301)

return self

async def start_timer(
self,
exposure_time: float,
readout_time: float | None = READOUT_TIME,
):
"""Starts the tqdm timer."""

if is_notebook():
from tqdm.notebook import tqdm
else:
from tqdm import tqdm

async def update_timer(max_time: int):
while True:
if self._tqdm is None:
return
if self._tqdm.n >= max_time:
return
await asyncio.sleep(1)
self._tqdm.update()

def done_timer(*_):
if self._tqdm:
self._tqdm.close()
self._tqdm = None
self._timer_task = None

total_time = int(exposure_time + (readout_time or 0.0))
bar_format = "{l_bar}{bar}| {n_fmt}/{total_fmt}s"

self._tqdm = tqdm(total=total_time, bar_format=bar_format)
self._tqdm.refresh()

self._timer_task = asyncio.create_task(update_timer(total_time))
self._timer_task.add_done_callback(done_timer)

return self._timer_task

async def stop_timer(self):
"""Cancels the timer."""

if self._tqdm:
# sys.stdout = sys.__stdout__
# sys.stderr = sys.__stderr__
self._tqdm.disable = True
self._tqdm.close()
self._tqdm = None

if self._timer_task and not self._timer_task.done():
self._timer_task.cancel()
with suppress(asyncio.CancelledError):
await self._timer_task
self._timer_task = None

def get_files(self):
"""Returns the files written by the exposure."""

sjd = get_sjd()
data_path = pathlib.Path(config["specs"]["data_path"].format(SJD=sjd))

return list(data_path.glob(f"*-[0]*{self.exp_no}.fits.gz"))

async def _done_monitor(self):
"""Waits until the spectrographs are idle, and marks the Future done."""

await self.spec_set._send_command_all("wait_until_idle", allow_errored=True)

for spec in self.spec_set.values():
reply = await spec.status()
if "ERROR" in reply["status_names"]:
self.error = True

self.reading = False

# Set the Future.
self.set_result(self)

# Reset this warning.
warnings.filterwarnings("default", message=".*cannot modify a done command.*")


class Spectrograph(GortDevice):
"""Class representing an LVM spectrograph functionality."""

Expand All @@ -41,19 +232,28 @@ async def status(self):
"""Retrieves the status of the telescope."""

reply: ActorReply = await self.actor.commands.status()
return reply.flatten()
flatten_reply = reply.flatten()

return flatten_reply.get("status", {})

async def is_idle(self):
"""Returns `True` if the spectrograph is idle and ready to expose."""

status = await self.status()
try:
if "IDLE" in status["status"]["status_names"]:
return True
except Exception:
pass
names = status["status_names"]
return "IDLE" in names and "READOUT_PENDING" not in names

async def is_exposing(self):
"""Returns `True` if the spectrograph is exposing."""

status = await self.status()
return "EXPOSING" in status["status_names"]

return False
async def is_reading(self):
"""Returns `True` if the spectrograph is idle and ready to expose."""

status = await self.status()
return "READING" in status["status_names"]

async def expose(self, **kwargs):
"""Exposes the spectrograph."""
Expand All @@ -74,6 +274,19 @@ class SpectrographSet(GortDeviceSet[Spectrograph]):

__DEVICE_CLASS__ = Spectrograph

def __init__(self, gort: GortClient, data: dict[str, dict], **kwargs):
super().__init__(gort, data, **kwargs)

self.last_exposure: Exposure | None = None

async def status(self) -> dict[str, dict]:
"""Collects the status of each spectrograph."""

names = list(self)
statuses = await self.call_device_method(Spectrograph.status)

return dict(zip(names, statuses))

def get_seqno(self):
"""Returns the next exposure sequence number."""

Expand All @@ -87,24 +300,30 @@ def get_seqno(self):
async def are_idle(self):
"""Returns `True` if all the spectrographs are idle and ready to expose."""

result = await asyncio.gather(*[spec.is_idle() for spec in self.values()])

return all(result)
return all(await self.call_device_method(Spectrograph.is_idle))

async def expose(
self,
exposure_time: float | None = None,
tile_data: dict | None = None,
show_progress: bool = False,
async_readout: bool = False,
**kwargs,
):
"""Exposes the spectrographs.
Parameters
----------
exposure_time
The exposure time.
tile_data
Tile data to add to the headers.
show_progress
Displays a progress bar with the elapsed exposure time.
async_readout
Returns after integration completes. Readout is initiated
but handled asynchronously and can be await by awaiting
the returned `.Exposure` object.
kwargs
Keyword arguments to pass to ``lvmscp expose``.
Expand All @@ -115,56 +334,46 @@ async def expose(
"""

if self.last_exposure is not None and not self.last_exposure.done():
self.write_to_log("Waiting for previous exposure to read out.", "warning")
await self.last_exposure

if not (await self.are_idle()):
raise GortSpecError(
"Spectrographs are not idle. Cannot expose.",
error_code=302,
)

count: int = kwargs.pop("count", 1)
if "count" in kwargs:
raise GortSpecError(
"Count cannot be used here. Use a loop instead.",
error_code=3,
)

exposure_time = kwargs.pop("exposure_time", 10)
if kwargs.get("bias", False) or kwargs.get("flavour", "object") == "bias":
exposure_time = 0.0

exp_nos: list[int] = []
seqno = self.get_seqno()
self.write_to_log(f"Taking spectrograph exposure {seqno}.", level="info")

for _ in range(count):
seqno = self.get_seqno()
self.write_to_log(f"Taking spectrograph exposure {seqno}.", level="info")
await self.reset()

await self.reset()
if tile_data is not None:
header = json.dumps(tile_data)
else:
header = None

if tile_data is not None:
header = json.dumps(tile_data)
else:
header = None
exposure = Exposure(seqno, self)
await exposure.expose(
exposure_time=exposure_time,
header=header,
async_readout=async_readout,
show_progress=show_progress,
**kwargs,
)

if show_progress:
timer = tqdm_timer(float(exposure_time) + READOUT_TIME)
else:
timer = None

try:
await self._send_command_all(
"expose",
exposure_time=exposure_time,
seqno=seqno,
header=header,
**kwargs,
)
exp_nos.append(seqno)
except Exception as err:
with suppress(asyncio.CancelledError):
if timer:
await timer

raise GortSpecError(
f"Exposure failed with error {err}",
error_code=301,
)

return exp_nos
return exposure

async def reset(self):
"""Reset the spectrographs."""
Expand Down
Loading

0 comments on commit fa6591e

Please sign in to comment.