Skip to content

Commit

Permalink
workflow-state: order status query by submit_num.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Oct 13, 2022
1 parent 75a8197 commit 63a7fa6
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 32 deletions.
62 changes: 35 additions & 27 deletions cylc/flow/dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import sqlite3
import sys
from textwrap import dedent

from cylc.flow.pathutil import expand_path
from cylc.flow.rundb import CylcWorkflowDAO
Expand All @@ -31,7 +32,7 @@


class CylcWorkflowDBChecker:
"""Object for querying a workflow database"""
"""Object for querying a workflow database."""
STATE_ALIASES = {
'finish': [
TASK_STATUS_FAILED,
Expand Down Expand Up @@ -70,33 +71,33 @@ def display_maps(res):
sys.stderr.write("INFO: No results to display.\n")
else:
for row in res:
sys.stdout.write((", ").join(row) + "\n")
sys.stdout.write((", ").join([str(s) for s in row]) + "\n")

def get_remote_point_format(self):
"""Query a remote workflow database for a 'cycle point format' entry"""
for row in self.conn.execute(
"""Query a remote workflow database for a 'cycle point format' entry."""
for row in self.conn.execute(dedent(
rf'''
SELECT
value
FROM
{CylcWorkflowDAO.TABLE_WORKFLOW_PARAMS}
WHERE
key==?
''', # nosec (table name is code constant)
'''), # nosec (table name is code constant)
['cycle_point_format']
):
return row[0]

def state_lookup(self, state):
"""allows for multiple states to be searched via a status alias"""
"""Allows for multiple states to be searched via a status alias."""
if state in self.STATE_ALIASES:
return self.STATE_ALIASES[state]
else:
return [state]

def workflow_state_query(
self, task, cycle, status=None, message=None, mask=None):
"""run a query on the workflow database"""
"""Run a query on the workflow database."""
stmt_args = []
stmt_wheres = []

Expand All @@ -109,12 +110,12 @@ def workflow_state_query(
else:
target_table = CylcWorkflowDAO.TABLE_TASK_STATES

stmt = rf'''
stmt = dedent(rf'''
SELECT
{mask}
FROM
{target_table}
''' # nosec
''') # nosec
# * mask is hardcoded
# * target_table is a code constant
if task is not None:
Expand All @@ -131,30 +132,37 @@ def workflow_state_query(
stmt_frags.append("status==?")
stmt_wheres.append("(" + (" OR ").join(stmt_frags) + ")")
if stmt_wheres:
stmt += " where " + (" AND ").join(stmt_wheres)

stmt += "WHERE\n " + (" AND ").join(stmt_wheres)
if status:
stmt += dedent("""
ORDER BY
submit_num
""")
res = []
for row in self.conn.execute(stmt, stmt_args):
if not all(v is None for v in row):
res.append(list(row))

return res

def task_state_getter(self, task, cycle):
"""used to get the state of a particular task at a particular cycle"""
return self.workflow_state_query(task, cycle, mask="status")[0]

def task_state_met(self, task, cycle, status=None, message=None):
"""used to check if a task is in a particular state"""
res = self.workflow_state_query(task, cycle, status, message)
if status:
return bool(res)
elif message:
return any(
message == value
for outputs_str, in res
for value in json.loads(outputs_str)
)
def task_state_met(self, task, cycle, status):
"""Check if the latest flow instance of a task is in a given state."""
# retrieve all flow-instances of cycle/task
res = self.workflow_state_query(task, cycle)
if res:
# only consider the latest isntance
return (res[-1])[2] == status
return False

def task_output_met(self, task, cycle, message):
"""Check if latest flow instance of a task has emitted a message."""
# TODO - NEED TO ADD submit_num TO THE task_outputs TABLE SO WE CAN
# TELL WHICH IS THE LATEST FLOW-INSTANCE (like state_met above).
res = self.workflow_state_query(task, cycle, message=message)
return any(
message == value
for outputs_str, in res
for value in json.loads(outputs_str)
)

@staticmethod
def validate_mask(mask):
Expand Down
9 changes: 6 additions & 3 deletions cylc/flow/scripts/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,12 @@ def connect(self):

async def check(self):
"""Return True if desired workflow state achieved, else False"""
return self.checker.task_state_met(
self.args['task'], self.args['cycle'],
self.args['status'], self.args['message'])
if self.args['status']:
return self.checker.task_state_met(
self.args['task'], self.args['cycle'], self.args['status'])
elif self.args['message']:
return self.checker.task_output_met(
self.args['task'], self.args['cycle'], self.args['message'])


def get_option_parser() -> COP:
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ def workflow_state(
my_parser = TimePointParser()
point = str(my_parser.parse(point, dump_format=fmt))
if message is not None:
satisfied = checker.task_state_met(task, point, message=message)
satisfied = checker.task_output_met(task, point, message)
else:
satisfied = checker.task_state_met(task, point, status=status)
satisfied = checker.task_state_met(task, point, status)
results = {
'workflow': workflow,
'task': task,
Expand Down

0 comments on commit 63a7fa6

Please sign in to comment.