Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #7153: raydata enable/disable auto processing of scans from the GUI #7199

Merged
merged 18 commits into from
Aug 7, 2024
35 changes: 35 additions & 0 deletions sirepo/package_data/static/js/raydata.js
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ SIREPO.app.directive('scansTable', function() {
}

function init() {
getAutomaticAnalysis();
setColumnHeaders();
if (scanService.cachedScans($scope.analysisStatus)) {
loadScans(scanService.cachedScans($scope.analysisStatus));
Expand Down Expand Up @@ -641,6 +642,39 @@ SIREPO.app.directive('scansTable', function() {
scanRequestInterval = $interval(doRequest, 5000);
}

function getAutomaticAnalysis() {
requestSender.sendStatelessCompute(
appState,
json => {
appState.models.runAnalysis.automaticAnalysis = json.data.automaticAnalysis ? 1 : 0;
appState.saveChanges('runAnalysis');
},
{
method: 'get_automatic_analysis',
args: {
catalogName: appState.applicationState().catalog.catalogName
}
},
errorOptions
);
}


function setAutomaticAnalysis() {
requestSender.sendStatelessCompute(
appState,
json => {
},
{
method: 'set_automatic_analysis',
args: {
automaticAnalysis: appState.models.runAnalysis.automaticAnalysis,
catalogName: appState.applicationState().catalog.catalogName }
},
errorOptions
);
}

function setColumnHeaders() {
$scope.columnHeaders = [
...columnsService.defaultColumns($scope.analysisStatus, appState),
Expand Down Expand Up @@ -842,6 +876,7 @@ SIREPO.app.directive('scansTable', function() {
};

$scope.$on(`${$scope.modelName}.changed`, () => sendScanRequest(true, true));
$scope.$on('runAnalysis.changed', setAutomaticAnalysis);
$scope.$on('catalog.changed', () => sendScanRequest(true, true));
$scope.$on('metadataColumns.changed', () => {
sendScanRequest(true);
Expand Down
4 changes: 3 additions & 1 deletion sirepo/package_data/static/json/raydata-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"searchStopTime":["Stop", "DateTimePicker", null],
"searchText": ["Search Text", "OptionalString", "", "Search across all text columns by term.<br /><br />Phrases can be quoted, ex. \"canted mode\".<br /><br />Negations can be added to the search text by adding a minus (-) sign to the word, however negations must be included with another search term, ex. csx -flatfield.<br /><br />ex. csx \"canted mode\" pinhole -flatfield<br /><br />Use the individual search fields below to search for a specific field value. Wildcards (*) may be used with individual field searches, ex. <b>owner</b> xf*"],
"pageSize": ["Page Size", "Integer", 15],
"automaticAnalysis": ["Automatically Run Analysis", "Boolean", "0"],
"scans": ["", "RunAnalysisTable", ""],
"confirmRunAnalysis": ["Hide this message for this session", "Boolean", "0"],
"searchTerms": ["", "SearchTerms", []]
Expand Down Expand Up @@ -118,7 +119,8 @@
"searchTerms"
],
"advanced": [
"pageSize"
"pageSize",
"automaticAnalysis"
]
},
"simulation": {
Expand Down
61 changes: 49 additions & 12 deletions sirepo/raydata/scan_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,19 @@
#: task(s) monitoring the execution of the analysis process
_ANALYSIS_PROCESSOR_TASKS = None

#: task(s) monitoring _CHANGE_AUTOMATIC_ANALYSIS
_AUTOMATIC_ANALYSIS_HANDLER_TASKS = []

#: task(s) monitoring catalogs for new scans
_CATALOG_MONITOR_TASKS = PKDict()

#: sentinels for whether automatic analysis needs to change
_CHANGE_AUTOMATIC_ANALYSIS = {}

# TODO(e-carlin): tune this number
_MAX_NUM_SCANS = 1000


# Fields that come from the top-level of metadata (as opposed to start document).
# Must match key name from _default_columns()
_METADATA_COLUMNS = {"start", "stop", "suid"}
Expand Down Expand Up @@ -434,6 +441,13 @@ def _all_pdfs(rduids):
).raise_for_status()
return PKDict()

def _request_get_automatic_analysis(self, req_data):
return PKDict(
data=PKDict(
automaticAnalysis=req_data.catalogName in _CATALOG_MONITOR_TASKS
)
)

def _request_get_scans(self, req_data):
s = 1
if req_data.analysisStatus == "allStatuses":
Expand Down Expand Up @@ -483,7 +497,7 @@ def _request_run_analysis(self, req_data):
def _request_run_engine_event_callback(self, req_data):
# Start as a task. No need to hold request until task is
# completed because the caller does nothing with the response.
asyncio.create_task(
pkasyncio.create_task(
sirepo.raydata.adaptive_workflow.run_engine_event_callback(req_data)
)
return PKDict()
Expand All @@ -497,6 +511,13 @@ def _request_scan_fields(self, req_data):
)
)

def _request_set_automatic_analysis(self, req_data):
if bool(int(req_data.automaticAnalysis)) != (
req_data.catalogName in _CATALOG_MONITOR_TASKS
):
_CHANGE_AUTOMATIC_ANALYSIS[req_data.catalogName] = True
return PKDict(data="ok")

def _sr_authenticate(self, token):
if (
token
Expand Down Expand Up @@ -558,7 +579,7 @@ async def _process_analysis_queue():

assert not _ANALYSIS_PROCESSOR_TASKS
_ANALYSIS_PROCESSOR_TASKS = [
asyncio.create_task(_process_analysis_queue())
pkasyncio.create_task(_process_analysis_queue())
] * cfg.concurrent_analyses
await asyncio.gather(*_ANALYSIS_PROCESSOR_TASKS)

Expand Down Expand Up @@ -592,15 +613,32 @@ def _get_detailed_status(catalog_name, rduid):


async def _init_catalog_monitors():
def _monitor_catalog(catalog_name):
assert catalog_name not in _CATALOG_MONITOR_TASKS
return asyncio.create_task(_poll_catalog_for_scans(catalog_name))

if not cfg.automatic_analysis:
return
for c in cfg.catalog_names:
_CATALOG_MONITOR_TASKS[c] = _monitor_catalog(c)
await asyncio.gather(*_CATALOG_MONITOR_TASKS.values())
_AUTOMATIC_ANALYSIS_HANDLER_TASKS.append(
pkasyncio.create_task(_handle_automatic_analysis(c))
)
rorour marked this conversation as resolved.
Show resolved Hide resolved
if cfg.automatic_analysis:
for c in cfg.catalog_names:
_CHANGE_AUTOMATIC_ANALYSIS[c] = True


async def _monitor_catalog(catalog_name):
assert catalog_name not in _CATALOG_MONITOR_TASKS
t = pkasyncio.create_task(_poll_catalog_for_scans(catalog_name))
_CATALOG_MONITOR_TASKS[catalog_name] = t
await t


async def _handle_automatic_analysis(catalog_name):
while True:
if _CHANGE_AUTOMATIC_ANALYSIS.get(catalog_name):
_CHANGE_AUTOMATIC_ANALYSIS[catalog_name] = False
if catalog_name in _CATALOG_MONITOR_TASKS:
_CATALOG_MONITOR_TASKS.pkdel(catalog_name)
else:
await _monitor_catalog(catalog_name)
else:
await pkasyncio.sleep(2)


# TODO(e-carlin): Rather than polling for scans we should explore using RunEngine.subscribe
Expand Down Expand Up @@ -632,7 +670,7 @@ def _collect_new_scans_and_queue(last_known_scan_metadata):

async def _poll_for_new_scans(most_recent_scan_metadata):
m = most_recent_scan_metadata
while True:
while not _CHANGE_AUTOMATIC_ANALYSIS.get(catalog_name):
m = _collect_new_scans_and_queue(m)
await pkasyncio.sleep(2)

Expand All @@ -648,7 +686,6 @@ async def _poll_for_new_scans(most_recent_scan_metadata):
if not _Analysis.have_analyzed_scan(s):
_queue_for_analysis(s)
await _poll_for_new_scans(s)
raise AssertionError("should never get here")


def _queue_for_analysis(scan_metadata):
Expand Down
8 changes: 8 additions & 0 deletions sirepo/template/raydata.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ def stateless_compute_download_analysis_pdfs(data, data_file_uri=None, **kwargs)
return _request_scan_monitor(PKDict(method="download_analysis_pdfs", data=data))


def stateless_compute_get_automatic_analysis(data, **kwargs):
return _request_scan_monitor(PKDict(method="get_automatic_analysis", data=data))


def stateless_compute_reorder_scan(data, **kwargs):
return _request_scan_monitor(PKDict(method="reorder_scan", data=data))

Expand All @@ -78,6 +82,10 @@ def stateless_compute_scan_fields(data, **kwargs):
return _request_scan_monitor(PKDict(method="scan_fields", data=data))


def stateless_compute_set_automatic_analysis(data, **kwargs):
return _request_scan_monitor(PKDict(method="set_automatic_analysis", data=data))


def _request_scan_monitor(data):
c = sirepo.feature_config.for_sim_type(SIM_TYPE)
try:
Expand Down