Skip to content

Commit

Permalink
Optionally send unzipped bags to Archivematica
Browse files Browse the repository at this point in the history
  • Loading branch information
mcantelon committed Feb 10, 2025
1 parent 48d050c commit cd97bb7
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 77 deletions.
3 changes: 3 additions & 0 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ transferDeadline = "1h"
# used.
transferSourcePath = ""

# transferType specifies the transfer type sent from Enduro to Archivematica.
transferType = "zipped bag"

[am.sftp]
host = "" # The Archivematica Storage Service hostname.
port = ""
Expand Down
2 changes: 2 additions & 0 deletions internal/am/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Config struct {
// "749ef452-fbed-4d50-9072-5f98bc01e52e:sftp_upload").
TransferSourcePath string

TransferType string

// Capacity sets the maximum number of worker sessions the worker can
// handle at one time (default: 1).
Capacity int
Expand Down
5 changes: 4 additions & 1 deletion internal/am/start_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type StartTransferActivityParams struct {
// Name of the transfer.
Name string

// Type of the transfer.
Type string

// RelativePath is the PIP path relative to the Archivematica transfer
// source directory.
RelativePath string
Expand Down Expand Up @@ -57,7 +60,7 @@ func (a *StartTransferActivity) Execute(

payload, resp, err := a.amps.Create(ctx, &amclient.PackageCreateRequest{
Name: opts.Name,
Type: "zipped bag",
Type: opts.Type,
Path: filepath.Join(a.cfg.TransferSourcePath, opts.RelativePath),
ProcessingConfig: processingConfig,
AutoApprove: true,
Expand Down
1 change: 1 addition & 0 deletions internal/am/start_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestStartTransferActivity(t *testing.T) {
transferID := uuid.New().String()
opts := am.StartTransferActivityParams{
Name: "Testing",
Type: "zipped bag",
RelativePath: "/tmp",
}

Expand Down
70 changes: 50 additions & 20 deletions internal/am/upload_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,35 +54,65 @@ func (a *UploadTransferActivity) Execute(
logger := temporal_tools.GetLogger(ctx)
logger.V(1).Info("Execute UploadTransferActivity", "SourcePath", params.SourcePath)

src, err := os.Open(params.SourcePath)
info, err := os.Stat(params.SourcePath)
if err != nil {
return nil, fmt.Errorf("%s: %v", UploadTransferActivityName, err)
}
defer src.Close()

var path string
var upload sftp.AsyncUpload

filename := filepath.Base(params.SourcePath)
path, upload, err := a.client.Upload(ctx, src, filename)
if err != nil {
e := fmt.Errorf("%s: %v", UploadTransferActivityName, err)
if info.IsDir() {
path, upload, err = a.client.UploadDirectory(ctx, params.SourcePath)
if err != nil {
e := fmt.Errorf("%s: %v", UploadTransferActivityName, err)

switch err.(type) {
case *sftp.AuthError:
return nil, temporal.NewNonRetryableError(e)
default:
return nil, e

Check warning on line 75 in internal/am/upload_transfer.go

View check run for this annotation

Codecov / codecov/patch

internal/am/upload_transfer.go#L67-L75

Added lines #L67 - L75 were not covered by tests
}
}

switch err.(type) {
case *sftp.AuthError:
return nil, temporal.NewNonRetryableError(e)
default:
return nil, e
// Block (with a heartbeat) until ctx is cancelled, the upload is done, or
// it stops with an error.
err = a.Heartbeat(ctx, upload, 100)
if err != nil {
return nil, err

Check warning on line 83 in internal/am/upload_transfer.go

View check run for this annotation

Codecov / codecov/patch

internal/am/upload_transfer.go#L81-L83

Added lines #L81 - L83 were not covered by tests
}
}

fi, err := src.Stat()
if err != nil {
return nil, fmt.Errorf("%s: %v", UploadTransferActivityName, err)
}
} else {
src, err := os.Open(params.SourcePath)
if err != nil {
return nil, fmt.Errorf("%s: %v", UploadTransferActivityName, err)
}

Check warning on line 90 in internal/am/upload_transfer.go

View check run for this annotation

Codecov / codecov/patch

internal/am/upload_transfer.go#L89-L90

Added lines #L89 - L90 were not covered by tests
defer src.Close()

path, upload, err = a.client.UploadFile(ctx, src, filename)
if err != nil {
e := fmt.Errorf("%s: %v", UploadTransferActivityName, err)

switch err.(type) {
case *sftp.AuthError:
return nil, temporal.NewNonRetryableError(e)
default:
return nil, e
}
}

// Block (with a heartbeat) until ctx is cancelled, the upload is done, or
// it stops with an error.
err = a.Heartbeat(ctx, upload, fi.Size())
if err != nil {
return nil, err
fi, err := src.Stat()
if err != nil {
return nil, fmt.Errorf("%s: %v", UploadTransferActivityName, err)
}

Check warning on line 108 in internal/am/upload_transfer.go

View check run for this annotation

Codecov / codecov/patch

internal/am/upload_transfer.go#L107-L108

Added lines #L107 - L108 were not covered by tests

// Block (with a heartbeat) until ctx is cancelled, the upload is done, or
// it stops with an error.
err = a.Heartbeat(ctx, upload, fi.Size())
if err != nil {
return nil, err
}

Check warning on line 115 in internal/am/upload_transfer.go

View check run for this annotation

Codecov / codecov/patch

internal/am/upload_transfer.go#L114-L115

Added lines #L114 - L115 were not covered by tests
}

return &UploadTransferActivityResult{
Expand Down
10 changes: 5 additions & 5 deletions internal/am/upload_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ func TestUploadTransferActivity(t *testing.T) {
var fp *os.File

client := sftp_fake.NewMockClient(ctrl)
upload := sftp_fake.NewMockAsyncUpload(ctrl)
upload := sftp_fake.NewMockAsyncUploadFile(ctrl)

client.EXPECT().
Upload(
UploadFile(
mockutil.Context(),
gomock.AssignableToTypeOf(fp),
filename,
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestUploadTransferActivity(t *testing.T) {
params: am.UploadTransferActivityParams{
SourcePath: td.Join("missing"),
},
wantErr: fmt.Sprintf("activity error (type: UploadTransferActivity, scheduledEventID: 0, startedEventID: 0, identity: ): UploadTransferActivity: open %s: no such file or directory", td.Join("missing")),
wantErr: fmt.Sprintf("activity error (type: UploadTransferActivity, scheduledEventID: 0, startedEventID: 0, identity: ): UploadTransferActivity: stat %s: no such file or directory", td.Join("missing")),
},
{
name: "Retryable error when SSH connection fails",
Expand All @@ -93,7 +93,7 @@ func TestUploadTransferActivity(t *testing.T) {

client := sftp_fake.NewMockClient(ctrl)
client.EXPECT().
Upload(
UploadFile(
mockutil.Context(),
gomock.AssignableToTypeOf(fp),
filename,
Expand All @@ -118,7 +118,7 @@ func TestUploadTransferActivity(t *testing.T) {

client := sftp_fake.NewMockClient(ctrl)
client.EXPECT().
Upload(
UploadFile(
mockutil.Context(),
gomock.AssignableToTypeOf(fp),
filename,
Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func Read(config *Configuration, configFile string) (found bool, configFileUsed
v.SetDefault("a3m.processing", a3m.ProcessingDefault)
v.SetDefault("am.capacity", 1)
v.SetDefault("am.pollInterval", 10*time.Second)
v.SetDefault("am.transferType", "zipped bag")
v.SetDefault("api.listen", "127.0.0.1:9000")
v.SetDefault("debugListen", "127.0.0.1:9001")
v.SetDefault("preservation.taskqueue", temporal.A3mWorkerTaskQueue)
Expand Down
3 changes: 2 additions & 1 deletion internal/sftp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type Client interface {
Delete(ctx context.Context, dest string) error
// Upload asynchronously copies data from the src reader to the specified
// dest on the SFTP server.
Upload(ctx context.Context, src io.Reader, dest string) (remotePath string, upload AsyncUpload, err error)
UploadFile(ctx context.Context, src io.Reader, dest string) (remotePath string, upload AsyncUpload, err error)
UploadDirectory(ctx context.Context, srcPath string) (remotePath string, upload AsyncUpload, err error)
}

// AsyncUpload provides information about an upload happening asynchronously in
Expand Down
86 changes: 60 additions & 26 deletions internal/sftp/fake/mock_sftp.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cd97bb7

Please sign in to comment.