Skip to content

Commit ce2cf26

Browse files
committed
feat: even more stats 🙂
Signed-off-by: Yves Bastide <[email protected]>
1 parent d98f368 commit ce2cf26

File tree

4 files changed

+188
-6
lines changed

4 files changed

+188
-6
lines changed

‎simpleflow/command.py

+21-4
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,9 @@ def terminate_workflow(
220220
run_id: str | None,
221221
):
222222
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
223+
if not ex:
224+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
225+
sys.exit(1)
223226
ex.terminate()
224227

225228

@@ -235,6 +238,9 @@ def terminate_workflow(
235238
)
236239
def restart_workflow(domain: str, workflow_id: str, run_id: str | None):
237240
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
241+
if not ex:
242+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
243+
sys.exit(1)
238244
history = ex.history()
239245
ex.terminate(reason="workflow.restart")
240246
new_ex = ex.workflow_type.start_execution(
@@ -299,6 +305,7 @@ def profile(ctx, domain, workflow_id, run_id, nb_tasks):
299305
)
300306

301307

308+
# FIXME superseded by history
302309
@click.option(
303310
"--nb-tasks",
304311
"-n",
@@ -331,6 +338,7 @@ def workflow_tasks(
331338
)
332339

333340

341+
# FIXME superseded by filter
334342
@click.argument(
335343
"domain",
336344
envvar="SWF_DOMAIN",
@@ -357,16 +365,16 @@ def list_workflows(ctx, domain: str, status: str, started_since: int):
357365
_NOTSET = object()
358366

359367

360-
@click.argument(
361-
"domain",
362-
envvar="SWF_DOMAIN",
363-
)
364368
@cli.command(
365369
"workflow.history",
366370
help="Workflow history from workflow WORKFLOW_ID [RUN_ID].",
367371
)
368372
@click.argument("workflow_id")
369373
@click.argument("run_id", required=False)
374+
@click.option(
375+
"--domain",
376+
envvar="SWF_DOMAIN",
377+
)
370378
@click.option(
371379
"--mode", required=False, type=click.Choice(["rawest", "raw", "cooked"]), default="raw", help="Output format."
372380
)
@@ -386,6 +394,9 @@ def workflow_history(
386394
from simpleflow.swf.mapper.models.history.base import History as BaseHistory
387395

388396
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
397+
if not ex:
398+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
399+
sys.exit(1)
389400
events = ex.history_events(
390401
callback=get_progression_callback("events"),
391402
reverse_order=reverse_order,
@@ -410,6 +421,7 @@ def workflow_history(
410421
elif mode == "cooked":
411422
history.parse()
412423
events = {
424+
"workflow": history.workflow,
413425
"activities": history.activities,
414426
"child_workflows": history.child_workflows,
415427
"markers": history.markers,
@@ -821,6 +833,11 @@ def standalone(
821833
ex.workflow_id,
822834
ex.run_id,
823835
)
836+
if not ex:
837+
print(
838+
f"Execution {workflow_id} {ex.run_id} not found" if ex.run_id else f"Workflow {workflow_id} not found"
839+
)
840+
sys.exit(1)
824841
if display_status:
825842
print(f"status: {ex.status}", file=sys.stderr)
826843
if ex.status == ex.STATUS_CLOSED:

‎simpleflow/history.py

+117
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,16 @@ def __init__(self, history: simpleflow.swf.mapper.models.history.History) -> Non
4343
self.started_decision_id: int | None = None
4444
self.completed_decision_id: int | None = None
4545
self.last_event_id: int | None = None
46+
self._workflow: dict[str, Any] = {}
4647

4748
@property
4849
def swf_history(self) -> simpleflow.swf.mapper.models.history.History:
4950
return self._history
5051

52+
@property
53+
def workflow(self):
54+
return self._workflow
55+
5156
@property
5257
def activities(self) -> dict[str, ActivityTaskEventDict]:
5358
"""
@@ -432,6 +437,118 @@ def parse_workflow_event(self, events: list[Event], event: WorkflowExecutionEven
432437
"""
433438
Parse a workflow event.
434439
"""
440+
if event.state == "started":
441+
self._workflow.update(
442+
{
443+
"state": event.state,
444+
f"{event.state}_id": event.id,
445+
f"{event.state}_timestamp": event.timestamp,
446+
"child_policy": getattr(event, "child_policy", None),
447+
"task_list": event.task_list["name"],
448+
"workflow_type": event.workflow_type,
449+
"continued_execution_run_id": getattr(event, "continued_execution_run_id", None),
450+
"execution_start_to_close_timeout": getattr(event, "execution_start_to_close_timeout", None),
451+
"input": getattr(event, "input", None),
452+
"lambda_role": getattr(event, "lambda_role", None),
453+
"parent_initiated_event_id": getattr(event, "parent_initiated_event_id", None),
454+
"parent_workflow_execution": getattr(event, "parent_workflow_execution", None),
455+
"tag_list": getattr(event, "tag_list", None),
456+
"task_priority": getattr(event, "task_priority", None),
457+
"task_start_to_close_timeout": getattr(event, "task_start_to_close_timeout", None),
458+
}
459+
)
460+
elif event.state == "continued_as_new":
461+
self._workflow.update(
462+
{
463+
"state": event.state,
464+
f"{event.state}_id": event.id,
465+
f"{event.state}_timestamp": event.timestamp,
466+
f"{event.state}_decision_task_completed_event_id": event.decision_task_completed_event_id,
467+
"new_execution_run_id": event.new_execution_run_id,
468+
"task_list": event.task_list["name"],
469+
"workflow_type": event.workflow_type,
470+
"execution_start_to_close_timeout": getattr(event, "execution_start_to_close_timeout", None),
471+
"input": getattr(event, "input", None),
472+
"lambda_role": getattr(event, "lambda_role", None),
473+
"tag_list": getattr(event, "tag_list", None),
474+
"task_priority": getattr(event, "task_priority", None),
475+
"task_start_to_close_timeout": getattr(event, "task_start_to_close_timeout", None),
476+
}
477+
)
478+
elif event.state == "completed":
479+
self._workflow.update(
480+
{
481+
"state": event.state,
482+
f"{event.state}_id": event.id,
483+
f"{event.state}_timestamp": event.timestamp,
484+
"initiated_event_id": getattr(event, "initiated_event_id", None),
485+
"result": getattr(event, "result", None),
486+
}
487+
)
488+
elif event.state == "cancelled":
489+
self._workflow.update(
490+
{
491+
"state": event.state,
492+
f"{event.state}_id": event.id,
493+
f"{event.state}_timestamp": event.timestamp,
494+
"initiated_event_id": getattr(event, "initiated_event_id", None),
495+
"decision_task_completed_event_id": event.decision_task_completed_event_id,
496+
"details": getattr(event, "details", None),
497+
}
498+
)
499+
elif event.state == "failed":
500+
self._workflow.update(
501+
{
502+
"state": event.state,
503+
f"{event.state}_id": event.id,
504+
f"{event.state}_timestamp": event.timestamp,
505+
"initiated_event_id": getattr(event, "initiated_event_id", None),
506+
"decision_task_completed_event_id": event.decision_task_completed_event_id,
507+
"reason": getattr(event, "reason", None),
508+
"details": getattr(event, "details", None),
509+
}
510+
)
511+
elif event.state == "terminated":
512+
self._workflow.update(
513+
{
514+
"state": event.state,
515+
f"{event.state}_id": event.id,
516+
f"{event.state}_timestamp": event.timestamp,
517+
"initiated_event_id": getattr(event, "initiated_event_id", None),
518+
"cause": getattr(event, "cause", None),
519+
"details": getattr(event, "details", None),
520+
}
521+
)
522+
elif event.state == "timed_out":
523+
self._workflow.update(
524+
{
525+
"state": event.state,
526+
f"{event.state}_id": event.id,
527+
f"{event.state}_timestamp": event.timestamp,
528+
"initiated_event_id": getattr(event, "initiated_event_id", None),
529+
"timeout_type": event.timeout_type,
530+
}
531+
)
532+
# elif event.state in (
533+
# "cancel_failed",
534+
# "complete_failed",
535+
# "continue_as_new",
536+
# "fail_failed",
537+
# "start_child_failed",
538+
# "start_failed",
539+
# "terminate_failed",
540+
# ):
541+
# self._workflow.update(
542+
# {
543+
# "state": event.state,
544+
# f"{event.state}_id": event.id,
545+
# f"{event.state}_cause": getattr(event, "cause", None),
546+
# f"{event.state}_decision_task_completed_event_id": event.decision_task_completed_event_id,
547+
# }
548+
# )
549+
550+
if event.state == "cancel_requested":
551+
self._workflow.update()
435552
if event.state == "signaled":
436553
signal = {
437554
"type": "signal",

‎simpleflow/swf/helpers.py

+16-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import json
66
import os
77
import socket
8+
import sys
89
from typing import TYPE_CHECKING
910

1011
import psutil
@@ -29,11 +30,12 @@
2930
]
3031

3132

32-
def get_workflow_execution(domain_name: str, workflow_id: str, run_id: str | None = None) -> WorkflowExecution:
33+
def get_workflow_execution(domain_name: str, workflow_id: str, run_id: str | None = None) -> WorkflowExecution | None:
3334
def filter_execution(*args, **kwargs):
3435
if "workflow_status" in kwargs:
3536
kwargs["status"] = kwargs.pop("workflow_status")
36-
return query.filter(*args, **kwargs)[0]
37+
filtered_executions = query.filter(*args, **kwargs)
38+
return filtered_executions[0] if filtered_executions else None
3739

3840
domain = simpleflow.swf.mapper.models.Domain(domain_name)
3941
query = simpleflow.swf.mapper.querysets.WorkflowExecutionQuerySet(domain)
@@ -61,6 +63,9 @@ def show_workflow_info(domain_name, workflow_id, run_id=None):
6163
workflow_id,
6264
run_id,
6365
)
66+
if not workflow_execution:
67+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
68+
sys.exit(1)
6469
return pretty.info(workflow_execution)
6570

6671

@@ -70,6 +75,9 @@ def show_workflow_profile(domain_name, workflow_id, run_id=None, nb_tasks=None):
7075
workflow_id,
7176
run_id,
7277
)
78+
if not workflow_execution:
79+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
80+
sys.exit(1)
7381
return pretty.profile(workflow_execution, nb_tasks)
7482

7583

@@ -79,6 +87,9 @@ def show_workflow_status(domain_name: str, workflow_id: str, run_id: str | None
7987
workflow_id,
8088
run_id,
8189
)
90+
if not workflow_execution:
91+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
92+
sys.exit(1)
8293
return pretty.status(workflow_execution, nb_tasks)
8394

8495

@@ -171,6 +182,9 @@ def get_task(
171182
domain_name,
172183
workflow_id,
173184
)
185+
if not workflow_execution:
186+
print(f"Workflow {workflow_id} not found")
187+
sys.exit(1)
174188
return pretty.get_task(workflow_execution, task_id, details)
175189

176190

‎simpleflow/swf/mapper/models/event/workflow.py

+34
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,44 @@ class WorkflowExecution(TypedDict):
2424
class WorkflowExecutionEvent(Event):
2525
_type = "WorkflowExecution"
2626

27+
# start
2728
initiated_event_id: int
29+
child_policy: str
30+
task_list: TaskList
31+
workflow_type: WorkflowType
32+
continued_execution_run_id: str | None
33+
execution_start_to_close_timeout: str | None
34+
input: str | None
35+
lambda_role: str | None
36+
parent_initiated_event_id: int | None
37+
parent_workflow_execution: WorkflowExecution | None
38+
tag_list: list[str] | None
39+
task_priority: str | None
40+
task_start_to_close_timeout: str | None
41+
42+
# continued_as_new
43+
new_execution_run_id: str
44+
2845
signal_name: str
46+
2947
decision_task_completed_event_id: int
3048

49+
# completed
50+
result: str | None
51+
52+
# terminated
53+
# child_policy:str
54+
cause: str | None
55+
details: str | None
56+
reason: str | None
57+
58+
# timed out
59+
# child_policy:str
60+
timeout_type: str | None
61+
62+
# workflow_execution: WorkflowExecution
63+
# close_status: str
64+
3165

3266
class CompiledWorkflowExecutionEvent(CompiledEvent):
3367
_type = "WorkflowExecution"

0 commit comments

Comments
 (0)