diff --git a/changes.d/5660.fix.md b/changes.d/5660.fix.md
new file mode 100644
index 00000000000..65cdea538a9
--- /dev/null
+++ b/changes.d/5660.fix.md
@@ -0,0 +1 @@
+Re-worked graph n-window algorithm for better efficiency.
diff --git a/cylc/flow/data_messages.proto b/cylc/flow/data_messages.proto
index cc56fba2e62..6068bb1c5df 100644
--- a/cylc/flow/data_messages.proto
+++ b/cylc/flow/data_messages.proto
@@ -105,6 +105,7 @@ message PbWorkflow {
optional bool pruned = 37;
optional int32 is_runahead_total = 38;
optional bool states_updated = 39;
+ optional int32 n_edge_distance = 40;
}
// Selected runtime fields
@@ -227,6 +228,7 @@ message PbTaskProxy {
optional bool is_runahead = 26;
optional bool flow_wait = 27;
optional PbRuntime runtime = 28;
+ optional int32 graph_depth = 29;
}
message PbFamily {
@@ -264,6 +266,7 @@ message PbFamilyProxy {
optional bool is_runahead = 19;
optional int32 is_runahead_total = 20;
optional PbRuntime runtime = 21;
+ optional int32 graph_depth = 22;
}
message PbEdge {
diff --git a/cylc/flow/data_messages_pb2.py b/cylc/flow/data_messages_pb2.py
index 1bf44b36dae..9894ea55ebc 100644
--- a/cylc/flow/data_messages_pb2.py
+++ b/cylc/flow/data_messages_pb2.py
@@ -1,4 +1,3 @@
-# type: ignore
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: data_messages.proto
@@ -14,25 +13,25 @@
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13\x64\x61ta_messages.proto\"\x96\x01\n\x06PbMeta\x12\x12\n\x05title\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x18\n\x0b\x64\x65scription\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x10\n\x03URL\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x19\n\x0cuser_defined\x18\x04 \x01(\tH\x03\x88\x01\x01\x42\x08\n\x06_titleB\x0e\n\x0c_descriptionB\x06\n\x04_URLB\x0f\n\r_user_defined\"\xaa\x01\n\nPbTimeZone\x12\x12\n\x05hours\x18\x01 \x01(\x05H\x00\x88\x01\x01\x12\x14\n\x07minutes\x18\x02 \x01(\x05H\x01\x88\x01\x01\x12\x19\n\x0cstring_basic\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x1c\n\x0fstring_extended\x18\x04 \x01(\tH\x03\x88\x01\x01\x42\x08\n\x06_hoursB\n\n\x08_minutesB\x0f\n\r_string_basicB\x12\n\x10_string_extended\"\'\n\x0fPbTaskProxyRefs\x12\x14\n\x0ctask_proxies\x18\x01 \x03(\t\"\xa2\x0c\n\nPbWorkflow\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x11\n\x04name\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x13\n\x06status\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x11\n\x04host\x18\x05 \x01(\tH\x04\x88\x01\x01\x12\x11\n\x04port\x18\x06 \x01(\x05H\x05\x88\x01\x01\x12\x12\n\x05owner\x18\x07 \x01(\tH\x06\x88\x01\x01\x12\r\n\x05tasks\x18\x08 \x03(\t\x12\x10\n\x08\x66\x61milies\x18\t \x03(\t\x12\x1c\n\x05\x65\x64ges\x18\n \x01(\x0b\x32\x08.PbEdgesH\x07\x88\x01\x01\x12\x18\n\x0b\x61pi_version\x18\x0b \x01(\x05H\x08\x88\x01\x01\x12\x19\n\x0c\x63ylc_version\x18\x0c \x01(\tH\t\x88\x01\x01\x12\x19\n\x0clast_updated\x18\r \x01(\x01H\n\x88\x01\x01\x12\x1a\n\x04meta\x18\x0e \x01(\x0b\x32\x07.PbMetaH\x0b\x88\x01\x01\x12&\n\x19newest_active_cycle_point\x18\x10 \x01(\tH\x0c\x88\x01\x01\x12&\n\x19oldest_active_cycle_point\x18\x11 \x01(\tH\r\x88\x01\x01\x12\x15\n\x08reloaded\x18\x12 \x01(\x08H\x0e\x88\x01\x01\x12\x15\n\x08run_mode\x18\x13 \x01(\tH\x0f\x88\x01\x01\x12\x19\n\x0c\x63ycling_mode\x18\x14 \x01(\tH\x10\x88\x01\x01\x12\x32\n\x0cstate_totals\x18\x15 \x03(\x0b\x32\x1c.PbWorkflow.StateTotalsEntry\x12\x1d\n\x10workflow_log_dir\x18\x16 \x01(\tH\x11\x88\x01\x01\x12(\n\x0etime_zone_info\x18\x17 \x01(\x0b\x32\x0b.PbTimeZoneH\x12\x88\x01\x01\x12\x17\n\ntree_depth\x18\x18 \x01(\x05H\x13\x88\x01\x01\x12\x15\n\rjob_log_names\x18\x19 \x03(\t\x12\x14\n\x0cns_def_order\x18\x1a \x03(\t\x12\x0e\n\x06states\x18\x1b \x03(\t\x12\x14\n\x0ctask_proxies\x18\x1c \x03(\t\x12\x16\n\x0e\x66\x61mily_proxies\x18\x1d \x03(\t\x12\x17\n\nstatus_msg\x18\x1e \x01(\tH\x14\x88\x01\x01\x12\x1a\n\ris_held_total\x18\x1f \x01(\x05H\x15\x88\x01\x01\x12\x0c\n\x04jobs\x18 \x03(\t\x12\x15\n\x08pub_port\x18! \x01(\x05H\x16\x88\x01\x01\x12\x17\n\nbroadcasts\x18\" \x01(\tH\x17\x88\x01\x01\x12\x1c\n\x0fis_queued_total\x18# \x01(\x05H\x18\x88\x01\x01\x12=\n\x12latest_state_tasks\x18$ \x03(\x0b\x32!.PbWorkflow.LatestStateTasksEntry\x12\x13\n\x06pruned\x18% \x01(\x08H\x19\x88\x01\x01\x12\x1e\n\x11is_runahead_total\x18& \x01(\x05H\x1a\x88\x01\x01\x12\x1b\n\x0estates_updated\x18\' \x01(\x08H\x1b\x88\x01\x01\x1a\x32\n\x10StateTotalsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x05:\x02\x38\x01\x1aI\n\x15LatestStateTasksEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x1f\n\x05value\x18\x02 \x01(\x0b\x32\x10.PbTaskProxyRefs:\x02\x38\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\x07\n\x05_nameB\t\n\x07_statusB\x07\n\x05_hostB\x07\n\x05_portB\x08\n\x06_ownerB\x08\n\x06_edgesB\x0e\n\x0c_api_versionB\x0f\n\r_cylc_versionB\x0f\n\r_last_updatedB\x07\n\x05_metaB\x1c\n\x1a_newest_active_cycle_pointB\x1c\n\x1a_oldest_active_cycle_pointB\x0b\n\t_reloadedB\x0b\n\t_run_modeB\x0f\n\r_cycling_modeB\x13\n\x11_workflow_log_dirB\x11\n\x0f_time_zone_infoB\r\n\x0b_tree_depthB\r\n\x0b_status_msgB\x10\n\x0e_is_held_totalB\x0b\n\t_pub_portB\r\n\x0b_broadcastsB\x12\n\x10_is_queued_totalB\t\n\x07_prunedB\x14\n\x12_is_runahead_totalB\x11\n\x0f_states_updated\"\xb9\x06\n\tPbRuntime\x12\x15\n\x08platform\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x13\n\x06script\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x18\n\x0binit_script\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x17\n\nenv_script\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x17\n\nerr_script\x18\x05 \x01(\tH\x04\x88\x01\x01\x12\x18\n\x0b\x65xit_script\x18\x06 \x01(\tH\x05\x88\x01\x01\x12\x17\n\npre_script\x18\x07 \x01(\tH\x06\x88\x01\x01\x12\x18\n\x0bpost_script\x18\x08 \x01(\tH\x07\x88\x01\x01\x12\x19\n\x0cwork_sub_dir\x18\t \x01(\tH\x08\x88\x01\x01\x12(\n\x1b\x65xecution_polling_intervals\x18\n \x01(\tH\t\x88\x01\x01\x12#\n\x16\x65xecution_retry_delays\x18\x0b \x01(\tH\n\x88\x01\x01\x12!\n\x14\x65xecution_time_limit\x18\x0c \x01(\tH\x0b\x88\x01\x01\x12)\n\x1csubmission_polling_intervals\x18\r \x01(\tH\x0c\x88\x01\x01\x12$\n\x17submission_retry_delays\x18\x0e \x01(\tH\r\x88\x01\x01\x12\x17\n\ndirectives\x18\x0f \x01(\tH\x0e\x88\x01\x01\x12\x18\n\x0b\x65nvironment\x18\x10 \x01(\tH\x0f\x88\x01\x01\x12\x14\n\x07outputs\x18\x11 \x01(\tH\x10\x88\x01\x01\x42\x0b\n\t_platformB\t\n\x07_scriptB\x0e\n\x0c_init_scriptB\r\n\x0b_env_scriptB\r\n\x0b_err_scriptB\x0e\n\x0c_exit_scriptB\r\n\x0b_pre_scriptB\x0e\n\x0c_post_scriptB\x0f\n\r_work_sub_dirB\x1e\n\x1c_execution_polling_intervalsB\x19\n\x17_execution_retry_delaysB\x17\n\x15_execution_time_limitB\x1f\n\x1d_submission_polling_intervalsB\x1a\n\x18_submission_retry_delaysB\r\n\x0b_directivesB\x0e\n\x0c_environmentB\n\n\x08_outputs\"\x9d\x05\n\x05PbJob\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x17\n\nsubmit_num\x18\x03 \x01(\x05H\x02\x88\x01\x01\x12\x12\n\x05state\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x17\n\ntask_proxy\x18\x05 \x01(\tH\x04\x88\x01\x01\x12\x1b\n\x0esubmitted_time\x18\x06 \x01(\tH\x05\x88\x01\x01\x12\x19\n\x0cstarted_time\x18\x07 \x01(\tH\x06\x88\x01\x01\x12\x1a\n\rfinished_time\x18\x08 \x01(\tH\x07\x88\x01\x01\x12\x13\n\x06job_id\x18\t \x01(\tH\x08\x88\x01\x01\x12\x1c\n\x0fjob_runner_name\x18\n \x01(\tH\t\x88\x01\x01\x12!\n\x14\x65xecution_time_limit\x18\x0e \x01(\x02H\n\x88\x01\x01\x12\x15\n\x08platform\x18\x0f \x01(\tH\x0b\x88\x01\x01\x12\x18\n\x0bjob_log_dir\x18\x11 \x01(\tH\x0c\x88\x01\x01\x12\x11\n\x04name\x18\x1e \x01(\tH\r\x88\x01\x01\x12\x18\n\x0b\x63ycle_point\x18\x1f \x01(\tH\x0e\x88\x01\x01\x12\x10\n\x08messages\x18 \x03(\t\x12 \n\x07runtime\x18! \x01(\x0b\x32\n.PbRuntimeH\x0f\x88\x01\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\r\n\x0b_submit_numB\x08\n\x06_stateB\r\n\x0b_task_proxyB\x11\n\x0f_submitted_timeB\x0f\n\r_started_timeB\x10\n\x0e_finished_timeB\t\n\x07_job_idB\x12\n\x10_job_runner_nameB\x17\n\x15_execution_time_limitB\x0b\n\t_platformB\x0e\n\x0c_job_log_dirB\x07\n\x05_nameB\x0e\n\x0c_cycle_pointB\n\n\x08_runtimeJ\x04\x08\x1d\x10\x1e\"\xe2\x02\n\x06PbTask\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x11\n\x04name\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x1a\n\x04meta\x18\x04 \x01(\x0b\x32\x07.PbMetaH\x03\x88\x01\x01\x12\x1e\n\x11mean_elapsed_time\x18\x05 \x01(\x02H\x04\x88\x01\x01\x12\x12\n\x05\x64\x65pth\x18\x06 \x01(\x05H\x05\x88\x01\x01\x12\x0f\n\x07proxies\x18\x07 \x03(\t\x12\x11\n\tnamespace\x18\x08 \x03(\t\x12\x0f\n\x07parents\x18\t \x03(\t\x12\x19\n\x0c\x66irst_parent\x18\n \x01(\tH\x06\x88\x01\x01\x12 \n\x07runtime\x18\x0b \x01(\x0b\x32\n.PbRuntimeH\x07\x88\x01\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\x07\n\x05_nameB\x07\n\x05_metaB\x14\n\x12_mean_elapsed_timeB\x08\n\x06_depthB\x0f\n\r_first_parentB\n\n\x08_runtime\"\xd8\x01\n\nPbPollTask\x12\x18\n\x0blocal_proxy\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x15\n\x08workflow\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x19\n\x0cremote_proxy\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x16\n\treq_state\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x19\n\x0cgraph_string\x18\x05 \x01(\tH\x04\x88\x01\x01\x42\x0e\n\x0c_local_proxyB\x0b\n\t_workflowB\x0f\n\r_remote_proxyB\x0c\n\n_req_stateB\x0f\n\r_graph_string\"\xcb\x01\n\x0bPbCondition\x12\x17\n\ntask_proxy\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x17\n\nexpr_alias\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x16\n\treq_state\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x16\n\tsatisfied\x18\x04 \x01(\x08H\x03\x88\x01\x01\x12\x14\n\x07message\x18\x05 \x01(\tH\x04\x88\x01\x01\x42\r\n\x0b_task_proxyB\r\n\x0b_expr_aliasB\x0c\n\n_req_stateB\x0c\n\n_satisfiedB\n\n\x08_message\"\x96\x01\n\x0ePbPrerequisite\x12\x17\n\nexpression\x18\x01 \x01(\tH\x00\x88\x01\x01\x12 \n\nconditions\x18\x02 \x03(\x0b\x32\x0c.PbCondition\x12\x14\n\x0c\x63ycle_points\x18\x03 \x03(\t\x12\x16\n\tsatisfied\x18\x04 \x01(\x08H\x01\x88\x01\x01\x42\r\n\x0b_expressionB\x0c\n\n_satisfied\"\x8c\x01\n\x08PbOutput\x12\x12\n\x05label\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x14\n\x07message\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x16\n\tsatisfied\x18\x03 \x01(\x08H\x02\x88\x01\x01\x12\x11\n\x04time\x18\x04 \x01(\x01H\x03\x88\x01\x01\x42\x08\n\x06_labelB\n\n\x08_messageB\x0c\n\n_satisfiedB\x07\n\x05_time\"\xa5\x01\n\tPbTrigger\x12\x0f\n\x02id\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x12\n\x05label\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x14\n\x07message\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x16\n\tsatisfied\x18\x04 \x01(\x08H\x03\x88\x01\x01\x12\x11\n\x04time\x18\x05 \x01(\x01H\x04\x88\x01\x01\x42\x05\n\x03_idB\x08\n\x06_labelB\n\n\x08_messageB\x0c\n\n_satisfiedB\x07\n\x05_time\"\xe7\x07\n\x0bPbTaskProxy\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x11\n\x04task\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x12\n\x05state\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x18\n\x0b\x63ycle_point\x18\x05 \x01(\tH\x04\x88\x01\x01\x12\x12\n\x05\x64\x65pth\x18\x06 \x01(\x05H\x05\x88\x01\x01\x12\x18\n\x0bjob_submits\x18\x07 \x01(\x05H\x06\x88\x01\x01\x12*\n\x07outputs\x18\t \x03(\x0b\x32\x19.PbTaskProxy.OutputsEntry\x12\x11\n\tnamespace\x18\x0b \x03(\t\x12&\n\rprerequisites\x18\x0c \x03(\x0b\x32\x0f.PbPrerequisite\x12\x0c\n\x04jobs\x18\r \x03(\t\x12\x19\n\x0c\x66irst_parent\x18\x0f \x01(\tH\x07\x88\x01\x01\x12\x11\n\x04name\x18\x10 \x01(\tH\x08\x88\x01\x01\x12\x14\n\x07is_held\x18\x11 \x01(\x08H\t\x88\x01\x01\x12\r\n\x05\x65\x64ges\x18\x12 \x03(\t\x12\x11\n\tancestors\x18\x13 \x03(\t\x12\x16\n\tflow_nums\x18\x14 \x01(\tH\n\x88\x01\x01\x12=\n\x11\x65xternal_triggers\x18\x17 \x03(\x0b\x32\".PbTaskProxy.ExternalTriggersEntry\x12.\n\txtriggers\x18\x18 \x03(\x0b\x32\x1b.PbTaskProxy.XtriggersEntry\x12\x16\n\tis_queued\x18\x19 \x01(\x08H\x0b\x88\x01\x01\x12\x18\n\x0bis_runahead\x18\x1a \x01(\x08H\x0c\x88\x01\x01\x12\x16\n\tflow_wait\x18\x1b \x01(\x08H\r\x88\x01\x01\x12 \n\x07runtime\x18\x1c \x01(\x0b\x32\n.PbRuntimeH\x0e\x88\x01\x01\x1a\x39\n\x0cOutputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x18\n\x05value\x18\x02 \x01(\x0b\x32\t.PbOutput:\x02\x38\x01\x1a\x43\n\x15\x45xternalTriggersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x19\n\x05value\x18\x02 \x01(\x0b\x32\n.PbTrigger:\x02\x38\x01\x1a<\n\x0eXtriggersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x19\n\x05value\x18\x02 \x01(\x0b\x32\n.PbTrigger:\x02\x38\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\x07\n\x05_taskB\x08\n\x06_stateB\x0e\n\x0c_cycle_pointB\x08\n\x06_depthB\x0e\n\x0c_job_submitsB\x0f\n\r_first_parentB\x07\n\x05_nameB\n\n\x08_is_heldB\x0c\n\n_flow_numsB\x0c\n\n_is_queuedB\x0e\n\x0c_is_runaheadB\x0c\n\n_flow_waitB\n\n\x08_runtime\"\xc8\x02\n\x08PbFamily\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x11\n\x04name\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x1a\n\x04meta\x18\x04 \x01(\x0b\x32\x07.PbMetaH\x03\x88\x01\x01\x12\x12\n\x05\x64\x65pth\x18\x05 \x01(\x05H\x04\x88\x01\x01\x12\x0f\n\x07proxies\x18\x06 \x03(\t\x12\x0f\n\x07parents\x18\x07 \x03(\t\x12\x13\n\x0b\x63hild_tasks\x18\x08 \x03(\t\x12\x16\n\x0e\x63hild_families\x18\t \x03(\t\x12\x19\n\x0c\x66irst_parent\x18\n \x01(\tH\x05\x88\x01\x01\x12 \n\x07runtime\x18\x0b \x01(\x0b\x32\n.PbRuntimeH\x06\x88\x01\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\x07\n\x05_nameB\x07\n\x05_metaB\x08\n\x06_depthB\x0f\n\r_first_parentB\n\n\x08_runtime\"\x84\x06\n\rPbFamilyProxy\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x18\n\x0b\x63ycle_point\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x11\n\x04name\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x13\n\x06\x66\x61mily\x18\x05 \x01(\tH\x04\x88\x01\x01\x12\x12\n\x05state\x18\x06 \x01(\tH\x05\x88\x01\x01\x12\x12\n\x05\x64\x65pth\x18\x07 \x01(\x05H\x06\x88\x01\x01\x12\x19\n\x0c\x66irst_parent\x18\x08 \x01(\tH\x07\x88\x01\x01\x12\x13\n\x0b\x63hild_tasks\x18\n \x03(\t\x12\x16\n\x0e\x63hild_families\x18\x0b \x03(\t\x12\x14\n\x07is_held\x18\x0c \x01(\x08H\x08\x88\x01\x01\x12\x11\n\tancestors\x18\r \x03(\t\x12\x0e\n\x06states\x18\x0e \x03(\t\x12\x35\n\x0cstate_totals\x18\x0f \x03(\x0b\x32\x1f.PbFamilyProxy.StateTotalsEntry\x12\x1a\n\ris_held_total\x18\x10 \x01(\x05H\t\x88\x01\x01\x12\x16\n\tis_queued\x18\x11 \x01(\x08H\n\x88\x01\x01\x12\x1c\n\x0fis_queued_total\x18\x12 \x01(\x05H\x0b\x88\x01\x01\x12\x18\n\x0bis_runahead\x18\x13 \x01(\x08H\x0c\x88\x01\x01\x12\x1e\n\x11is_runahead_total\x18\x14 \x01(\x05H\r\x88\x01\x01\x12 \n\x07runtime\x18\x15 \x01(\x0b\x32\n.PbRuntimeH\x0e\x88\x01\x01\x1a\x32\n\x10StateTotalsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x05:\x02\x38\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\x0e\n\x0c_cycle_pointB\x07\n\x05_nameB\t\n\x07_familyB\x08\n\x06_stateB\x08\n\x06_depthB\x0f\n\r_first_parentB\n\n\x08_is_heldB\x10\n\x0e_is_held_totalB\x0c\n\n_is_queuedB\x12\n\x10_is_queued_totalB\x0e\n\x0c_is_runaheadB\x14\n\x12_is_runahead_totalB\n\n\x08_runtime\"\xbc\x01\n\x06PbEdge\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x13\n\x06source\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x13\n\x06target\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x14\n\x07suicide\x18\x05 \x01(\x08H\x04\x88\x01\x01\x12\x11\n\x04\x63ond\x18\x06 \x01(\x08H\x05\x88\x01\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\t\n\x07_sourceB\t\n\x07_targetB\n\n\x08_suicideB\x07\n\x05_cond\"{\n\x07PbEdges\x12\x0f\n\x02id\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\r\n\x05\x65\x64ges\x18\x02 \x03(\t\x12+\n\x16workflow_polling_tasks\x18\x03 \x03(\x0b\x32\x0b.PbPollTask\x12\x0e\n\x06leaves\x18\x04 \x03(\t\x12\x0c\n\x04\x66\x65\x65t\x18\x05 \x03(\tB\x05\n\x03_id\"\xf2\x01\n\x10PbEntireWorkflow\x12\"\n\x08workflow\x18\x01 \x01(\x0b\x32\x0b.PbWorkflowH\x00\x88\x01\x01\x12\x16\n\x05tasks\x18\x02 \x03(\x0b\x32\x07.PbTask\x12\"\n\x0ctask_proxies\x18\x03 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x14\n\x04jobs\x18\x04 \x03(\x0b\x32\x06.PbJob\x12\x1b\n\x08\x66\x61milies\x18\x05 \x03(\x0b\x32\t.PbFamily\x12&\n\x0e\x66\x61mily_proxies\x18\x06 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x16\n\x05\x65\x64ges\x18\x07 \x03(\x0b\x32\x07.PbEdgeB\x0b\n\t_workflow\"\xaf\x01\n\x07\x45\x44\x65ltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x15\n\x08\x63hecksum\x18\x02 \x01(\x03H\x01\x88\x01\x01\x12\x16\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x07.PbEdge\x12\x18\n\x07updated\x18\x04 \x03(\x0b\x32\x07.PbEdge\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x15\n\x08reloaded\x18\x06 \x01(\x08H\x02\x88\x01\x01\x42\x07\n\x05_timeB\x0b\n\t_checksumB\x0b\n\t_reloaded\"\xb3\x01\n\x07\x46\x44\x65ltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x15\n\x08\x63hecksum\x18\x02 \x01(\x03H\x01\x88\x01\x01\x12\x18\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\t.PbFamily\x12\x1a\n\x07updated\x18\x04 \x03(\x0b\x32\t.PbFamily\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x15\n\x08reloaded\x18\x06 \x01(\x08H\x02\x88\x01\x01\x42\x07\n\x05_timeB\x0b\n\t_checksumB\x0b\n\t_reloaded\"\xbe\x01\n\x08\x46PDeltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x15\n\x08\x63hecksum\x18\x02 \x01(\x03H\x01\x88\x01\x01\x12\x1d\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x1f\n\x07updated\x18\x04 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x15\n\x08reloaded\x18\x06 \x01(\x08H\x02\x88\x01\x01\x42\x07\n\x05_timeB\x0b\n\t_checksumB\x0b\n\t_reloaded\"\xad\x01\n\x07JDeltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x15\n\x08\x63hecksum\x18\x02 \x01(\x03H\x01\x88\x01\x01\x12\x15\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x06.PbJob\x12\x17\n\x07updated\x18\x04 \x03(\x0b\x32\x06.PbJob\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x15\n\x08reloaded\x18\x06 \x01(\x08H\x02\x88\x01\x01\x42\x07\n\x05_timeB\x0b\n\t_checksumB\x0b\n\t_reloaded\"\xaf\x01\n\x07TDeltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x15\n\x08\x63hecksum\x18\x02 \x01(\x03H\x01\x88\x01\x01\x12\x16\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x07.PbTask\x12\x18\n\x07updated\x18\x04 \x03(\x0b\x32\x07.PbTask\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x15\n\x08reloaded\x18\x06 \x01(\x08H\x02\x88\x01\x01\x42\x07\n\x05_timeB\x0b\n\t_checksumB\x0b\n\t_reloaded\"\xba\x01\n\x08TPDeltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x15\n\x08\x63hecksum\x18\x02 \x01(\x03H\x01\x88\x01\x01\x12\x1b\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x1d\n\x07updated\x18\x04 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x15\n\x08reloaded\x18\x06 \x01(\x08H\x02\x88\x01\x01\x42\x07\n\x05_timeB\x0b\n\t_checksumB\x0b\n\t_reloaded\"\xc3\x01\n\x07WDeltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x1f\n\x05\x61\x64\x64\x65\x64\x18\x02 \x01(\x0b\x32\x0b.PbWorkflowH\x01\x88\x01\x01\x12!\n\x07updated\x18\x03 \x01(\x0b\x32\x0b.PbWorkflowH\x02\x88\x01\x01\x12\x15\n\x08reloaded\x18\x04 \x01(\x08H\x03\x88\x01\x01\x12\x13\n\x06pruned\x18\x05 \x01(\tH\x04\x88\x01\x01\x42\x07\n\x05_timeB\x08\n\x06_addedB\n\n\x08_updatedB\x0b\n\t_reloadedB\t\n\x07_pruned\"\xd1\x01\n\tAllDeltas\x12\x1a\n\x08\x66\x61milies\x18\x01 \x01(\x0b\x32\x08.FDeltas\x12!\n\x0e\x66\x61mily_proxies\x18\x02 \x01(\x0b\x32\t.FPDeltas\x12\x16\n\x04jobs\x18\x03 \x01(\x0b\x32\x08.JDeltas\x12\x17\n\x05tasks\x18\x04 \x01(\x0b\x32\x08.TDeltas\x12\x1f\n\x0ctask_proxies\x18\x05 \x01(\x0b\x32\t.TPDeltas\x12\x17\n\x05\x65\x64ges\x18\x06 \x01(\x0b\x32\x08.EDeltas\x12\x1a\n\x08workflow\x18\x07 \x01(\x0b\x32\x08.WDeltasb\x06proto3')
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13\x64\x61ta_messages.proto\"\x96\x01\n\x06PbMeta\x12\x12\n\x05title\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x18\n\x0b\x64\x65scription\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x10\n\x03URL\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x19\n\x0cuser_defined\x18\x04 \x01(\tH\x03\x88\x01\x01\x42\x08\n\x06_titleB\x0e\n\x0c_descriptionB\x06\n\x04_URLB\x0f\n\r_user_defined\"\xaa\x01\n\nPbTimeZone\x12\x12\n\x05hours\x18\x01 \x01(\x05H\x00\x88\x01\x01\x12\x14\n\x07minutes\x18\x02 \x01(\x05H\x01\x88\x01\x01\x12\x19\n\x0cstring_basic\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x1c\n\x0fstring_extended\x18\x04 \x01(\tH\x03\x88\x01\x01\x42\x08\n\x06_hoursB\n\n\x08_minutesB\x0f\n\r_string_basicB\x12\n\x10_string_extended\"\'\n\x0fPbTaskProxyRefs\x12\x14\n\x0ctask_proxies\x18\x01 \x03(\t\"\xd4\x0c\n\nPbWorkflow\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x11\n\x04name\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x13\n\x06status\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x11\n\x04host\x18\x05 \x01(\tH\x04\x88\x01\x01\x12\x11\n\x04port\x18\x06 \x01(\x05H\x05\x88\x01\x01\x12\x12\n\x05owner\x18\x07 \x01(\tH\x06\x88\x01\x01\x12\r\n\x05tasks\x18\x08 \x03(\t\x12\x10\n\x08\x66\x61milies\x18\t \x03(\t\x12\x1c\n\x05\x65\x64ges\x18\n \x01(\x0b\x32\x08.PbEdgesH\x07\x88\x01\x01\x12\x18\n\x0b\x61pi_version\x18\x0b \x01(\x05H\x08\x88\x01\x01\x12\x19\n\x0c\x63ylc_version\x18\x0c \x01(\tH\t\x88\x01\x01\x12\x19\n\x0clast_updated\x18\r \x01(\x01H\n\x88\x01\x01\x12\x1a\n\x04meta\x18\x0e \x01(\x0b\x32\x07.PbMetaH\x0b\x88\x01\x01\x12&\n\x19newest_active_cycle_point\x18\x10 \x01(\tH\x0c\x88\x01\x01\x12&\n\x19oldest_active_cycle_point\x18\x11 \x01(\tH\r\x88\x01\x01\x12\x15\n\x08reloaded\x18\x12 \x01(\x08H\x0e\x88\x01\x01\x12\x15\n\x08run_mode\x18\x13 \x01(\tH\x0f\x88\x01\x01\x12\x19\n\x0c\x63ycling_mode\x18\x14 \x01(\tH\x10\x88\x01\x01\x12\x32\n\x0cstate_totals\x18\x15 \x03(\x0b\x32\x1c.PbWorkflow.StateTotalsEntry\x12\x1d\n\x10workflow_log_dir\x18\x16 \x01(\tH\x11\x88\x01\x01\x12(\n\x0etime_zone_info\x18\x17 \x01(\x0b\x32\x0b.PbTimeZoneH\x12\x88\x01\x01\x12\x17\n\ntree_depth\x18\x18 \x01(\x05H\x13\x88\x01\x01\x12\x15\n\rjob_log_names\x18\x19 \x03(\t\x12\x14\n\x0cns_def_order\x18\x1a \x03(\t\x12\x0e\n\x06states\x18\x1b \x03(\t\x12\x14\n\x0ctask_proxies\x18\x1c \x03(\t\x12\x16\n\x0e\x66\x61mily_proxies\x18\x1d \x03(\t\x12\x17\n\nstatus_msg\x18\x1e \x01(\tH\x14\x88\x01\x01\x12\x1a\n\ris_held_total\x18\x1f \x01(\x05H\x15\x88\x01\x01\x12\x0c\n\x04jobs\x18 \x03(\t\x12\x15\n\x08pub_port\x18! \x01(\x05H\x16\x88\x01\x01\x12\x17\n\nbroadcasts\x18\" \x01(\tH\x17\x88\x01\x01\x12\x1c\n\x0fis_queued_total\x18# \x01(\x05H\x18\x88\x01\x01\x12=\n\x12latest_state_tasks\x18$ \x03(\x0b\x32!.PbWorkflow.LatestStateTasksEntry\x12\x13\n\x06pruned\x18% \x01(\x08H\x19\x88\x01\x01\x12\x1e\n\x11is_runahead_total\x18& \x01(\x05H\x1a\x88\x01\x01\x12\x1b\n\x0estates_updated\x18\' \x01(\x08H\x1b\x88\x01\x01\x12\x1c\n\x0fn_edge_distance\x18( \x01(\x05H\x1c\x88\x01\x01\x1a\x32\n\x10StateTotalsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x05:\x02\x38\x01\x1aI\n\x15LatestStateTasksEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x1f\n\x05value\x18\x02 \x01(\x0b\x32\x10.PbTaskProxyRefs:\x02\x38\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\x07\n\x05_nameB\t\n\x07_statusB\x07\n\x05_hostB\x07\n\x05_portB\x08\n\x06_ownerB\x08\n\x06_edgesB\x0e\n\x0c_api_versionB\x0f\n\r_cylc_versionB\x0f\n\r_last_updatedB\x07\n\x05_metaB\x1c\n\x1a_newest_active_cycle_pointB\x1c\n\x1a_oldest_active_cycle_pointB\x0b\n\t_reloadedB\x0b\n\t_run_modeB\x0f\n\r_cycling_modeB\x13\n\x11_workflow_log_dirB\x11\n\x0f_time_zone_infoB\r\n\x0b_tree_depthB\r\n\x0b_status_msgB\x10\n\x0e_is_held_totalB\x0b\n\t_pub_portB\r\n\x0b_broadcastsB\x12\n\x10_is_queued_totalB\t\n\x07_prunedB\x14\n\x12_is_runahead_totalB\x11\n\x0f_states_updatedB\x12\n\x10_n_edge_distance\"\xb9\x06\n\tPbRuntime\x12\x15\n\x08platform\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x13\n\x06script\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x18\n\x0binit_script\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x17\n\nenv_script\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x17\n\nerr_script\x18\x05 \x01(\tH\x04\x88\x01\x01\x12\x18\n\x0b\x65xit_script\x18\x06 \x01(\tH\x05\x88\x01\x01\x12\x17\n\npre_script\x18\x07 \x01(\tH\x06\x88\x01\x01\x12\x18\n\x0bpost_script\x18\x08 \x01(\tH\x07\x88\x01\x01\x12\x19\n\x0cwork_sub_dir\x18\t \x01(\tH\x08\x88\x01\x01\x12(\n\x1b\x65xecution_polling_intervals\x18\n \x01(\tH\t\x88\x01\x01\x12#\n\x16\x65xecution_retry_delays\x18\x0b \x01(\tH\n\x88\x01\x01\x12!\n\x14\x65xecution_time_limit\x18\x0c \x01(\tH\x0b\x88\x01\x01\x12)\n\x1csubmission_polling_intervals\x18\r \x01(\tH\x0c\x88\x01\x01\x12$\n\x17submission_retry_delays\x18\x0e \x01(\tH\r\x88\x01\x01\x12\x17\n\ndirectives\x18\x0f \x01(\tH\x0e\x88\x01\x01\x12\x18\n\x0b\x65nvironment\x18\x10 \x01(\tH\x0f\x88\x01\x01\x12\x14\n\x07outputs\x18\x11 \x01(\tH\x10\x88\x01\x01\x42\x0b\n\t_platformB\t\n\x07_scriptB\x0e\n\x0c_init_scriptB\r\n\x0b_env_scriptB\r\n\x0b_err_scriptB\x0e\n\x0c_exit_scriptB\r\n\x0b_pre_scriptB\x0e\n\x0c_post_scriptB\x0f\n\r_work_sub_dirB\x1e\n\x1c_execution_polling_intervalsB\x19\n\x17_execution_retry_delaysB\x17\n\x15_execution_time_limitB\x1f\n\x1d_submission_polling_intervalsB\x1a\n\x18_submission_retry_delaysB\r\n\x0b_directivesB\x0e\n\x0c_environmentB\n\n\x08_outputs\"\x9d\x05\n\x05PbJob\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x17\n\nsubmit_num\x18\x03 \x01(\x05H\x02\x88\x01\x01\x12\x12\n\x05state\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x17\n\ntask_proxy\x18\x05 \x01(\tH\x04\x88\x01\x01\x12\x1b\n\x0esubmitted_time\x18\x06 \x01(\tH\x05\x88\x01\x01\x12\x19\n\x0cstarted_time\x18\x07 \x01(\tH\x06\x88\x01\x01\x12\x1a\n\rfinished_time\x18\x08 \x01(\tH\x07\x88\x01\x01\x12\x13\n\x06job_id\x18\t \x01(\tH\x08\x88\x01\x01\x12\x1c\n\x0fjob_runner_name\x18\n \x01(\tH\t\x88\x01\x01\x12!\n\x14\x65xecution_time_limit\x18\x0e \x01(\x02H\n\x88\x01\x01\x12\x15\n\x08platform\x18\x0f \x01(\tH\x0b\x88\x01\x01\x12\x18\n\x0bjob_log_dir\x18\x11 \x01(\tH\x0c\x88\x01\x01\x12\x11\n\x04name\x18\x1e \x01(\tH\r\x88\x01\x01\x12\x18\n\x0b\x63ycle_point\x18\x1f \x01(\tH\x0e\x88\x01\x01\x12\x10\n\x08messages\x18 \x03(\t\x12 \n\x07runtime\x18! \x01(\x0b\x32\n.PbRuntimeH\x0f\x88\x01\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\r\n\x0b_submit_numB\x08\n\x06_stateB\r\n\x0b_task_proxyB\x11\n\x0f_submitted_timeB\x0f\n\r_started_timeB\x10\n\x0e_finished_timeB\t\n\x07_job_idB\x12\n\x10_job_runner_nameB\x17\n\x15_execution_time_limitB\x0b\n\t_platformB\x0e\n\x0c_job_log_dirB\x07\n\x05_nameB\x0e\n\x0c_cycle_pointB\n\n\x08_runtimeJ\x04\x08\x1d\x10\x1e\"\xe2\x02\n\x06PbTask\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x11\n\x04name\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x1a\n\x04meta\x18\x04 \x01(\x0b\x32\x07.PbMetaH\x03\x88\x01\x01\x12\x1e\n\x11mean_elapsed_time\x18\x05 \x01(\x02H\x04\x88\x01\x01\x12\x12\n\x05\x64\x65pth\x18\x06 \x01(\x05H\x05\x88\x01\x01\x12\x0f\n\x07proxies\x18\x07 \x03(\t\x12\x11\n\tnamespace\x18\x08 \x03(\t\x12\x0f\n\x07parents\x18\t \x03(\t\x12\x19\n\x0c\x66irst_parent\x18\n \x01(\tH\x06\x88\x01\x01\x12 \n\x07runtime\x18\x0b \x01(\x0b\x32\n.PbRuntimeH\x07\x88\x01\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\x07\n\x05_nameB\x07\n\x05_metaB\x14\n\x12_mean_elapsed_timeB\x08\n\x06_depthB\x0f\n\r_first_parentB\n\n\x08_runtime\"\xd8\x01\n\nPbPollTask\x12\x18\n\x0blocal_proxy\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x15\n\x08workflow\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x19\n\x0cremote_proxy\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x16\n\treq_state\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x19\n\x0cgraph_string\x18\x05 \x01(\tH\x04\x88\x01\x01\x42\x0e\n\x0c_local_proxyB\x0b\n\t_workflowB\x0f\n\r_remote_proxyB\x0c\n\n_req_stateB\x0f\n\r_graph_string\"\xcb\x01\n\x0bPbCondition\x12\x17\n\ntask_proxy\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x17\n\nexpr_alias\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x16\n\treq_state\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x16\n\tsatisfied\x18\x04 \x01(\x08H\x03\x88\x01\x01\x12\x14\n\x07message\x18\x05 \x01(\tH\x04\x88\x01\x01\x42\r\n\x0b_task_proxyB\r\n\x0b_expr_aliasB\x0c\n\n_req_stateB\x0c\n\n_satisfiedB\n\n\x08_message\"\x96\x01\n\x0ePbPrerequisite\x12\x17\n\nexpression\x18\x01 \x01(\tH\x00\x88\x01\x01\x12 \n\nconditions\x18\x02 \x03(\x0b\x32\x0c.PbCondition\x12\x14\n\x0c\x63ycle_points\x18\x03 \x03(\t\x12\x16\n\tsatisfied\x18\x04 \x01(\x08H\x01\x88\x01\x01\x42\r\n\x0b_expressionB\x0c\n\n_satisfied\"\x8c\x01\n\x08PbOutput\x12\x12\n\x05label\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x14\n\x07message\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x16\n\tsatisfied\x18\x03 \x01(\x08H\x02\x88\x01\x01\x12\x11\n\x04time\x18\x04 \x01(\x01H\x03\x88\x01\x01\x42\x08\n\x06_labelB\n\n\x08_messageB\x0c\n\n_satisfiedB\x07\n\x05_time\"\xa5\x01\n\tPbTrigger\x12\x0f\n\x02id\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x12\n\x05label\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x14\n\x07message\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x16\n\tsatisfied\x18\x04 \x01(\x08H\x03\x88\x01\x01\x12\x11\n\x04time\x18\x05 \x01(\x01H\x04\x88\x01\x01\x42\x05\n\x03_idB\x08\n\x06_labelB\n\n\x08_messageB\x0c\n\n_satisfiedB\x07\n\x05_time\"\x91\x08\n\x0bPbTaskProxy\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x11\n\x04task\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x12\n\x05state\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x18\n\x0b\x63ycle_point\x18\x05 \x01(\tH\x04\x88\x01\x01\x12\x12\n\x05\x64\x65pth\x18\x06 \x01(\x05H\x05\x88\x01\x01\x12\x18\n\x0bjob_submits\x18\x07 \x01(\x05H\x06\x88\x01\x01\x12*\n\x07outputs\x18\t \x03(\x0b\x32\x19.PbTaskProxy.OutputsEntry\x12\x11\n\tnamespace\x18\x0b \x03(\t\x12&\n\rprerequisites\x18\x0c \x03(\x0b\x32\x0f.PbPrerequisite\x12\x0c\n\x04jobs\x18\r \x03(\t\x12\x19\n\x0c\x66irst_parent\x18\x0f \x01(\tH\x07\x88\x01\x01\x12\x11\n\x04name\x18\x10 \x01(\tH\x08\x88\x01\x01\x12\x14\n\x07is_held\x18\x11 \x01(\x08H\t\x88\x01\x01\x12\r\n\x05\x65\x64ges\x18\x12 \x03(\t\x12\x11\n\tancestors\x18\x13 \x03(\t\x12\x16\n\tflow_nums\x18\x14 \x01(\tH\n\x88\x01\x01\x12=\n\x11\x65xternal_triggers\x18\x17 \x03(\x0b\x32\".PbTaskProxy.ExternalTriggersEntry\x12.\n\txtriggers\x18\x18 \x03(\x0b\x32\x1b.PbTaskProxy.XtriggersEntry\x12\x16\n\tis_queued\x18\x19 \x01(\x08H\x0b\x88\x01\x01\x12\x18\n\x0bis_runahead\x18\x1a \x01(\x08H\x0c\x88\x01\x01\x12\x16\n\tflow_wait\x18\x1b \x01(\x08H\r\x88\x01\x01\x12 \n\x07runtime\x18\x1c \x01(\x0b\x32\n.PbRuntimeH\x0e\x88\x01\x01\x12\x18\n\x0bgraph_depth\x18\x1d \x01(\x05H\x0f\x88\x01\x01\x1a\x39\n\x0cOutputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x18\n\x05value\x18\x02 \x01(\x0b\x32\t.PbOutput:\x02\x38\x01\x1a\x43\n\x15\x45xternalTriggersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x19\n\x05value\x18\x02 \x01(\x0b\x32\n.PbTrigger:\x02\x38\x01\x1a<\n\x0eXtriggersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x19\n\x05value\x18\x02 \x01(\x0b\x32\n.PbTrigger:\x02\x38\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\x07\n\x05_taskB\x08\n\x06_stateB\x0e\n\x0c_cycle_pointB\x08\n\x06_depthB\x0e\n\x0c_job_submitsB\x0f\n\r_first_parentB\x07\n\x05_nameB\n\n\x08_is_heldB\x0c\n\n_flow_numsB\x0c\n\n_is_queuedB\x0e\n\x0c_is_runaheadB\x0c\n\n_flow_waitB\n\n\x08_runtimeB\x0e\n\x0c_graph_depth\"\xc8\x02\n\x08PbFamily\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x11\n\x04name\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x1a\n\x04meta\x18\x04 \x01(\x0b\x32\x07.PbMetaH\x03\x88\x01\x01\x12\x12\n\x05\x64\x65pth\x18\x05 \x01(\x05H\x04\x88\x01\x01\x12\x0f\n\x07proxies\x18\x06 \x03(\t\x12\x0f\n\x07parents\x18\x07 \x03(\t\x12\x13\n\x0b\x63hild_tasks\x18\x08 \x03(\t\x12\x16\n\x0e\x63hild_families\x18\t \x03(\t\x12\x19\n\x0c\x66irst_parent\x18\n \x01(\tH\x05\x88\x01\x01\x12 \n\x07runtime\x18\x0b \x01(\x0b\x32\n.PbRuntimeH\x06\x88\x01\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\x07\n\x05_nameB\x07\n\x05_metaB\x08\n\x06_depthB\x0f\n\r_first_parentB\n\n\x08_runtime\"\xae\x06\n\rPbFamilyProxy\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x18\n\x0b\x63ycle_point\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x11\n\x04name\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x13\n\x06\x66\x61mily\x18\x05 \x01(\tH\x04\x88\x01\x01\x12\x12\n\x05state\x18\x06 \x01(\tH\x05\x88\x01\x01\x12\x12\n\x05\x64\x65pth\x18\x07 \x01(\x05H\x06\x88\x01\x01\x12\x19\n\x0c\x66irst_parent\x18\x08 \x01(\tH\x07\x88\x01\x01\x12\x13\n\x0b\x63hild_tasks\x18\n \x03(\t\x12\x16\n\x0e\x63hild_families\x18\x0b \x03(\t\x12\x14\n\x07is_held\x18\x0c \x01(\x08H\x08\x88\x01\x01\x12\x11\n\tancestors\x18\r \x03(\t\x12\x0e\n\x06states\x18\x0e \x03(\t\x12\x35\n\x0cstate_totals\x18\x0f \x03(\x0b\x32\x1f.PbFamilyProxy.StateTotalsEntry\x12\x1a\n\ris_held_total\x18\x10 \x01(\x05H\t\x88\x01\x01\x12\x16\n\tis_queued\x18\x11 \x01(\x08H\n\x88\x01\x01\x12\x1c\n\x0fis_queued_total\x18\x12 \x01(\x05H\x0b\x88\x01\x01\x12\x18\n\x0bis_runahead\x18\x13 \x01(\x08H\x0c\x88\x01\x01\x12\x1e\n\x11is_runahead_total\x18\x14 \x01(\x05H\r\x88\x01\x01\x12 \n\x07runtime\x18\x15 \x01(\x0b\x32\n.PbRuntimeH\x0e\x88\x01\x01\x12\x18\n\x0bgraph_depth\x18\x16 \x01(\x05H\x0f\x88\x01\x01\x1a\x32\n\x10StateTotalsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x05:\x02\x38\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\x0e\n\x0c_cycle_pointB\x07\n\x05_nameB\t\n\x07_familyB\x08\n\x06_stateB\x08\n\x06_depthB\x0f\n\r_first_parentB\n\n\x08_is_heldB\x10\n\x0e_is_held_totalB\x0c\n\n_is_queuedB\x12\n\x10_is_queued_totalB\x0e\n\x0c_is_runaheadB\x14\n\x12_is_runahead_totalB\n\n\x08_runtimeB\x0e\n\x0c_graph_depth\"\xbc\x01\n\x06PbEdge\x12\x12\n\x05stamp\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x0f\n\x02id\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x13\n\x06source\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x13\n\x06target\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x14\n\x07suicide\x18\x05 \x01(\x08H\x04\x88\x01\x01\x12\x11\n\x04\x63ond\x18\x06 \x01(\x08H\x05\x88\x01\x01\x42\x08\n\x06_stampB\x05\n\x03_idB\t\n\x07_sourceB\t\n\x07_targetB\n\n\x08_suicideB\x07\n\x05_cond\"{\n\x07PbEdges\x12\x0f\n\x02id\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\r\n\x05\x65\x64ges\x18\x02 \x03(\t\x12+\n\x16workflow_polling_tasks\x18\x03 \x03(\x0b\x32\x0b.PbPollTask\x12\x0e\n\x06leaves\x18\x04 \x03(\t\x12\x0c\n\x04\x66\x65\x65t\x18\x05 \x03(\tB\x05\n\x03_id\"\xf2\x01\n\x10PbEntireWorkflow\x12\"\n\x08workflow\x18\x01 \x01(\x0b\x32\x0b.PbWorkflowH\x00\x88\x01\x01\x12\x16\n\x05tasks\x18\x02 \x03(\x0b\x32\x07.PbTask\x12\"\n\x0ctask_proxies\x18\x03 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x14\n\x04jobs\x18\x04 \x03(\x0b\x32\x06.PbJob\x12\x1b\n\x08\x66\x61milies\x18\x05 \x03(\x0b\x32\t.PbFamily\x12&\n\x0e\x66\x61mily_proxies\x18\x06 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x16\n\x05\x65\x64ges\x18\x07 \x03(\x0b\x32\x07.PbEdgeB\x0b\n\t_workflow\"\xaf\x01\n\x07\x45\x44\x65ltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x15\n\x08\x63hecksum\x18\x02 \x01(\x03H\x01\x88\x01\x01\x12\x16\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x07.PbEdge\x12\x18\n\x07updated\x18\x04 \x03(\x0b\x32\x07.PbEdge\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x15\n\x08reloaded\x18\x06 \x01(\x08H\x02\x88\x01\x01\x42\x07\n\x05_timeB\x0b\n\t_checksumB\x0b\n\t_reloaded\"\xb3\x01\n\x07\x46\x44\x65ltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x15\n\x08\x63hecksum\x18\x02 \x01(\x03H\x01\x88\x01\x01\x12\x18\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\t.PbFamily\x12\x1a\n\x07updated\x18\x04 \x03(\x0b\x32\t.PbFamily\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x15\n\x08reloaded\x18\x06 \x01(\x08H\x02\x88\x01\x01\x42\x07\n\x05_timeB\x0b\n\t_checksumB\x0b\n\t_reloaded\"\xbe\x01\n\x08\x46PDeltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x15\n\x08\x63hecksum\x18\x02 \x01(\x03H\x01\x88\x01\x01\x12\x1d\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x1f\n\x07updated\x18\x04 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x15\n\x08reloaded\x18\x06 \x01(\x08H\x02\x88\x01\x01\x42\x07\n\x05_timeB\x0b\n\t_checksumB\x0b\n\t_reloaded\"\xad\x01\n\x07JDeltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x15\n\x08\x63hecksum\x18\x02 \x01(\x03H\x01\x88\x01\x01\x12\x15\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x06.PbJob\x12\x17\n\x07updated\x18\x04 \x03(\x0b\x32\x06.PbJob\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x15\n\x08reloaded\x18\x06 \x01(\x08H\x02\x88\x01\x01\x42\x07\n\x05_timeB\x0b\n\t_checksumB\x0b\n\t_reloaded\"\xaf\x01\n\x07TDeltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x15\n\x08\x63hecksum\x18\x02 \x01(\x03H\x01\x88\x01\x01\x12\x16\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x07.PbTask\x12\x18\n\x07updated\x18\x04 \x03(\x0b\x32\x07.PbTask\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x15\n\x08reloaded\x18\x06 \x01(\x08H\x02\x88\x01\x01\x42\x07\n\x05_timeB\x0b\n\t_checksumB\x0b\n\t_reloaded\"\xba\x01\n\x08TPDeltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x15\n\x08\x63hecksum\x18\x02 \x01(\x03H\x01\x88\x01\x01\x12\x1b\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x1d\n\x07updated\x18\x04 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x15\n\x08reloaded\x18\x06 \x01(\x08H\x02\x88\x01\x01\x42\x07\n\x05_timeB\x0b\n\t_checksumB\x0b\n\t_reloaded\"\xc3\x01\n\x07WDeltas\x12\x11\n\x04time\x18\x01 \x01(\x01H\x00\x88\x01\x01\x12\x1f\n\x05\x61\x64\x64\x65\x64\x18\x02 \x01(\x0b\x32\x0b.PbWorkflowH\x01\x88\x01\x01\x12!\n\x07updated\x18\x03 \x01(\x0b\x32\x0b.PbWorkflowH\x02\x88\x01\x01\x12\x15\n\x08reloaded\x18\x04 \x01(\x08H\x03\x88\x01\x01\x12\x13\n\x06pruned\x18\x05 \x01(\tH\x04\x88\x01\x01\x42\x07\n\x05_timeB\x08\n\x06_addedB\n\n\x08_updatedB\x0b\n\t_reloadedB\t\n\x07_pruned\"\xd1\x01\n\tAllDeltas\x12\x1a\n\x08\x66\x61milies\x18\x01 \x01(\x0b\x32\x08.FDeltas\x12!\n\x0e\x66\x61mily_proxies\x18\x02 \x01(\x0b\x32\t.FPDeltas\x12\x16\n\x04jobs\x18\x03 \x01(\x0b\x32\x08.JDeltas\x12\x17\n\x05tasks\x18\x04 \x01(\x0b\x32\x08.TDeltas\x12\x1f\n\x0ctask_proxies\x18\x05 \x01(\x0b\x32\t.TPDeltas\x12\x17\n\x05\x65\x64ges\x18\x06 \x01(\x0b\x32\x08.EDeltas\x12\x1a\n\x08workflow\x18\x07 \x01(\x0b\x32\x08.WDeltasb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'data_messages_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
- _globals['_PBWORKFLOW_STATETOTALSENTRY']._options = None
- _globals['_PBWORKFLOW_STATETOTALSENTRY']._serialized_options = b'8\001'
- _globals['_PBWORKFLOW_LATESTSTATETASKSENTRY']._options = None
- _globals['_PBWORKFLOW_LATESTSTATETASKSENTRY']._serialized_options = b'8\001'
- _globals['_PBTASKPROXY_OUTPUTSENTRY']._options = None
- _globals['_PBTASKPROXY_OUTPUTSENTRY']._serialized_options = b'8\001'
- _globals['_PBTASKPROXY_EXTERNALTRIGGERSENTRY']._options = None
- _globals['_PBTASKPROXY_EXTERNALTRIGGERSENTRY']._serialized_options = b'8\001'
- _globals['_PBTASKPROXY_XTRIGGERSENTRY']._options = None
- _globals['_PBTASKPROXY_XTRIGGERSENTRY']._serialized_options = b'8\001'
- _globals['_PBFAMILYPROXY_STATETOTALSENTRY']._options = None
- _globals['_PBFAMILYPROXY_STATETOTALSENTRY']._serialized_options = b'8\001'
+ _PBWORKFLOW_STATETOTALSENTRY._options = None
+ _PBWORKFLOW_STATETOTALSENTRY._serialized_options = b'8\001'
+ _PBWORKFLOW_LATESTSTATETASKSENTRY._options = None
+ _PBWORKFLOW_LATESTSTATETASKSENTRY._serialized_options = b'8\001'
+ _PBTASKPROXY_OUTPUTSENTRY._options = None
+ _PBTASKPROXY_OUTPUTSENTRY._serialized_options = b'8\001'
+ _PBTASKPROXY_EXTERNALTRIGGERSENTRY._options = None
+ _PBTASKPROXY_EXTERNALTRIGGERSENTRY._serialized_options = b'8\001'
+ _PBTASKPROXY_XTRIGGERSENTRY._options = None
+ _PBTASKPROXY_XTRIGGERSENTRY._serialized_options = b'8\001'
+ _PBFAMILYPROXY_STATETOTALSENTRY._options = None
+ _PBFAMILYPROXY_STATETOTALSENTRY._serialized_options = b'8\001'
_globals['_PBMETA']._serialized_start=24
_globals['_PBMETA']._serialized_end=174
_globals['_PBTIMEZONE']._serialized_start=177
@@ -40,61 +39,61 @@
_globals['_PBTASKPROXYREFS']._serialized_start=349
_globals['_PBTASKPROXYREFS']._serialized_end=388
_globals['_PBWORKFLOW']._serialized_start=391
- _globals['_PBWORKFLOW']._serialized_end=1961
- _globals['_PBWORKFLOW_STATETOTALSENTRY']._serialized_start=1411
- _globals['_PBWORKFLOW_STATETOTALSENTRY']._serialized_end=1461
- _globals['_PBWORKFLOW_LATESTSTATETASKSENTRY']._serialized_start=1463
- _globals['_PBWORKFLOW_LATESTSTATETASKSENTRY']._serialized_end=1536
- _globals['_PBRUNTIME']._serialized_start=1964
- _globals['_PBRUNTIME']._serialized_end=2789
- _globals['_PBJOB']._serialized_start=2792
- _globals['_PBJOB']._serialized_end=3461
- _globals['_PBTASK']._serialized_start=3464
- _globals['_PBTASK']._serialized_end=3818
- _globals['_PBPOLLTASK']._serialized_start=3821
- _globals['_PBPOLLTASK']._serialized_end=4037
- _globals['_PBCONDITION']._serialized_start=4040
- _globals['_PBCONDITION']._serialized_end=4243
- _globals['_PBPREREQUISITE']._serialized_start=4246
- _globals['_PBPREREQUISITE']._serialized_end=4396
- _globals['_PBOUTPUT']._serialized_start=4399
- _globals['_PBOUTPUT']._serialized_end=4539
- _globals['_PBTRIGGER']._serialized_start=4542
- _globals['_PBTRIGGER']._serialized_end=4707
- _globals['_PBTASKPROXY']._serialized_start=4710
- _globals['_PBTASKPROXY']._serialized_end=5709
- _globals['_PBTASKPROXY_OUTPUTSENTRY']._serialized_start=5335
- _globals['_PBTASKPROXY_OUTPUTSENTRY']._serialized_end=5392
- _globals['_PBTASKPROXY_EXTERNALTRIGGERSENTRY']._serialized_start=5394
- _globals['_PBTASKPROXY_EXTERNALTRIGGERSENTRY']._serialized_end=5461
- _globals['_PBTASKPROXY_XTRIGGERSENTRY']._serialized_start=5463
- _globals['_PBTASKPROXY_XTRIGGERSENTRY']._serialized_end=5523
- _globals['_PBFAMILY']._serialized_start=5712
- _globals['_PBFAMILY']._serialized_end=6040
- _globals['_PBFAMILYPROXY']._serialized_start=6043
- _globals['_PBFAMILYPROXY']._serialized_end=6815
- _globals['_PBFAMILYPROXY_STATETOTALSENTRY']._serialized_start=1411
- _globals['_PBFAMILYPROXY_STATETOTALSENTRY']._serialized_end=1461
- _globals['_PBEDGE']._serialized_start=6818
- _globals['_PBEDGE']._serialized_end=7006
- _globals['_PBEDGES']._serialized_start=7008
- _globals['_PBEDGES']._serialized_end=7131
- _globals['_PBENTIREWORKFLOW']._serialized_start=7134
- _globals['_PBENTIREWORKFLOW']._serialized_end=7376
- _globals['_EDELTAS']._serialized_start=7379
- _globals['_EDELTAS']._serialized_end=7554
- _globals['_FDELTAS']._serialized_start=7557
- _globals['_FDELTAS']._serialized_end=7736
- _globals['_FPDELTAS']._serialized_start=7739
- _globals['_FPDELTAS']._serialized_end=7929
- _globals['_JDELTAS']._serialized_start=7932
- _globals['_JDELTAS']._serialized_end=8105
- _globals['_TDELTAS']._serialized_start=8108
- _globals['_TDELTAS']._serialized_end=8283
- _globals['_TPDELTAS']._serialized_start=8286
- _globals['_TPDELTAS']._serialized_end=8472
- _globals['_WDELTAS']._serialized_start=8475
- _globals['_WDELTAS']._serialized_end=8670
- _globals['_ALLDELTAS']._serialized_start=8673
- _globals['_ALLDELTAS']._serialized_end=8882
+ _globals['_PBWORKFLOW']._serialized_end=2011
+ _globals['_PBWORKFLOW_STATETOTALSENTRY']._serialized_start=1441
+ _globals['_PBWORKFLOW_STATETOTALSENTRY']._serialized_end=1491
+ _globals['_PBWORKFLOW_LATESTSTATETASKSENTRY']._serialized_start=1493
+ _globals['_PBWORKFLOW_LATESTSTATETASKSENTRY']._serialized_end=1566
+ _globals['_PBRUNTIME']._serialized_start=2014
+ _globals['_PBRUNTIME']._serialized_end=2839
+ _globals['_PBJOB']._serialized_start=2842
+ _globals['_PBJOB']._serialized_end=3511
+ _globals['_PBTASK']._serialized_start=3514
+ _globals['_PBTASK']._serialized_end=3868
+ _globals['_PBPOLLTASK']._serialized_start=3871
+ _globals['_PBPOLLTASK']._serialized_end=4087
+ _globals['_PBCONDITION']._serialized_start=4090
+ _globals['_PBCONDITION']._serialized_end=4293
+ _globals['_PBPREREQUISITE']._serialized_start=4296
+ _globals['_PBPREREQUISITE']._serialized_end=4446
+ _globals['_PBOUTPUT']._serialized_start=4449
+ _globals['_PBOUTPUT']._serialized_end=4589
+ _globals['_PBTRIGGER']._serialized_start=4592
+ _globals['_PBTRIGGER']._serialized_end=4757
+ _globals['_PBTASKPROXY']._serialized_start=4760
+ _globals['_PBTASKPROXY']._serialized_end=5801
+ _globals['_PBTASKPROXY_OUTPUTSENTRY']._serialized_start=5411
+ _globals['_PBTASKPROXY_OUTPUTSENTRY']._serialized_end=5468
+ _globals['_PBTASKPROXY_EXTERNALTRIGGERSENTRY']._serialized_start=5470
+ _globals['_PBTASKPROXY_EXTERNALTRIGGERSENTRY']._serialized_end=5537
+ _globals['_PBTASKPROXY_XTRIGGERSENTRY']._serialized_start=5539
+ _globals['_PBTASKPROXY_XTRIGGERSENTRY']._serialized_end=5599
+ _globals['_PBFAMILY']._serialized_start=5804
+ _globals['_PBFAMILY']._serialized_end=6132
+ _globals['_PBFAMILYPROXY']._serialized_start=6135
+ _globals['_PBFAMILYPROXY']._serialized_end=6949
+ _globals['_PBFAMILYPROXY_STATETOTALSENTRY']._serialized_start=1441
+ _globals['_PBFAMILYPROXY_STATETOTALSENTRY']._serialized_end=1491
+ _globals['_PBEDGE']._serialized_start=6952
+ _globals['_PBEDGE']._serialized_end=7140
+ _globals['_PBEDGES']._serialized_start=7142
+ _globals['_PBEDGES']._serialized_end=7265
+ _globals['_PBENTIREWORKFLOW']._serialized_start=7268
+ _globals['_PBENTIREWORKFLOW']._serialized_end=7510
+ _globals['_EDELTAS']._serialized_start=7513
+ _globals['_EDELTAS']._serialized_end=7688
+ _globals['_FDELTAS']._serialized_start=7691
+ _globals['_FDELTAS']._serialized_end=7870
+ _globals['_FPDELTAS']._serialized_start=7873
+ _globals['_FPDELTAS']._serialized_end=8063
+ _globals['_JDELTAS']._serialized_start=8066
+ _globals['_JDELTAS']._serialized_end=8239
+ _globals['_TDELTAS']._serialized_start=8242
+ _globals['_TDELTAS']._serialized_end=8417
+ _globals['_TPDELTAS']._serialized_start=8420
+ _globals['_TPDELTAS']._serialized_end=8606
+ _globals['_WDELTAS']._serialized_start=8609
+ _globals['_WDELTAS']._serialized_end=8804
+ _globals['_ALLDELTAS']._serialized_start=8807
+ _globals['_ALLDELTAS']._serialized_end=9016
# @@protoc_insertion_point(module_scope)
diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py
index e6761ad4c09..ab9b3124352 100644
--- a/cylc/flow/data_store_mgr.py
+++ b/cylc/flow/data_store_mgr.py
@@ -33,19 +33,17 @@
includes workflow, task, and family definition objects.
The cycle point nodes/edges (i.e. task/family proxies) generation is triggered
-individually on transition from staging to active task pool. Each active task
-is generated along with any children and parents recursively out to a
-specified maximum graph distance (n_edge_distance), that can be externally
-altered (via API). Collectively this forms the N-Distance-Window on the
-workflow graph.
-
-Pruning of data-store elements is done using both the collection/set of nodes
-generated through the associated graph paths of the active nodes and the
-tracking of the boundary nodes (n_edge_distance+1) of those active nodes.
-Once active, these boundary nodes act as the prune trigger for their
-original/generator node(s). Set operations are used to do a diff between the
-nodes of active paths (paths whose node is in the active task pool) and the
-nodes of flagged paths (whose boundary node(s) have become active).
+individually on transition to active task pool. Each active task is generated
+along with any children and parents via a graph walk out to a specified maximum
+graph distance (n_edge_distance), that can be externally altered (via API).
+Collectively this forms the N-Distance-Window on the workflow graph.
+
+Pruning of data-store elements is done using the collection/set of nodes
+generated at the boundary of an active node's graph walk and registering active
+node's parents against them. Once active, these boundary nodes act as the prune
+triggers for the associated parent nodes. Set operations are used to do a diff
+between the nodes of active paths (paths whose node is in the active task pool)
+and the nodes of flagged paths (whose boundary node(s) have become active).
Updates are created by the event/task/job managers.
@@ -65,6 +63,7 @@
Any,
Dict,
Optional,
+ List,
Set,
TYPE_CHECKING,
Tuple,
@@ -73,6 +72,7 @@
import zlib
from cylc.flow import __version__ as CYLC_VERSION, LOG
+from cylc.flow.cycling.loader import get_point
from cylc.flow.data_messages_pb2 import ( # type: ignore
PbEdge, PbEntireWorkflow, PbFamily, PbFamilyProxy, PbJob, PbTask,
PbTaskProxy, PbWorkflow, PbRuntime, AllDeltas, EDeltas, FDeltas,
@@ -506,8 +506,11 @@ def __init__(self, schd):
self.all_task_pool = set()
self.all_n_window_nodes = set()
self.n_window_nodes = {}
- self.n_window_edges = {}
- self.n_window_boundary_nodes = {}
+ self.n_window_edges = set()
+ self.n_window_node_walks = {}
+ self.n_window_completed_walks = set()
+ self.n_window_depths = {}
+ self.update_window_depths = False
self.db_load_task_proxies = {}
self.family_pruned_ids = set()
self.prune_trigger_nodes = {}
@@ -565,6 +568,7 @@ def generate_definition_elements(self):
families = self.added[FAMILIES]
workflow = self.added[WORKFLOW]
workflow.id = self.workflow_id
+ workflow.n_edge_distance = self.n_edge_distance
workflow.last_updated = update_time
workflow.stamp = f'{workflow.id}@{workflow.last_updated}'
# Treat play/restart as hard reload of definition.
@@ -710,32 +714,25 @@ def increment_graph_window(
source_tokens: Tokens,
point,
flow_nums,
- edge_distance=0,
- active_id: Optional[str] = None,
- descendant=False,
- is_parent=False,
is_manual_submit=False,
itask=None
) -> None:
"""Generate graph window about active task proxy to n-edge-distance.
- A recursive function, that creates a node then moves to children and
- parents repeating this process out to one edge beyond the max window
- size (in edges). Going out one edge further, we can trigger
- pruning as new active tasks appear beyond this boundary.
-
+ Fills in graph walk from existing walks if possible, otherwise expands
+ the graph front from whereever hasn't been walked.
+ Walk nodes are grouped into locations which are tag according to
+ parent child path, i.e. 'cpc' would be children-parents-children away
+ from active/start task. Which not only provide a way to cheaply rewalk,
+ but also the edge distance from origin.
+ The futherest child boundary nodes are registered as prune triggers for
+ the origin's parents, so when they become active the parents are
+ assessed for pruning eligibility.
Args:
source_tokens (cylc.flow.id.Tokens)
point (PointBase)
flow_nums (set)
- edge_distance (int):
- Graph distance from active/origin node.
- active_id (str):
- Active/origin node id.
- descendant (bool):
- Is the current node a direct descendent of the active/origin.
- is_parent (bool)
is_manual_submit (bool)
itask (cylc.flow.task_proxy.TaskProxy):
Active/Other task proxy, passed in with pool invocation.
@@ -744,146 +741,343 @@ def increment_graph_window(
None
"""
- is_active = not (descendant or is_parent)
- # ID passed through recursion as reference to original/active node.
- if active_id is None:
- source_tokens = self.id_.duplicate(source_tokens)
- active_id = source_tokens.id
-
- # flag manual triggers for pruning on deletion.
- if is_manual_submit:
- self.prune_trigger_nodes.setdefault(active_id, set()).add(
- source_tokens.id
- )
-
- # Setup and check if active node is another's boundary node
- # to flag its paths for pruning.
- if is_active:
- self.n_window_edges[active_id] = set()
- self.n_window_boundary_nodes[active_id] = {}
- self.n_window_nodes[active_id] = set()
- if active_id in self.prune_trigger_nodes:
- self.prune_flagged_nodes.update(
- self.prune_trigger_nodes[active_id])
- del self.prune_trigger_nodes[active_id]
- # This part is vital to constructing a set of boundary nodes
- # associated with the current Active node.
- if edge_distance > self.n_edge_distance:
- if descendant and self.n_edge_distance > 0:
- self.n_window_boundary_nodes[
- active_id
- ].setdefault(edge_distance, set()).add(source_tokens.id)
- return
+ # common refrences
+ active_id = source_tokens.id
+ all_walks = self.n_window_node_walks
+ taskdefs = self.schd.config.taskdefs
+ final_point = self.schd.config.final_point
+
+ # walk keys/tags
+ # Children location tag
+ c_tag = 'c'
+ # Parents location tag
+ p_tag = 'p'
+
+ # Setup walk fields:
+ # - locations (locs): i.e. 'cpc' children-parents-children from origin,
+ # with their respective node ids.
+ # - orphans: task no longer exists in workflow.
+ # - done_locs: set of locactions that have been walked over.
+ # - done_ids: set of node ids that have been walked (from initial
+ # walk filling, that may not have been the entire walk).
+ # If walk already completed, must have gone from non-active to active
+ # again.. So redo walk (as walk nodes may be pruned).
+ if (
+ active_id not in all_walks
+ or active_id in self.n_window_completed_walks
+ ):
+ all_walks[active_id] = {
+ 'locations': {},
+ 'orphans': set(),
+ 'done_locs': set(),
+ 'done_ids': set(),
+ 'walk_ids': {active_id},
+ 'depths': {
+ depth: set()
+ for depth in range(1, self.n_edge_distance + 1)
+ }
+ }
+ if active_id in self.n_window_completed_walks:
+ self.n_window_completed_walks.remove(active_id)
+ active_walk = all_walks[active_id]
+ active_locs = active_walk['locations']
+ if source_tokens['task'] not in taskdefs:
+ active_walk['orphans'].add(active_id)
# Generate task proxy node
- is_orphan, graph_children = self.generate_ghost_task(
+ self.n_window_nodes[active_id] = set()
+
+ self.generate_ghost_task(
source_tokens,
point,
flow_nums,
- is_parent,
+ False,
itask
)
- self.n_window_nodes[active_id].add(source_tokens.id)
-
- edge_distance += 1
+ # Pre-populate from previous walks
+ # Will check all location permutations.
+ # There may be short cuts for parent locs, however children will more
+ # likely be incomplete walks with no 'done_locs' and using parent's
+ # children will required sifting out cousin branches.
+ working_locs: List[str] = []
+ if self.n_edge_distance > 1:
+ if c_tag in active_locs:
+ working_locs.extend(('cc', 'cp'))
+ if p_tag in active_locs:
+ working_locs.extend(('pp', 'pc'))
+ n_depth = 2
+ while working_locs:
+ for w_loc in working_locs:
+ loc_done = True
+ # Most will be incomplete walks, however, we can check.
+ # i.e. parents of children may all exist.
+ if w_loc[:-1] in active_locs:
+ for loc_id in active_locs[w_loc[:-1]]:
+ if loc_id not in all_walks:
+ loc_done = False
+ break
+ else:
+ continue
+ # find child nodes of parent location,
+ # i.e. 'cpcc' = 'cpc' + 'c'
+ w_set = set().union(*(
+ all_walks[loc_id]['locations'][w_loc[-1]]
+ for loc_id in active_locs[w_loc[:-1]]
+ if (
+ loc_id in all_walks
+ and w_loc[-1] in all_walks[loc_id]['locations']
+ )
+ ))
+ w_set.difference_update(active_walk['walk_ids'])
+ if w_set:
+ active_locs[w_loc] = w_set
+ active_walk['walk_ids'].update(w_set)
+ active_walk['depths'][n_depth].update(w_set)
+ # If child/parent nodes have been pruned we will need
+ # to regenerate them.
+ if (
+ loc_done
+ and not w_set.difference(self.all_n_window_nodes)
+ ):
+ active_walk['done_locs'].add(w_loc[:-1])
+ active_walk['done_ids'].update(
+ active_locs[w_loc[:-1]]
+ )
+ working_locs = [
+ new_loc
+ for loc in working_locs
+ if loc in active_locs and len(loc) < self.n_edge_distance
+ for new_loc in (loc + c_tag, loc + p_tag)
+ ]
+ n_depth += 1
- # Don't expand window about orphan task.
+ # Graph walk
+ node_tokens: Tokens
child_tokens: Tokens
parent_tokens: Tokens
- if not is_orphan:
- tdef = self.schd.config.taskdefs[source_tokens['task']]
- # TODO: xtrigger is workflow_state edges too
- # Reference set for workflow relations
- final_point = self.schd.config.final_point
- if descendant or is_active:
- if graph_children is None:
- graph_children = generate_graph_children(tdef, point)
- if not any(graph_children.values()):
- self.n_window_boundary_nodes[active_id].setdefault(
- edge_distance - 1,
- set()
- ).add(source_tokens.id)
-
- # Children/downstream nodes
- for items in graph_children.values():
- for child_name, child_point, _ in items:
- if child_point > final_point:
- continue
- child_tokens = self.id_.duplicate(
- cycle=str(child_point),
- task=child_name,
- )
- # We still increment the graph one further to find
- # boundary nodes, but don't create elements.
- if edge_distance <= self.n_edge_distance:
- self.generate_edge(
- source_tokens,
- child_tokens,
- active_id
- )
- if child_tokens.id in self.n_window_nodes[active_id]:
- continue
- self.increment_graph_window(
- child_tokens,
- child_point,
- flow_nums,
- edge_distance,
- active_id,
- True,
- False
- )
+ walk_incomplete = True
+ while walk_incomplete:
+ walk_incomplete = False
+ # Only walk locations not fully explored
+ locations = [
+ loc
+ for loc in active_locs
+ if (
- # Parents/upstream nodes
- if is_parent or is_active:
- for items in generate_graph_parents(
- tdef,
- point,
- self.schd.config.taskdefs
- ).values():
- for parent_name, parent_point, _ in items:
- if parent_point > final_point:
+ len(loc) < self.n_edge_distance
+ and loc not in active_walk['done_locs']
+ )
+ ]
+ # Origin/Active usually first or isolate nodes
+ if (
+ not active_walk['done_ids']
+ and not locations
+ and active_id not in active_walk['orphans']
+ and self.n_edge_distance != 0
+ ):
+ locations = ['']
+ # Explore/walk locations
+ for location in locations:
+ walk_incomplete = True
+ if not location:
+ loc_nodes = {active_id}
+ else:
+ loc_nodes = active_locs[location]
+ active_walk['done_locs'].add(location)
+ c_loc = location + c_tag
+ p_loc = location + p_tag
+ c_ids = set()
+ p_ids = set()
+ n_depth = len(location) + 1
+ # Exclude walked nodes at this location.
+ # This also helps avoid walking in a circle.
+ for node_id in loc_nodes.difference(active_walk['done_ids']):
+ active_walk['done_ids'].add(node_id)
+ node_tokens = Tokens(node_id)
+ # Don't expand window about orphan task.
+ try:
+ tdef = taskdefs[node_tokens['task']]
+ except KeyError:
+ active_walk['orphans'].add(node_id)
+ continue
+ # Use existing children/parents from other walks.
+ # (note: nodes/edges should already be generated)
+ c_done = False
+ p_done = False
+ if node_id in all_walks and node_id is not active_id:
+ with suppress(KeyError):
+ # If children have been pruned, don't skip,
+ # re-generate them (uncommon or impossible?).
+ if not all_walks[node_id]['locations'][
+ c_tag
+ ].difference(self.all_n_window_nodes):
+ c_ids.update(
+ all_walks[node_id]['locations'][c_tag]
+ )
+ c_done = True
+ with suppress(KeyError):
+ # If parent have been pruned, don't skip,
+ # re-generate them (more common case).
+ if not all_walks[node_id]['locations'][
+ p_tag
+ ].difference(self.all_n_window_nodes):
+ p_ids.update(
+ all_walks[node_id]['locations'][p_tag]
+ )
+ p_done = True
+ if p_done and c_done:
continue
- parent_tokens = self.id_.duplicate(
- cycle=str(parent_point),
- task=parent_name,
- )
- if edge_distance <= self.n_edge_distance:
- # reverse for parent
- self.generate_edge(
- parent_tokens,
- source_tokens,
- active_id
+
+ # Children/downstream nodes
+ # TODO: xtrigger is workflow_state edges too
+ # see: https://github.com/cylc/cylc-flow/issues/4582
+ # Reference set for workflow relations
+ nc_ids = set()
+ if not c_done:
+ if itask is not None and n_depth == 1:
+ graph_children = itask.graph_children
+ else:
+ graph_children = generate_graph_children(
+ tdef,
+ get_point(node_tokens['cycle'])
)
- if parent_tokens.id in self.n_window_nodes[active_id]:
- continue
- self.increment_graph_window(
- parent_tokens,
- parent_point,
- flow_nums,
- edge_distance,
- active_id,
- False,
- True
- )
+ for items in graph_children.values():
+ for child_name, child_point, _ in items:
+ if child_point > final_point:
+ continue
+ child_tokens = self.id_.duplicate(
+ cycle=str(child_point),
+ task=child_name,
+ )
+ self.generate_ghost_task(
+ child_tokens,
+ child_point,
+ flow_nums,
+ False,
+ None,
+ n_depth
+ )
+ self.generate_edge(
+ node_tokens,
+ child_tokens,
+ active_id
+ )
+ nc_ids.add(child_tokens.id)
+
+ # Parents/upstream nodes
+ np_ids = set()
+ if not p_done:
+ for items in generate_graph_parents(
+ tdef,
+ get_point(node_tokens['cycle']),
+ taskdefs
+ ).values():
+ for parent_name, parent_point, _ in items:
+ if parent_point > final_point:
+ continue
+ parent_tokens = self.id_.duplicate(
+ cycle=str(parent_point),
+ task=parent_name,
+ )
+ self.generate_ghost_task(
+ parent_tokens,
+ parent_point,
+ flow_nums,
+ True,
+ None,
+ n_depth
+ )
+ # reverse for parent
+ self.generate_edge(
+ parent_tokens,
+ node_tokens,
+ active_id
+ )
+ np_ids.add(parent_tokens.id)
+
+ # Register new walk
+ if node_id not in all_walks:
+ all_walks[node_id] = {
+ 'locations': {},
+ 'done_ids': set(),
+ 'done_locs': set(),
+ 'orphans': set(),
+ 'walk_ids': {node_id} | nc_ids | np_ids,
+ 'depths': {
+ depth: set()
+ for depth in range(1, self.n_edge_distance + 1)
+ }
+ }
+ if nc_ids:
+ all_walks[node_id]['locations'][c_tag] = nc_ids
+ all_walks[node_id]['depths'][1].update(nc_ids)
+ c_ids.update(nc_ids)
+ if np_ids:
+ all_walks[node_id]['locations'][p_tag] = np_ids
+ all_walks[node_id]['depths'][1].update(np_ids)
+ p_ids.update(np_ids)
+
+ # Create location association
+ c_ids.difference_update(active_walk['walk_ids'])
+ if c_ids:
+ active_locs.setdefault(c_loc, set()).update(c_ids)
+ p_ids.difference_update(active_walk['walk_ids'])
+ if p_ids:
+ active_locs.setdefault(p_loc, set()).update(p_ids)
+ active_walk['walk_ids'].update(c_ids, p_ids)
+ active_walk['depths'][n_depth].update(c_ids, p_ids)
+
+ self.n_window_completed_walks.add(active_id)
+ self.n_window_nodes[active_id].update(active_walk['walk_ids'])
- # If this is the active task (edge_distance has been incremented),
- # then add the most distant child as a trigger to prune it.
- if is_active:
- levels = self.n_window_boundary_nodes[active_id].keys()
+ # This part is vital to constructing a set of boundary nodes
+ # associated with the n=0 window of current active node.
+ # Only trigger pruning for furthest set of boundary nodes
+ boundary_nodes: Set[str] = set()
+ max_level: int = 0
+ with suppress(ValueError):
+ max_level = max(
+ len(loc)
+ for loc in active_locs
+ if p_tag not in loc
+ )
+ # add the most distant child as a trigger to prune it.
+ boundary_nodes.update(*(
+ active_locs[loc]
+ for loc in active_locs
+ if p_tag not in loc and len(loc) >= max_level
+ ))
+ if not boundary_nodes and not max_level:
# Could be self-reference node foo:failed => foo
- if not levels:
- self.n_window_boundary_nodes[active_id][0] = {active_id}
- levels = (0,)
- # Only trigger pruning for furthest set of boundary nodes
- for tp_id in self.n_window_boundary_nodes[active_id][max(levels)]:
- self.prune_trigger_nodes.setdefault(
- tp_id, set()).add(active_id)
- del self.n_window_boundary_nodes[active_id]
- if self.n_window_edges[active_id]:
- getattr(self.updated[WORKFLOW], EDGES).edges.extend(
- self.n_window_edges[active_id])
+ boundary_nodes = {active_id}
+ # associate
+ for tp_id in boundary_nodes:
+ try:
+ self.prune_trigger_nodes.setdefault(tp_id, set()).update(
+ active_walk['walk_ids']
+ )
+ self.prune_trigger_nodes[tp_id].discard(tp_id)
+ except KeyError:
+ self.prune_trigger_nodes.setdefault(tp_id, set()).add(
+ active_id
+ )
+ # flag manual triggers for pruning on deletion.
+ if is_manual_submit:
+ self.prune_trigger_nodes.setdefault(active_id, set()).add(
+ active_id
+ )
+ if active_walk['orphans']:
+ self.prune_trigger_nodes.setdefault(active_id, set()).union(
+ active_walk['orphans']
+ )
+ # Check if active node is another's boundary node
+ # to flag its paths for pruning.
+ if active_id in self.prune_trigger_nodes:
+ self.prune_flagged_nodes.update(
+ self.prune_trigger_nodes[active_id])
+ del self.prune_trigger_nodes[active_id]
def generate_edge(
self,
@@ -894,7 +1088,7 @@ def generate_edge(
"""Construct edge of child and parent task proxy node."""
# Initiate edge element.
e_id = self.edge_id(parent_tokens, child_tokens)
- if e_id in self.n_window_edges[active_id]:
+ if e_id in self.n_window_edges:
return
if (
e_id not in self.data[self.workflow_id][EDGES]
@@ -912,7 +1106,8 @@ def generate_edge(
self.updated[TASK_PROXIES].setdefault(
parent_tokens.id,
PbTaskProxy(id=parent_tokens.id)).edges.append(e_id)
- self.n_window_edges[active_id].add(e_id)
+ getattr(self.updated[WORKFLOW], EDGES).edges.append(e_id)
+ self.n_window_edges.add(e_id)
def remove_pool_node(self, name, point):
"""Remove ID reference and flag isolate node/branch for pruning."""
@@ -930,13 +1125,16 @@ def remove_pool_node(self, name, point):
):
self.prune_flagged_nodes.update(self.prune_trigger_nodes[tp_id])
del self.prune_trigger_nodes[tp_id]
- self.updates_pending = True
elif (
tp_id in self.n_window_nodes and
self.n_window_nodes[tp_id].isdisjoint(self.all_task_pool)
):
self.prune_flagged_nodes.add(tp_id)
- self.updates_pending = True
+ elif tp_id in self.n_window_node_walks:
+ self.prune_flagged_nodes.update(
+ self.n_window_node_walks[tp_id]['walk_ids']
+ )
+ self.updates_pending = True
def add_pool_node(self, name, point):
"""Add external ID reference for internal task pool node."""
@@ -945,6 +1143,7 @@ def add_pool_node(self, name, point):
task=name,
).id
self.all_task_pool.add(tp_id)
+ self.update_window_depths = True
def generate_ghost_task(
self,
@@ -952,8 +1151,9 @@ def generate_ghost_task(
point,
flow_nums,
is_parent=False,
- itask=None
- ) -> Tuple[bool, Optional[dict]]:
+ itask=None,
+ n_depth=0,
+ ):
"""Create task-point element populated with static data.
Args:
@@ -964,29 +1164,26 @@ def generate_ghost_task(
Used to determine whether to load DB state.
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy object.
+ n_depth (int): n-window graph edge distance.
Returns:
- (is_orphan, graph_children)
- Orphan tasks with no children return (True, None) respectively.
+ None
"""
+ tp_id = tokens.id
+ if (
+ tp_id in self.data[self.workflow_id][TASK_PROXIES]
+ or tp_id in self.added[TASK_PROXIES]
+ ):
+ return
+
name = tokens['task']
point_string = tokens['cycle']
t_id = self.definition_id(name)
- tp_id = tokens.id
- task_proxies = self.data[self.workflow_id][TASK_PROXIES]
-
- is_orphan = False
- if name not in self.schd.config.taskdefs:
- is_orphan = True
if itask is None:
itask = self.schd.pool.get_task(point_string, name)
- if tp_id in task_proxies or tp_id in self.added[TASK_PROXIES]:
- if itask is None:
- return is_orphan, None
- return is_orphan, itask.graph_children
if itask is None:
itask = TaskProxy(
@@ -998,7 +1195,9 @@ def generate_ghost_task(
data_mode=True
)
- if is_orphan:
+ is_orphan = False
+ if name not in self.schd.config.taskdefs:
+ is_orphan = True
self.generate_orphan_task(itask)
# Most of the time the definition node will be in the store.
@@ -1009,7 +1208,7 @@ def generate_ghost_task(
task_def = self.added[TASKS][t_id]
except KeyError:
# Task removed from workflow definition.
- return False, itask.graph_children
+ return
update_time = time()
tp_stamp = f'{tp_id}@{update_time}'
@@ -1023,8 +1222,11 @@ def generate_ghost_task(
in self.schd.pool.tasks_to_hold
),
depth=task_def.depth,
+ graph_depth=n_depth,
name=name,
)
+ self.all_n_window_nodes.add(tp_id)
+ self.n_window_depths.setdefault(n_depth, set()).add(tp_id)
tproxy.namespace[:] = task_def.namespace
if is_orphan:
@@ -1077,7 +1279,7 @@ def generate_ghost_task(
self.updates_pending = True
- return is_orphan, itask.graph_children
+ return
def generate_orphan_task(self, itask):
"""Generate orphan task definition."""
@@ -1196,7 +1398,6 @@ def generate_ghost_family(self, fp_id, child_fam=None, child_task=None):
def apply_task_proxy_db_history(self):
"""Extract and apply DB history on given task proxies."""
-
if not self.db_load_task_proxies:
return
@@ -1501,16 +1702,23 @@ def insert_db_job(self, row_idx, row):
def update_data_structure(self):
"""Workflow batch updates in the data structure."""
- # load database history for flagged nodes
- self.apply_task_proxy_db_history()
# Avoids changing window edge distance during edge/node creation
if self.next_n_edge_distance is not None:
self.n_edge_distance = self.next_n_edge_distance
+ self.window_resize_rewalk()
self.next_n_edge_distance = None
+ # load database history for flagged nodes
+ self.apply_task_proxy_db_history()
+
self.updates_pending_follow_on = False
self.prune_data_store()
+
+ # Find depth changes and create deltas
+ if self.update_window_depths:
+ self.window_depth_finder()
+
if self.updates_pending:
# update
self.update_family_proxies()
@@ -1549,6 +1757,83 @@ def update_workflow_states(self):
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()
+ def window_resize_rewalk(self):
+ """Re-create data-store n-window on resize."""
+ tokens: Tokens
+ # Gather pre-resize window nodes
+ if not self.all_n_window_nodes:
+ self.all_n_window_nodes = set().union(*(
+ v
+ for k, v in self.n_window_nodes.items()
+ if k in self.all_task_pool
+ ))
+
+ # Clear window walks, and walk from scratch.
+ self.prune_flagged_nodes.clear()
+ self.n_window_node_walks.clear()
+ for tp_id in self.all_task_pool:
+ tokens = Tokens(tp_id)
+ tp_id, tproxy = self.store_node_fetcher(tokens)
+ self.increment_graph_window(
+ tokens,
+ get_point(tokens['cycle']),
+ tproxy.flow_nums
+ )
+ # Flag difference between old and new window for pruning.
+ self.prune_flagged_nodes.update(
+ self.all_n_window_nodes.difference(*(
+ v
+ for k, v in self.n_window_nodes.items()
+ if k in self.all_task_pool
+ ))
+ )
+ self.update_window_depths = True
+
+ def window_depth_finder(self):
+ """Recalculate window depths, creating depth deltas."""
+ # Setup new window depths
+ n_window_depths: Dict[int, Set[str]] = {
+ 0: self.all_task_pool.copy()
+ }
+
+ depth = 1
+ # Since starting from smaller depth, exclude those whose depth has
+ # already been found.
+ depth_found_tasks: Set[str] = self.all_task_pool.copy()
+ while depth <= self.n_edge_distance:
+ n_window_depths[depth] = set().union(*(
+ self.n_window_node_walks[n_id]['depths'][depth]
+ for n_id in self.all_task_pool
+ if (
+ n_id in self.n_window_node_walks
+ and depth in self.n_window_node_walks[n_id]['depths']
+ )
+ )).difference(depth_found_tasks)
+ depth_found_tasks.update(n_window_depths[depth])
+ # Calculate next depth parameters.
+ depth += 1
+
+ # Create deltas of those whose depth has changed, a node should only
+ # appear once across all depths.
+ # So the diff will only contain it at a single depth and if it didn't
+ # appear at the same depth previously.
+ update_time = time()
+ for depth, node_set in n_window_depths.items():
+ node_set_diff = node_set.difference(
+ self.n_window_depths.setdefault(depth, set())
+ )
+ if not self.updates_pending and node_set_diff:
+ self.updates_pending = True
+ for tp_id in node_set_diff:
+ tp_delta = self.updated[TASK_PROXIES].setdefault(
+ tp_id, PbTaskProxy(id=tp_id)
+ )
+ tp_delta.stamp = f'{tp_id}@{update_time}'
+ tp_delta.graph_depth = depth
+ # Set old to new.
+ self.n_window_depths = n_window_depths
+ self.update_window_depths = False
+
def prune_data_store(self):
"""Remove flagged nodes and edges not in the set of active paths."""
@@ -1583,8 +1868,6 @@ def prune_data_store(self):
for tp_id in list(node_ids):
if tp_id in self.n_window_nodes:
del self.n_window_nodes[tp_id]
- if tp_id in self.n_window_edges:
- del self.n_window_edges[tp_id]
if tp_id in tp_data:
node = tp_data[tp_id]
elif tp_id in tp_added:
@@ -1592,6 +1875,11 @@ def prune_data_store(self):
else:
node_ids.remove(tp_id)
continue
+ self.n_window_edges.difference_update(node.edges)
+ if tp_id in self.n_window_node_walks:
+ del self.n_window_node_walks[tp_id]
+ if tp_id in self.n_window_completed_walks:
+ self.n_window_completed_walks.remove(tp_id)
for sig in node.xtriggers:
self.xtrigger_tasks[sig].remove(
(tp_id, node.xtriggers[sig].label)
@@ -1746,6 +2034,7 @@ def _family_ascent_point_update(self, fp_id):
is_held_total = 0
is_queued_total = 0
is_runahead_total = 0
+ graph_depth = self.n_edge_distance
for child_id in fam_node.child_families:
child_node = fp_updated.get(child_id, fp_data.get(child_id))
if child_node is not None:
@@ -1753,6 +2042,8 @@ def _family_ascent_point_update(self, fp_id):
is_queued_total += child_node.is_queued_total
is_runahead_total += child_node.is_runahead_total
state_counter += Counter(dict(child_node.state_totals))
+ if child_node.graph_depth < graph_depth:
+ graph_depth = child_node.graph_depth
# Gather all child task states
task_states = []
for tp_id in fam_node.child_tasks:
@@ -1787,6 +2078,12 @@ def _family_ascent_point_update(self, fp_id):
if tp_runahead.is_runahead:
is_runahead_total += 1
+ tp_depth = tp_delta
+ if tp_depth is None or not tp_depth.HasField('graph_depth'):
+ tp_depth = tp_node
+ if tp_depth.graph_depth < graph_depth:
+ graph_depth = tp_depth.graph_depth
+
state_counter += Counter(task_states)
# created delta data element
fp_delta = PbFamilyProxy(
@@ -1798,7 +2095,8 @@ def _family_ascent_point_update(self, fp_id):
is_queued=(is_queued_total > 0),
is_queued_total=is_queued_total,
is_runahead=(is_runahead_total > 0),
- is_runahead_total=is_runahead_total
+ is_runahead_total=is_runahead_total,
+ graph_depth=graph_depth
)
fp_delta.states[:] = state_counter.keys()
# Use all states to clean up pruned counts
@@ -1820,8 +2118,9 @@ def set_graph_window_extent(self, n_edge_distance):
Maximum edge distance from active node.
"""
- self.next_n_edge_distance = n_edge_distance
- self.updates_pending = True
+ if n_edge_distance != self.n_edge_distance:
+ self.next_n_edge_distance = n_edge_distance
+ self.updates_pending = True
def update_workflow(self, reloaded=False):
"""Update workflow element status and state totals."""
@@ -1882,6 +2181,10 @@ def update_workflow(self, reloaded=False):
if reloaded is not w_data.reloaded:
w_delta.reloaded = reloaded
+ if w_data.n_edge_distance != self.n_edge_distance:
+ w_delta.n_edge_distance = self.n_edge_distance
+ delta_set = True
+
if self.schd.pool.main_pool:
pool_points = set(self.schd.pool.main_pool)
oldest_point = str(min(pool_points))
diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py
index 55016776bfb..c51946ff188 100644
--- a/cylc/flow/network/resolvers.py
+++ b/cylc/flow/network/resolvers.py
@@ -267,6 +267,10 @@ def node_filter(node, node_type, args, state):
args.get('maxdepth', -1) < 0
or node.depth <= args['maxdepth']
)
+ and (
+ args.get('graph_depth', -1) < 0
+ or node.graph_depth <= args['graph_depth']
+ )
# Now filter node against id arg lists
and (
not args.get('ids')
diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py
index 63d9f236d7a..f9ed95c7158 100644
--- a/cylc/flow/network/schema.py
+++ b/cylc/flow/network/schema.py
@@ -203,6 +203,7 @@ class SortArgs(InputObjectType):
'is_runahead': Boolean(),
'mindepth': Int(default_value=-1),
'maxdepth': Int(default_value=-1),
+ 'graph_depth': Int(default_value=-1),
'sort': SortArgs(default_value=None),
}
@@ -218,6 +219,7 @@ class SortArgs(InputObjectType):
'is_runahead': Boolean(),
'mindepth': Int(default_value=-1),
'maxdepth': Int(default_value=-1),
+ 'graph_depth': Int(default_value=-1),
'sort': SortArgs(default_value=None),
}
@@ -226,8 +228,6 @@ class SortArgs(InputObjectType):
'exids': graphene.List(ID, default_value=[]),
'states': graphene.List(String, default_value=[]),
'exstates': graphene.List(String, default_value=[]),
- 'mindepth': Int(default_value=-1),
- 'maxdepth': Int(default_value=-1),
'sort': SortArgs(default_value=None),
}
@@ -785,6 +785,12 @@ class Meta:
description='Any active workflow broadcasts.'
)
pruned = Boolean() # TODO: what is this? write description
+ n_edge_distance = Int(
+ description=sstrip('''
+ The maximum graph distance (n) from an active node
+ of the data-store graph window.
+ '''),
+ )
class RuntimeSetting(ObjectType):
@@ -1067,6 +1073,11 @@ class Meta:
depth = Int(
description='The family inheritance depth',
)
+ graph_depth = Int(
+ description=sstrip('''
+ The n-window graph edge depth from closet active task(s).
+ '''),
+ )
job_submits = Int(
description='The number of job submissions for this task instance.',
)
@@ -1217,6 +1228,12 @@ class Meta:
is_runahead = Boolean()
is_runahead_total = Int()
depth = Int()
+ graph_depth = Int(
+ description=sstrip('''
+ The n-window graph edge smallest child task/family depth
+ from closet active task(s).
+ '''),
+ )
child_tasks = graphene.List(
TaskProxy,
description="""Descendant task proxies.""",
diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py
index f9cc94deaaf..ac0e9f44e31 100644
--- a/cylc/flow/task_pool.py
+++ b/cylc/flow/task_pool.py
@@ -840,13 +840,14 @@ def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]':
return point_itasks
- def get_task(self, point, name):
+ def get_task(self, point, name) -> Optional[TaskProxy]:
"""Retrieve a task from the pool."""
rel_id = f'{point}/{name}'
for pool in (self.main_pool, self.hidden_pool):
tasks = pool.get(point)
if tasks and rel_id in tasks:
return tasks[rel_id]
+ return None
def _get_hidden_task_by_id(self, id_: str) -> Optional[TaskProxy]:
"""Return runahead pool task by ID if it exists, or None."""
diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py
index 9d51794ec6c..9b6f0d84cb2 100644
--- a/cylc/flow/task_proxy.py
+++ b/cylc/flow/task_proxy.py
@@ -253,7 +253,10 @@ def __init__(
self.state = TaskState(tdef, self.point, status, is_held)
# Determine graph children of this task (for spawning).
- self.graph_children = generate_graph_children(tdef, self.point)
+ if data_mode:
+ self.graph_children = {}
+ else:
+ self.graph_children = generate_graph_children(tdef, self.point)
def __repr__(self) -> str:
return f"<{self.__class__.__name__} '{self.tokens}'>"
diff --git a/mypy.ini b/mypy.ini
index 4c35cf1d53e..fe03c3bbe12 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -19,3 +19,7 @@ exclude= cylc/flow/etc/tutorial/.*
# Suppress the following messages:
# By default the bodies of untyped functions are not checked, consider using --check-untyped-defs
disable_error_code = annotation-unchecked
+
+# For some reason, couldn't exclude this with the exclude directive above
+[mypy-cylc.flow.data_messages_pb2]
+ignore_errors = True
diff --git a/tests/functional/cylc-show/06-past-present-future/flow.cylc b/tests/functional/cylc-show/06-past-present-future/flow.cylc
index 383ff86e0fb..5c8497d6a0a 100644
--- a/tests/functional/cylc-show/06-past-present-future/flow.cylc
+++ b/tests/functional/cylc-show/06-past-present-future/flow.cylc
@@ -20,6 +20,8 @@
cylc stop --now --max-polls=10 --interval=1 $CYLC_WORKFLOW_ID
false
else
+ # Allow time for c submission => running
+ sleep 2
cylc show "$CYLC_WORKFLOW_ID//1/b" >> $CYLC_WORKFLOW_RUN_DIR/show-b.txt
cylc show "$CYLC_WORKFLOW_ID//1/c" >> $CYLC_WORKFLOW_RUN_DIR/show-c.txt
cylc show "$CYLC_WORKFLOW_ID//1/d" >> $CYLC_WORKFLOW_RUN_DIR/show-d.txt
diff --git a/tests/functional/graphql/01-workflow.t b/tests/functional/graphql/01-workflow.t
index b0f82997a0a..ae9044fe2c6 100755
--- a/tests/functional/graphql/01-workflow.t
+++ b/tests/functional/graphql/01-workflow.t
@@ -47,6 +47,7 @@ query {
oldestActiveCyclePoint
reloaded
runMode
+ nEdgeDistance
stateTotals
workflowLogDir
timeZoneInfo {
@@ -96,6 +97,7 @@ cmp_json "${TEST_NAME}-out" "${TEST_NAME_BASE}-workflows.stdout" << __HERE__
"oldestActiveCyclePoint": "20210101T0000Z",
"reloaded": false,
"runMode": "live",
+ "nEdgeDistance": 1,
"stateTotals": {
"waiting": 1,
"expired": 0,
diff --git a/tests/functional/graphql/03-is-held-arg.t b/tests/functional/graphql/03-is-held-arg.t
index 414f2842526..6603e1c5347 100755
--- a/tests/functional/graphql/03-is-held-arg.t
+++ b/tests/functional/graphql/03-is-held-arg.t
@@ -49,14 +49,14 @@ query {
workflows {
name
isHeldTotal
- taskProxies(isHeld: true) {
+ taskProxies(isHeld: true, graphDepth: 1) {
id
jobs {
submittedTime
startedTime
}
}
- familyProxies(exids: [\"*/root\"], isHeld: true) {
+ familyProxies(exids: [\"*/root\"], isHeld: true, graphDepth: 1) {
id
}
}
diff --git a/tests/functional/n-window/01-past-present-future.t b/tests/functional/n-window/01-past-present-future.t
new file mode 100644
index 00000000000..d5ed27a8085
--- /dev/null
+++ b/tests/functional/n-window/01-past-present-future.t
@@ -0,0 +1,66 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#-------------------------------------------------------------------------------
+
+# Test window size using graphql and cylc-show for all tasks.
+
+. "$(dirname "$0")/test_header"
+
+set_test_number 7
+
+install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
+
+TEST_NAME="${TEST_NAME_BASE}-validate"
+run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}"
+
+TEST_NAME="${TEST_NAME_BASE}-run"
+# 'a => b => c => d => e', 'a' sets window size to 2, 'c' uses cylc show on all.
+workflow_run_ok "${TEST_NAME}" cylc play --no-detach --debug "${WORKFLOW_NAME}"
+
+TEST_NAME="${TEST_NAME_BASE}-show-a.past"
+contains_ok "$WORKFLOW_RUN_DIR/show-a.txt" <<__END__
+state: succeeded
+prerequisites: (None)
+__END__
+
+TEST_NAME="${TEST_NAME_BASE}-show-b.past"
+contains_ok "$WORKFLOW_RUN_DIR/show-b.txt" <<__END__
+state: succeeded
+prerequisites: (n/a for past tasks)
+__END__
+
+TEST_NAME="${TEST_NAME_BASE}-show-c.present"
+contains_ok "${WORKFLOW_RUN_DIR}/show-c.txt" <<__END__
+prerequisites: ('-': not satisfied)
+ + 1/b succeeded
+__END__
+
+TEST_NAME="${TEST_NAME_BASE}-show-d.future"
+contains_ok "${WORKFLOW_RUN_DIR}/show-d.txt" <<__END__
+state: waiting
+prerequisites: ('-': not satisfied)
+ - 1/c succeeded
+__END__
+
+TEST_NAME="${TEST_NAME_BASE}-show-e.future"
+contains_ok "${WORKFLOW_RUN_DIR}/show-e.txt" <<__END__
+state: waiting
+prerequisites: ('-': not satisfied)
+ - 1/d succeeded
+__END__
+
+purge
diff --git a/tests/functional/n-window/01-past-present-future/flow.cylc b/tests/functional/n-window/01-past-present-future/flow.cylc
new file mode 100644
index 00000000000..a032274a41e
--- /dev/null
+++ b/tests/functional/n-window/01-past-present-future/flow.cylc
@@ -0,0 +1,41 @@
+[scheduler]
+ allow implicit tasks = True
+ [[events]]
+ inactivity timeout = PT1M
+ abort on inactivity timeout = True
+[scheduling]
+ [[graph]]
+ R1 = """
+ a => b => c => d => e
+ """
+[runtime]
+ [[a]]
+ script = """
+set +e
+
+read -r -d '' gqlDoc <<_DOC_
+{"request_string": "
+mutation {
+ setGraphWindowExtent (
+ workflows: [\"${CYLC_WORKFLOW_ID}\"],
+ nEdgeDistance: 2) {
+ result
+ }
+}",
+"variables": null}
+_DOC_
+
+echo "${gqlDoc}"
+
+cylc client "$CYLC_WORKFLOW_ID" graphql < <(echo ${gqlDoc}) 2>/dev/null
+
+set -e
+"""
+ [[c]]
+ script = """
+cylc show "$CYLC_WORKFLOW_ID//1/a" >> $CYLC_WORKFLOW_RUN_DIR/show-a.txt
+cylc show "$CYLC_WORKFLOW_ID//1/b" >> $CYLC_WORKFLOW_RUN_DIR/show-b.txt
+cylc show "$CYLC_WORKFLOW_ID//1/c" >> $CYLC_WORKFLOW_RUN_DIR/show-c.txt
+cylc show "$CYLC_WORKFLOW_ID//1/d" >> $CYLC_WORKFLOW_RUN_DIR/show-d.txt
+cylc show "$CYLC_WORKFLOW_ID//1/e" >> $CYLC_WORKFLOW_RUN_DIR/show-e.txt
+"""
diff --git a/tests/functional/n-window/02-big-window.t b/tests/functional/n-window/02-big-window.t
new file mode 100644
index 00000000000..e6aa45fae24
--- /dev/null
+++ b/tests/functional/n-window/02-big-window.t
@@ -0,0 +1,56 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#-------------------------------------------------------------------------------
+
+# Test large window size using graphql and find tasks in window.
+# This is helpful with coverage by using most the no-rewalk mechanics.
+
+. "$(dirname "$0")/test_header"
+
+set_test_number 5
+
+install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
+
+TEST_NAME="${TEST_NAME_BASE}-validate"
+run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}"
+
+TEST_NAME="${TEST_NAME_BASE}-run"
+# 'a => b => . . . f => g => h', 'a' sets window size to 5,
+# 'b => i => j => f', 'c' finds 'a', 'j', 'h'
+workflow_run_ok "${TEST_NAME}" cylc play --no-detach --debug "${WORKFLOW_NAME}"
+
+TEST_NAME="${TEST_NAME_BASE}-show-a.past"
+contains_ok "$WORKFLOW_RUN_DIR/show-a.txt" <<__END__
+state: succeeded
+prerequisites: (None)
+__END__
+
+TEST_NAME="${TEST_NAME_BASE}-show-j.parallel"
+contains_ok "${WORKFLOW_RUN_DIR}/show-j.txt" <<__END__
+state: waiting
+prerequisites: ('-': not satisfied)
+ - 1/i succeeded
+__END__
+
+TEST_NAME="${TEST_NAME_BASE}-show-h.future"
+contains_ok "${WORKFLOW_RUN_DIR}/show-h.txt" <<__END__
+state: waiting
+prerequisites: ('-': not satisfied)
+ - 1/g succeeded
+__END__
+
+purge
diff --git a/tests/functional/n-window/02-big-window/flow.cylc b/tests/functional/n-window/02-big-window/flow.cylc
new file mode 100644
index 00000000000..09e4d8181fc
--- /dev/null
+++ b/tests/functional/n-window/02-big-window/flow.cylc
@@ -0,0 +1,52 @@
+[scheduler]
+ allow implicit tasks = True
+ [[events]]
+ inactivity timeout = PT1M
+ abort on inactivity timeout = True
+[scheduling]
+ [[graph]]
+ R1 = """
+ a => b => c => d => e => f => g => h
+ b => i => j => f
+ """
+[runtime]
+ [[a]]
+ script = """
+set +e
+
+read -r -d '' gqlDoc <<_DOC_
+{"request_string": "
+mutation {
+ setGraphWindowExtent (
+ workflows: [\"${CYLC_WORKFLOW_ID}\"],
+ nEdgeDistance: 5) {
+ result
+ }
+}",
+"variables": null}
+_DOC_
+
+echo "${gqlDoc}"
+
+cylc client "$CYLC_WORKFLOW_ID" graphql < <(echo ${gqlDoc}) 2>/dev/null
+
+set -e
+"""
+ [[c]]
+ script = """
+cylc show "$CYLC_WORKFLOW_ID//1/a" >> $CYLC_WORKFLOW_RUN_DIR/show-a.txt
+cylc show "$CYLC_WORKFLOW_ID//1/j" >> $CYLC_WORKFLOW_RUN_DIR/show-j.txt
+cylc show "$CYLC_WORKFLOW_ID//1/h" >> $CYLC_WORKFLOW_RUN_DIR/show-h.txt
+"""
+
+ [[i]]
+ script = """
+# Slow 2nd branch down
+sleep 5
+"""
+
+ [[f]]
+ script = """
+# test re-trigger of old point
+cylc trigger "$CYLC_WORKFLOW_ID//1/b"
+"""
diff --git a/tests/functional/n-window/test_header b/tests/functional/n-window/test_header
new file mode 120000
index 00000000000..90bd5a36f92
--- /dev/null
+++ b/tests/functional/n-window/test_header
@@ -0,0 +1 @@
+../lib/bash/test_header
\ No newline at end of file
diff --git a/tests/integration/test_increment_graph_window.py b/tests/integration/test_increment_graph_window.py
new file mode 100644
index 00000000000..bd418a8bad5
--- /dev/null
+++ b/tests/integration/test_increment_graph_window.py
@@ -0,0 +1,410 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from contextlib import suppress
+
+from cylc.flow.cycling.integer import IntegerPoint
+from cylc.flow.data_store_mgr import (
+ TASK_PROXIES,
+)
+from cylc.flow.id import Tokens
+
+
+def increment_graph_window(schd, task):
+ """Increment the graph window about the active task."""
+ tokens = schd.tokens.duplicate(cycle='1', task=task)
+ schd.data_store_mgr.increment_graph_window(
+ tokens,
+ IntegerPoint('1'),
+ {1},
+ is_manual_submit=False,
+ )
+
+
+def get_deltas(schd):
+ """Return the ids and graph-window values in the delta store.
+
+ Note, call before get_n_window as this clears the delta store.
+
+ Returns:
+ (added, updated, pruned)
+
+ """
+ # populate added deltas
+ schd.data_store_mgr.gather_delta_elements(
+ schd.data_store_mgr.added,
+ 'added',
+ )
+ # populate pruned deltas
+ schd.data_store_mgr.prune_data_store()
+ # Run depth finder
+ schd.data_store_mgr.window_depth_finder()
+ # populate updated deltas
+ schd.data_store_mgr.gather_delta_elements(
+ schd.data_store_mgr.updated,
+ 'updated',
+ )
+ return (
+ {
+ # added
+ Tokens(tb_task_proxy.id)['task']: tb_task_proxy.graph_depth
+ for tb_task_proxy in schd.data_store_mgr.deltas[TASK_PROXIES].added
+ },
+ {
+ # updated
+ Tokens(tb_task_proxy.id)['task']: tb_task_proxy.graph_depth
+ for tb_task_proxy in schd.data_store_mgr.deltas[TASK_PROXIES].updated
+ # only include those updated nodes whose depths have been set
+ if 'graph_depth' in {
+ sub_field.name
+ for sub_field, _ in tb_task_proxy.ListFields()
+ }
+ },
+ {
+ # pruned
+ Tokens(id_)['task']
+ for id_ in schd.data_store_mgr.deltas[TASK_PROXIES].pruned
+ },
+ )
+
+
+async def get_n_window(schd):
+ """Read out the graph window of the workflow."""
+ await schd.update_data_structure()
+ data = schd.data_store_mgr.data[schd.data_store_mgr.workflow_id]
+ return {
+ t.name: t.graph_depth
+ for t in data[TASK_PROXIES].values()
+ }
+
+
+async def complete_task(schd, task):
+ """Mark a task as completed."""
+ schd.data_store_mgr.remove_pool_node(task, IntegerPoint('1'))
+
+
+def add_task(schd, task):
+ """Add a waiting task to the pool."""
+ schd.data_store_mgr.add_pool_node(task, IntegerPoint('1'))
+
+
+def get_graph_walk_cache(schd):
+ """Return the head task names of cached graph walks."""
+ # prune graph walk cache
+ schd.data_store_mgr.prune_data_store()
+ # fetch the cached walks
+ n_window_node_walks = sorted(
+ Tokens(task_id)['task']
+ for task_id in schd.data_store_mgr.n_window_node_walks
+ )
+ n_window_completed_walks = sorted(
+ Tokens(task_id)['task']
+ for task_id in schd.data_store_mgr.n_window_completed_walks
+ )
+ # the IDs in set and keys of dict are only the same at n<2 window.
+ assert n_window_node_walks == n_window_completed_walks
+ return n_window_completed_walks
+
+
+async def test_increment_graph_window_blink(flow, scheduler, start):
+ """Test with a task which drifts in and out of the n-window.
+
+ This workflow presents a fiendish challenge for the graph window algorithm.
+
+ The test runs in the n=3 window and simulates running each task in the
+ chain a - s one by one. The task "blink" is dependent on multiple tasks
+ in the chain awkwardly spaced so that the "blink" task routinely
+ disappears from the n-window, only to re-appear again later.
+
+ The expansion of the window around the "blink" task is difficult to get
+ right as it can be influenced by caches from previous graph walks.
+ """
+ id_ = flow({
+ 'scheduler': {
+ 'allow implicit tasks': 'True',
+ },
+ 'scheduling': {
+ 'cycling mode': 'integer',
+ 'initial cycle point': '1',
+ 'graph': {
+ 'R1': '''
+ # the "abdef" chain of tasks which run one after another
+ a => b => c => d => e => f => g => h => i => j => k => l =>
+ m => n => o => p => q => r => s
+
+ # these dependencies cause "blink" to disappear and
+ # reappear at set intervals
+ a => blink
+ g => blink
+ m => blink
+ s => blink
+ ''',
+ }
+ }
+ })
+ schd = scheduler(id_)
+
+ # the tasks traversed via the "blink" task when...
+ blink = {
+ 1: {
+ # the blink task is n=1
+ 'blink': 1,
+ 'a': 2,
+ 'g': 2,
+ 'm': 2,
+ 's': 2,
+ 'b': 3,
+ 'f': 3,
+ 'h': 3,
+ 'l': 3,
+ 'n': 3,
+ 'r': 3,
+ },
+ 2: {
+ # the blink task is n=2
+ 'blink': 2,
+ 'a': 3,
+ 'g': 3,
+ 'm': 3,
+ 's': 3,
+ },
+ 3: {
+ # the blink task is n=3
+ 'blink': 3,
+ },
+ 4: {
+ # the blink task is n=4
+ },
+ }
+
+ def advance():
+ """Advance to the next task in the workflow.
+
+ This works its way down the chain of tasks between "a" and "s"
+ inclusive, yielding what the n-window should look like for this
+ workflow at each step.
+
+ Yields:
+ tuple - (previous_task, active_task, n_window)
+
+ previous_task:
+ The task which has just "succeeded".
+ active_task:
+ The task which is about to run.
+ n_window:
+ Dictionary of {task_name: graph_depth} for the n=3 window.
+
+ """
+ # the initial window on startup (minus the nodes traversed via "blink")
+ window = {
+ 'a': 0,
+ 'b': 1,
+ 'c': 2,
+ 'd': 3,
+ }
+ # the tasks we will run in order
+ letters = 'abcdefghijklmnopqrs'
+ # the graph-depth of the "blink" task at each stage of the workflow
+ blink_distances = [1] + [*range(2, 5), *range(3, 0, -1)] * 3
+
+ for ind, blink_distance in zip(range(len(letters)), blink_distances):
+ previous_task = letters[ind - 1] if ind > 0 else None
+ active_task = letters[ind]
+ yield (
+ previous_task,
+ active_task,
+ {
+ # the tasks traversed via the "blink" task
+ **blink[blink_distance],
+ # the tasks in the main "abcdefg" chain
+ **{key: abs(value) for key, value in window.items()},
+ }
+ )
+
+ # move each task in the "abcdef" chain down one
+ window = {key: value - 1 for key, value in window.items()}
+ # add the n=3 task in the "abcdef" chain into the window
+ with suppress(IndexError):
+ window[letters[ind + 4]] = 3
+ # pull out anything which is not supposed to be in the n=3 window
+ window = {
+ key: value
+ for key, value in window.items()
+ if abs(value) < 4
+ }
+
+ async with start(schd):
+ schd.data_store_mgr.set_graph_window_extent(3)
+ await schd.update_data_structure()
+
+ previous_n_window = {}
+ for previous_task, active_task, expected_n_window in advance():
+ # mark the previous task as completed
+ await complete_task(schd, previous_task)
+ # add the next task to the pool
+ add_task(schd, active_task)
+ # run the graph window algorithm
+ increment_graph_window(schd, active_task)
+ # get the deltas which increment_graph_window created
+ added, updated, pruned = get_deltas(schd)
+
+ # compare the n-window in the store to what we were expecting
+ n_window = await get_n_window(schd)
+ assert n_window == expected_n_window
+
+ # compare the deltas to what we were expecting
+ if active_task != 'a':
+ # skip the first task as this is complicated by startup logic
+ assert added == {
+ key: value
+ for key, value in expected_n_window.items()
+ if key not in previous_n_window
+ }
+ # Skip added as depth isn't updated
+ # (the manager only updates those that need it)
+ assert updated == {
+ key: value
+ for key, value in expected_n_window.items()
+ if key not in added
+ }
+ assert pruned == {
+ key
+ for key in previous_n_window
+ if key not in expected_n_window
+ }
+
+ previous_n_window = n_window
+
+
+async def test_window_resize_rewalk(flow, scheduler, start):
+ """The window resize method should wipe and rebuild the n-window."""
+ id_ = flow({
+ 'scheduler': {
+ 'allow implicit tasks': 'true',
+ },
+ 'scheduling': {
+ 'graph': {
+ 'R1': 'a => b => c => d => e => f => g'
+ }
+ },
+ })
+ schd = scheduler(id_)
+ async with start(schd):
+ # start with an empty pool
+ schd.pool.remove(schd.pool.get_tasks()[0])
+
+ # the n-window should be empty
+ assert await get_n_window(schd) == {}
+
+ # expand the window around 1/d
+ add_task(schd, 'd')
+ increment_graph_window(schd, 'd')
+
+ # set the graph window to n=3
+ schd.data_store_mgr.set_graph_window_extent(3)
+ assert set(await get_n_window(schd)) == {
+ 'a', 'b', 'c', 'd', 'e', 'f', 'g'
+ }
+
+ # set the graph window to n=1
+ schd.data_store_mgr.set_graph_window_extent(1)
+ schd.data_store_mgr.window_resize_rewalk()
+ assert set(await get_n_window(schd)) == {
+ 'c', 'd', 'e'
+ }
+
+ # set the graph window to n=2
+ schd.data_store_mgr.set_graph_window_extent(2)
+ schd.data_store_mgr.window_resize_rewalk()
+ assert set(await get_n_window(schd)) == {
+ 'b', 'c', 'd', 'e', 'f'
+ }
+
+
+async def test_cache_pruning(flow, scheduler, start):
+ """It should remove graph walks from the cache when no longer needed.
+
+ The algorithm caches graph walks for efficiency. This test is designed to
+ ensure we don't introduce a memory leak by failing to clear cached walks
+ at the correct point.
+ """
+ id_ = flow({
+ 'scheduler': {
+ 'allow implicit tasks': 'True',
+ },
+ 'scheduling': {
+ 'graph': {
+ 'R1': '''
+ # a chain of tasks
+ a => b1 & b2 => c => d1 & d2 => e => f
+ # force "a" to drift into an out of the window
+ a => c
+ a => e
+ '''
+ }
+ },
+ })
+ schd = scheduler(id_)
+ async with start(schd):
+ schd.data_store_mgr.set_graph_window_extent(1)
+
+ # work through this workflow, step by step checking the cached items...
+
+ # active: a
+ add_task(schd, 'a')
+ increment_graph_window(schd, 'a')
+ assert get_graph_walk_cache(schd) == ['a']
+
+ # active: b1, b2
+ await complete_task(schd, 'a')
+ add_task(schd, 'b1')
+ add_task(schd, 'b2')
+ increment_graph_window(schd, 'b1')
+ increment_graph_window(schd, 'b2')
+ assert get_graph_walk_cache(schd) == ['a', 'b1', 'b2']
+
+ # active: c
+ await complete_task(schd, 'b1')
+ await complete_task(schd, 'b2')
+ add_task(schd, 'c')
+ increment_graph_window(schd, 'c')
+ assert get_graph_walk_cache(schd) == ['a', 'b1', 'b2', 'c']
+
+ # active: d1, d2
+ await complete_task(schd, 'c')
+ add_task(schd, 'd1')
+ add_task(schd, 'd2')
+ increment_graph_window(schd, 'd1')
+ increment_graph_window(schd, 'd2')
+ assert get_graph_walk_cache(schd) == ['c', 'd1', 'd2']
+
+ # active: e
+ await complete_task(schd, 'd1')
+ await complete_task(schd, 'd2')
+ add_task(schd, 'e')
+ increment_graph_window(schd, 'e')
+ assert get_graph_walk_cache(schd) == ['d1', 'd2', 'e']
+
+ # active: f
+ await complete_task(schd, 'e')
+ add_task(schd, 'f')
+ increment_graph_window(schd, 'f')
+ assert get_graph_walk_cache(schd) == ['e', 'f']
+
+ # active: None
+ await complete_task(schd, 'f')
+ increment_graph_window(schd, 'f')
+ assert get_graph_walk_cache(schd) == []