Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky vreplication tests: correct logic that checks for workflow state in test helper #17498

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,29 +368,33 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
require.NoError(t, err, output)
done = true
state := ""
streams := gjson.Get(output, "workflows.0.shard_streams.*.streams")
streams.ForEach(func(streamId, stream gjson.Result) bool { // For each stream
info := stream.Map()
// We need to wait for all streams to have the desired state.
state = info["state"].String()
if state == wantState {
for i := 0; i < len(fieldEqualityChecks); i++ {
if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 {
key := kvparts[0]
val := kvparts[1]
res := info[key].String()
if !strings.EqualFold(res, val) {
done = false
shardStreams := gjson.Get(output, "workflows.0.shard_streams")
// We need to wait for all streams in all shard streams to have the desired state.
shardStreams.ForEach(func(shardStreamId, shardStream gjson.Result) bool {
Comment on lines +371 to +373
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you used https://gjson.dev with sample outputs to be sure that this is different? I thought that this would get all streams, but maybe not:

streams := gjson.Get(output, "workflows.0.shard_streams.*.streams")
streams.ForEach(func(streamId, stream gjson.Result) bool { // For each stream

Just a note that there's no need to backport this PR as the code you're modifying here is new from this recent PR: #17441

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I confirmed that workflows.0.shard_streams.*.streams only returns 1 stream. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I had tested the json parsing locally and that is how I figured out the problem and the fix. The gjson document is quite confusing with regard to wildcard filtering!

streams := shardStream.Get("*")
streams.ForEach(func(streamId, stream gjson.Result) bool {
info := stream.Map()
state = info["state"].String()
if state == wantState {
for i := 0; i < len(fieldEqualityChecks); i++ {
if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 {
key := kvparts[0]
val := kvparts[1]
res := info[key].String()
if !strings.EqualFold(res, val) {
done = false
}
}
}
}
if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() &&
(info["position"].Exists() && info["position"].String() == "") {
if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() &&
(info["position"].Exists() && info["position"].String() == "") {
done = false
}
} else {
done = false
}
} else {
done = false
}
return true
})
return true
})
if done {
Expand Down
Loading