-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream-level composability for state artifacts #6
Comments
The inverse of this question came up in a recent discussion: "Should state artifacts be splittable?" Meaning, from the state output of a tap execution with 3 streams: can I split that state artifact into three distinct states, one for each stream? Another way to phrase this could be "composable and decomposable states". cc @tayloramurphy , @DouweM , @dmosorast |
@aaronsteers great call out. Perhaps this issue could be used to provide some structure for state in the spec? Currently documented in the spec is this phrase:
But there is a "recommended" structure. Perhaps we could add a few optional fields to a given state message (or within the |
@tayloramurphy - I like this approach personally. I think at the top level, this already exists for 95% of taps: {
"bookmarks": {
"stream_1": {},
"stream_2": {}
},
"nonessential_extra_1": {},
"nonessential_extra_2": {}
} Important to codify:
@dmosorast - What do you think of this approach, especially point 3. I know you use markers to resort/order streams. Could that (and other similar use cases) be deemed "non-essential" in the case of splitting/merging? |
@aaronsteers Yes, I think the structure you specify there is the most common, and I think there is quite a bit of frameworky code that encodes this assumption. I agree with the For the non-essentials portion, I'm wondering what values are put up there in practice. The only reason I can think of that would require values at the top level is if they do not apply to a specific stream, and the only key that comes to mind right now is However, I think it is feasible that a tap could have a need for some other more crucial stateful values that would apply to all streams (e.g., a global min/max transaction ID for a database, or the last sync's start for a complex nested bookmarking strategy). Though, in this case, maybe those keys would be nested at the top level under The more I think about it, the less I'm convinced that we can definitively say any keys outside of Clarifying Questions To add some clarity for me, is this being proposed as a Standard to codify that per-stream bookmarks should be specified in this structure? or is it a spec change (e.g., changing how the The reason I ask is that a core concept of the |
I think it would be the former. No need to change how taps communicate STATE or how the STATE message works. The action here would just be to clarify guidance so that orchestrators can take this as a given and to the needed. 👍
I agree, at least in theory, this is a possibility. I think in those cases, we'd encourage any critical functionality to be encoded redundantly per stream. This is also important in general: since presumably toggling a new stream to be "selected=true" after previously being deselected would buy into the same downsides - for instance, if global progress in a binlog tracker, is inferred from a stream that wasn't previously scanned to that point. |
@aaronsteers Great, as a stated Standard, I'm currently on board. For the global state, that's a good point. In the case of global values, the ability to select and deselect streams requires the tap to be able to 1) Detect a stale bookmark entry on the global piece of state (even if it was a top-level piece) and 2) Be able to process a state to account for the earliest viable bookmark across streams. I think this is how |
Maybe as an added piece of "versioning" concern, there are some taps that don't ascribe to this standard, so versioning the standards might need to be considered as well. As an example, What does that mean as a whole to orchestrators using that tap-stripe? The beneficial assumptions won't be able to be made for it, but a refactor of how state is written is a pretty huge breaking change that would require some kind of step-wise upgrade procedure. I'm curious about your thoughts on that kind of "certification path". |
It's a good point, and this probably warrants its own issue - and how we approach probably depends on the specifics of the part of the spec and the type/degree of incompatibility.
State format changes are specifically tricky. In the SDK, we have a overloadable |
Sweet! FYI @tayloramurphy, I think this is ready to be drafted up and commented on. |
Just a note to say Meltano already merges STATE messages for failed/incomplete previous pipeline runs. This is effectively done to update the 'last good run' with any streams that were able to successfully complete and emit STATE (even if the overall job failed). Logic is here. |
Adding to the above, the merge logic is here: https://gitlab.com/meltano/meltano/-/blob/master/src/meltano/core/utils/__init__.py#L141-157 Based on a simple recursive dict merge logic as shared here: https://stackoverflow.com/a/20666342/4298208 |
@aaronsteers I think the merge algorithm (as pseudocode) should probably be part of the SIP. Wdyt? |
@edgarrmondragon - Yes, I think so! I'm not sure though if we should infinitely recurse dictionaries or just root + "bookmarks" + optionally "partitions" when it is a grandchild of "bookmarks". Whatever is deemed safe and most interoperable, would be good to include in the SIP text. |
Related: as noted in the last paragraph of my comment here, we may also want the orchestrator to treat null value assignments such as |
As discussed in: https://gitlab.com/meltano/meltano/-/issues/2903
Summary: We'd like to be able to run multiple copies of a tap in parallel and then merge the state from the two jobs. While this likely works in practice with most taps today, it requires some assumptions about how state is stored and where/whether merge conflicts are expected.
For example:
The text was updated successfully, but these errors were encountered: