Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EnableDistributed mode for Step Function maps #1576

Closed
wants to merge 11 commits into from

Conversation

bhiles
Copy link

@bhiles bhiles commented Oct 9, 2023

This PR introduces a way to utilize the "Distributed" type of Map (vs "Inline") to enable Step Functions to scale to larger sizes (issue: #1216).

How it works

  • Add in a new flag --use-distributed-map when creating a step function (ie python helloworld.py step-functions create --use-distributed-map)
  • All map steps are then turned to use Distributed mode (vs the Inline) mode
  • Distributed Map values are written to S3 and then we need to run some additional steps (fetch the S3 manifest and run a second Distributed Map step to extract the value)
  • Lastly, the Run ID of the initial step function is passed down to maps so that it can be accessed on child Step Functions so data can be found in the S3 bucket. This is achieved by passing it into the Dynamo table.

Additional Notes

This PR works for my test flows (one that maps to 1k values to test the scale, and another that does a map -> map -> join -> join).

How it works:

* Add in a new flag `--use-distributed-map` when creating a step function (ie `python helloworld.py step-functions create --use-distributed-map`)
* All map steps are then turned to use `Distributed` mode (vs the `Inline`) mode

Problems
* The root RunID isn't available to downstream steps to be accessed
* Foreach with lots of values have results that are too big and thus can't pass the split ID
@bhiles bhiles changed the title A basic flow works for the Distributed mode [WIP] EnableDistributed mode for Step Function maps Oct 9, 2023
@bhiles bhiles changed the title [WIP] EnableDistributed mode for Step Function maps EnableDistributed mode for Step Function maps Oct 11, 2023
@@ -10,14 +10,15 @@ def __init__(self):
self._client = get_aws_client("dynamodb")
self.name = SFN_DYNAMO_DB_TABLE

def save_foreach_cardinality(self, foreach_split_task_id, foreach_cardinality, ttl):
def save_foreach_cardinality(self, foreach_split_task_id, foreach_cardinality, ttl, root_run_id):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds a spot so that we can save the root Run ID so it can be extracted in child step function executions.

.items_path("$.Result.Item.for_each_cardinality.NS")
.parameter("JobId.$", "$.JobId")
.parameter("SplitParentTaskId.$", "$.JobId")
.parameter("Parameters.$", "$.Parameters")
.parameter("Index.$", "$$.Map.Item.Value")
.next(node.matching_join)
.parameter("RootRunId.$", "$.Result.Item.root_run_id.S")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are extracting the root Run ID from dynamo and putting into the parameters as RootRunId so it can be used.

.items_path("$.Result.Item.for_each_cardinality.NS")
.parameter("JobId.$", "$.JobId")
.parameter("SplitParentTaskId.$", "$.JobId")
.parameter("Parameters.$", "$.Parameters")
.parameter("Index.$", "$$.Map.Item.Value")
.next(node.matching_join)
.parameter("RootRunId.$", "$.Result.Item.root_run_id.S")
.next(f"{iterator_name}_GetManifest" if self.use_distributed_map else node.matching_join)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are using the distributed map, the next step won't be the node.matching_join, but some additional steps to get the parameters from one of the distributed map's S3 data. To achieve that we add:

  • Write the data to S3 using ResultWriter
  • Add a S3.GetObject step to get the manifest.json file which gives us the list of files our results are saved in
  • Add a distributed map step (with a limit of 1 value) to read just the first entry of the first file
  • Add a pass step to just transform the data to the expected format to be passed to the join

@@ -328,8 +332,62 @@ def _visit(node, workflow, exit_node=None):
)
)
.max_concurrency(self.max_workers)
.output_path("$.[0]")
.output_path("$" if self.use_distributed_map else "$.[0]")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows us to get the S3 result writer destination when using the distributed map mode.

# These additional states are how we pull the data from S3.
if self.use_distributed_map:
workflow.add_state_hack(f"{iterator_name}_GetManifest", {
"Type": "Task",
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We potentially can skip this step if we just transform the $bucket/$prefix/$UUID/manifest.json to be $bucket/$prefix/$UUID/SUCCEED_0.json, but wanted to share this running flow before I optimize more.

# we need to write the results to S3 and then get the data from S3.
# These additional states are how we pull the data from S3.
if self.use_distributed_map:
workflow.add_state_hack(f"{iterator_name}_GetManifest", {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_state_hack just allowed me to add states that weren't objects but just dictionaries. This isn't something I'd check in, just wanted to get the flow function and get a cursory review before I optimized that functionality.

"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": {
"InputType": "JSON",
"MaxItems": 1
Copy link
Author

@bhiles bhiles Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you can see I've added a limit (via MaxItems) so the distributed map step only get a single value. And I can fetch just the first result from this map, since I know I'm only processing a single value (vs on the actual map step it could be a huge number since its user defined).

I attempted to just read the file via a step of S3.GetObject but the file can be too big to parse as a step that doesn't use a lambda, so I found this a work-around. But definitely unfortunate we need to add a second distributed map.

Also I tried restricting the output of the distributed map to just give me a single value, but it always got the data exceeded value on a large number of runs. So this was the scalable alternative.

@@ -538,11 +596,13 @@ def _batch(self, node):
attrs[
"split_parent_task_id_%s.$" % node.split_parents[-1]
] = "$.SplitParentTaskId"
attrs["root_run_id.$"] = "$.RootRunId"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to copy the logic for $.SplitParentTaskId to pass the $.RootRunId through steps.

@bhiles bhiles marked this pull request as ready for review October 11, 2023 15:02
@bhiles
Copy link
Author

bhiles commented Jan 12, 2024

Closing this in favor of #1683

@bhiles bhiles closed this Jan 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant