Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send heartbeat with progress during SFTP upload #827

Merged
merged 1 commit into from
Jan 19, 2024

Conversation

djjuhasz
Copy link
Collaborator

Fixes issue #815.

@djjuhasz djjuhasz requested a review from sevein January 12, 2024 02:29
Copy link
Contributor

@sevein sevein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like I forgot to commit the fakes generated by a new version of github.com/uber-go/mock? Perhaps that work could go into its own dedicated commit.

@djjuhasz djjuhasz force-pushed the dev/issue-815-sftp-upload-progress branch 2 times, most recently from 1f168dd to 7809e74 Compare January 16, 2024 23:35
@sevein
Copy link
Contributor

sevein commented Jan 17, 2024

Looking good so far. This reminds me of http.Get which unblocks when the headers are received so you can read resp.Body (the underlying conn remains open until resp.Body.Close() is called).

@djjuhasz
Copy link
Collaborator Author

Looking good so far. This reminds me of http.Get which unblocks when the headers are received so you can read resp.Body (the underlying conn remains open until resp.Body.Close() is called).

Thanks @sevein! It's been a great opportunity to learn more about async processing with goroutines and channels. My brain was smoking a bit yesterday trying to figure out the bugs, but I learned a lot. :)

And it turns out atomics came in handy after all! 😆

@djjuhasz djjuhasz force-pushed the dev/issue-815-sftp-upload-progress branch 7 times, most recently from 1303ef2 to 863854c Compare January 18, 2024 01:21
@djjuhasz djjuhasz requested a review from sevein January 18, 2024 01:29
Copy link
Contributor

@sevein sevein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +33 to +35
// Upload asynchronously copies data from the src reader to the specified
// dest on the SFTP server.
Upload(ctx context.Context, src io.Reader, dest string) (remotePath string, upload AsyncUpload, err error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm interested to see how our approach to this API evolves over time. Lately, I've noticed a trend in the community towards more synchronous APIs, and moving away from using channels due to their complexity (semantics) and potential for errors.

If we had to buidl a public API that prioritizes simplicity and ease of use, we could explore these two alternatives:

  1. Provide a io.Writer, e.g. gocloude.dev's bucket.NewWriter,
  2. Provide a callback, e.g.:
type UploadStatusCallback func(bytesWritten int64)
    
// Upload blocks.
Upload(ctx context.Context, src io.Reader, dest string, updater UploadStatusCallback) (remotePath string, err error)

Just to be clear, I'm not suggesting we need to change the current design.

Copy link
Collaborator Author

@djjuhasz djjuhasz Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sevein my original plan was to add an UploadChunk() method that would take start and width parameters, and then just loop it in the upload activity with a heartbeat + bytes upload count each iteration. I thought it would fairly easy to do chunked read/writes with the io interfaces, but my reading on it gave me the impression that reading/writing chunks like that is not very efficient. The best answer I found for how to do an (up/down)load progress update was to use io.TeeWriter() like I've used here. Unfortunately I don't see any way to use the TeeWriter for updates without goroutines and channels for communication. 🤷

I'm not sure how you get around the blocking problem with io.Writer or a callback to provide heartbeats during the upload, without using goroutines somehow. It seems like you would have to push the aysnc complexity to the caller instead of handling it in the Upload method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've read somewhere that the Go SDK throttles heartbeats. I suspect that's why in the code examples they just show RecordHeartbeat being used in loops. The SDK makes sure we don't flood the Temporal frontend service with unnecessary heartbeats. I'll try to find where I saw that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@sevein sevein Jan 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the activity code could look like:

type progressReader struct {
    r        io.Reader
    callback func(int)
}

func (pr *progressReader) Read(p []byte) (int, error) {
    n, err := pr.r.Read(p)
    if err == nil {
        pr.callback(n)
    }
    return n, err
}

io.Copy(w, progressReader{myFileReader, myHeartbeater})

Not sure if that's any easier, maybe?

@djjuhasz djjuhasz force-pushed the dev/issue-815-sftp-upload-progress branch from 863854c to 6119236 Compare January 19, 2024 19:01
Fixes issue #815.

- Make the `sftp.Upload()` method asynchronous, and return an
  `AsyncUpload` struct that receives upload status updates, including
  upload progress
- Add a heartbeat to the UploadTransferActivity that includes the number
  of bytes uploaded
- Use sync/atomic to prevent a data race from simultaneous writes and
  reads of the `AsyncUploadImpl.bytes` field
@djjuhasz djjuhasz force-pushed the dev/issue-815-sftp-upload-progress branch from 6119236 to 1ce6503 Compare January 19, 2024 19:09
@djjuhasz djjuhasz merged commit 8228ec6 into main Jan 19, 2024
11 checks passed
@djjuhasz djjuhasz deleted the dev/issue-815-sftp-upload-progress branch January 19, 2024 19:15
@djjuhasz djjuhasz linked an issue Mar 1, 2024 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Problem: Archivematica SFTP upload provides no progress updates
2 participants