diff --git a/enduro.toml b/enduro.toml index 29e874817..596c92f8d 100644 --- a/enduro.toml +++ b/enduro.toml @@ -179,6 +179,9 @@ transferDeadline = "1h" # used. transferSourcePath = "" +# transferType specifies the transfer type sent from Enduro to Archivematica. +transferType = "zipped bag" + [am.sftp] host = "" # The Archivematica Storage Service hostname. port = "" diff --git a/internal/am/config.go b/internal/am/config.go index 2e77ed96c..2e794f3a0 100644 --- a/internal/am/config.go +++ b/internal/am/config.go @@ -30,6 +30,8 @@ type Config struct { // "749ef452-fbed-4d50-9072-5f98bc01e52e:sftp_upload"). TransferSourcePath string + TransferType string + // Capacity sets the maximum number of worker sessions the worker can // handle at one time (default: 1). Capacity int diff --git a/internal/am/start_transfer.go b/internal/am/start_transfer.go index 14287149e..618535e17 100644 --- a/internal/am/start_transfer.go +++ b/internal/am/start_transfer.go @@ -19,6 +19,9 @@ type StartTransferActivityParams struct { // Name of the transfer. Name string + // Type of the transfer. + Type string + // RelativePath is the PIP path relative to the Archivematica transfer // source directory. RelativePath string @@ -57,7 +60,7 @@ func (a *StartTransferActivity) Execute( payload, resp, err := a.amps.Create(ctx, &amclient.PackageCreateRequest{ Name: opts.Name, - Type: "zipped bag", + Type: opts.Type, Path: filepath.Join(a.cfg.TransferSourcePath, opts.RelativePath), ProcessingConfig: processingConfig, AutoApprove: true, diff --git a/internal/am/start_transfer_test.go b/internal/am/start_transfer_test.go index 74b05748a..f288d0f12 100644 --- a/internal/am/start_transfer_test.go +++ b/internal/am/start_transfer_test.go @@ -23,6 +23,7 @@ func TestStartTransferActivity(t *testing.T) { transferID := uuid.New().String() opts := am.StartTransferActivityParams{ Name: "Testing", + Type: "zipped bag", RelativePath: "/tmp", } diff --git a/internal/am/upload_transfer.go b/internal/am/upload_transfer.go index 3bc847c78..92bbe7ca2 100644 --- a/internal/am/upload_transfer.go +++ b/internal/am/upload_transfer.go @@ -54,35 +54,65 @@ func (a *UploadTransferActivity) Execute( logger := temporal_tools.GetLogger(ctx) logger.V(1).Info("Execute UploadTransferActivity", "SourcePath", params.SourcePath) - src, err := os.Open(params.SourcePath) + info, err := os.Stat(params.SourcePath) if err != nil { return nil, fmt.Errorf("%s: %v", UploadTransferActivityName, err) } - defer src.Close() + + var path string + var upload sftp.AsyncUpload filename := filepath.Base(params.SourcePath) - path, upload, err := a.client.Upload(ctx, src, filename) - if err != nil { - e := fmt.Errorf("%s: %v", UploadTransferActivityName, err) + if info.IsDir() { + path, upload, err = a.client.UploadDirectory(ctx, params.SourcePath) + if err != nil { + e := fmt.Errorf("%s: %v", UploadTransferActivityName, err) + + switch err.(type) { + case *sftp.AuthError: + return nil, temporal.NewNonRetryableError(e) + default: + return nil, e + } + } - switch err.(type) { - case *sftp.AuthError: - return nil, temporal.NewNonRetryableError(e) - default: - return nil, e + // Block (with a heartbeat) until ctx is cancelled, the upload is done, or + // it stops with an error. + err = a.Heartbeat(ctx, upload, 100) + if err != nil { + return nil, err } - } - fi, err := src.Stat() - if err != nil { - return nil, fmt.Errorf("%s: %v", UploadTransferActivityName, err) - } + } else { + src, err := os.Open(params.SourcePath) + if err != nil { + return nil, fmt.Errorf("%s: %v", UploadTransferActivityName, err) + } + defer src.Close() + + path, upload, err = a.client.UploadFile(ctx, src, filename) + if err != nil { + e := fmt.Errorf("%s: %v", UploadTransferActivityName, err) + + switch err.(type) { + case *sftp.AuthError: + return nil, temporal.NewNonRetryableError(e) + default: + return nil, e + } + } - // Block (with a heartbeat) until ctx is cancelled, the upload is done, or - // it stops with an error. - err = a.Heartbeat(ctx, upload, fi.Size()) - if err != nil { - return nil, err + fi, err := src.Stat() + if err != nil { + return nil, fmt.Errorf("%s: %v", UploadTransferActivityName, err) + } + + // Block (with a heartbeat) until ctx is cancelled, the upload is done, or + // it stops with an error. + err = a.Heartbeat(ctx, upload, fi.Size()) + if err != nil { + return nil, err + } } return &UploadTransferActivityResult{ diff --git a/internal/am/upload_transfer_test.go b/internal/am/upload_transfer_test.go index 220047771..46fbd8a12 100644 --- a/internal/am/upload_transfer_test.go +++ b/internal/am/upload_transfer_test.go @@ -46,10 +46,10 @@ func TestUploadTransferActivity(t *testing.T) { var fp *os.File client := sftp_fake.NewMockClient(ctrl) - upload := sftp_fake.NewMockAsyncUpload(ctrl) + upload := sftp_fake.NewMockAsyncUploadFile(ctrl) client.EXPECT(). - Upload( + UploadFile( mockutil.Context(), gomock.AssignableToTypeOf(fp), filename, @@ -81,7 +81,7 @@ func TestUploadTransferActivity(t *testing.T) { params: am.UploadTransferActivityParams{ SourcePath: td.Join("missing"), }, - wantErr: fmt.Sprintf("activity error (type: UploadTransferActivity, scheduledEventID: 0, startedEventID: 0, identity: ): UploadTransferActivity: open %s: no such file or directory", td.Join("missing")), + wantErr: fmt.Sprintf("activity error (type: UploadTransferActivity, scheduledEventID: 0, startedEventID: 0, identity: ): UploadTransferActivity: stat %s: no such file or directory", td.Join("missing")), }, { name: "Retryable error when SSH connection fails", @@ -93,7 +93,7 @@ func TestUploadTransferActivity(t *testing.T) { client := sftp_fake.NewMockClient(ctrl) client.EXPECT(). - Upload( + UploadFile( mockutil.Context(), gomock.AssignableToTypeOf(fp), filename, @@ -118,7 +118,7 @@ func TestUploadTransferActivity(t *testing.T) { client := sftp_fake.NewMockClient(ctrl) client.EXPECT(). - Upload( + UploadFile( mockutil.Context(), gomock.AssignableToTypeOf(fp), filename, diff --git a/internal/config/config.go b/internal/config/config.go index 33e3f04a4..2b9988e66 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -86,6 +86,7 @@ func Read(config *Configuration, configFile string) (found bool, configFileUsed v.SetDefault("a3m.processing", a3m.ProcessingDefault) v.SetDefault("am.capacity", 1) v.SetDefault("am.pollInterval", 10*time.Second) + v.SetDefault("am.transferType", "zipped bag") v.SetDefault("api.listen", "127.0.0.1:9000") v.SetDefault("debugListen", "127.0.0.1:9001") v.SetDefault("preservation.taskqueue", temporal.A3mWorkerTaskQueue) diff --git a/internal/sftp/client.go b/internal/sftp/client.go index 398036e8e..48c5d76ff 100644 --- a/internal/sftp/client.go +++ b/internal/sftp/client.go @@ -32,7 +32,8 @@ type Client interface { Delete(ctx context.Context, dest string) error // Upload asynchronously copies data from the src reader to the specified // dest on the SFTP server. - Upload(ctx context.Context, src io.Reader, dest string) (remotePath string, upload AsyncUpload, err error) + UploadFile(ctx context.Context, src io.Reader, dest string) (remotePath string, upload AsyncUpload, err error) + UploadDirectory(ctx context.Context, srcPath string) (remotePath string, upload AsyncUpload, err error) } // AsyncUpload provides information about an upload happening asynchronously in diff --git a/internal/sftp/fake/mock_sftp.go b/internal/sftp/fake/mock_sftp.go index 1608c166f..8113ffb38 100644 --- a/internal/sftp/fake/mock_sftp.go +++ b/internal/sftp/fake/mock_sftp.go @@ -80,7 +80,7 @@ func (c *MockClientDeleteCall) DoAndReturn(f func(context.Context, string) error } // Upload mocks base method. -func (m *MockClient) Upload(arg0 context.Context, arg1 io.Reader, arg2 string) (string, sftp.AsyncUpload, error) { +func (m *MockClient) UploadFile(arg0 context.Context, arg1 io.Reader, arg2 string) (string, sftp.AsyncUpload, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Upload", arg0, arg1, arg2) ret0, _ := ret[0].(string) @@ -89,10 +89,27 @@ func (m *MockClient) Upload(arg0 context.Context, arg1 io.Reader, arg2 string) ( return ret0, ret1, ret2 } +// UploadDirectory mocks base method. +func (m *MockClient) UploadDirectory(arg0 context.Context, arg1 string) (string, sftp.AsyncUpload, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Upload", arg0, arg1) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(sftp.AsyncUpload) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + // Upload indicates an expected call of Upload. -func (mr *MockClientMockRecorder) Upload(arg0, arg1, arg2 any) *MockClientUploadCall { +func (mr *MockClientMockRecorder) UploadFile(arg0, arg1, arg2 any) *MockClientUploadCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Upload", reflect.TypeOf((*MockClient)(nil).UploadFile), arg0, arg1, arg2) + return &MockClientUploadCall{Call: call} +} + +// UploadDirectory indicates an expected call of Upload. +func (mr *MockClientMockRecorder) UploadDirectory(arg0, arg1, arg2 any) *MockClientUploadCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Upload", reflect.TypeOf((*MockClient)(nil).Upload), arg0, arg1, arg2) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UploadDirectory", reflect.TypeOf((*MockClient)(nil).UploadFile), arg0, arg1, arg2) return &MockClientUploadCall{Call: call} } @@ -101,6 +118,11 @@ type MockClientUploadCall struct { *gomock.Call } +// MockClientUploadDirectoryCall wrap *gomock.Call +type MockClientUploadDirectoryCall struct { + *gomock.Call +} + // Return rewrite *gomock.Call.Return func (c *MockClientUploadCall) Return(arg0 string, arg1 sftp.AsyncUpload, arg2 error) *MockClientUploadCall { c.Call = c.Call.Return(arg0, arg1, arg2) @@ -119,31 +141,43 @@ func (c *MockClientUploadCall) DoAndReturn(f func(context.Context, io.Reader, st return c } +// Do rewrite *gomock.Call.Do +func (c *MockClientUploadDirectoryCall) Do(f func(context.Context, io.Reader, string) (string, sftp.AsyncUpload, error)) *MockClientUploadDirectoryCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockClientUploadDirectoryCall) DoAndReturn(f func(context.Context, io.Reader, string) (string, sftp.AsyncUpload, error)) *MockClientUploadDirectoryCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // MockAsyncUpload is a mock of AsyncUpload interface. -type MockAsyncUpload struct { +type MockAsyncUploadFile struct { ctrl *gomock.Controller - recorder *MockAsyncUploadMockRecorder + recorder *MockAsyncUploadFileMockRecorder } // MockAsyncUploadMockRecorder is the mock recorder for MockAsyncUpload. -type MockAsyncUploadMockRecorder struct { - mock *MockAsyncUpload +type MockAsyncUploadFileMockRecorder struct { + mock *MockAsyncUploadFile } // NewMockAsyncUpload creates a new mock instance. -func NewMockAsyncUpload(ctrl *gomock.Controller) *MockAsyncUpload { - mock := &MockAsyncUpload{ctrl: ctrl} - mock.recorder = &MockAsyncUploadMockRecorder{mock} +func NewMockAsyncUploadFile(ctrl *gomock.Controller) *MockAsyncUploadFile { + mock := &MockAsyncUploadFile{ctrl: ctrl} + mock.recorder = &MockAsyncUploadFileMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockAsyncUpload) EXPECT() *MockAsyncUploadMockRecorder { +func (m *MockAsyncUploadFile) EXPECT() *MockAsyncUploadFileMockRecorder { return m.recorder } // Bytes mocks base method. -func (m *MockAsyncUpload) Bytes() int64 { +func (m *MockAsyncUploadFile) Bytes() int64 { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Bytes") ret0, _ := ret[0].(int64) @@ -151,9 +185,9 @@ func (m *MockAsyncUpload) Bytes() int64 { } // Bytes indicates an expected call of Bytes. -func (mr *MockAsyncUploadMockRecorder) Bytes() *MockAsyncUploadBytesCall { +func (mr *MockAsyncUploadFileMockRecorder) Bytes() *MockAsyncUploadBytesCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bytes", reflect.TypeOf((*MockAsyncUpload)(nil).Bytes)) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bytes", reflect.TypeOf((*MockAsyncUploadFile)(nil).Bytes)) return &MockAsyncUploadBytesCall{Call: call} } @@ -181,7 +215,7 @@ func (c *MockAsyncUploadBytesCall) DoAndReturn(f func() int64) *MockAsyncUploadB } // Close mocks base method. -func (m *MockAsyncUpload) Close() error { +func (m *MockAsyncUploadFile) Close() error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close") ret0, _ := ret[0].(error) @@ -189,9 +223,9 @@ func (m *MockAsyncUpload) Close() error { } // Close indicates an expected call of Close. -func (mr *MockAsyncUploadMockRecorder) Close() *MockAsyncUploadCloseCall { +func (mr *MockAsyncUploadFileMockRecorder) Close() *MockAsyncUploadCloseCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockAsyncUpload)(nil).Close)) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockAsyncUploadFile)(nil).Close)) return &MockAsyncUploadCloseCall{Call: call} } @@ -219,7 +253,7 @@ func (c *MockAsyncUploadCloseCall) DoAndReturn(f func() error) *MockAsyncUploadC } // Done mocks base method. -func (m *MockAsyncUpload) Done() chan bool { +func (m *MockAsyncUploadFile) Done() chan bool { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Done") ret0, _ := ret[0].(chan bool) @@ -227,9 +261,9 @@ func (m *MockAsyncUpload) Done() chan bool { } // Done indicates an expected call of Done. -func (mr *MockAsyncUploadMockRecorder) Done() *MockAsyncUploadDoneCall { +func (mr *MockAsyncUploadFileMockRecorder) Done() *MockAsyncUploadDoneCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockAsyncUpload)(nil).Done)) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockAsyncUploadFile)(nil).Done)) return &MockAsyncUploadDoneCall{Call: call} } @@ -257,7 +291,7 @@ func (c *MockAsyncUploadDoneCall) DoAndReturn(f func() chan bool) *MockAsyncUplo } // Err mocks base method. -func (m *MockAsyncUpload) Err() chan error { +func (m *MockAsyncUploadFile) Err() chan error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Err") ret0, _ := ret[0].(chan error) @@ -265,9 +299,9 @@ func (m *MockAsyncUpload) Err() chan error { } // Err indicates an expected call of Err. -func (mr *MockAsyncUploadMockRecorder) Err() *MockAsyncUploadErrCall { +func (mr *MockAsyncUploadFileMockRecorder) Err() *MockAsyncUploadErrCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockAsyncUpload)(nil).Err)) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockAsyncUploadFile)(nil).Err)) return &MockAsyncUploadErrCall{Call: call} } @@ -295,7 +329,7 @@ func (c *MockAsyncUploadErrCall) DoAndReturn(f func() chan error) *MockAsyncUplo } // Write mocks base method. -func (m *MockAsyncUpload) Write(arg0 []byte) (int, error) { +func (m *MockAsyncUploadFile) Write(arg0 []byte) (int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Write", arg0) ret0, _ := ret[0].(int) @@ -304,9 +338,9 @@ func (m *MockAsyncUpload) Write(arg0 []byte) (int, error) { } // Write indicates an expected call of Write. -func (mr *MockAsyncUploadMockRecorder) Write(arg0 any) *MockAsyncUploadWriteCall { +func (mr *MockAsyncUploadFileMockRecorder) Write(arg0 any) *MockAsyncUploadWriteCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockAsyncUpload)(nil).Write), arg0) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockAsyncUploadFile)(nil).Write), arg0) return &MockAsyncUploadWriteCall{Call: call} } diff --git a/internal/sftp/goclient.go b/internal/sftp/goclient.go index 8b821b863..e16782da4 100644 --- a/internal/sftp/goclient.go +++ b/internal/sftp/goclient.go @@ -7,6 +7,7 @@ import ( "io" "io/fs" "os" + "path/filepath" "regexp" "strconv" @@ -55,7 +56,7 @@ func (c *GoClient) Delete(ctx context.Context, dest string) error { return nil } -// Upload asynchronously copies the src data to dest over an SFTP connection. +// UploadFile asynchronously copies the src data to dest over an SFTP connection. // // When Upload is called it starts the upload in an asynchronous goroutine, then // immediately returns the full remote path, and an AsyncUpload struct that @@ -66,7 +67,7 @@ func (c *GoClient) Delete(ctx context.Context, dest string) error { // `AsyncUpload.Error()` channel and the upload is terminated. If a ctx // cancellation signal is received, the `ctx.Err()` error will be sent to the // `AsyncUpload.Error()` channel, and the upload is terminated. -func (c *GoClient) Upload(ctx context.Context, src io.Reader, dest string) (string, AsyncUpload, error) { +func (c *GoClient) UploadFile(ctx context.Context, src io.Reader, dest string) (string, AsyncUpload, error) { remotePath := sftp.Join(c.cfg.RemoteDir, dest) conn, err := c.dial(ctx) @@ -76,11 +77,67 @@ func (c *GoClient) Upload(ctx context.Context, src io.Reader, dest string) (stri // Asynchronously upload file. upload := NewAsyncUpload(conn) - go remoteCopy(ctx, &upload, src, remotePath) + go remoteCopy(ctx, &upload, src, remotePath, true) return remotePath, &upload, nil } +// UploadDirectory asynchronously copies a directory to dest over an SFTP connection. +func (c *GoClient) UploadDirectory(ctx context.Context, srcPath string) (string, AsyncUpload, error) { + transferDir := filepath.Base(srcPath) + + conn, err := c.dial(ctx) + if err != nil { + return "", nil, err + } + + upload := NewAsyncUpload(conn) + go uploadDirectory(ctx, conn, srcPath, c.cfg.RemoteDir, &upload) + + return sftp.Join(c.cfg.RemoteDir, transferDir), &upload, nil +} + +func uploadDirectory(ctx context.Context, conn *connection, srcPath string, remoteDir string, upload *AsyncUploadImpl) { + defer upload.Close() + + transferDir := filepath.Base(srcPath) + + err := filepath.WalkDir(srcPath, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + relPath, err := filepath.Rel(srcPath, path) + if err != nil { + return err + } + + remotePath := sftp.Join(remoteDir, transferDir, relPath) + + if !d.IsDir() { + f, err := os.Open(path) // #nosec G304 -- trusted file path. + if err != nil { + return err + } + defer f.Close() + + remoteCopy(ctx, upload, f, remotePath, false) + } else { + err = conn.Client.MkdirAll(remotePath) + if err != nil { + return err + } + } + + return nil + }) + if err != nil { + upload.Err() <- fmt.Errorf("goclient: %v", err) + } + + upload.Done() <- true +} + // dial connects to an SSH host, creates an SFTP client on the connection, then // returns conn. When conn is no longer needed, conn.close() must be called to // prevent leaks. @@ -107,8 +164,10 @@ func (c *GoClient) dial(ctx context.Context) (*connection, error) { // remoteCopy copies data from the src reader to a remote file at dest, and // updates upload progress asynchronously. Upload status and progress will be // sent to the upload struct via the `upload.Done()` and `upload.Error()` channels. -func remoteCopy(ctx context.Context, upload *AsyncUploadImpl, src io.Reader, dest string) { - defer upload.Close() +func remoteCopy(ctx context.Context, upload *AsyncUploadImpl, src io.Reader, dest string, async bool) { + if async { + defer upload.Close() + } // Note: Some SFTP servers don't support O_RDWR mode. w, err := upload.conn.OpenFile(dest, (os.O_WRONLY | os.O_CREATE | os.O_TRUNC)) @@ -127,10 +186,11 @@ func remoteCopy(ctx context.Context, upload *AsyncUploadImpl, src io.Reader, des _, err = io.Copy(contextio.NewWriter(ctx, w), src) if err != nil { upload.Err() <- fmt.Errorf("remote copy: %v", err) - return } - upload.Done() <- true + if async { + upload.Done() <- true + } } var statusCodeRegex = regexp.MustCompile(`\(SSH_[A-Z_]+\)$`) diff --git a/internal/sftp/goclient_test.go b/internal/sftp/goclient_test.go index 27fa6ea5b..0ecd33f6a 100644 --- a/internal/sftp/goclient_test.go +++ b/internal/sftp/goclient_test.go @@ -306,7 +306,7 @@ func TestUpload(t *testing.T) { tc.cfg.RemoteDir = remoteDir.Path() client := sftp.NewGoClient(logr.Discard(), tc.cfg) - remotePath, upload, err := client.Upload(context.Background(), tc.params.src, tc.params.dest) + remotePath, upload, err := client.UploadFile(context.Background(), tc.params.src, tc.params.dest) if tc.wantErr != nil { assert.Error(t, err, tc.wantErr.Error()) assert.Assert(t, reflect.TypeOf(err) == reflect.TypeOf(tc.wantErr)) diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index fb16e68ec..f5ddba650 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -908,23 +908,29 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo return err } - // Zip PIP. - activityOpts := withActivityOptsForLocalAction(ctx) - var zipResult archivezip.Result - err = temporalsdk_workflow.ExecuteActivity( - activityOpts, - archivezip.Name, - &archivezip.Params{SourceDir: tinfo.TempPath}, - ).Get(activityOpts, &zipResult) - if err != nil { - return err - } + var sourcePath string + if w.cfg.AM.TransferType == "zipped bag" { + // Zip PIP. + activityOpts := withActivityOptsForLocalAction(ctx) + var zipResult archivezip.Result + err = temporalsdk_workflow.ExecuteActivity( + activityOpts, + archivezip.Name, + &archivezip.Params{SourceDir: tinfo.TempPath}, + ).Get(activityOpts, &zipResult) + if err != nil { + return err + } - tinfo.SendToFailed.Path = zipResult.Path - tinfo.SendToFailed.ActivityName = activities.SendToFailedPIPsName + tinfo.SendToFailed.Path = zipResult.Path + tinfo.SendToFailed.ActivityName = activities.SendToFailedPIPsName + sourcePath = zipResult.Path + } else { + sourcePath = tinfo.TempPath + } // Upload PIP to AMSS. - activityOpts = temporalsdk_workflow.WithActivityOptions(ctx, + activityOpts := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{ StartToCloseTimeout: time.Hour * 2, HeartbeatTimeout: 2 * tinfo.req.PollInterval, @@ -942,7 +948,7 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo err = temporalsdk_workflow.ExecuteActivity( activityOpts, am.UploadTransferActivityName, - &am.UploadTransferActivityParams{SourcePath: zipResult.Path}, + &am.UploadTransferActivityParams{SourcePath: sourcePath}, ).Get(activityOpts, &uploadResult) if err != nil { return err @@ -956,6 +962,7 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo am.StartTransferActivityName, &am.StartTransferActivityParams{ Name: tinfo.req.Key, + Type: w.cfg.AM.TransferType, RelativePath: uploadResult.RemoteRelativePath, }, ).Get(activityOpts, &transferResult) diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 1a7cd04e1..ecdefbe95 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -645,6 +645,7 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { cfg := config.Configuration{ A3m: a3m.Config{ShareDir: s.CreateTransferDir()}, + AM: am.Config{TransferType: "zipped bag"}, Preservation: pres.Config{TaskQueue: temporal.AmWorkerTaskQueue}, Storage: storage.Config{DefaultPermanentLocationID: amssLocationID}, ValidatePREMIS: premis.Config{ @@ -753,7 +754,7 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { ) s.env.OnActivity(am.StartTransferActivityName, sessionCtx, - &am.StartTransferActivityParams{Name: key, RelativePath: "transfer.zip"}, + &am.StartTransferActivityParams{Name: key, Type: "zipped bag", RelativePath: "transfer.zip"}, ).Return( &am.StartTransferActivityResult{TransferID: transferID.String()}, nil, ) @@ -1517,6 +1518,7 @@ func (s *ProcessingWorkflowTestSuite) TestFailedPIPA3m() { func (s *ProcessingWorkflowTestSuite) TestFailedPIPAM() { cfg := config.Configuration{ + AM: am.Config{TransferType: "zipped bag"}, Preservation: pres.Config{TaskQueue: temporal.AmWorkerTaskQueue}, Storage: storage.Config{DefaultPermanentLocationID: amssLocationID}, }