Skip to content

Commit

Permalink
Log for each stack to metadata, update query logic
Browse files Browse the repository at this point in the history
  • Loading branch information
talsperre committed Jan 12, 2025
1 parent 18a293f commit 7644058
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 7 deletions.
8 changes: 6 additions & 2 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import print_function

import time
import json
import os
import tarfile
Expand Down Expand Up @@ -1177,7 +1178,7 @@ def _get_metadata_query_vals(
# the same step.
query_task = self._get_task_for_queried_step(flow_id, run_id, steps[0])
query_foreach_stack_len = len(
query_task.metadata_dict.get("foreach-stack", [])
query_task.metadata_dict.get("foreach-indices", [])
)

if query_foreach_stack_len == cur_foreach_stack_len:
Expand Down Expand Up @@ -1221,6 +1222,7 @@ def _get_metadata_query_vals(
return field_name, field_value

def _get_related_tasks(self, relation_type: str) -> Dict[str, List[str]]:
start_time = time.time()
flow_id, run_id, _, _ = self.path_components
steps = (
self.metadata_dict.get("previous_steps")
Expand All @@ -1234,11 +1236,13 @@ def _get_related_tasks(self, relation_type: str) -> Dict[str, List[str]]:
field_name, field_value = self._get_metadata_query_vals(
flow_id,
run_id,
len(self.metadata_dict.get("foreach-stack", [])),
len(self.metadata_dict.get("foreach-indices", [])),
steps,
relation_type,
)

cur_time = time.time()

return {
step: [
f"{flow_id}/{run_id}/{step}/{task_id}"
Expand Down
3 changes: 1 addition & 2 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@
# Default container registry
DEFAULT_CONTAINER_REGISTRY = from_conf("DEFAULT_CONTAINER_REGISTRY")
# Controls whether to include foreach stack information in metadata.
# TODO(Darin, 05/01/24): Remove this flag once we are confident with this feature.
INCLUDE_FOREACH_STACK = from_conf("INCLUDE_FOREACH_STACK", False)
INCLUDE_FOREACH_STACK = from_conf("INCLUDE_FOREACH_STACK", True)
# Maximum length of the foreach value string to be stored in each ForeachFrame.
MAXIMUM_FOREACH_VALUE_CHARS = from_conf("MAXIMUM_FOREACH_VALUE_CHARS", 30)
# The default runtime limit (In seconds) of jobs launched by any compute provider. Default of 5 days.
Expand Down
1 change: 0 additions & 1 deletion metaflow/plugins/metadata_providers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ def _read_metadata_value(file_path: str) -> dict:
meta_path = LocalMetadataProvider._get_metadir(
flow_id, run_id, query_step, task_id
)

latest_file = _get_latest_metadata_file(meta_path, field_name_prefix)
if not latest_file:
continue
Expand Down
1 change: 1 addition & 0 deletions metaflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ def run_step(
current_foreach_path_length += len(foreach_step)
foreach_stack_formatted.append(foreach_step)


if foreach_stack_formatted:
metadata.append(
MetaDatum(
Expand Down
2 changes: 1 addition & 1 deletion test/core/contexts.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": [ "python3-cli", "python3-metadata"],
"checks": ["python3-cli", "python3-metadata"],
"disabled_tests": [
"LargeArtifactTest",
"S3FailureTest",
Expand Down
1 change: 0 additions & 1 deletion test/core/tests/client_ancestors.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def check_results(self, flow, checker):
# For each task in the step
for task in step:
ancestors = task.immediate_ancestors
print(f"Task is {task.data.task_pathspec} and ancestors are {ancestors}")
ancestor_pathspecs = set(chain.from_iterable(ancestors.values()))

# Compare with stored parent_task_pathspecs
Expand Down

0 comments on commit 7644058

Please sign in to comment.