Skip to content

Commit

Permalink
Fix radiasoft/raydata#164: handle chx scans without cycle in start doc (
Browse files Browse the repository at this point in the history
#7260)

Create the concept of "elegible scan" that tests whether the fields
we need are present.

Also cleanup some cruft.
  • Loading branch information
e-carlin authored Sep 18, 2024
1 parent 2ae80ce commit 94851a3
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 62 deletions.
12 changes: 6 additions & 6 deletions sirepo/package_data/static/js/raydata.js
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ SIREPO.app.directive('scanDetail', function() {
<div data-ng-if="analysisElapsedTime()"><strong>Analysis Elapsed Time:</strong> {{ analysisElapsedTime() }} seconds</div>
<div>
<div><strong>Current Status: </strong>{{ scan.status }}</div>
<div data-ng-if="! isEmptyObject(latestDetailedStatus)">
<div data-ng-if="latestDetailedStatus">
<strong>Detailed Status:</strong>
<ul>
<li data-ng-repeat="(stepName, stepInfo) in latestDetailedStatus">
Expand All @@ -1146,17 +1146,17 @@ SIREPO.app.directive('scanDetail', function() {
$scope.latestDetailedStatus = null;

function setLatestDetailedStatus() {
$scope.latestDetailedStatus = $scope.scan.detailed_status[Math.max(Object.keys($scope.scan.detailed_status))];
$scope.latestDetailedStatus = null;
if ($scope.scan.detailed_status) {
$scope.latestDetailedStatus = $scope.scan.detailed_status[Math.max(Object.keys($scope.scan.detailed_status))];
}

}

$scope.analysisElapsedTime = () => {
return $scope.scan && $scope.scan.analysis_elapsed_time ? $scope.scan.analysis_elapsed_time : null;
};

$scope.isEmptyObject = (obj) => {
return $.isEmptyObject(obj);
};

$scope.parseTime = (unixTime) => {
return (new Date(unixTime * 1000)).toString();
};
Expand Down
9 changes: 9 additions & 0 deletions sirepo/raydata/analysis_driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def get_analysis_pdf_paths(self):
def get_conda_env(self):
raise NotImplementedError("children must implement this method")

def get_detailed_status_file(*args, **kwargs):
return None

def get_notebooks(self, *args, **kwargs):
raise NotImplementedError("children must implement this method")

Expand Down Expand Up @@ -95,6 +98,11 @@ def get_run_log(self):
def has_analysis_pdfs(self):
return len(self.get_analysis_pdf_paths()) > 0

# TODO(e-carlin): There should be a databroker class for each
# beamline and this question should be answered by it.
def is_scan_elegible_for_analysis(self):
return True

def render_papermill_script(self, input_f, output_f):
p = self.get_output_dir().join(_PAPERMILL_SCRIPT)
pkjinja.render_resource(
Expand All @@ -116,6 +124,7 @@ def _get_papermill_args(self, *args, **kwargs):
return []


# TODO(e-carlin): support just passing catalog_name and rduid outsidef of PKDict
def get(incoming):
def _verify_rduid(rduid):
# rduid will be combined with paths throughout the application.
Expand Down
7 changes: 5 additions & 2 deletions sirepo/raydata/analysis_driver/chx.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ class CHX(sirepo.raydata.analysis_driver.AnalysisDriverBase):
def get_conda_env(self):
return _cfg.conda_env

def get_detailed_status_file(self, rduid):
def get_detailed_status_file(self, rduid, *args, **kwargs):
p = self.get_output_dir().join(f"progress_dict_{rduid}.json")
if not p.check():
return PKDict()
return None
d = pkjson.load_any(p)
# The notebooks do json.dump(json.dumps(progress_dict), outfile)
# which double encodes the json object. So, we may
Expand Down Expand Up @@ -56,6 +56,9 @@ def get_output_dir(self):
self.rduid,
)

def is_scan_elegible_for_analysis(self):
return bool(self._scan_metadata.get_start_field("cycle", unchecked=True))

def _get_papermill_args(self, *args, **kwargs):
return [
# Cycle can look like 2024_2 which is converted to int by papermill unless raw_param=True
Expand Down
98 changes: 44 additions & 54 deletions sirepo/raydata/scan_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,20 +186,15 @@ def set_scan_status(cls, analysis_driver, status, analysis_elapsed_time=None):
r.save()

@classmethod
def statuses_for_scans(cls, catalog_name, rduids):
return (
cls.session.query(cls.rduid, cls.status)
.filter(cls.catalog_name == catalog_name, cls.rduid.in_(rduids))
.all()
)

@classmethod
def analysis_elapsed_time_for_scans(cls, catalog_name, rduids):
return (
cls.session.query(cls.rduid, cls.analysis_elapsed_time)
.filter(cls.catalog_name == catalog_name, cls.rduid.in_(rduids))
.all()
def status_and_elapsed_time(cls, catalog_name, rduid):
r = (
cls.session.query(cls.status, cls.analysis_elapsed_time)
.filter(cls.rduid == rduid)
.one_or_none()
)
if r:
return PKDict(status=r[0], analysis_elapsed_time=r[1])
return PKDict(status=None, analysis_elapsed_time=None)

@classmethod
def _db_upgrade(cls):
Expand Down Expand Up @@ -317,7 +312,7 @@ def _build_search_text(self, text):
},
],
}
elif len(nums):
if len(nums):
return {
"scan_id": {"$in": nums},
}
Expand Down Expand Up @@ -358,32 +353,27 @@ def _search_params(req_data):
)
/ req_data.pageSize
)
l = [
PKDict(rduid=u)
for u in c.search(
_search_params(req_data),
sort=_sort_params(req_data),
limit=req_data.pageSize,
skip=req_data.pageNumber * req_data.pageSize,
)
]
d = PKDict(
_Analysis.statuses_for_scans(
catalog_name=req_data.catalogName, rduids=[s.rduid for s in l]
)
)

e = PKDict(
_Analysis.analysis_elapsed_time_for_scans(
catalog_name=req_data.catalogName, rduids=[s.rduid for s in l]
)
)

for s in l:
s.status = d.get(s.rduid, _AnalysisStatus.NONE)
s.analysis_elapsed_time = e.get(s.rduid, None)
s.detailed_status = _get_detailed_status(req_data.catalogName, s.rduid)
return l, pc
res = []
for u in c.search(
_search_params(req_data),
sort=_sort_params(req_data),
limit=req_data.pageSize,
skip=req_data.pageNumber * req_data.pageSize,
):
# Code after this (ex detailed_status) expects that the
# scan is valid (ex 'cycle' exists in start doc for chx).
# So, don't even show scans to users that aren't elegible.
if sirepo.raydata.analysis_driver.get(
PKDict(catalog_name=req_data.catalogName, rduid=u)
).is_scan_elegible_for_analysis():
res.append(
PKDict(
rduid=u,
detailed_status=_get_detailed_status(req_data.catalogName, u),
**_Analysis.status_and_elapsed_time(req_data.catalogName, u),
)
)
return res, pc

def _request_analysis_output(self, req_data):
return sirepo.raydata.analysis_driver.get(req_data).get_output()
Expand Down Expand Up @@ -602,13 +592,9 @@ def _default_columns(catalog_name):


def _get_detailed_status(catalog_name, rduid):
d = sirepo.raydata.analysis_driver.get(
return sirepo.raydata.analysis_driver.get(
PKDict(catalog_name=catalog_name, rduid=rduid)
)
if hasattr(d, "get_detailed_status_file"):
return d.get_detailed_status_file(rduid)
else:
return None
).get_detailed_status_file(rduid)


async def _init_catalog_monitors():
Expand All @@ -629,7 +615,6 @@ def _monitor_catalog(catalog_name):
# new documents are available.
# But, for now it is easiest to just poll
async def _poll_catalog_for_scans(catalog_name):
# TODO(e-carlin): need to test polling feature
def _collect_new_scans_and_queue(last_known_scan_metadata):
r = [
sirepo.raydata.databroker.get_metadata(s, catalog_name)
Expand Down Expand Up @@ -663,14 +648,17 @@ async def _poll_for_new_scans():
s = _collect_new_scans_and_queue(s)
await pkasyncio.sleep(2)

async def _wait_for_catalog():
while True:
try:
sirepo.raydata.databroker.catalog(catalog_name)
return
except KeyError:
pkdlog(f"no catalog_name={catalog_name}. Retrying...")
await pkasyncio.sleep(15)

pkdlog("catalog_name={}", catalog_name)
c = None
while not c:
try:
c = sirepo.raydata.databroker.catalog(catalog_name)
except KeyError:
pkdlog(f"no catalog_name={catalog_name}. Retrying...")
await pkasyncio.sleep(15)
await _wait_for_catalog()
await _poll_for_new_scans()
raise AssertionError("should never get here")

Expand All @@ -680,6 +668,8 @@ def _queue_for_analysis(scan_metadata):
rduid=scan_metadata.rduid,
catalog_name=scan_metadata.catalog_name,
)
if not sirepo.raydata.analysis_driver.get(s).is_scan_elegible_for_analysis():
return
pkdlog("scan={}", s)
if s not in _SCANS_AWAITING_ANALYSIS:
pkio.unchecked_remove(sirepo.raydata.analysis_driver.get(s).get_output_dir())
Expand Down

0 comments on commit 94851a3

Please sign in to comment.