From 6985b40df7b953f7eb1492e0d9135a59f8e20780 Mon Sep 17 00:00:00 2001 From: David Juhasz Date: Mon, 12 Aug 2024 13:20:43 -0700 Subject: [PATCH] Send PIPs to Archivematica as BagIt bags Fixes #805 - Change the package type to "zipped bag" when starting a transfer via the Archivematica API - Bag the PIP before sending it to Archivematica, if it's not already a bag - Add "TransferSourcePath" config value to specify the API path to the Transfer Source directory where PIPs are uploaded --- cmd/enduro-am-worker/main.go | 4 ++ enduro.toml | 24 ++++++++++-- hack/kube/overlays/dev-am/enduro-am.yaml | 5 +++ internal/am/config.go | 8 ++++ internal/am/start_transfer.go | 17 ++++++-- internal/am/start_transfer_test.go | 13 +++--- internal/config/config.go | 15 ++++--- internal/config/config_test.go | 1 + internal/sftp/config.go | 3 +- internal/workflow/processing.go | 50 +++++++++++++++++------- internal/workflow/processing_test.go | 12 +++++- 11 files changed, 115 insertions(+), 37 deletions(-) diff --git a/cmd/enduro-am-worker/main.go b/cmd/enduro-am-worker/main.go index 7f2004733..6fb8eeede 100644 --- a/cmd/enduro-am-worker/main.go +++ b/cmd/enduro-am-worker/main.go @@ -245,6 +245,10 @@ func main() { activities.NewBundleActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName}, ) + w.RegisterActivityWithOptions( + bagit_activity.NewCreateBagActivity(cfg.BagIt).Execute, + temporalsdk_activity.RegisterOptions{Name: bagit_activity.CreateBagActivityName}, + ) w.RegisterActivityWithOptions( activities.NewZipActivity( logger, diff --git a/enduro.toml b/enduro.toml index c73307ba3..b2a435a00 100644 --- a/enduro.toml +++ b/enduro.toml @@ -20,7 +20,7 @@ corsOrigin = "http://localhost" [api.auth] # Enable API authentication. OIDC is the only protocol supported at the -# moment. When enabled the API verifies the access token submitted with +# moment. When enabled the API verifies the access token submitted with # each request. The API client is responsible for obtaining an access # token from the provider. enabled = true @@ -29,7 +29,7 @@ enabled = true # OIDC provider URL. Required when auth. is enabled. providerURL = "http://keycloak:7470/realms/artefactual" # OIDC client ID. The client ID must be included in the `aud` claim of -# the access token. Required when auth. is enabled. +# the access token. Required when auth. is enabled. clientID = "enduro" [api.auth.oidc.abac] @@ -39,7 +39,7 @@ clientID = "enduro" enabled = true # Claim path of the Enduro attributes within the access token. If the claim # path is nested then include all fields separated by `claimPathSeparator` -# (see below). E.g. "attributes.enduro" with `claimPathSeparator = "."`. +# (see below). E.g. "attributes.enduro" with `claimPathSeparator = "."`. # Required when ABAC is enabled. claimPath = "enduro" # Separator used to split the claim path fields. The default value of "" will @@ -152,12 +152,28 @@ pollInterval = "10s" # no time limit. transferDeadline = "1h" +# TransferSourcePath is the path to an Archivematica transfer source directory. +# It is used in the POST /api/v2beta/package "path" parameter to start a +# transfer via the API. TransferSourcePath must be prefixed with the UUID of an +# AMSS transfer source directory, optionally followed by a relative path from +# the source dir (e.g. "749ef452-fbed-4d50-9072-5f98bc01e52e:sftp_upload"). If +# no transferSourcPath is specified, the default transfer source path will be +# used. +transferSourcePath = "" + [am.sftp] host = "" # The Archivematica Storage Service hostname. port = "" user = "" + +# knownHostsFile is the absolute path to a local SSH "known_hosts" file that +# includes a public host key for the AM SFTP server. +# Default: "/home/[user]/.ssh/known_hosts" (where [user] is your local user). knownHostsFile = "" -remoteDir = "/transfer_source" + +# remoteDir is the directory path, relative to the SFTP root directory, where +# PIPs should be uploaded. +remoteDir = "" [am.sftp.privateKey] path = "" diff --git a/hack/kube/overlays/dev-am/enduro-am.yaml b/hack/kube/overlays/dev-am/enduro-am.yaml index 4971ce80f..4eb5e41ee 100644 --- a/hack/kube/overlays/dev-am/enduro-am.yaml +++ b/hack/kube/overlays/dev-am/enduro-am.yaml @@ -81,6 +81,11 @@ spec: secretKeyRef: name: enduro-am-secret key: api_key + - name: ENDURO_AM_TRANSFERSOURCEPATH + valueFrom: + secretKeyRef: + name: enduro-am-secret + key: transfer_source_path - name: ENDURO_AM_SFTP_HOST valueFrom: secretKeyRef: diff --git a/internal/am/config.go b/internal/am/config.go index 2e421025a..2e77ed96c 100644 --- a/internal/am/config.go +++ b/internal/am/config.go @@ -22,6 +22,14 @@ type Config struct { // SFTP configuration for uploading transfers to Archivematica. SFTP sftp.Config + // TransferSourcePath is the path to an Archivematica transfer source + // directory. It is used in the POST /api/v2beta/package "path" parameter + // to start a transfer via the API. TransferSourcePath must be prefixed with + // the UUID of an AMSS transfer source directory, optionally followed by a + // relative path from the source dir (e.g. + // "749ef452-fbed-4d50-9072-5f98bc01e52e:sftp_upload"). + TransferSourcePath string + // Capacity sets the maximum number of worker sessions the worker can // handle at one time (default: 1). Capacity int diff --git a/internal/am/start_transfer.go b/internal/am/start_transfer.go index 8f05fee00..e7043d859 100644 --- a/internal/am/start_transfer.go +++ b/internal/am/start_transfer.go @@ -2,6 +2,7 @@ package am import ( context "context" + "path/filepath" "github.com/go-logr/logr" "go.artefactual.dev/amclient" @@ -16,8 +17,12 @@ type StartTransferActivity struct { } type StartTransferActivityParams struct { + // Name of the transfer. Name string - Path string + + // RelativePath is the PIP path relative to the Archivematica transfer + // source directory. + RelativePath string } type StartTransferActivityResult struct { @@ -40,7 +45,11 @@ func (a *StartTransferActivity) Execute( ctx context.Context, opts *StartTransferActivityParams, ) (*StartTransferActivityResult, error) { - a.logger.V(1).Info("Executing StartTransferActivity", "Name", opts.Name, "Path", opts.Path) + a.logger.V(1).Info( + "Executing StartTransferActivity", + "Name", opts.Name, + "RelativePath", opts.RelativePath, + ) processingConfig := a.cfg.ProcessingConfig if processingConfig == "" { @@ -49,8 +58,8 @@ func (a *StartTransferActivity) Execute( payload, resp, err := a.amps.Create(ctx, &amclient.PackageCreateRequest{ Name: opts.Name, - Type: "zipfile", - Path: opts.Path, + Type: "zipped bag", + Path: filepath.Join(a.cfg.TransferSourcePath, opts.RelativePath), ProcessingConfig: processingConfig, AutoApprove: true, }) diff --git a/internal/am/start_transfer_test.go b/internal/am/start_transfer_test.go index 55b179039..ed4e03567 100644 --- a/internal/am/start_transfer_test.go +++ b/internal/am/start_transfer_test.go @@ -23,8 +23,8 @@ func TestStartTransferActivity(t *testing.T) { transferID := uuid.New().String() opts := am.StartTransferActivityParams{ - Name: "Testing", - Path: "/tmp", + Name: "Testing", + RelativePath: "/tmp", } amcrDefault := func(m *amclienttest.MockPackageServiceMockRecorder, st http.Response) { @@ -32,8 +32,8 @@ func TestStartTransferActivity(t *testing.T) { mockutil.Context(), &amclient.PackageCreateRequest{ Name: opts.Name, - Type: "zipfile", - Path: opts.Path, + Type: "zipped bag", + Path: opts.RelativePath, ProcessingConfig: "automated", AutoApprove: true, }, @@ -59,8 +59,8 @@ func TestStartTransferActivity(t *testing.T) { mockutil.Context(), &amclient.PackageCreateRequest{ Name: opts.Name, - Type: "zipfile", - Path: opts.Path, + Type: "zipped bag", + Path: opts.RelativePath, ProcessingConfig: "automated", AutoApprove: true, }, @@ -117,6 +117,7 @@ func TestStartTransferActivity(t *testing.T) { return } + assert.NilError(t, err) var r am.StartTransferActivityResult err = future.Get(&r) diff --git a/internal/config/config.go b/internal/config/config.go index 584bb965b..b9e9793c1 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,6 +9,7 @@ import ( "time" "github.com/artefactual-sdps/temporal-activities/archive" + "github.com/artefactual-sdps/temporal-activities/bagit" "github.com/google/uuid" "github.com/mitchellh/mapstructure" "github.com/spf13/viper" @@ -40,6 +41,7 @@ type Configuration struct { AM am.Config InternalAPI api.Config API api.Config + BagIt bagit.Config Database db.Config Event event.Config ExtractActivity archive.Config @@ -52,13 +54,14 @@ type Configuration struct { Telemetry telemetry.Config } -func (c Configuration) Validate() error { +func (c *Configuration) Validate() error { // TODO: should this validate all the fields in Configuration? - apiAuthErr := c.API.Auth.Validate() - preprocessingErr := c.Preprocessing.Validate() - uploadErr := c.Upload.Validate() - - return errors.Join(apiAuthErr, preprocessingErr, uploadErr) + return errors.Join( + c.API.Auth.Validate(), + c.BagIt.Validate(), + c.Preprocessing.Validate(), + c.Upload.Validate(), + ) } func Read(config *Configuration, configFile string) (found bool, configFileUsed string, err error) { diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 3f5055d53..79e0ae0a8 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -72,6 +72,7 @@ func TestConfig(t *testing.T) { assert.Equal(t, c.AM.Capacity, 1) assert.Equal(t, c.AM.PollInterval, 10*time.Second) assert.Equal(t, c.API.Listen, "127.0.0.1:9000") + assert.Equal(t, c.BagIt.ChecksumAlgorithm, "sha512") assert.Equal(t, c.DebugListen, "127.0.0.1:9001") assert.Equal(t, c.Preservation.TaskQueue, temporal.A3mWorkerTaskQueue) assert.Equal(t, c.Storage.TaskQueue, temporal.GlobalTaskQueue) diff --git a/internal/sftp/config.go b/internal/sftp/config.go index 750c22413..cb9dae179 100644 --- a/internal/sftp/config.go +++ b/internal/sftp/config.go @@ -25,7 +25,8 @@ type Config struct { // Private key used for authentication. PrivateKey PrivateKey - // Default directory on SFTP server for file transfers. + // RemoteDir is the directory path, relative to the SFTP root directory, + // where PIPs should be uploaded. RemoteDir string } diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index a3107434d..173e876f5 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -772,7 +772,11 @@ func (w *ProcessingWorkflow) waitForReview(ctx temporalsdk_workflow.Context) *pa return &review } -func (w *ProcessingWorkflow) transferA3m(sessCtx temporalsdk_workflow.Context, tinfo *TransferInfo, cleanup *cleanupRegistry) error { +func (w *ProcessingWorkflow) transferA3m( + sessCtx temporalsdk_workflow.Context, + tinfo *TransferInfo, + cleanup *cleanupRegistry, +) error { // Bundle PIP as an Archivematica standard transfer. { activityOpts := withActivityOptsForLongLivedRequest(sessCtx) @@ -814,7 +818,8 @@ func (w *ProcessingWorkflow) transferA3m(sessCtx temporalsdk_workflow.Context, t } result := a3m.CreateAIPActivityResult{} - err := temporalsdk_workflow.ExecuteActivity(activityOpts, a3m.CreateAIPActivityName, params).Get(sessCtx, &result) + err := temporalsdk_workflow.ExecuteActivity(activityOpts, a3m.CreateAIPActivityName, params). + Get(sessCtx, &result) if err != nil { return err } @@ -827,11 +832,26 @@ func (w *ProcessingWorkflow) transferA3m(sessCtx temporalsdk_workflow.Context, t return nil } -func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, tinfo *TransferInfo) error { +func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo *TransferInfo) error { var err error - // Zip transfer. - activityOpts := withActivityOptsForLongLivedRequest(sessCtx) + // Bag PIP if it's not already a bag. + if tinfo.PackageType != enums.PackageTypeBagIt { + lctx := withActivityOptsForLocalAction(ctx) + var zipResult bagit_activity.CreateBagActivityResult + err = temporalsdk_workflow.ExecuteActivity( + lctx, + bagit_activity.CreateBagActivityName, + &bagit_activity.CreateBagActivityParams{SourcePath: tinfo.TempPath}, + ).Get(lctx, &zipResult) + if err != nil { + return err + } + tinfo.PackageType = enums.PackageTypeBagIt + } + + // Zip PIP. + activityOpts := withActivityOptsForLongLivedRequest(ctx) var zipResult activities.ZipActivityResult err = temporalsdk_workflow.ExecuteActivity( activityOpts, @@ -842,8 +862,8 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti return err } - // Upload transfer to AMSS. - activityOpts = temporalsdk_workflow.WithActivityOptions(sessCtx, + // Upload PIP to AMSS. + activityOpts = temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{ StartToCloseTimeout: time.Hour * 2, HeartbeatTimeout: 2 * tinfo.req.PollInterval, @@ -868,14 +888,14 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti } // Start AM transfer. - activityOpts = withActivityOptsForRequest(sessCtx) + activityOpts = withActivityOptsForRequest(ctx) transferResult := am.StartTransferActivityResult{} err = temporalsdk_workflow.ExecuteActivity( activityOpts, am.StartTransferActivityName, &am.StartTransferActivityParams{ - Name: tinfo.req.Key, - Path: uploadResult.RemoteFullPath, + Name: tinfo.req.Key, + RelativePath: uploadResult.RemoteRelativePath, }, ).Get(activityOpts, &transferResult) if err != nil { @@ -883,7 +903,7 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti } pollOpts := temporalsdk_workflow.WithActivityOptions( - sessCtx, + ctx, temporalsdk_workflow.ActivityOptions{ HeartbeatTimeout: 2 * tinfo.req.PollInterval, StartToCloseTimeout: tinfo.req.TransferDeadline, @@ -928,11 +948,11 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti } // Set AIP "stored at" time. - tinfo.StoredAt = temporalsdk_workflow.Now(sessCtx).UTC() + tinfo.StoredAt = temporalsdk_workflow.Now(ctx).UTC() // Set package location { - ctx := withLocalActivityOpts(sessCtx) + ctx := withLocalActivityOpts(ctx) err := temporalsdk_workflow.ExecuteLocalActivity( ctx, setLocationIDLocalActivity, @@ -947,7 +967,7 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti // Create storage package record and set location to AMSS location. { - activityOpts := withLocalActivityOpts(sessCtx) + activityOpts := withLocalActivityOpts(ctx) err := temporalsdk_workflow.ExecuteActivity( activityOpts, activities.CreateStoragePackageActivityName, @@ -965,7 +985,7 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti } // Delete transfer. - activityOpts = withActivityOptsForRequest(sessCtx) + activityOpts = withActivityOptsForRequest(ctx) err = temporalsdk_workflow.ExecuteActivity(activityOpts, am.DeleteTransferActivityName, am.DeleteTransferActivityParams{ Destination: uploadResult.RemoteRelativePath, }). diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 0213b0c2d..5fd61b45d 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -152,6 +152,10 @@ func (s *ProcessingWorkflowTestSuite) setupAMWorkflowTest( clock := clockwork.NewFakeClock() sftpc := sftp_fake.NewMockClient(ctrl) + s.env.RegisterActivityWithOptions( + bagit_activity.NewCreateBagActivity(bagit_activity.Config{}).Execute, + temporalsdk_activity.RegisterOptions{Name: bagit_activity.CreateBagActivityName}, + ) s.env.RegisterActivityWithOptions( activities.NewZipActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.ZipActivityName}, @@ -602,6 +606,12 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { ) // Archivematica specific activities. + s.env.OnActivity(bagit_activity.CreateBagActivityName, sessionCtx, + &bagit_activity.CreateBagActivityParams{SourcePath: extractPath}, + ).Return( + &bagit_activity.CreateBagActivityResult{BagPath: extractPath}, nil, + ) + s.env.OnActivity(activities.ZipActivityName, sessionCtx, &activities.ZipActivityParams{SourceDir: extractPath}, ).Return( @@ -618,7 +628,7 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { ) s.env.OnActivity(am.StartTransferActivityName, sessionCtx, - &am.StartTransferActivityParams{Name: key, Path: "transfer.zip"}, + &am.StartTransferActivityParams{Name: key, RelativePath: "transfer.zip"}, ).Return( &am.StartTransferActivityResult{TransferID: transferID.String()}, nil, )