Skip to content

Commit

Permalink
Make upload asynchronous
Browse files Browse the repository at this point in the history
When Upload() is called, start an asynchronous routine for the upload,
then immediately return. The return includes an `AsyncUpload` struct
that receives updates from the upload goroutine, and can be queried for
the upload status.
  • Loading branch information
djjuhasz committed Jan 16, 2024
1 parent 6fefe84 commit 7809e74
Show file tree
Hide file tree
Showing 11 changed files with 472 additions and 387 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ gen-mock: $(MOCKGEN)
mockgen -typed -destination=./internal/api/auth/fake/mock_ticket_store.go -package=fake github.com/artefactual-sdps/enduro/internal/api/auth TicketStore
mockgen -typed -destination=./internal/package_/fake/mock_package_.go -package=fake github.com/artefactual-sdps/enduro/internal/package_ Service
mockgen -typed -destination=./internal/persistence/fake/mock_persistence.go -package=fake github.com/artefactual-sdps/enduro/internal/persistence Service
mockgen -typed -destination=./internal/sftp/fake/mock_sftp.go -package=fake github.com/artefactual-sdps/enduro/internal/sftp Client,Connection
mockgen -typed -destination=./internal/sftp/fake/mock_sftp.go -package=fake github.com/artefactual-sdps/enduro/internal/sftp Client,AsyncUpload
mockgen -typed -destination=./internal/storage/fake/mock_storage.go -package=fake github.com/artefactual-sdps/enduro/internal/storage Service
mockgen -typed -destination=./internal/storage/persistence/fake/mock_persistence.go -package=fake github.com/artefactual-sdps/enduro/internal/storage/persistence Storage
mockgen -typed -destination=./internal/upload/fake/mock_upload.go -package=fake github.com/artefactual-sdps/enduro/internal/upload Service
Expand Down
7 changes: 1 addition & 6 deletions internal/am/delete_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@ func (a *DeleteTransferActivity) Execute(ctx context.Context, params *DeleteTran
"destination", params.Destination,
)

conn, err := a.client.Dial(ctx)
if err != nil {
return fmt.Errorf("delete transfer: conn: %v", err)
}

