This repository has been archived by the owner on Dec 5, 2023. It is now read-only.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What is this?
The issue of how to handle processing items dependent on n other items seems to be a reoccurring theme lately when discussing how best to structure cirrus workflows modeling such dependencies. Some purpose-built solutions have been suggested for particular instances of this issue, but a larger question has been asked: can we find a solution worthy of integration into cirrus to support this use-case more generally?
An idea: workflow callbacks
In thinking through this problem, I stumbled upon support within step functions for task tokens, which can be used to pause a step function execution until a token is returned. It works like this:
SendTaskSuccess
orSendtaskFailure
, as appropriate, with the specified tokenWithin cirrus, we could use this mechanism to wait on items to be published from other workflows. For example, the SQS could be the cirrus process SQS and the message could be the cirrus process payload for the child workflow. That payload would include a task token in it corresponding to the waiting step function task.
If we could guarantee a one-to-one relation between workflows in this process, we could simply have
update-state
call theSendTaskSuccess
orSendTaskFailed
using the task token from the input payload, if one is present. However, we have no such guarantees: multiple workflow might be dependent on the same items, so we have to add some complexity to handle these possible many-to-one relations.I propose we use a dynamodb table to track these relations. The schema would looks something like:
payload_id
: the ID from a workflow payload (partition key)task_token
: the token of the waiting step function task (sort key)workflow_state
: the final state of the workflow corresponding to payload_idexpiration_time
: a field that can be used to track if the callback has been executed and can be used with a table ttl to expire records (so we don’t infinitely grow the table but can keep records around for a limited time as part of an audit trail)We could use a table stream to have a lambda process all records when
workflow_state
is set and make the callback toSendTaskSuccess
orSendTaskFailed
, settingexpiration_time
when that happens. Thenupdate-state
would only have to query for all records in this table with thepayload_id
and update them with the workflow’s final state.process
would persist a task token in an input payload to that dynamo table regardless of the whether it kicks off a workflow. We could map theworkflow_state
values to the same as those for payload items, wherePROCESSING
means another update is coming,COMPLETED
means to send a success notification, andINVALID
triggers a failure notification (the other states encountered in process would trigger a workflow run and would therefore default toPROCESSING
).Here is a diagram of this flow, not sure if it makes sense or not though:
Handling multiple dependencies within a workflow
It should be possible to use dynamic parallelism to create as many parallel tasks as required to queue up all other workflows, waiting until they all complete to resume the dependent root workflow.
Preventing hanging workflows
It would be possible to combine the waiting tasks with a timeout, so if the dependencies are not completed in a reasonable time the root workflow will fail, triggering awareness of the issue.
It would also be possible to have tooling/alarms looking for long-running step function executions.
Potential Negatives
Race conditions
Depending how we implement this, we could have end up with a risk of race conditions. I think by using the table stream and always having
process
add a row to the table we could mitigate that potential entirely, but I might be overlooking some edge cases.Complexity
This seems like it might be a complex solution, but we need some external state tracking to make this work, so I’m not sure any alternatives would be any less complex.
AWS-centric
It could be hard to implement a mechanism like this on alternative platforms.
Capacity limitations
An advantage of this solution is that it does not require any daemon-like process to poll for state updates or handle events, outside the step function platform itself. Therefore we do not incur any costs for a workflow waiting on dependencies, aside from the fact that it uses up one of the 1,000,000 step function limit (and it appears this limit can be increased upon request).
In high-volume workflow situations, it could therefore be possible to cause a deadlock where too many workflows are waiting on items to be processed to be able to start workflows to process those items. I don’t see a good solution to this issue beyond preventing it in the first place.
As a best-practice, then, I would suggest not relying solely on this workflow dependency relationship to trigger processing for dependencies in high-volume situations. Generally it is better to have a process to ensure all lower layers items are as present as possible using an alternative feeder method, and rely on this dependency relationship only for a minimal number of items on “the edge” of the other feeder’s processing.