Skip to content

Commit

Permalink
Remove debug statements
Browse files Browse the repository at this point in the history
  • Loading branch information
talsperre committed Jan 11, 2025
1 parent 2bf7c3e commit b692130
Show file tree
Hide file tree
Showing 8 changed files with 8 additions and 44 deletions.
26 changes: 4 additions & 22 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1177,14 +1177,10 @@ def _get_metadata_query_vals(
# For linear steps, or foreach splits and joins, ancestor and successor tasks will all belong to
# the same step.
query_task = self._get_task_for_queried_step(flow_id, run_id, steps[0])
print(f"flow_id: {flow_id}, run_id: {run_id}, query_step: {steps[0]}, query_task: {query_task}")
print(f"query_task.metadata_dict: {query_task.metadata_dict}")
query_foreach_stack_len = len(
query_task.metadata_dict.get("foreach-stack", [])
)

print(f"cur_foreach_stack_len: {cur_foreach_stack_len}, query_foreach_stack_len: {query_foreach_stack_len}")

if query_foreach_stack_len == cur_foreach_stack_len:
# The successor or ancestor tasks belong to the same foreach stack level
field_name = "foreach-indices"
Expand Down Expand Up @@ -1237,39 +1233,25 @@ def _get_related_tasks(self, relation_type: str) -> Dict[str, List[str]]:
if not steps:
return {}

print("-" * 50)
print(f"Getting related tasks for {relation_type} and path {self.path_components} and query steps {steps}")
field_name, field_value = self._get_metadata_query_vals(
flow_id,
run_id,
len(self.metadata_dict.get("foreach-stack", [])),
steps,
relation_type,
)
print(f"fetched field_name: {field_name} and field_value: {field_value}")

cur_time = time.time()

tp = {}
for step in steps:
tp[step] = [
return {
step: [
f"{flow_id}/{run_id}/{step}/{task_id}"
for task_id in self._metaflow.metadata.filter_tasks_by_metadata(
flow_id, run_id, step, field_name, field_value
)
]
cur_time = time.time()

return tp
# return {
# step: [
# f"{flow_id}/{run_id}/{step}/{task_id}"
# for task_id in self._metaflow.metadata.filter_tasks_by_metadata(
# flow_id, run_id, step, field_name, field_value
# )
# ]
# for step in steps
# }
for step in steps
}

@property
def immediate_ancestors(self) -> Dict[str, List[str]]:
Expand Down
1 change: 0 additions & 1 deletion metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@
# Default container registry
DEFAULT_CONTAINER_REGISTRY = from_conf("DEFAULT_CONTAINER_REGISTRY")
# Controls whether to include foreach stack information in metadata.
# TODO(Darin, 05/01/24): Remove this flag once we are confident with this feature.
INCLUDE_FOREACH_STACK = from_conf("INCLUDE_FOREACH_STACK", True)
# Maximum length of the foreach value string to be stored in each ForeachFrame.
MAXIMUM_FOREACH_VALUE_CHARS = from_conf("MAXIMUM_FOREACH_VALUE_CHARS", 30)
Expand Down
6 changes: 0 additions & 6 deletions metaflow/plugins/metadata_providers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,18 +291,12 @@ def _read_metadata_value(file_path: str) -> dict:
meta_path = LocalMetadataProvider._get_metadir(
flow_id, run_id, query_step, task_id
)
print(f"Meta path: {meta_path}")
print(f"Field name prefix: {field_name_prefix}, field value: {field_value}")
print(f"Files in meta path: {os.listdir(meta_path)}")

latest_file = _get_latest_metadata_file(meta_path, field_name_prefix)
print(f"Latest file: {latest_file}")
if not latest_file:
continue

# Read metadata and check value
metadata = _read_metadata_value(latest_file[0])
print(f"Metadata: {metadata}")
if metadata.get("value") == field_value:
filtered_task_ids.append(task_id)

Expand Down
3 changes: 1 addition & 2 deletions metaflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ def run_step(
current_foreach_path_length += len(foreach_step)
foreach_stack_formatted.append(foreach_step)

print(f"Foreach stack formatted: {foreach_stack_formatted}")

if foreach_stack_formatted:
metadata.append(
MetaDatum(
Expand All @@ -516,7 +516,6 @@ def run_step(
foreach_indices, foreach_indices_truncated, foreach_step_names = (
self._dynamic_runtime_metadata(foreach_stack)
)
print(f"Task id: {task_id}, foreach_stack: {foreach_stack}, foreach_indices: {foreach_indices}")
metadata.extend(
[
MetaDatum(
Expand Down
4 changes: 2 additions & 2 deletions test/core/contexts.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": [ "python3-metadata"],
"checks": ["python3-cli", "python3-metadata"],
"disabled_tests": [
"LargeArtifactTest",
"S3FailureTest",
"CardComponentRefreshTest",
"CardWithRefreshTest"
],
"executors": ["api"]
"executors": ["cli", "api"]
},
{
"name": "python3-all-local-cards-realtime",
Expand Down
9 changes: 0 additions & 9 deletions test/core/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,6 @@ def construct_arg_dicts_from_click_api():
)
return called_processes[-1].returncode, path

# Print the test_flow.py file
subprocess.run(
["cat", "test_flow.py"],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)

# run flow
if executor == "cli":
called_processes.append(
Expand Down
1 change: 0 additions & 1 deletion test/core/tests/client_ancestors.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def check_results(self, flow, checker):
# For each task in the step
for task in step:
ancestors = task.immediate_ancestors
print(f"Task is {task.data.task_pathspec} and ancestors are {ancestors}")
ancestor_pathspecs = set(chain.from_iterable(ancestors.values()))

# Compare with stored parent_task_pathspecs
Expand Down
2 changes: 1 addition & 1 deletion test_runner
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ install_extensions() {
}

run_tests() {
cd test/core && PYTHONPATH=`pwd`/../../ python3 run_tests.py --num-parallel 8 --tests ImmediateAncestorTest && cd ../../
cd test/core && PYTHONPATH=`pwd`/../../ python3 run_tests.py --num-parallel 8 && cd ../../
}

# We run realtime cards tests separately because there these tests validate the asynchronous updates to the
Expand Down

0 comments on commit b692130

Please sign in to comment.