|
1 | 1 | from __future__ import annotations
|
2 | 2 |
|
3 | 3 | import collections
|
4 |
| -from typing import TYPE_CHECKING, Callable, ClassVar |
| 4 | +from typing import TYPE_CHECKING, Callable, ClassVar, cast |
5 | 5 |
|
6 | 6 | import simpleflow.swf.mapper.models.history
|
7 | 7 | from simpleflow import logger
|
@@ -143,6 +143,48 @@ def tasks(self):
|
143 | 143 | def events(self) -> list[Event]:
|
144 | 144 | return self._history.events
|
145 | 145 |
|
| 146 | + def get_activities_history(self) -> dict[str, dict[str, Any]]: |
| 147 | + activities: dict[str, dict[str, Any]] = {} |
| 148 | + scheduled_to_activity_id: dict[int, str] = {} |
| 149 | + event: ActivityTaskEvent | Any |
| 150 | + for event in self.events: |
| 151 | + if event.type != "ActivityTask": |
| 152 | + continue |
| 153 | + cast(ActivityTaskEvent, event) |
| 154 | + activity_id = getattr(event, "activity_id", None) |
| 155 | + if event.state == "scheduled" and activity_id not in activities: |
| 156 | + activities[activity_id] = { |
| 157 | + "id": activity_id, |
| 158 | + "name": event.activity_type["name"], |
| 159 | + "version": event.activity_type["version"], |
| 160 | + "states": [event.state], |
| 161 | + "scheduled_ids": [event.id], |
| 162 | + "scheduled_timestamps": [event.timestamp], |
| 163 | + "inputs": [event.input], |
| 164 | + "task_lists": [event.task_list["name"]], |
| 165 | + } |
| 166 | + scheduled_to_activity_id[event.id] = activity_id |
| 167 | + else: |
| 168 | + if event.state != "scheduled": |
| 169 | + scheduled_event = self.events[event.scheduled_event_id - 1] |
| 170 | + scheduled_id = scheduled_event.id |
| 171 | + activity = activities[scheduled_to_activity_id[scheduled_id]] |
| 172 | + activity["task_lists"].append(event.task_list["name"]) |
| 173 | + else: |
| 174 | + activity = activities[event.activity_id] |
| 175 | + activity.setdefault("states", []).append(event.state) |
| 176 | + activity.setdefault(f"{event.state}_ids", []).append(event.id) |
| 177 | + activity.setdefault(f"{event.state}_timestamp", []).append(event.timestamp) |
| 178 | + for attr in ("identity", "result", "reason", "details"): |
| 179 | + if hasattr(event, attr): |
| 180 | + activity.setdefault(attr, []).append(getattr(event, attr)) |
| 181 | + if event.state == "timed_out": |
| 182 | + activity.setdefault("timeout_types", []).append(event.timeout_type) |
| 183 | + activity.setdefault(f"{event.timeout_type}_timeouts", []).append( |
| 184 | + getattr(event, f"{event.timeout_type}_timeout") |
| 185 | + ) |
| 186 | + activity.setdefault(f"{event.timeout_values}", []).append() |
| 187 | + |
146 | 188 | def parse_activity_event(self, events: list[Event | ActivityTaskEvent], event: ActivityTaskEvent):
|
147 | 189 | """
|
148 | 190 | Aggregate all the attributes of an activity in a single entry.
|
|
0 commit comments