err = conn.Delete(ctx, params.Destination)
err := a.client.Delete(ctx, params.Destination)
if err != nil {
return fmt.Errorf("delete transfer: path: %q: %v", params.Destination, err)
}
Expand Down
17 changes: 0 additions & 17 deletions internal/am/delete_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,8 @@ func TestDeleteTransferActivity(t *testing.T) {
},
mock: func(ctrl *gomock.Controller) *sftp_fake.MockClient {
mclient := sftp_fake.NewMockClient(ctrl)
mconn := sftp_fake.NewMockConnection(ctrl)

mclient.EXPECT().
Dial(mockutil.Context()).
Return(mconn, nil)

mconn.EXPECT().
Delete(mockutil.Context(), td.Path()).
Return(nil)

Expand All @@ -60,13 +55,7 @@ func TestDeleteTransferActivity(t *testing.T) {
},
mock: func(ctrl *gomock.Controller) *sftp_fake.MockClient {
mclient := sftp_fake.NewMockClient(ctrl)
mconn := sftp_fake.NewMockConnection(ctrl)

mclient.EXPECT().
Dial(mockutil.Context()).
Return(mconn, nil)

mconn.EXPECT().
Delete(mockutil.Context(), td.Join("missing")).
Return(
errors.New("SFTP: unable to remove file \"test.txt\": file does not exist"),
Expand All @@ -83,13 +72,7 @@ func TestDeleteTransferActivity(t *testing.T) {
},
mock: func(ctrl *gomock.Controller) *sftp_fake.MockClient {
mclient := sftp_fake.NewMockClient(ctrl)
mconn := sftp_fake.NewMockConnection(ctrl)

mclient.EXPECT().
Dial(mockutil.Context()).
Return(mconn, nil)

mconn.EXPECT().
Delete(mockutil.Context(), td.Join(filename)).
Return(
errors.New("SSH: failed to connect: dial tcp 127.0.0.1:2200: connect: connection refused"),
Expand Down
43 changes: 20 additions & 23 deletions internal/am/upload_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,8 @@ func (a *UploadTransferActivity) Execute(ctx context.Context, params *UploadTran
}
defer src.Close()

fi, err := src.Stat()
if err != nil {
return nil, fmt.Errorf("%s: %v", UploadTransferActivityName, err)
}

conn, err := a.client.Dial(ctx)
if err != nil {
return nil, fmt.Errorf("sftp: dial: %v", err)
}
defer conn.Close()

done := make(chan bool, 1)
go a.Heartbeat(ctx, conn, done, fi.Size())

filename := filepath.Base(params.SourcePath)
bytes, path, err := conn.Upload(ctx, src, filename)
path, upload, err := a.client.Upload(ctx, src, filename)
if err != nil {
e := fmt.Errorf("%s: %v", UploadTransferActivityName, err)

Expand All @@ -82,30 +68,41 @@ func (a *UploadTransferActivity) Execute(ctx context.Context, params *UploadTran
}
}

done <- true
fi, err := src.Stat()
if err != nil {
return nil, fmt.Errorf("%s: %v", UploadTransferActivityName, err)
}

// Block (with heartbeat) until ctx is cancelled, or the upload is complete
// or stops with an error.
err = a.Heartbeat(ctx, upload, fi.Size())
if err != nil {
return nil, err
}

return &UploadTransferActivityResult{
BytesCopied: bytes,
BytesCopied: upload.Bytes(),
RemoteFullPath: path,
RemoteRelativePath: filename,
}, nil
}

func (a *UploadTransferActivity) Heartbeat(ctx context.Context, conn sftp.Connection, done chan bool, fileSize int64) {
func (a *UploadTransferActivity) Heartbeat(ctx context.Context, upload sftp.AsyncUpload, fileSize int64) error {
ticker := time.NewTicker(a.heartRate)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-done:
return
return ctx.Err()
case err := <-upload.Error():
return err
case <-upload.Done():
return nil
case <-ticker.C:
temporalsdk_activity.RecordHeartbeat(ctx,
fmt.Sprintf("uploaded %d bytes of %d.", conn.Progress(), fileSize),
fmt.Sprintf("uploaded %d bytes of %d.", upload.Bytes(), fileSize),
)
continue
}
}
}
198 changes: 97 additions & 101 deletions internal/am/upload_transfer_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package am_test

import (
"context"
"errors"
"fmt"
"io"
"os"
"testing"
"time"
Expand Down Expand Up @@ -32,7 +28,7 @@ func TestUploadTransferActivity(t *testing.T) {
type test struct {
name string
params am.UploadTransferActivityParams
mock func(*gomock.Controller) *sftp_fake.MockClient
mock func(*gomock.Controller) (sftp.Client, sftp.AsyncUpload)
want am.UploadTransferActivityResult
wantErr string
wantNonRetryErr bool
Expand All @@ -43,116 +39,116 @@ func TestUploadTransferActivity(t *testing.T) {
params: am.UploadTransferActivityParams{
SourcePath: td.Join(filename),
},
mock: func(ctrl *gomock.Controller) *sftp_fake.MockClient {
mock: func(ctrl *gomock.Controller) (sftp.Client, sftp.AsyncUpload) {
var fp *os.File

mclient := sftp_fake.NewMockClient(ctrl)
mconn := sftp_fake.NewMockConnection(ctrl)
client := sftp_fake.NewMockClient(ctrl)
upload := sftp_fake.NewMockAsyncUpload(ctrl)

mclient.EXPECT().
Dial(mockutil.Context()).
Return(mconn, nil)

mconn.EXPECT().
client.EXPECT().
Upload(
mockutil.Context(),
gomock.AssignableToTypeOf(fp),
filename,
).
DoAndReturn(
func(context.Context, io.Reader, string) (int64, string, error) {
time.Sleep(3 * time.Millisecond)
return int64(14), "/transfer_dir/" + filename, nil
},
)
Return("/transfer_dir/"+filename, upload, nil)

doneCh := make(chan bool, 1)
upload.EXPECT().Done().Return(doneCh).AnyTimes()

errCh := make(chan error, 1)
upload.EXPECT().Error().Return(errCh).AnyTimes()

mconn.EXPECT().Close()
mconn.EXPECT().Progress().Return(7)
upload.EXPECT().Bytes().DoAndReturn(func() int64 {
doneCh <- true
return int64(7)
})
upload.EXPECT().Bytes().Return(14)

return mclient
return client, upload
},
want: am.UploadTransferActivityResult{
BytesCopied: int64(14),
RemoteFullPath: "/transfer_dir/" + filename,
RemoteRelativePath: filename,
},
},
{
name: "Errors when local file can't be read",
params: am.UploadTransferActivityParams{
SourcePath: td.Join("missing"),
},
wantErr: fmt.Sprintf("activity error (type: UploadTransferActivity, scheduledEventID: 0, startedEventID: 0, identity: ): UploadTransferActivity: open %s: no such file or directory", td.Join("missing")),
},
{
name: "Retryable error when SSH connection fails",
params: am.UploadTransferActivityParams{
SourcePath: td.Join(filename),
},
mock: func(ctrl *gomock.Controller) *sftp_fake.MockClient {
var fp *os.File

mclient := sftp_fake.NewMockClient(ctrl)
mconn := sftp_fake.NewMockConnection(ctrl)

mclient.EXPECT().
Dial(mockutil.Context()).
Return(mconn, nil)

mconn.EXPECT().
Upload(
mockutil.Context(),
gomock.AssignableToTypeOf(fp),
filename,
).
Return(
0,
"",
errors.New("ssh: failed to connect: dial tcp 127.0.0.1:2200: connect: connection refused"),
)

mconn.EXPECT().Close()

return mclient
},
wantErr: "activity error (type: UploadTransferActivity, scheduledEventID: 0, startedEventID: 0, identity: ): UploadTransferActivity: ssh: failed to connect: dial tcp 127.0.0.1:2200: connect: connection refused",
},
{
name: "Non-retryable error when authentication fails",
params: am.UploadTransferActivityParams{
SourcePath: td.Join(filename),
},
mock: func(ctrl *gomock.Controller) *sftp_fake.MockClient {
var fp *os.File

mclient := sftp_fake.NewMockClient(ctrl)
mconn := sftp_fake.NewMockConnection(ctrl)

mclient.EXPECT().
Dial(mockutil.Context()).
Return(mconn, nil)

mconn.EXPECT().
Upload(
mockutil.Context(),
gomock.AssignableToTypeOf(fp),
filename,
).
Return(
0,
"",
&sftp.AuthError{
Message: "ssh: handshake failed: ssh: unable to authenticate, attempted methods [none publickey], no supported methods remain",
},
)

mconn.EXPECT().Close()

return mclient
},
wantErr: "activity error (type: UploadTransferActivity, scheduledEventID: 0, startedEventID: 0, identity: ): UploadTransferActivity: auth: ssh: handshake failed: ssh: unable to authenticate, attempted methods [none publickey], no supported methods remain",
wantNonRetryErr: true,
},
// {
// name: "Errors when local file can't be read",
// params: am.UploadTransferActivityParams{
// SourcePath: td.Join("missing"),
// },
// wantErr: fmt.Sprintf("activity error (type: UploadTransferActivity, scheduledEventID: 0, startedEventID: 0, identity: ): UploadTransferActivity: open %s: no such file or directory", td.Join("missing")),
// },
// {
// name: "Retryable error when SSH connection fails",
// params: am.UploadTransferActivityParams{
// SourcePath: td.Join(filename),
// },
// mock: func(ctrl *gomock.Controller) *sftp_fake.MockClient {
// var fp *os.File

// mclient := sftp_fake.NewMockClient(ctrl)
// mconn := sftp_fake.NewMockConnection(ctrl)

// mclient.EXPECT().
// Dial(mockutil.Context()).
// Return(mconn, nil)

// mconn.EXPECT().
// Upload(
// mockutil.Context(),
// gomock.AssignableToTypeOf(fp),
// filename,
// ).
// Return(
// 0,
// "",
// errors.New("ssh: failed to connect: dial tcp 127.0.0.1:2200: connect: connection refused"),
// )

// mconn.EXPECT().Close()

// return mclient
// },
// wantErr: "activity error (type: UploadTransferActivity, scheduledEventID: 0, startedEventID: 0, identity: ): UploadTransferActivity: ssh: failed to connect: dial tcp 127.0.0.1:2200: connect: connection refused",
// },
// {
// name: "Non-retryable error when authentication fails",
// params: am.UploadTransferActivityParams{
// SourcePath: td.Join(filename),
// },
// mock: func(ctrl *gomock.Controller) *sftp_fake.MockClient {
// var fp *os.File

// mclient := sftp_fake.NewMockClient(ctrl)
// mconn := sftp_fake.NewMockConnection(ctrl)

// mclient.EXPECT().
// Dial(mockutil.Context()).
// Return(mconn, nil)

// mconn.EXPECT().
// Upload(
// mockutil.Context(),
// gomock.AssignableToTypeOf(fp),
// filename,
// ).
// Return(
// 0,
// "",
// &sftp.AuthError{
// Message: "ssh: handshake failed: ssh: unable to authenticate, attempted methods [none publickey], no supported methods remain",
// },
// )

// mconn.EXPECT().Close()

// return mclient
// },
// wantErr: "activity error (type: UploadTransferActivity, scheduledEventID: 0, startedEventID: 0, identity: ): UploadTransferActivity: auth: ssh: handshake failed: ssh: unable to authenticate, attempted methods [none publickey], no supported methods remain",
// wantNonRetryErr: true,
// },
} {
tt := tt
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -162,13 +158,13 @@ func TestUploadTransferActivity(t *testing.T) {
env := ts.NewTestActivityEnvironment()
ctrl := gomock.NewController(t)

mclient := sftp_fake.NewMockClient(ctrl)
var client sftp.Client
if tt.mock != nil {
mclient = tt.mock(ctrl)
client, _ = tt.mock(ctrl)
}

env.RegisterActivityWithOptions(
am.NewUploadTransferActivity(logr.Discard(), mclient, 2*time.Millisecond).Execute,
am.NewUploadTransferActivity(logr.Discard(), client, 2*time.Millisecond).Execute,
temporalsdk_activity.RegisterOptions{
Name: am.UploadTransferActivityName,
},
Expand Down
Loading

0 comments on commit 7809e74

Please sign in to comment.