-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(sftp): make sure to delete last file when watch
and delete_on_finish
are enabled
#3037
base: main
Are you sure you want to change the base?
Conversation
83668cd
to
2e47f2a
Compare
I don't think there is a utility so either you need to do option 1 or implement the retry logic - which I don't think should be too bad? Here's the code that drives this in benthos AFAIK: https://github.com/redpanda-data/benthos/blob/dad70374cd8fb323f0c7f47452498ea94c2ed7aa/internal/component/input/async_reader.go#L115 The pipeline option (number 1) might be the best route, but I'm not too familiar with that test helper myself. |
This commit reduces the scope of critical sections guarded by scannerMut to remove a deadlock that causes the last file to not be deleted when the SFTP input is used with watching enabled.
`(*watcherPathProvider).Next()` currently uses recursion to loop until a path is found. This commit refactors that function to use a for loop instead which is more straight forward to read.
This integration test makes sure that when `delete_on_finish` is true and watching is enabled that we delete every file.
1bbf6fa
to
ab133f4
Compare
builder := service.NewStreamBuilder() | ||
require.NoError(t, builder.SetYAML(config)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is a utility so either you need to do option 1 or implement the retry logic - which I don't think should be too bad?
Here's the code that drives this in benthos AFAIK: https://github.com/redpanda-data/benthos/blob/dad70374cd8fb323f0c7f47452498ea94c2ed7aa/internal/component/input/async_reader.go#L115
The pipeline option (number 1) might be the best route, but I'm not too familiar with that test helper myself.
Knowledge from the great and powerful @mihaitodor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great tests thanks :)
// path from the cache then we skip that path because the | ||
// watcher will eventually poll again, and the cache.Get | ||
// operation will re-run. | ||
if v, err := cache.Get(ctx, path); errors.Is(err, service.ErrKeyNotFound) || (!w.followUpPoll && string(v) == "!") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: separately it would be nice to use constants for the pending symbol and other cache values :)
s.scannerMut.Lock() | ||
defer s.scannerMut.Unlock() | ||
|
||
if s.scanner != nil { | ||
return nil | ||
skip = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we skip if there is a scanner (and what are we skipping)? Can you add a comment?
outErr = fmt.Errorf("remove %v: %w", nextPath, outErr) | ||
} | ||
} | ||
s.scannerMut.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we always return after this block we could defer this right? I just get worried if the unlock is not right after.
|
||
details := service.NewScannerSourceDetails() | ||
details.SetName(nextPath) | ||
if s.scanner, err = s.scannerCtor.Create(file, func(ctx context.Context, aErr error) (outErr error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the assignment of s.scanner
needs to be under the mutex as well based on the ReadBatch
function right?
@@ -242,11 +241,22 @@ func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPat | |||
s.pathProvider = s.getFilePathProvider(ctx) | |||
} | |||
|
|||
return s.client, s.pathProvider, false, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a newcomer to this code this feels unsafe.
If the only important part of this code that all these variables are accessed/set together atomically, I do wonder if an atomic is better suited. You can use Swap
to set the new value and destroy in Close
, Store
in Connect and Load
in ReadBatch. I don't quite understand the higher level contract here of why it's only required that they are accessed concurrently and we don't have to worry about Close
clobbering something ongoing in Connect
or ReadBatch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And when I talk about using atomics I mean using typed.AtomicValue
in our typed package in internal/typed
and wrapping all this state into a struct so it becomes typed.AtomicValue[*sftpReaderState]
builder := service.NewStreamBuilder() | ||
require.NoError(t, builder.SetYAML(config)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great tests thanks :)
Fixes #2435
Questions
I believe I have fixed the underlying issue, but I am not sure how to write an integration test to verify the fix. I have created a new integration test function with a
TODO
comment on where I got stuck. The questions I have around this are:watch
anddelete_on_finished
enabled, the use an SFTP client directly to inspect which files exist on the server to make sure they are all deleted after the pipeline runs. However, I'm not sure how to actually run the pipeline. Is too specific of a test to run usingintegration.StreamTests()
, and if not, could you point me in the right direction?newSFTPReaderFromParsed()
directly from the tests then useConnect()
, andReadBatch()
to interact with the plugin. However this plugin appears to be unusually structured in the way that it progresses through the input files. What it does is finds the first file inConnect()
and sets up the scanner for the file. InReadBatch()
, when the file is exhausted,ReadBatch()
returnsservice.ErrNotConnected
which will cause the engine to re-runConnect()
which advances to the next file. If the plugin only requiredConnect()
to be called once, I would be happy to drive the plugin directly in the tests, but because of the reconnection logic required, I was hesitant to reimplement the reconnection loop in the tests. Is there a utility somewhere that I can use from a test that implements the reconnect logic?