From 1d921d919316f3d228abc510d21b8d0507764657 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Tue, 24 Oct 2023 15:36:33 +0300 Subject: [PATCH 1/3] remove batch attribute computation as unnecessary for step-functions --- metaflow/plugins/aws/step_functions/step_functions.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/metaflow/plugins/aws/step_functions/step_functions.py b/metaflow/plugins/aws/step_functions/step_functions.py index bc40083fd5..9da3e5af34 100644 --- a/metaflow/plugins/aws/step_functions/step_functions.py +++ b/metaflow/plugins/aws/step_functions/step_functions.py @@ -24,7 +24,6 @@ from metaflow.plugins.retry_decorator import RetryDecorator from metaflow.util import compress_list, dict_to_cli_options, to_pascalcase -from ..aws_utils import compute_resource_attributes from ..batch.batch import Batch from .event_bridge_client import EventBridgeClient from .step_functions_client import StepFunctionsClient @@ -664,11 +663,6 @@ def _batch(self, node): batch_deco = [deco for deco in node.decorators if deco.name == "batch"][0] resources = {} resources.update(batch_deco.attributes) - resources.update( - compute_resource_attributes( - node.decorators, batch_deco, batch_deco.resource_defaults - ) - ) # Resolve retry strategy. user_code_retries, total_retries = self._get_retries(node) From c7f670293b646e219f641b1253874122ae50e707 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Tue, 24 Oct 2023 15:37:25 +0300 Subject: [PATCH 2/3] remove unused imports --- metaflow/plugins/aws/step_functions/step_functions.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/metaflow/plugins/aws/step_functions/step_functions.py b/metaflow/plugins/aws/step_functions/step_functions.py index 9da3e5af34..fc0aaae8e2 100644 --- a/metaflow/plugins/aws/step_functions/step_functions.py +++ b/metaflow/plugins/aws/step_functions/step_functions.py @@ -4,13 +4,11 @@ import random import string import sys -import time -import uuid from collections import defaultdict from metaflow import R from metaflow.decorators import flow_decorators -from metaflow.exception import MetaflowException, MetaflowInternalError +from metaflow.exception import MetaflowException from metaflow.metaflow_config import ( EVENTS_SFN_ACCESS_IAM_ROLE, S3_ENDPOINT_URL, @@ -19,10 +17,7 @@ SFN_IAM_ROLE, ) from metaflow.parameters import deploy_time_eval -from metaflow.plugins.aws.batch.batch_decorator import BatchDecorator -from metaflow.plugins.resources_decorator import ResourcesDecorator -from metaflow.plugins.retry_decorator import RetryDecorator -from metaflow.util import compress_list, dict_to_cli_options, to_pascalcase +from metaflow.util import dict_to_cli_options, to_pascalcase from ..batch.batch import Batch from .event_bridge_client import EventBridgeClient From 30992be75caabb4dadc1b2bda7e06af53b9634f4 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Tue, 24 Oct 2023 17:14:15 +0300 Subject: [PATCH 3/3] fix shared_memory and memory value handling for step-functions --- metaflow/plugins/aws/batch/batch_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metaflow/plugins/aws/batch/batch_client.py b/metaflow/plugins/aws/batch/batch_client.py index b80a7ec586..ed4ade71e0 100644 --- a/metaflow/plugins/aws/batch/batch_client.py +++ b/metaflow/plugins/aws/batch/batch_client.py @@ -216,7 +216,7 @@ def _register_job_definition( if shared_memory is not None: if not ( isinstance(shared_memory, (int, unicode, basestring)) - and int(shared_memory) > 0 + and int(float(shared_memory)) > 0 ): raise BatchJobException( "Invalid shared memory size value ({}); " @@ -225,7 +225,7 @@ def _register_job_definition( else: job_definition["containerProperties"]["linuxParameters"][ "sharedMemorySize" - ] = int(shared_memory) + ] = int(float(shared_memory)) if swappiness is not None: if not ( isinstance(swappiness, (int, unicode, basestring)) @@ -298,7 +298,7 @@ def _register_job_definition( ) else: # default tmpfs behavior - https://man7.org/linux/man-pages/man5/tmpfs.5.html - tmpfs_size = int(memory) / 2 + tmpfs_size = int(float(memory)) / 2 job_definition["containerProperties"]["linuxParameters"]["tmpfs"] = [ {