From 90f4991430e3d08d26452a221bfc89690357043f Mon Sep 17 00:00:00 2001 From: rebrowning Date: Sat, 13 Jan 2024 13:25:10 -0800 Subject: [PATCH 1/6] ton of logging, narrowed hot spots down to a couple of spots --- orquesta/conducting.py | 73 +++++++++++++++++++++++++++++- orquesta/specs/native/v1/models.py | 41 +++++++++++++++-- orquesta/utils/context.py | 9 +++- 3 files changed, 115 insertions(+), 8 deletions(-) diff --git a/orquesta/conducting.py b/orquesta/conducting.py index a0c07d1d..420c4773 100644 --- a/orquesta/conducting.py +++ b/orquesta/conducting.py @@ -15,6 +15,8 @@ import logging import six +import time +import uuid from six.moves import queue @@ -181,10 +183,25 @@ def get_unreachable_barriers(self): return unreachable_barriers def get_staged_tasks(self, filtered=True): + gnt_uuid = uuid.uuid4() + start = time.time() + LOG.info("get_staged_tasks - 1 - %s, %s", gnt_uuid, time.time() - start) if not filtered: return self.staged - - return [x for x in self.staged if x["ready"] and not x.get("completed", False)] + LOG.info("get_staged_tasks - 2 - %s, %s", gnt_uuid, time.time() - start) + + # resp = [x for x in self.staged if x["ready"] and not x.get("completed", False)] + resp = [] + it_cnt = 0 + for x in self.staged: + LOG.info("get_staged_tasks - 3.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) + if x["ready"] and not x.get("completed", False): + LOG.info("get_staged_tasks - 4.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) + resp.append(x) + LOG.info("get_staged_tasks - 5.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) + it_cnt += 1 + LOG.info("get_staged_tasks - 6 - %s, %s", gnt_uuid, time.time() - start) + return resp @property def has_staged_tasks(self): @@ -566,17 +583,29 @@ def get_inbound_criteria_status(self, task_id, route): return constants.INBOUND_CRITERIA_NOT_SATISFIED def get_task(self, task_id, route): + gnt_uuid = uuid.uuid4() + start = time.time() + LOG.info("get_task - 1 - %s, %s", gnt_uuid, time.time() - start) try: task_ctx = self.get_task_initial_context(task_id, route) + LOG.info("get_task - 2 - %s, %s", gnt_uuid, time.time() - start) except ValueError: + LOG.info("get_task - 3 - %s, %s", gnt_uuid, time.time() - start) task_ctx = self.get_workflow_initial_context() + LOG.info("get_task - 4 - %s, %s", gnt_uuid, time.time() - start) + LOG.info("get_task - 5 - %s, %s", gnt_uuid, time.time() - start) state_ctx = {"__state": self.workflow_state.serialize()} + LOG.info("get_task - 6 - %s, %s", gnt_uuid, time.time() - start) current_task = {"id": task_id, "route": route} task_ctx = ctx_util.set_current_task(task_ctx, current_task) + LOG.info("get_task - 7 - %s, %s", gnt_uuid, time.time() - start) task_ctx = dict_util.merge_dicts(task_ctx, state_ctx, True) + LOG.info("get_task - 8 - %s, %s", gnt_uuid, time.time() - start) task_spec = self.spec.tasks.get_task(task_id).copy() + LOG.info("get_task - 9 - %s, %s", gnt_uuid, time.time() - start) task_spec, action_specs = task_spec.render(task_ctx) + LOG.info("get_task - 10 - %s, %s", gnt_uuid, time.time() - start) task = { "id": task_id, @@ -588,23 +617,37 @@ def get_task(self, task_id, route): # If there is a task delay specified, evaluate the delay value. if getattr(task_spec, "delay", None): + LOG.info("get_task - 11 - %s, %s", gnt_uuid, time.time() - start) task_delay = task_spec.delay + LOG.info("get_task - 12 - %s, %s", gnt_uuid, time.time() - start) if isinstance(task_delay, six.string_types): + LOG.info("get_task - 13 - %s, %s", gnt_uuid, time.time() - start) task_delay = expr_base.evaluate(task_delay, task_ctx) + LOG.info("get_task - 14 - %s, %s", gnt_uuid, time.time() - start) + LOG.info("get_task - 15 - %s, %s", gnt_uuid, time.time() - start) if not isinstance(task_delay, int): + LOG.info("get_task - 16 - %s, %s", gnt_uuid, time.time() - start) raise TypeError("The value of task delay is not type of integer.") task["delay"] = task_delay + LOG.info("get_task - 17 - %s, %s", gnt_uuid, time.time() - start) # Add items and related meta data to the task details. + LOG.info("get_task - 18 - %s, %s", gnt_uuid, time.time() - start) if task_spec.has_items(): + LOG.info("get_task - 19 - %s, %s", gnt_uuid, time.time() - start) items_spec = getattr(task_spec, "with") + LOG.info("get_task - 20 - %s, %s", gnt_uuid, time.time() - start) concurrency = getattr(items_spec, "concurrency", None) + LOG.info("get_task - 21 - %s, %s", gnt_uuid, time.time() - start) task["items_count"] = len(action_specs) + LOG.info("get_task - 22 - %s, %s", gnt_uuid, time.time() - start) task["concurrency"] = expr_base.evaluate(concurrency, task_ctx) + LOG.info("get_task - 23 - %s, %s", gnt_uuid, time.time() - start) + LOG.info("get_task - 24 - %s, %s", gnt_uuid, time.time() - start) return task def _evaluate_task_actions(self, task): @@ -689,47 +732,73 @@ def has_next_tasks(self, task_id=None, route=None): return self._has_next(task_id, route=route) def get_next_tasks(self): + gnt_uuid = uuid.uuid4() + start = time.time() + LOG.info("get_next_tasks - 1 - %s, %s", gnt_uuid, time.time() - start) fail_on_task_rendering = False staged_tasks = self.workflow_state.get_staged_tasks() + LOG.info("get_next_tasks - 2 - %s, %s", gnt_uuid, time.time() - start) remediation_tasks = [] next_tasks = [] # Identify remediation tasks if workflow failed. if self.get_workflow_status() == statuses.FAILED: + LOG.info("get_next_tasks - 3 - %s, %s", gnt_uuid, time.time() - start) remediation_tasks = [s for s in staged_tasks if s.get("run_on_fail", False) is True] + LOG.info("get_next_tasks - 4 - %s, %s", gnt_uuid, time.time() - start) # Return an empty list if the workflow is not running and there is no remediation tasks. + LOG.info("get_next_tasks - 5 - %s, %s", gnt_uuid, time.time() - start) if self.get_workflow_status() not in statuses.RUNNING_STATUSES and not remediation_tasks: + LOG.info("get_next_tasks - 6 - %s, %s", gnt_uuid, time.time() - start) return next_tasks + LOG.info("get_next_tasks - 7 - %s, %s", gnt_uuid, time.time() - start) # Return the list of tasks that are staged and readied. If there is exception on # task rendering, then log the error and continue. This allows user to know about # all task rendering errors for this task transition instead of getting rendering # error one at a time during runtime. + it_cnt = 0 for staged_task in remediation_tasks or staged_tasks: + LOG.info("get_next_tasks - 8.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) try: next_task = self.get_task(staged_task["id"], staged_task["route"]) + LOG.info("get_next_tasks - 9.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) next_task = self._evaluate_task_actions(next_task) + LOG.info("get_next_tasks - 10.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) # Assign the task retry delay which will overwrite any task delay # specified in the task definition. if "retry" in staged_task: + LOG.info("get_next_tasks - 11.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) next_task["delay"] = staged_task["retry"].get("delay") or 0 + LOG.info("get_next_tasks - 12.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) if "actions" in next_task and len(next_task["actions"]) > 0: + LOG.info("get_next_tasks - 13.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) next_tasks.append(next_task) + LOG.info("get_next_tasks - 14.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) + elif "items_count" in next_task and next_task["items_count"] == 0: + LOG.info("get_next_tasks - 15.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) next_tasks.append(next_task) + LOG.info("get_next_tasks - 16.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) except Exception as e: + LOG.info("get_next_tasks - 17.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) fail_on_task_rendering = True self.log_error(e, task_id=staged_task["id"], route=staged_task["route"]) continue + it_cnt += 1 # Return nothing if there is error(s) on determining next tasks. + LOG.info("get_next_tasks - 18 - %s, %s", gnt_uuid, time.time() - start) if fail_on_task_rendering: + LOG.info("get_next_tasks - 19 - %s, %s", gnt_uuid, time.time() - start) self.request_workflow_status(statuses.FAILED) + LOG.info("get_next_tasks - 20 - %s, %s", gnt_uuid, time.time() - start) return [] + LOG.info("get_next_tasks - 21 - %s, %s", gnt_uuid, time.time() - start) return sorted(next_tasks, key=lambda x: (x["id"], x["route"])) def _get_task_state_idx(self, task_id, route): diff --git a/orquesta/specs/native/v1/models.py b/orquesta/specs/native/v1/models.py index 1712dfac..ece1a08a 100644 --- a/orquesta/specs/native/v1/models.py +++ b/orquesta/specs/native/v1/models.py @@ -15,6 +15,8 @@ import logging import six +import time +import uuid from six.moves import queue from orquesta import events @@ -154,17 +156,27 @@ def has_retry(self): return hasattr(self, "retry") and self.retry def render(self, in_ctx): + gnt_uuid = uuid.uuid4() + start = time.time() + LOG.info("render - 1 - %s, %s", gnt_uuid, time.time() - start) action_specs = [] + item_ctx_value = ctx_util.copy_context(in_ctx) + if not self.has_items(): + # LOG.info("render - 2 - %s, %s", gnt_uuid, time.time() - start) action_spec = { "action": expr_base.evaluate(self.action, in_ctx), "input": expr_base.evaluate(getattr(self, "input", {}), in_ctx), } + # LOG.info("render - 3 - %s, %s", gnt_uuid, time.time() - start) action_specs.append(action_spec) + # LOG.info("render - 4 - %s, %s", gnt_uuid, time.time() - start) else: + # LOG.info("render - 5 - %s, %s", gnt_uuid, time.time() - start) items_spec = self.get_items_spec() + # LOG.info("render - 6 - %s, %s", gnt_uuid, time.time() - start) if " in " not in items_spec.items: items_expr = items_spec.items.strip() @@ -172,7 +184,10 @@ def render(self, in_ctx): start_idx = items_spec.items.index(" in ") + 4 items_expr = items_spec.items[start_idx:].strip() + # LOG.info("render - 7 - %s, %s", gnt_uuid, time.time() - start) + items = expr_base.evaluate(items_expr, in_ctx) + # LOG.info("render - 8 - %s, %s", gnt_uuid, time.time() - start) if not isinstance(items, list): raise TypeError('The value of "%s" is not type of list.' % items_expr) @@ -182,23 +197,39 @@ def render(self, in_ctx): if " in " not in items_spec.items else items_spec.items[: items_spec.items.index(" in ")].replace(" ", "").split(",") ) + LOG.info("render - 9 - %s, %s", gnt_uuid, time.time() - start) for idx, item in enumerate(items): + LOG.info("render - 10.%s - %s, %s", idx, gnt_uuid, time.time() - start) if item_keys and (isinstance(item, tuple) or isinstance(item, list)): + LOG.info("render - 11.%s - %s, %s", idx, gnt_uuid, time.time() - start) item = dict(zip(item_keys, list(item))) + LOG.info("render - 12.%s - %s, %s", idx, gnt_uuid, time.time() - start) elif item_keys and len(item_keys) == 1: + LOG.info("render - 13.%s - %s, %s", idx, gnt_uuid, time.time() - start) item = {item_keys[0]: item} - - item_ctx_value = ctx_util.set_current_item(in_ctx, item) - + LOG.info("render - 14.%s - %s, %s", idx, gnt_uuid, time.time() - start) + + LOG.info("render - 15.%s - %s, %s", idx, gnt_uuid, time.time() - start) + item_ctx_value = ctx_util.set_current_item(item_ctx_value, item) + LOG.info("render - 16.%s - %s, %s", idx, gnt_uuid, time.time() - start) + LOG.info("render - 16.0.%s - %s, %s - self.action - %s", idx, gnt_uuid, time.time() - start, self.action) + LOG.info("render - 16.1.%s - %s, %s - getattr(self, 'input', {}) - %s", idx, gnt_uuid, time.time() - start, getattr(self, "input", {})) + action = expr_base.evaluate(self.action, item_ctx_value) + LOG.info("render - 16.2.%s - %s, %s - action - %s", idx, gnt_uuid, time.time() - start, action) + gen_input = expr_base.evaluate(getattr(self, "input", {}), item_ctx_value) + LOG.info("render - 16.3.%s - %s, %s - gen_input - %s", idx, gnt_uuid, time.time() - start, gen_input) action_spec = { - "action": expr_base.evaluate(self.action, item_ctx_value), - "input": expr_base.evaluate(getattr(self, "input", {}), item_ctx_value), + "action": action, + "input": gen_input, "item_id": idx, } + LOG.info("render - 17.%s - %s, %s, action_spec - %s", idx, gnt_uuid, time.time() - start, action_spec) action_specs.append(action_spec) + # LOG.info("render - 18.%s - %s, %s", idx, gnt_uuid, time.time() - start) + LOG.info("render - 19 - %s, %s, action_specs - %s", gnt_uuid, time.time() - start, action_specs) return self, action_specs def finalize_context(self, next_task_name, task_transition_meta, in_ctx): diff --git a/orquesta/utils/context.py b/orquesta/utils/context.py index b720e0ae..e3adbb00 100644 --- a/orquesta/utils/context.py +++ b/orquesta/utils/context.py @@ -40,11 +40,18 @@ def set_current_task(context, task): return ctx -def set_current_item(context, item): +def copy_context(context): if context and not isinstance(context, dict): raise TypeError("The context is not type of dict.") ctx = json_util.deepcopy(context) if context else dict() + return ctx + +def set_current_item(context, item): + if context and not isinstance(context, dict): + raise TypeError("The context is not type of dict.") + + ctx = {**context} ctx["__current_item"] = item return ctx From 4c0f102c11a1644240f76152a363f19ef330919d Mon Sep 17 00:00:00 2001 From: rebrowning Date: Mon, 15 Jan 2024 17:54:51 -0800 Subject: [PATCH 2/6] clean up current log statements --- orquesta/conducting.py | 59 ------------------------------ orquesta/specs/native/v1/models.py | 29 --------------- 2 files changed, 88 deletions(-) diff --git a/orquesta/conducting.py b/orquesta/conducting.py index 420c4773..c0a937b6 100644 --- a/orquesta/conducting.py +++ b/orquesta/conducting.py @@ -15,8 +15,6 @@ import logging import six -import time -import uuid from six.moves import queue @@ -183,24 +181,16 @@ def get_unreachable_barriers(self): return unreachable_barriers def get_staged_tasks(self, filtered=True): - gnt_uuid = uuid.uuid4() - start = time.time() - LOG.info("get_staged_tasks - 1 - %s, %s", gnt_uuid, time.time() - start) if not filtered: return self.staged - LOG.info("get_staged_tasks - 2 - %s, %s", gnt_uuid, time.time() - start) # resp = [x for x in self.staged if x["ready"] and not x.get("completed", False)] resp = [] it_cnt = 0 for x in self.staged: - LOG.info("get_staged_tasks - 3.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) if x["ready"] and not x.get("completed", False): - LOG.info("get_staged_tasks - 4.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) resp.append(x) - LOG.info("get_staged_tasks - 5.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) it_cnt += 1 - LOG.info("get_staged_tasks - 6 - %s, %s", gnt_uuid, time.time() - start) return resp @property @@ -583,29 +573,17 @@ def get_inbound_criteria_status(self, task_id, route): return constants.INBOUND_CRITERIA_NOT_SATISFIED def get_task(self, task_id, route): - gnt_uuid = uuid.uuid4() - start = time.time() - LOG.info("get_task - 1 - %s, %s", gnt_uuid, time.time() - start) try: task_ctx = self.get_task_initial_context(task_id, route) - LOG.info("get_task - 2 - %s, %s", gnt_uuid, time.time() - start) except ValueError: - LOG.info("get_task - 3 - %s, %s", gnt_uuid, time.time() - start) task_ctx = self.get_workflow_initial_context() - LOG.info("get_task - 4 - %s, %s", gnt_uuid, time.time() - start) - LOG.info("get_task - 5 - %s, %s", gnt_uuid, time.time() - start) state_ctx = {"__state": self.workflow_state.serialize()} - LOG.info("get_task - 6 - %s, %s", gnt_uuid, time.time() - start) current_task = {"id": task_id, "route": route} task_ctx = ctx_util.set_current_task(task_ctx, current_task) - LOG.info("get_task - 7 - %s, %s", gnt_uuid, time.time() - start) task_ctx = dict_util.merge_dicts(task_ctx, state_ctx, True) - LOG.info("get_task - 8 - %s, %s", gnt_uuid, time.time() - start) task_spec = self.spec.tasks.get_task(task_id).copy() - LOG.info("get_task - 9 - %s, %s", gnt_uuid, time.time() - start) task_spec, action_specs = task_spec.render(task_ctx) - LOG.info("get_task - 10 - %s, %s", gnt_uuid, time.time() - start) task = { "id": task_id, @@ -617,37 +595,23 @@ def get_task(self, task_id, route): # If there is a task delay specified, evaluate the delay value. if getattr(task_spec, "delay", None): - LOG.info("get_task - 11 - %s, %s", gnt_uuid, time.time() - start) task_delay = task_spec.delay - LOG.info("get_task - 12 - %s, %s", gnt_uuid, time.time() - start) if isinstance(task_delay, six.string_types): - LOG.info("get_task - 13 - %s, %s", gnt_uuid, time.time() - start) task_delay = expr_base.evaluate(task_delay, task_ctx) - LOG.info("get_task - 14 - %s, %s", gnt_uuid, time.time() - start) - LOG.info("get_task - 15 - %s, %s", gnt_uuid, time.time() - start) if not isinstance(task_delay, int): - LOG.info("get_task - 16 - %s, %s", gnt_uuid, time.time() - start) raise TypeError("The value of task delay is not type of integer.") task["delay"] = task_delay - LOG.info("get_task - 17 - %s, %s", gnt_uuid, time.time() - start) # Add items and related meta data to the task details. - LOG.info("get_task - 18 - %s, %s", gnt_uuid, time.time() - start) if task_spec.has_items(): - LOG.info("get_task - 19 - %s, %s", gnt_uuid, time.time() - start) items_spec = getattr(task_spec, "with") - LOG.info("get_task - 20 - %s, %s", gnt_uuid, time.time() - start) concurrency = getattr(items_spec, "concurrency", None) - LOG.info("get_task - 21 - %s, %s", gnt_uuid, time.time() - start) task["items_count"] = len(action_specs) - LOG.info("get_task - 22 - %s, %s", gnt_uuid, time.time() - start) task["concurrency"] = expr_base.evaluate(concurrency, task_ctx) - LOG.info("get_task - 23 - %s, %s", gnt_uuid, time.time() - start) - LOG.info("get_task - 24 - %s, %s", gnt_uuid, time.time() - start) return task def _evaluate_task_actions(self, task): @@ -732,27 +696,18 @@ def has_next_tasks(self, task_id=None, route=None): return self._has_next(task_id, route=route) def get_next_tasks(self): - gnt_uuid = uuid.uuid4() - start = time.time() - LOG.info("get_next_tasks - 1 - %s, %s", gnt_uuid, time.time() - start) fail_on_task_rendering = False staged_tasks = self.workflow_state.get_staged_tasks() - LOG.info("get_next_tasks - 2 - %s, %s", gnt_uuid, time.time() - start) remediation_tasks = [] next_tasks = [] # Identify remediation tasks if workflow failed. if self.get_workflow_status() == statuses.FAILED: - LOG.info("get_next_tasks - 3 - %s, %s", gnt_uuid, time.time() - start) remediation_tasks = [s for s in staged_tasks if s.get("run_on_fail", False) is True] - LOG.info("get_next_tasks - 4 - %s, %s", gnt_uuid, time.time() - start) # Return an empty list if the workflow is not running and there is no remediation tasks. - LOG.info("get_next_tasks - 5 - %s, %s", gnt_uuid, time.time() - start) if self.get_workflow_status() not in statuses.RUNNING_STATUSES and not remediation_tasks: - LOG.info("get_next_tasks - 6 - %s, %s", gnt_uuid, time.time() - start) return next_tasks - LOG.info("get_next_tasks - 7 - %s, %s", gnt_uuid, time.time() - start) # Return the list of tasks that are staged and readied. If there is exception on # task rendering, then log the error and continue. This allows user to know about @@ -760,45 +715,31 @@ def get_next_tasks(self): # error one at a time during runtime. it_cnt = 0 for staged_task in remediation_tasks or staged_tasks: - LOG.info("get_next_tasks - 8.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) try: next_task = self.get_task(staged_task["id"], staged_task["route"]) - LOG.info("get_next_tasks - 9.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) next_task = self._evaluate_task_actions(next_task) - LOG.info("get_next_tasks - 10.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) # Assign the task retry delay which will overwrite any task delay # specified in the task definition. if "retry" in staged_task: - LOG.info("get_next_tasks - 11.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) next_task["delay"] = staged_task["retry"].get("delay") or 0 - LOG.info("get_next_tasks - 12.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) if "actions" in next_task and len(next_task["actions"]) > 0: - LOG.info("get_next_tasks - 13.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) next_tasks.append(next_task) - LOG.info("get_next_tasks - 14.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) elif "items_count" in next_task and next_task["items_count"] == 0: - LOG.info("get_next_tasks - 15.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) next_tasks.append(next_task) - LOG.info("get_next_tasks - 16.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) except Exception as e: - LOG.info("get_next_tasks - 17.%s - %s, %s", it_cnt, gnt_uuid, time.time() - start) fail_on_task_rendering = True self.log_error(e, task_id=staged_task["id"], route=staged_task["route"]) continue it_cnt += 1 # Return nothing if there is error(s) on determining next tasks. - LOG.info("get_next_tasks - 18 - %s, %s", gnt_uuid, time.time() - start) if fail_on_task_rendering: - LOG.info("get_next_tasks - 19 - %s, %s", gnt_uuid, time.time() - start) self.request_workflow_status(statuses.FAILED) - LOG.info("get_next_tasks - 20 - %s, %s", gnt_uuid, time.time() - start) return [] - LOG.info("get_next_tasks - 21 - %s, %s", gnt_uuid, time.time() - start) return sorted(next_tasks, key=lambda x: (x["id"], x["route"])) def _get_task_state_idx(self, task_id, route): diff --git a/orquesta/specs/native/v1/models.py b/orquesta/specs/native/v1/models.py index ece1a08a..4a0599c4 100644 --- a/orquesta/specs/native/v1/models.py +++ b/orquesta/specs/native/v1/models.py @@ -15,8 +15,6 @@ import logging import six -import time -import uuid from six.moves import queue from orquesta import events @@ -156,27 +154,19 @@ def has_retry(self): return hasattr(self, "retry") and self.retry def render(self, in_ctx): - gnt_uuid = uuid.uuid4() - start = time.time() - LOG.info("render - 1 - %s, %s", gnt_uuid, time.time() - start) action_specs = [] item_ctx_value = ctx_util.copy_context(in_ctx) if not self.has_items(): - # LOG.info("render - 2 - %s, %s", gnt_uuid, time.time() - start) action_spec = { "action": expr_base.evaluate(self.action, in_ctx), "input": expr_base.evaluate(getattr(self, "input", {}), in_ctx), } - # LOG.info("render - 3 - %s, %s", gnt_uuid, time.time() - start) action_specs.append(action_spec) - # LOG.info("render - 4 - %s, %s", gnt_uuid, time.time() - start) else: - # LOG.info("render - 5 - %s, %s", gnt_uuid, time.time() - start) items_spec = self.get_items_spec() - # LOG.info("render - 6 - %s, %s", gnt_uuid, time.time() - start) if " in " not in items_spec.items: items_expr = items_spec.items.strip() @@ -184,10 +174,7 @@ def render(self, in_ctx): start_idx = items_spec.items.index(" in ") + 4 items_expr = items_spec.items[start_idx:].strip() - # LOG.info("render - 7 - %s, %s", gnt_uuid, time.time() - start) - items = expr_base.evaluate(items_expr, in_ctx) - # LOG.info("render - 8 - %s, %s", gnt_uuid, time.time() - start) if not isinstance(items, list): raise TypeError('The value of "%s" is not type of list.' % items_expr) @@ -197,39 +184,23 @@ def render(self, in_ctx): if " in " not in items_spec.items else items_spec.items[: items_spec.items.index(" in ")].replace(" ", "").split(",") ) - LOG.info("render - 9 - %s, %s", gnt_uuid, time.time() - start) for idx, item in enumerate(items): - LOG.info("render - 10.%s - %s, %s", idx, gnt_uuid, time.time() - start) if item_keys and (isinstance(item, tuple) or isinstance(item, list)): - LOG.info("render - 11.%s - %s, %s", idx, gnt_uuid, time.time() - start) item = dict(zip(item_keys, list(item))) - LOG.info("render - 12.%s - %s, %s", idx, gnt_uuid, time.time() - start) elif item_keys and len(item_keys) == 1: - LOG.info("render - 13.%s - %s, %s", idx, gnt_uuid, time.time() - start) item = {item_keys[0]: item} - LOG.info("render - 14.%s - %s, %s", idx, gnt_uuid, time.time() - start) - LOG.info("render - 15.%s - %s, %s", idx, gnt_uuid, time.time() - start) item_ctx_value = ctx_util.set_current_item(item_ctx_value, item) - LOG.info("render - 16.%s - %s, %s", idx, gnt_uuid, time.time() - start) - LOG.info("render - 16.0.%s - %s, %s - self.action - %s", idx, gnt_uuid, time.time() - start, self.action) - LOG.info("render - 16.1.%s - %s, %s - getattr(self, 'input', {}) - %s", idx, gnt_uuid, time.time() - start, getattr(self, "input", {})) action = expr_base.evaluate(self.action, item_ctx_value) - LOG.info("render - 16.2.%s - %s, %s - action - %s", idx, gnt_uuid, time.time() - start, action) gen_input = expr_base.evaluate(getattr(self, "input", {}), item_ctx_value) - LOG.info("render - 16.3.%s - %s, %s - gen_input - %s", idx, gnt_uuid, time.time() - start, gen_input) action_spec = { "action": action, "input": gen_input, "item_id": idx, } - LOG.info("render - 17.%s - %s, %s, action_spec - %s", idx, gnt_uuid, time.time() - start, action_spec) - action_specs.append(action_spec) - # LOG.info("render - 18.%s - %s, %s", idx, gnt_uuid, time.time() - start) - LOG.info("render - 19 - %s, %s, action_specs - %s", gnt_uuid, time.time() - start, action_specs) return self, action_specs def finalize_context(self, next_task_name, task_transition_meta, in_ctx): From 2b2d48c1b0d6aaa287f4bb5ddd3d01270e03cf68 Mon Sep 17 00:00:00 2001 From: rebrowning Date: Mon, 15 Jan 2024 17:58:47 -0800 Subject: [PATCH 3/6] a little bit more cleanup --- orquesta/conducting.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/orquesta/conducting.py b/orquesta/conducting.py index c0a937b6..beaaf3fa 100644 --- a/orquesta/conducting.py +++ b/orquesta/conducting.py @@ -184,13 +184,7 @@ def get_staged_tasks(self, filtered=True): if not filtered: return self.staged - # resp = [x for x in self.staged if x["ready"] and not x.get("completed", False)] - resp = [] - it_cnt = 0 - for x in self.staged: - if x["ready"] and not x.get("completed", False): - resp.append(x) - it_cnt += 1 + resp = [x for x in self.staged if x["ready"] and not x.get("completed", False)] return resp @property @@ -713,7 +707,6 @@ def get_next_tasks(self): # task rendering, then log the error and continue. This allows user to know about # all task rendering errors for this task transition instead of getting rendering # error one at a time during runtime. - it_cnt = 0 for staged_task in remediation_tasks or staged_tasks: try: next_task = self.get_task(staged_task["id"], staged_task["route"]) @@ -726,14 +719,12 @@ def get_next_tasks(self): if "actions" in next_task and len(next_task["actions"]) > 0: next_tasks.append(next_task) - elif "items_count" in next_task and next_task["items_count"] == 0: next_tasks.append(next_task) except Exception as e: fail_on_task_rendering = True self.log_error(e, task_id=staged_task["id"], route=staged_task["route"]) continue - it_cnt += 1 # Return nothing if there is error(s) on determining next tasks. if fail_on_task_rendering: From d7db46caeaf4142f6f16d3ffa162f056f5b296f3 Mon Sep 17 00:00:00 2001 From: rebrowning Date: Wed, 17 Jan 2024 20:15:26 -0800 Subject: [PATCH 4/6] fix linting error --- orquesta/utils/context.py | 1 + 1 file changed, 1 insertion(+) diff --git a/orquesta/utils/context.py b/orquesta/utils/context.py index e3adbb00..fc0a075f 100644 --- a/orquesta/utils/context.py +++ b/orquesta/utils/context.py @@ -47,6 +47,7 @@ def copy_context(context): ctx = json_util.deepcopy(context) if context else dict() return ctx + def set_current_item(context, item): if context and not isinstance(context, dict): raise TypeError("The context is not type of dict.") From 52b824559df51187c7aae1cf1a35642be68d345e Mon Sep 17 00:00:00 2001 From: rebrowning Date: Mon, 22 Jan 2024 17:31:33 -0800 Subject: [PATCH 5/6] a bit of cleanup based on feedback --- orquesta/conducting.py | 3 +-- orquesta/specs/native/v1/models.py | 10 +++------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/orquesta/conducting.py b/orquesta/conducting.py index beaaf3fa..a0c07d1d 100644 --- a/orquesta/conducting.py +++ b/orquesta/conducting.py @@ -184,8 +184,7 @@ def get_staged_tasks(self, filtered=True): if not filtered: return self.staged - resp = [x for x in self.staged if x["ready"] and not x.get("completed", False)] - return resp + return [x for x in self.staged if x["ready"] and not x.get("completed", False)] @property def has_staged_tasks(self): diff --git a/orquesta/specs/native/v1/models.py b/orquesta/specs/native/v1/models.py index 4a0599c4..b8284b11 100644 --- a/orquesta/specs/native/v1/models.py +++ b/orquesta/specs/native/v1/models.py @@ -156,8 +156,6 @@ def has_retry(self): def render(self, in_ctx): action_specs = [] - item_ctx_value = ctx_util.copy_context(in_ctx) - if not self.has_items(): action_spec = { "action": expr_base.evaluate(self.action, in_ctx), @@ -166,6 +164,7 @@ def render(self, in_ctx): action_specs.append(action_spec) else: + item_ctx_value = ctx_util.copy_context(in_ctx) items_spec = self.get_items_spec() if " in " not in items_spec.items: @@ -184,7 +183,6 @@ def render(self, in_ctx): if " in " not in items_spec.items else items_spec.items[: items_spec.items.index(" in ")].replace(" ", "").split(",") ) - for idx, item in enumerate(items): if item_keys and (isinstance(item, tuple) or isinstance(item, list)): item = dict(zip(item_keys, list(item))) @@ -192,11 +190,9 @@ def render(self, in_ctx): item = {item_keys[0]: item} item_ctx_value = ctx_util.set_current_item(item_ctx_value, item) - action = expr_base.evaluate(self.action, item_ctx_value) - gen_input = expr_base.evaluate(getattr(self, "input", {}), item_ctx_value) action_spec = { - "action": action, - "input": gen_input, + "action": expr_base.evaluate(self.action, item_ctx_value), + "input": expr_base.evaluate(getattr(self, "input", {}), item_ctx_value), "item_id": idx, } action_specs.append(action_spec) From f8d20219285b108fa297cf2e6ebad5f5d1e4c013 Mon Sep 17 00:00:00 2001 From: rebrowning Date: Mon, 22 Jan 2024 17:35:49 -0800 Subject: [PATCH 6/6] cleanup --- orquesta/specs/native/v1/models.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/orquesta/specs/native/v1/models.py b/orquesta/specs/native/v1/models.py index b8284b11..534d167b 100644 --- a/orquesta/specs/native/v1/models.py +++ b/orquesta/specs/native/v1/models.py @@ -183,6 +183,7 @@ def render(self, in_ctx): if " in " not in items_spec.items else items_spec.items[: items_spec.items.index(" in ")].replace(" ", "").split(",") ) + for idx, item in enumerate(items): if item_keys and (isinstance(item, tuple) or isinstance(item, list)): item = dict(zip(item_keys, list(item))) @@ -190,11 +191,13 @@ def render(self, in_ctx): item = {item_keys[0]: item} item_ctx_value = ctx_util.set_current_item(item_ctx_value, item) + action_spec = { "action": expr_base.evaluate(self.action, item_ctx_value), "input": expr_base.evaluate(getattr(self, "input", {}), item_ctx_value), "item_id": idx, } + action_specs.append(action_spec) return self, action_specs