-
Notifications
You must be signed in to change notification settings - Fork 787
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EnableDistributed
mode for Step Function maps
#1576
Conversation
How it works: * Add in a new flag `--use-distributed-map` when creating a step function (ie `python helloworld.py step-functions create --use-distributed-map`) * All map steps are then turned to use `Distributed` mode (vs the `Inline`) mode Problems * The root RunID isn't available to downstream steps to be accessed * Foreach with lots of values have results that are too big and thus can't pass the split ID
Distributed
mode for Step Function maps
Distributed
mode for Step Function mapsDistributed
mode for Step Function maps
@@ -10,14 +10,15 @@ def __init__(self): | |||
self._client = get_aws_client("dynamodb") | |||
self.name = SFN_DYNAMO_DB_TABLE | |||
|
|||
def save_foreach_cardinality(self, foreach_split_task_id, foreach_cardinality, ttl): | |||
def save_foreach_cardinality(self, foreach_split_task_id, foreach_cardinality, ttl, root_run_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This adds a spot so that we can save the root Run ID so it can be extracted in child step function executions.
.items_path("$.Result.Item.for_each_cardinality.NS") | ||
.parameter("JobId.$", "$.JobId") | ||
.parameter("SplitParentTaskId.$", "$.JobId") | ||
.parameter("Parameters.$", "$.Parameters") | ||
.parameter("Index.$", "$$.Map.Item.Value") | ||
.next(node.matching_join) | ||
.parameter("RootRunId.$", "$.Result.Item.root_run_id.S") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we are extracting the root Run ID from dynamo and putting into the parameters as RootRunId
so it can be used.
.items_path("$.Result.Item.for_each_cardinality.NS") | ||
.parameter("JobId.$", "$.JobId") | ||
.parameter("SplitParentTaskId.$", "$.JobId") | ||
.parameter("Parameters.$", "$.Parameters") | ||
.parameter("Index.$", "$$.Map.Item.Value") | ||
.next(node.matching_join) | ||
.parameter("RootRunId.$", "$.Result.Item.root_run_id.S") | ||
.next(f"{iterator_name}_GetManifest" if self.use_distributed_map else node.matching_join) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are using the distributed map, the next step won't be the node.matching_join
, but some additional steps to get the parameters from one of the distributed map's S3 data. To achieve that we add:
- Write the data to S3 using
ResultWriter
- Add a
S3.GetObject
step to get themanifest.json
file which gives us the list of files our results are saved in - Add a
distributed map
step (with a limit of 1 value) to read just the first entry of the first file - Add a
pass
step to just transform the data to the expected format to be passed to the join
@@ -328,8 +332,62 @@ def _visit(node, workflow, exit_node=None): | |||
) | |||
) | |||
.max_concurrency(self.max_workers) | |||
.output_path("$.[0]") | |||
.output_path("$" if self.use_distributed_map else "$.[0]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows us to get the S3 result writer destination when using the distributed map mode.
# These additional states are how we pull the data from S3. | ||
if self.use_distributed_map: | ||
workflow.add_state_hack(f"{iterator_name}_GetManifest", { | ||
"Type": "Task", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We potentially can skip this step if we just transform the $bucket/$prefix/$UUID/manifest.json
to be $bucket/$prefix/$UUID/SUCCEED_0.json
, but wanted to share this running flow before I optimize more.
# we need to write the results to S3 and then get the data from S3. | ||
# These additional states are how we pull the data from S3. | ||
if self.use_distributed_map: | ||
workflow.add_state_hack(f"{iterator_name}_GetManifest", { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add_state_hack
just allowed me to add states that weren't objects but just dictionaries. This isn't something I'd check in, just wanted to get the flow function and get a cursory review before I optimized that functionality.
"Resource": "arn:aws:states:::s3:getObject", | ||
"ReaderConfig": { | ||
"InputType": "JSON", | ||
"MaxItems": 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you can see I've added a limit (via MaxItems
) so the distributed map step only get a single value. And I can fetch just the first result from this map, since I know I'm only processing a single value (vs on the actual map step it could be a huge number since its user defined).
I attempted to just read the file via a step of S3.GetObject
but the file can be too big to parse as a step that doesn't use a lambda, so I found this a work-around. But definitely unfortunate we need to add a second distributed map.
Also I tried restricting the output of the distributed map to just give me a single value, but it always got the data exceeded value on a large number of runs. So this was the scalable alternative.
@@ -538,11 +596,13 @@ def _batch(self, node): | |||
attrs[ | |||
"split_parent_task_id_%s.$" % node.split_parents[-1] | |||
] = "$.SplitParentTaskId" | |||
attrs["root_run_id.$"] = "$.RootRunId" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to copy the logic for $.SplitParentTaskId
to pass the $.RootRunId
through steps.
Closing this in favor of #1683 |
This PR introduces a way to utilize the "Distributed" type of Map (vs "Inline") to enable Step Functions to scale to larger sizes (issue: #1216).
How it works
--use-distributed-map
when creating a step function (iepython helloworld.py step-functions create --use-distributed-map
)Distributed
mode (vs theInline
) modeAdditional Notes
This PR works for my test flows (one that maps to 1k values to test the scale, and another that does a map -> map -> join -> join).