-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Send heartbeat with progress during SFTP upload #827
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like I forgot to commit the fakes generated by a new version of github.com/uber-go/mock? Perhaps that work could go into its own dedicated commit.
1f168dd
to
7809e74
Compare
Looking good so far. This reminds me of http.Get which unblocks when the headers are received so you can read resp.Body (the underlying conn remains open until resp.Body.Close() is called). |
Thanks @sevein! It's been a great opportunity to learn more about async processing with goroutines and channels. My brain was smoking a bit yesterday trying to figure out the bugs, but I learned a lot. :) And it turns out atomics came in handy after all! 😆 |
1303ef2
to
863854c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
// Upload asynchronously copies data from the src reader to the specified | ||
// dest on the SFTP server. | ||
Upload(ctx context.Context, src io.Reader, dest string) (remotePath string, upload AsyncUpload, err error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm interested to see how our approach to this API evolves over time. Lately, I've noticed a trend in the community towards more synchronous APIs, and moving away from using channels due to their complexity (semantics) and potential for errors.
If we had to buidl a public API that prioritizes simplicity and ease of use, we could explore these two alternatives:
- Provide a io.Writer, e.g. gocloude.dev's bucket.NewWriter,
- Provide a callback, e.g.:
type UploadStatusCallback func(bytesWritten int64)
// Upload blocks.
Upload(ctx context.Context, src io.Reader, dest string, updater UploadStatusCallback) (remotePath string, err error)
Just to be clear, I'm not suggesting we need to change the current design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sevein my original plan was to add an UploadChunk()
method that would take start and width parameters, and then just loop it in the upload activity with a heartbeat + bytes upload count each iteration. I thought it would fairly easy to do chunked read/writes with the io
interfaces, but my reading on it gave me the impression that reading/writing chunks like that is not very efficient. The best answer I found for how to do an (up/down)load progress update was to use io.TeeWriter()
like I've used here. Unfortunately I don't see any way to use the TeeWriter for updates without goroutines and channels for communication. 🤷
I'm not sure how you get around the blocking problem with io.Writer
or a callback to provide heartbeats during the upload, without using goroutines somehow. It seems like you would have to push the aysnc complexity to the caller instead of handling it in the Upload method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've read somewhere that the Go SDK throttles heartbeats. I suspect that's why in the code examples they just show RecordHeartbeat
being used in loops. The SDK makes sure we don't flood the Temporal frontend service with unnecessary heartbeats. I'll try to find where I saw that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the activity code could look like:
type progressReader struct {
r io.Reader
callback func(int)
}
func (pr *progressReader) Read(p []byte) (int, error) {
n, err := pr.r.Read(p)
if err == nil {
pr.callback(n)
}
return n, err
}
io.Copy(w, progressReader{myFileReader, myHeartbeater})
Not sure if that's any easier, maybe?
863854c
to
6119236
Compare
Fixes issue #815. - Make the `sftp.Upload()` method asynchronous, and return an `AsyncUpload` struct that receives upload status updates, including upload progress - Add a heartbeat to the UploadTransferActivity that includes the number of bytes uploaded - Use sync/atomic to prevent a data race from simultaneous writes and reads of the `AsyncUploadImpl.bytes` field
6119236
to
1ce6503
Compare
Fixes issue #815.