Skip to content

Commit

Permalink
Provide periodic SFTP upload progress updates
Browse files Browse the repository at this point in the history
Fixes issue #815.

- Make the `sftp.Upload()` method asynchronous, and return an
  `AsyncUpload` struct that receives upload status updates, including
  upload progress
- Add a heartbeat to the UploadTransferActivity that includes the number
  of bytes uploaded
- Use sync/atomic to prevent a data race from simultaneous writes and
  reads of the `AsyncUploadImpl.bytes` field
  • Loading branch information
djjuhasz committed Jan 19, 2024
1 parent ef5328b commit 6119236
Show file tree
Hide file tree
Showing 14 changed files with 591 additions and 171 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ gen-mock: $(MOCKGEN)
mockgen -typed -destination=./internal/api/auth/fake/mock_ticket_store.go -package=fake github.com/artefactual-sdps/enduro/internal/api/auth TicketStore
mockgen -typed -destination=./internal/package_/fake/mock_package_.go -package=fake github.com/artefactual-sdps/enduro/internal/package_ Service
mockgen -typed -destination=./internal/persistence/fake/mock_persistence.go -package=fake github.com/artefactual-sdps/enduro/internal/persistence Service
mockgen -typed -destination=./internal/sftp/fake/mock_sftp.go -package=fake github.com/artefactual-sdps/enduro/internal/sftp Client
mockgen -typed -destination=./internal/sftp/fake/mock_sftp.go -package=fake github.com/artefactual-sdps/enduro/internal/sftp Client,AsyncUpload
mockgen -typed -destination=./internal/storage/fake/mock_storage.go -package=fake github.com/artefactual-sdps/enduro/internal/storage Service
mockgen -typed -destination=./internal/storage/persistence/fake/mock_persistence.go -package=fake github.com/artefactual-sdps/enduro/internal/storage/persistence Storage
mockgen -typed -destination=./internal/upload/fake/mock_upload.go -package=fake github.com/artefactual-sdps/enduro/internal/upload Service
Expand Down
2 changes: 1 addition & 1 deletion cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func main() {
activities.NewZipActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.ZipActivityName},
)
w.RegisterActivityWithOptions(
am.NewUploadTransferActivity(logger, sftpClient).Execute,
am.NewUploadTransferActivity(logger, sftpClient, cfg.AM.PollInterval).Execute,
temporalsdk_activity.RegisterOptions{Name: am.UploadTransferActivityName},
)
w.RegisterActivityWithOptions(
Expand Down
60 changes: 31 additions & 29 deletions internal/am/delete_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,40 @@ func TestDeleteTransferActivity(t *testing.T) {
)

type test struct {
name string
params am.DeleteTransferActivityParams
recorder func(*sftp_fake.MockClientMockRecorder, am.DeleteTransferActivityParams)
errMsg string
name string
params am.DeleteTransferActivityParams
mock func(*gomock.Controller) *sftp_fake.MockClient
errMsg string
}
for _, tt := range []test{
{
name: "Deletes transfer",
params: am.DeleteTransferActivityParams{
Destination: td.Path(),
},
recorder: func(m *sftp_fake.MockClientMockRecorder, params am.DeleteTransferActivityParams) {
m.Delete(
mockutil.Context(),
params.Destination,
).Return(nil)
mock: func(ctrl *gomock.Controller) *sftp_fake.MockClient {
client := sftp_fake.NewMockClient(ctrl)
client.EXPECT().
Delete(mockutil.Context(), td.Path()).
Return(nil)

return client
},
},
{
name: "Errors when file does not exist",
params: am.DeleteTransferActivityParams{
Destination: td.Join("missing"),
},
recorder: func(m *sftp_fake.MockClientMockRecorder, params am.DeleteTransferActivityParams) {
m.Delete(
mockutil.Context(),
params.Destination,
).Return(
errors.New("SFTP: unable to remove file \"test.txt\": file does not exist"),
)
mock: func(ctrl *gomock.Controller) *sftp_fake.MockClient {
client := sftp_fake.NewMockClient(ctrl)
client.EXPECT().
Delete(mockutil.Context(), td.Join("missing")).
Return(
errors.New("SFTP: unable to remove file \"test.txt\": file does not exist"),
)

return client
},
errMsg: fmt.Sprintf("delete transfer: path: %q: %v", td.Join("missing"), errors.New("SFTP: unable to remove file \"test.txt\": file does not exist")),
},
Expand All @@ -65,13 +69,15 @@ func TestDeleteTransferActivity(t *testing.T) {
params: am.DeleteTransferActivityParams{
Destination: td.Join(filename),
},
recorder: func(m *sftp_fake.MockClientMockRecorder, params am.DeleteTransferActivityParams) {
m.Delete(
mockutil.Context(),
params.Destination,
).Return(
errors.New("SSH: failed to connect: dial tcp 127.0.0.1:2200: connect: connection refused"),
)
mock: func(ctrl *gomock.Controller) *sftp_fake.MockClient {
client := sftp_fake.NewMockClient(ctrl)
client.EXPECT().
Delete(mockutil.Context(), td.Join(filename)).
Return(
errors.New("SSH: failed to connect: dial tcp 127.0.0.1:2200: connect: connection refused"),
)

return client
},
errMsg: fmt.Sprintf("delete transfer: path: %q: %v", td.Join(filename), errors.New("SSH: failed to connect: dial tcp 127.0.0.1:2200: connect: connection refused")),
},
Expand All @@ -82,14 +88,10 @@ func TestDeleteTransferActivity(t *testing.T) {

ts := &temporalsdk_testsuite.WorkflowTestSuite{}
env := ts.NewTestActivityEnvironment()
msvc := sftp_fake.NewMockClient(gomock.NewController(t))

if tt.recorder != nil {
tt.recorder(msvc.EXPECT(), tt.params)
}
ctrl := gomock.NewController(t)

env.RegisterActivityWithOptions(
am.NewDeleteTransferActivity(logr.Discard(), msvc).Execute,
am.NewDeleteTransferActivity(logr.Discard(), tt.mock(ctrl)).Execute,
temporalsdk_activity.RegisterOptions{
Name: am.DeleteTransferActivityName,
},
Expand Down
68 changes: 60 additions & 8 deletions internal/am/upload_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,54 @@ import (
"fmt"
"os"
"path/filepath"
"time"

"github.com/go-logr/logr"
"go.artefactual.dev/tools/temporal"
temporalsdk_activity "go.temporal.io/sdk/activity"

"github.com/artefactual-sdps/enduro/internal/sftp"
)

const UploadTransferActivityName = "UploadTransferActivity"

type UploadTransferActivityParams struct {
// Local path of the source file.
SourcePath string
}

type UploadTransferActivityResult struct {
// Bytes copied to the remote file over the SFTP connection.
BytesCopied int64
// Full path including `remoteDir` config path.
// Full path of the destination file including `remoteDir` config path.
RemoteFullPath string
// Relative path to the `remoteDir` config path.
// Path of the destination file relative to the `remoteDir` config path.
RemoteRelativePath string
}

// UploadTransferActivity uploads a transfer via the SFTP client, and sends
// a periodic Temporal Heartbeat at the given heartRate.
type UploadTransferActivity struct {
client sftp.Client
logger logr.Logger
client sftp.Client
logger logr.Logger
heartRate time.Duration
}

func NewUploadTransferActivity(logger logr.Logger, client sftp.Client) *UploadTransferActivity {
return &UploadTransferActivity{client: client, logger: logger}
// NewUploadTransferActivity initializes and returns a new
// UploadTransferActivity.
func NewUploadTransferActivity(
logger logr.Logger,
client sftp.Client,
heartRate time.Duration,
) *UploadTransferActivity {
return &UploadTransferActivity{
client: client,
logger: logger,
heartRate: heartRate,
}
}

// Execute copies the source transfer to the destination via SFTP.
func (a *UploadTransferActivity) Execute(ctx context.Context, params *UploadTransferActivityParams) (*UploadTransferActivityResult, error) {
a.logger.V(1).Info("Execute UploadTransferActivity", "SourcePath", params.SourcePath)

Expand All @@ -45,7 +63,7 @@ func (a *UploadTransferActivity) Execute(ctx context.Context, params *UploadTran
defer src.Close()

filename := filepath.Base(params.SourcePath)
bytes, path, err := a.client.Upload(ctx, src, filename)
path, upload, err := a.client.Upload(ctx, src, filename)
if err != nil {
e := fmt.Errorf("%s: %v", UploadTransferActivityName, err)

Expand All @@ -57,9 +75,43 @@ func (a *UploadTransferActivity) Execute(ctx context.Context, params *UploadTran
}
}

fi, err := src.Stat()
if err != nil {
return nil, fmt.Errorf("%s: %v", UploadTransferActivityName, err)
}

// Block (with a heartbeat) until ctx is cancelled, the upload is done, or
// it stops with an error.
err = a.Heartbeat(ctx, upload, fi.Size())
if err != nil {
return nil, err
}

return &UploadTransferActivityResult{
BytesCopied: bytes,
BytesCopied: upload.Bytes(),
RemoteFullPath: path,
RemoteRelativePath: filename,
}, nil
}

// Heartbeat sends a periodic Temporal heartbeat, which includes the number of
// bytes uploaded, until the upload is complete, cancelled or returns an error.
func (a *UploadTransferActivity) Heartbeat(ctx context.Context, upload sftp.AsyncUpload, fileSize int64) error {
ticker := time.NewTicker(a.heartRate)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-upload.Err():
return err
case <-upload.Done():
return nil
case <-ticker.C:
temporalsdk_activity.RecordHeartbeat(ctx,
fmt.Sprintf("Uploaded %d bytes of %d.", upload.Bytes(), fileSize),
)
}
}
}
106 changes: 70 additions & 36 deletions internal/am/upload_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"testing"
"time"

"github.com/go-logr/logr"
"go.artefactual.dev/tools/mockutil"
Expand All @@ -29,7 +30,7 @@ func TestUploadTransferActivity(t *testing.T) {
type test struct {
name string
params am.UploadTransferActivityParams
recorder func(*sftp_fake.MockClientMockRecorder)
mock func(*gomock.Controller) (sftp.Client, sftp.AsyncUpload)
want am.UploadTransferActivityResult
wantErr string
wantNonRetryErr bool
Expand All @@ -40,13 +41,33 @@ func TestUploadTransferActivity(t *testing.T) {
params: am.UploadTransferActivityParams{
SourcePath: td.Join(filename),
},
recorder: func(m *sftp_fake.MockClientMockRecorder) {
var t *os.File
m.Upload(
mockutil.Context(),
gomock.AssignableToTypeOf(t),
filename,
).Return(int64(14), "/transfer_dir/"+filename, nil)
mock: func(ctrl *gomock.Controller) (sftp.Client, sftp.AsyncUpload) {
var fp *os.File

client := sftp_fake.NewMockClient(ctrl)
upload := sftp_fake.NewMockAsyncUpload(ctrl)

client.EXPECT().
Upload(
mockutil.Context(),
gomock.AssignableToTypeOf(fp),
filename,
).
Return("/transfer_dir/"+filename, upload, nil)

doneCh := make(chan bool, 1)
upload.EXPECT().Done().Return(doneCh).Times(2)

errCh := make(chan error, 1)
upload.EXPECT().Err().Return(errCh).Times(2)

upload.EXPECT().Bytes().DoAndReturn(func() int64 {
doneCh <- true
return int64(7)
})
upload.EXPECT().Bytes().Return(14)

return client, upload
},
want: am.UploadTransferActivityResult{
BytesCopied: int64(14),
Expand All @@ -66,17 +87,23 @@ func TestUploadTransferActivity(t *testing.T) {
params: am.UploadTransferActivityParams{
SourcePath: td.Join(filename),
},
recorder: func(m *sftp_fake.MockClientMockRecorder) {
var t *os.File
m.Upload(
mockutil.Context(),
gomock.AssignableToTypeOf(t),
filename,
).Return(
0,
"",
errors.New("ssh: failed to connect: dial tcp 127.0.0.1:2200: connect: connection refused"),
)
mock: func(ctrl *gomock.Controller) (sftp.Client, sftp.AsyncUpload) {
var fp *os.File

client := sftp_fake.NewMockClient(ctrl)
client.EXPECT().
Upload(
mockutil.Context(),
gomock.AssignableToTypeOf(fp),
filename,
).
Return(
"",
nil,
errors.New("ssh: failed to connect: dial tcp 127.0.0.1:2200: connect: connection refused"),
)

return client, nil
},
wantErr: "activity error (type: UploadTransferActivity, scheduledEventID: 0, startedEventID: 0, identity: ): UploadTransferActivity: ssh: failed to connect: dial tcp 127.0.0.1:2200: connect: connection refused",
},
Expand All @@ -85,19 +112,25 @@ func TestUploadTransferActivity(t *testing.T) {
params: am.UploadTransferActivityParams{
SourcePath: td.Join(filename),
},
recorder: func(m *sftp_fake.MockClientMockRecorder) {
var t *os.File
m.Upload(
mockutil.Context(),
gomock.AssignableToTypeOf(t),
filename,
).Return(
0,
"",
&sftp.AuthError{
Message: "ssh: handshake failed: ssh: unable to authenticate, attempted methods [none publickey], no supported methods remain",
},
)
mock: func(ctrl *gomock.Controller) (sftp.Client, sftp.AsyncUpload) {
var fp *os.File

client := sftp_fake.NewMockClient(ctrl)
client.EXPECT().
Upload(
mockutil.Context(),
gomock.AssignableToTypeOf(fp),
filename,
).
Return(
"",
nil,
&sftp.AuthError{
Message: "ssh: handshake failed: ssh: unable to authenticate, attempted methods [none publickey], no supported methods remain",
},
)

return client, nil
},
wantErr: "activity error (type: UploadTransferActivity, scheduledEventID: 0, startedEventID: 0, identity: ): UploadTransferActivity: auth: ssh: handshake failed: ssh: unable to authenticate, attempted methods [none publickey], no supported methods remain",
wantNonRetryErr: true,
Expand All @@ -109,14 +142,15 @@ func TestUploadTransferActivity(t *testing.T) {

ts := &temporalsdk_testsuite.WorkflowTestSuite{}
env := ts.NewTestActivityEnvironment()
msvc := sftp_fake.NewMockClient(gomock.NewController(t))
ctrl := gomock.NewController(t)

if tt.recorder != nil {
tt.recorder(msvc.EXPECT())
var client sftp.Client
if tt.mock != nil {
client, _ = tt.mock(ctrl)
}

env.RegisterActivityWithOptions(
am.NewUploadTransferActivity(logr.Discard(), msvc).Execute,
am.NewUploadTransferActivity(logr.Discard(), client, 2*time.Millisecond).Execute,
temporalsdk_activity.RegisterOptions{
Name: am.UploadTransferActivityName,
},
Expand Down
Loading

0 comments on commit 6119236

Please sign in to comment.