diff --git a/cmd/enduro-a3m-worker/main.go b/cmd/enduro-a3m-worker/main.go index f2f283b56..ada2a1287 100644 --- a/cmd/enduro-a3m-worker/main.go +++ b/cmd/enduro-a3m-worker/main.go @@ -136,6 +136,7 @@ func main() { } w.RegisterActivityWithOptions(activities.NewDownloadActivity(logger, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}) + w.RegisterActivityWithOptions(activities.NewUnarchiveActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.UnarchiveActivityName}) w.RegisterActivityWithOptions(activities.NewBundleActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName}) w.RegisterActivityWithOptions(a3m.NewCreateAIPActivity(logger, &cfg.A3m, pkgsvc).Execute, temporalsdk_activity.RegisterOptions{Name: a3m.CreateAIPActivityName}) w.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName}) diff --git a/cmd/enduro-am-worker/main.go b/cmd/enduro-am-worker/main.go index c9fbd8cd2..30dca822f 100644 --- a/cmd/enduro-am-worker/main.go +++ b/cmd/enduro-am-worker/main.go @@ -143,6 +143,10 @@ func main() { activities.NewDownloadActivity(logger, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}, ) + w.RegisterActivityWithOptions( + activities.NewUnarchiveActivity(logger).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.UnarchiveActivityName}, + ) w.RegisterActivityWithOptions( activities.NewBundleActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName}, diff --git a/internal/fsutil/fsutil.go b/internal/fsutil/fsutil.go index cb4737bcd..b94ac0361 100644 --- a/internal/fsutil/fsutil.go +++ b/internal/fsutil/fsutil.go @@ -2,6 +2,8 @@ package fsutil import ( "errors" + "fmt" + "io/fs" "os" "path/filepath" "strings" @@ -48,3 +50,25 @@ func Move(src, dst string) error { return err } + +// SetFileModes recursively sets the file mode of root and its contents. +func SetFileModes(root string, dirMode, fileMode int) error { + return filepath.WalkDir(root, + func(path string, d os.DirEntry, err error) error { + if err != nil { + return err + } + + mode := fs.FileMode(fileMode) + if d.IsDir() { + mode = fs.FileMode(dirMode) + } + + if err := os.Chmod(path, mode); err != nil { + return fmt.Errorf("set permissions: %v", err) + } + + return nil + }, + ) +} diff --git a/internal/fsutil/fsutil_test.go b/internal/fsutil/fsutil_test.go index 4bca98efc..b12af643f 100644 --- a/internal/fsutil/fsutil_test.go +++ b/internal/fsutil/fsutil_test.go @@ -110,3 +110,28 @@ func TestMove(t *testing.T) { assert.Assert(t, fs.Equal(dst, srcManifest)) }) } + +func TestSetFileModes(t *testing.T) { + td := fs.NewDir(t, "enduro-test-fsutil", + fs.WithDir("transfer", fs.WithMode(0o755), + fs.WithFile("test1", "I'm a test file.", fs.WithMode(0o644)), + fs.WithDir("subdir", fs.WithMode(0o755), + fs.WithFile("test2", "Another test file.", fs.WithMode(0o644)), + ), + ), + ) + + err := fsutil.SetFileModes(td.Join("transfer"), 0o700, 0o600) + assert.NilError(t, err) + assert.Assert(t, fs.Equal( + td.Path(), + fs.Expected(t, + fs.WithDir("transfer", fs.WithMode(0o700), + fs.WithFile("test1", "I'm a test file.", fs.WithMode(0o600)), + fs.WithDir("subdir", fs.WithMode(0o700), + fs.WithFile("test2", "Another test file.", fs.WithMode(0o600)), + ), + ), + ), + )) +} diff --git a/internal/testdata/bag/small_bag/bag-info.txt b/internal/testdata/bag/small_bag/bag-info.txt new file mode 100644 index 000000000..04469d97a --- /dev/null +++ b/internal/testdata/bag/small_bag/bag-info.txt @@ -0,0 +1,3 @@ +Bag-Software-Agent: bagit.py v1.8.1 +Bagging-Date: 2023-12-12 +Payload-Oxum: 19.1 diff --git a/internal/testdata/bag/small_bag/bagit.txt b/internal/testdata/bag/small_bag/bagit.txt new file mode 100644 index 000000000..c4aebb43a --- /dev/null +++ b/internal/testdata/bag/small_bag/bagit.txt @@ -0,0 +1,2 @@ +BagIt-Version: 0.97 +Tag-File-Character-Encoding: UTF-8 diff --git a/internal/testdata/bag/small_bag/data/small.txt b/internal/testdata/bag/small_bag/data/small.txt new file mode 100644 index 000000000..95240149d --- /dev/null +++ b/internal/testdata/bag/small_bag/data/small.txt @@ -0,0 +1 @@ +I am a small file. diff --git a/internal/testdata/bag/small_bag/manifest-sha256.txt b/internal/testdata/bag/small_bag/manifest-sha256.txt new file mode 100644 index 000000000..8fbee63e4 --- /dev/null +++ b/internal/testdata/bag/small_bag/manifest-sha256.txt @@ -0,0 +1 @@ +4450c8a88130a3b397bfc659245c4f0f87a8c79d017a60bdb1bd32f4b51c8133 data/small.txt diff --git a/internal/testdata/bag/small_bag/tagmanifest-sha256.txt b/internal/testdata/bag/small_bag/tagmanifest-sha256.txt new file mode 100644 index 000000000..92eaf1e2c --- /dev/null +++ b/internal/testdata/bag/small_bag/tagmanifest-sha256.txt @@ -0,0 +1,3 @@ +ac3f0fa6e7763ba403c1bca2b6e785a51bfcd5102fe7cbc1cfcf05be77ffdf24 manifest-sha256.txt +fd696a4957ed3f8329860c7191e518162b99c942b26b42291386da69bb3c2bc8 bag-info.txt +e91f941be5973ff71f1dccbdd1a32d598881893a7f21be516aca743da38b1689 bagit.txt diff --git a/internal/workflow/activities/bundle.go b/internal/workflow/activities/bundle.go index 165d28e97..1d9edd892 100644 --- a/internal/workflow/activities/bundle.go +++ b/internal/workflow/activities/bundle.go @@ -12,7 +12,6 @@ import ( securejoin "github.com/cyphar/filepath-securejoin" "github.com/go-logr/logr" - "github.com/mholt/archiver/v3" "github.com/otiai10/copy" temporal_tools "go.artefactual.dev/tools/temporal" @@ -51,9 +50,7 @@ type BundleActivityParams struct { } type BundleActivityResult struct { - RelPath string // Path of the transfer relative to the transfer directory. - FullPath string // Full path to the transfer in the worker running the session. - FullPathBeforeStrip string // Same as FullPath but includes the top-level dir even when stripped. + FullPath string // Full path to the transfer in the worker running the session. } func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityParams) (*BundleActivityResult, error) { @@ -77,22 +74,14 @@ func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityPara } if params.IsDir { - res.FullPath, res.FullPathBeforeStrip, err = a.Copy( - ctx, - params.SourcePath, - params.TransferDir, - false, - ) + res.FullPath, err = a.Bundle(ctx, params.SourcePath, params.TransferDir) + if err != nil { + err = fmt.Errorf("bundle dir: %v", err) + } } else { - unar := a.Unarchiver(params.SourcePath) - if unar == nil { - res.FullPath, err = a.SingleFile(ctx, params.TransferDir, params.SourcePath) - if err != nil { - err = fmt.Errorf("bundle single file: %v", err) - } - res.FullPathBeforeStrip = res.FullPath - } else { - res.FullPath, res.FullPathBeforeStrip, err = a.Bundle(ctx, unar, params.TransferDir, params.SourcePath, params.StripTopLevelDir) + res.FullPath, err = a.SingleFile(ctx, params.SourcePath, params.TransferDir) + if err != nil { + err = fmt.Errorf("bundle single file: %v", err) } } if err != nil { @@ -101,51 +90,28 @@ func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityPara err = unbag(res.FullPath) if err != nil { - return nil, temporal_tools.NewNonRetryableError(err) - } - - res.RelPath, err = filepath.Rel(params.TransferDir, res.FullPath) - if err != nil { - return nil, temporal_tools.NewNonRetryableError(fmt.Errorf( - "error calculating relative path to transfer (base=%q, target=%q): %v", - params.TransferDir, res.FullPath, err, - )) + return nil, temporal_tools.NewNonRetryableError( + fmt.Errorf("bundle: unbag: %v", err), + ) } if err = setPermissions(res.FullPath); err != nil { return nil, temporal_tools.NewNonRetryableError( - fmt.Errorf("set permissions: %v", err), + fmt.Errorf("bundle: set permissions: %v", err), ) } return res, nil } -// Unarchiver returns the unarchiver suited for the archival format. -func (a *BundleActivity) Unarchiver(sourcePath string) archiver.Unarchiver { - if iface, err := archiver.ByExtension(filepath.Base(sourcePath)); err == nil { - if u, ok := iface.(archiver.Unarchiver); ok { - return u - } - } - - r, err := os.Open(sourcePath) // #nosec G304 -- trusted file path. - if err != nil { - return nil - } - defer r.Close() - - if u, err := archiver.ByHeader(r); err == nil { - return u - } - - return nil -} - // SingleFile bundles a transfer with the downloaded blob in it. // // TODO: Write metadata.csv and checksum files to the metadata dir. -func (a *BundleActivity) SingleFile(ctx context.Context, transferDir, sourcePath string) (string, error) { +func (a *BundleActivity) SingleFile( + ctx context.Context, + sourcePath string, + transferDir string, +) (string, error) { b, err := bundler.NewBundlerWithTempDir(transferDir) if err != nil { return "", fmt.Errorf("create bundler: %v", err) @@ -170,76 +136,35 @@ func (a *BundleActivity) SingleFile(ctx context.Context, transferDir, sourcePath } // Bundle a transfer with the contents found in the archive. -func (a *BundleActivity) Bundle(ctx context.Context, unar archiver.Unarchiver, transferDir, sourcePath string, stripTopLevelDir bool) (string, string, error) { - // Create a new directory for our transfer with the name randomized. - const prefix = "enduro" - tempDir, err := os.MkdirTemp(transferDir, prefix) +func (a *BundleActivity) Bundle( + ctx context.Context, + sourcePath string, + transferDir string, +) (string, error) { + tempDir, err := a.Copy(ctx, sourcePath, transferDir) if err != nil { - return "", "", fmt.Errorf("error creating temporary directory: %s", err) - } - - if err := unar.Unarchive(sourcePath, tempDir); err != nil { - return "", "", fmt.Errorf("error unarchiving file: %v", err) - } - - tempDirBeforeStrip := tempDir - if stripTopLevelDir { - tempDir, err = stripDirContainer(tempDir) - if err != nil { - return "", "", err - } + return "", fmt.Errorf("bundle: %v", err) } // Delete the archive. We still have a copy in the watched source. _ = os.Remove(sourcePath) - return tempDir, tempDirBeforeStrip, nil + return tempDir, nil } // Copy a transfer in the given destination using an intermediate temp. directory. -func (a *BundleActivity) Copy(ctx context.Context, src, dst string, stripTopLevelDir bool) (string, string, error) { +func (a *BundleActivity) Copy(ctx context.Context, src, dst string) (string, error) { const prefix = "enduro" tempDir, err := os.MkdirTemp(dst, prefix) if err != nil { - return "", "", fmt.Errorf("error creating temporary directory: %s", err) + return "", fmt.Errorf("error creating temporary directory: %s", err) } if err := copy.Copy(src, tempDir); err != nil { - return "", "", fmt.Errorf("error copying transfer: %v", err) - } - - tempDirBeforeStrip := tempDir - if stripTopLevelDir { - tempDir, err = stripDirContainer(tempDir) - if err != nil { - return "", "", err - } + return "", fmt.Errorf("error copying transfer: %v", err) } - return tempDir, tempDirBeforeStrip, nil -} - -// stripDirContainer strips the top-level directory of a transfer. -func stripDirContainer(path string) (string, error) { - const errPrefix = "error stripping top-level dir" - ff, err := os.Open(filepath.Clean(path)) - if err != nil { - return "", fmt.Errorf("%s: cannot open path: %v", errPrefix, err) - } - defer ff.Close() - - fis, err := ff.Readdir(2) - if err != nil { - return "", fmt.Errorf("%s: error reading dir: %v", errPrefix, err) - } - if len(fis) != 1 { - return "", fmt.Errorf("%s: directory has more than one child", errPrefix) - } - if !fis[0].IsDir() { - return "", fmt.Errorf("%s: top-level item is not a directory", errPrefix) - } - securePath, _ := securejoin.SecureJoin(path, fis[0].Name()) - return securePath, nil + return tempDir, nil } // unbag converts a bagged transfer into a standard Archivematica transfer. diff --git a/internal/workflow/activities/bundle_test.go b/internal/workflow/activities/bundle_test.go index 5e18a00c9..f3af5f6b1 100644 --- a/internal/workflow/activities/bundle_test.go +++ b/internal/workflow/activities/bundle_test.go @@ -54,22 +54,11 @@ func TestBundleActivity(t *testing.T) { ), }, { - name: "Bundles a zipped standard transfer", + name: "Bundles a BagIt transfer", params: &activities.BundleActivityParams{ - SourcePath: sourceDir.Join("zipped_transfer", "small.zip"), - TransferDir: destDir.Path(), - StripTopLevelDir: true, - }, - wantFs: fs.Expected(t, fs.WithMode(activities.ModeDir), - fs.WithFile("small.txt", "I am a small file.\n", fs.WithMode(activities.ModeFile)), - ), - }, - { - name: "Bundles a tarred and gzipped bag transfer", - params: &activities.BundleActivityParams{ - SourcePath: sourceDir.Join("gzipped_bag", "small_bag.tgz"), - TransferDir: destDir.Path(), - StripTopLevelDir: true, + SourcePath: sourceDir.Join("bag", "small_bag"), + TransferDir: destDir.Path(), + IsDir: true, }, wantFs: fs.Expected(t, fs.WithMode(activities.ModeDir), fs.WithFile( @@ -148,16 +137,11 @@ e91f941be5973ff71f1dccbdd1a32d598881893a7f21be516aca743da38b1689 bagit.txt if tt.wantFs != (fs.Manifest{}) { assert.Assert(t, fs.Equal(res.FullPath, tt.wantFs)) } + assert.NilError(t, err) - if tt.params.StripTopLevelDir { - assert.Equal(t, res.FullPathBeforeStrip, filepath.Dir(res.FullPath)) - } else { - assert.Equal(t, res.FullPathBeforeStrip, res.FullPath) - } - - p, err := filepath.Rel(destDir.Path(), res.FullPath) + rp, err := filepath.Rel(tt.params.TransferDir, res.FullPath) assert.NilError(t, err) - assert.Equal(t, res.RelPath, p) + assert.Assert(t, len(rp) > 0) }) } } diff --git a/internal/workflow/activities/unarchive.go b/internal/workflow/activities/unarchive.go new file mode 100644 index 000000000..ee5cdc528 --- /dev/null +++ b/internal/workflow/activities/unarchive.go @@ -0,0 +1,168 @@ +package activities + +import ( + "context" + "fmt" + "io/fs" + "os" + "path/filepath" + + "github.com/go-logr/logr" + "github.com/mholt/archiver/v3" + + "github.com/artefactual-sdps/enduro/internal/fsutil" +) + +const UnarchiveActivityName = "unarchive-activity" + +// UnarchiveActivity extracts transfer files from an archive (e.g. zip, tgz). +type UnarchiveActivity struct { + logger logr.Logger +} + +type UnarchiveParams struct { + SourcePath string + StripTopLevelDir bool +} + +type UnarchiveResult struct { + // DestPath is the path to the extracted archive contents. + DestPath string + + // IsDir is true if DestPath is a directory. + IsDir bool +} + +func NewUnarchiveActivity(logger logr.Logger) *UnarchiveActivity { + return &UnarchiveActivity{logger: logger} +} + +func (a *UnarchiveActivity) Execute(ctx context.Context, params *UnarchiveParams) (*UnarchiveResult, error) { + a.logger.V(1).Info("Executing UnarchiveActivity", "Path", params.SourcePath) + + fi, err := os.Stat(params.SourcePath) + if err != nil { + return nil, fmt.Errorf("unarchive: stat: %v", err) + } + if fi.IsDir() { + a.logger.V(2).Info("Unarchive: skipping directory") + return &UnarchiveResult{DestPath: params.SourcePath, IsDir: true}, nil + } + + u, err := unarchiver(params.SourcePath) + if err != nil { + return nil, fmt.Errorf("unarchive: unarchiver: %v", err) + } + if u == nil { + // Couldn't find an unarchiver, so this is probably a regular file. + // Return the source path unaltered, and IsDir false. + a.logger.V(2).Info("Unarchive: not an archive, skipping") + return &UnarchiveResult{DestPath: params.SourcePath}, nil + } + + dest := filepath.Join(filepath.Dir(params.SourcePath), "extract") + if err := u.Unarchive(params.SourcePath, dest); err != nil { + return nil, fmt.Errorf("unarchive: unarchive: %v", err) + } + + if params.StripTopLevelDir { + if err = stripDirContainer(dest); err != nil { + return nil, fmt.Errorf("unarchive: strip top-level dir: %v", err) + } + } + + if err := fsutil.SetFileModes(dest, ModeDir, ModeFile); err != nil { + return nil, fmt.Errorf("unarchive: %v", err) + } + + if err := os.Remove(params.SourcePath); err != nil { + a.logger.V(1).Info("Unarchive: couldn't delete source archive: %v", err) + } + + return &UnarchiveResult{DestPath: dest, IsDir: true}, err +} + +// Unarchiver returns the unarchiver suited for the archival format. +func unarchiver(filename string) (archiver.Unarchiver, error) { + if iface, err := archiver.ByExtension(filename); err == nil { + if u, ok := iface.(archiver.Unarchiver); ok { + return u, nil + } + } + + file, err := os.Open(filepath.Clean(filename)) + if err != nil { + return nil, fmt.Errorf("open file: %v", err) + } + defer file.Close() + + if u, err := archiver.ByHeader(file); err == nil { + return u, nil + } + + return nil, nil +} + +// stripDirContainer strips the top-level directory of a transfer. +func stripDirContainer(dir string) error { + tld, err := topLevelDir(dir) + if err != nil { + return fmt.Errorf("get top-level dir: %v", err) + } + + tmpDir, err := os.MkdirTemp("", "enduro-") + if err != nil { + return fmt.Errorf("make temp dir: %v", err) + } + defer func() { _ = os.RemoveAll(tmpDir) }() + + // Move the top-level directory and contents to tmpPath. + tmpPath := filepath.Join(tmpDir, tld) + if err := os.Rename(filepath.Join(dir, tld), tmpPath); err != nil { + return fmt.Errorf("move: %v", err) + } + + // Move the TLD contents back to the original path. + err = filepath.WalkDir(tmpPath, func(path string, d fs.DirEntry, err error) error { + if path == tmpPath { + return nil + } + + if err := os.Rename(path, filepath.Join(dir, d.Name())); err != nil { + return fmt.Errorf("move to temp dir: %v", err) + } + + // Don't descend into sub-directories. + if d.IsDir() { + return filepath.SkipDir + } + + return nil + }) + if err != nil { + return fmt.Errorf("move back to top-level dir: %v", err) + } + + return nil +} + +func topLevelDir(path string) (string, error) { + r, err := os.Open(filepath.Clean(path)) + if err != nil { + return "", fmt.Errorf("cannot open path: %v", err) + } + defer r.Close() + + fis, err := r.Readdir(2) + if err != nil { + return "", fmt.Errorf("error reading dir: %v", err) + } + if len(fis) != 1 { + return "", fmt.Errorf("directory %q has more than one child", path) + } + if !fis[0].IsDir() { + return "", fmt.Errorf("top-level item %q is not a directory", path+fis[0].Name()) + } + + return fis[0].Name(), nil +} diff --git a/internal/workflow/activities/unarchive_test.go b/internal/workflow/activities/unarchive_test.go new file mode 100644 index 000000000..1ef89464a --- /dev/null +++ b/internal/workflow/activities/unarchive_test.go @@ -0,0 +1,189 @@ +package activities_test + +import ( + "path/filepath" + "testing" + + "github.com/go-logr/logr" + "go.artefactual.dev/tools/temporal" + temporalsdk_activity "go.temporal.io/sdk/activity" + temporalsdk_testsuite "go.temporal.io/sdk/testsuite" + "gotest.tools/v3/assert" + "gotest.tools/v3/fs" + + "github.com/artefactual-sdps/enduro/internal/workflow/activities" +) + +var bagPathOps = []fs.PathOp{ + fs.WithDir("data", + fs.WithMode(activities.ModeDir), + fs.WithFile( + "small.txt", "I am a small file.\n", + fs.WithMode(activities.ModeFile), + ), + ), + fs.WithFile( + "bag-info.txt", + `Bag-Software-Agent: bagit.py v1.8.1 +Bagging-Date: 2023-12-12 +Payload-Oxum: 19.1 +`, + fs.WithMode(activities.ModeFile), + ), + fs.WithFile( + "bagit.txt", + `BagIt-Version: 0.97 +Tag-File-Character-Encoding: UTF-8 +`, + fs.WithMode(activities.ModeFile), + ), + fs.WithFile( + "manifest-sha256.txt", + `4450c8a88130a3b397bfc659245c4f0f87a8c79d017a60bdb1bd32f4b51c8133 data/small.txt +`, + fs.WithMode(activities.ModeFile), + ), + fs.WithFile( + "tagmanifest-sha256.txt", + `ac3f0fa6e7763ba403c1bca2b6e785a51bfcd5102fe7cbc1cfcf05be77ffdf24 manifest-sha256.txt +fd696a4957ed3f8329860c7191e518162b99c942b26b42291386da69bb3c2bc8 bag-info.txt +e91f941be5973ff71f1dccbdd1a32d598881893a7f21be516aca743da38b1689 bagit.txt +`, + fs.WithMode(activities.ModeFile), + ), +} + +func TestUnarchiveActivity(t *testing.T) { + type test struct { + name string + params *activities.UnarchiveParams + want *activities.UnarchiveResult + wantFs fs.Manifest + wantErr string + } + for _, tt := range []test{ + { + name: "Unarchives a zipped standard transfer", + params: &activities.UnarchiveParams{ + SourcePath: filepath.Join("zipped_transfer", "small.zip"), + }, + want: &activities.UnarchiveResult{ + DestPath: filepath.Join("zipped_transfer", "extract"), + IsDir: true, + }, + wantFs: fs.Expected(t, fs.WithMode(activities.ModeDir), + fs.WithDir("small", fs.WithMode(activities.ModeDir), + fs.WithFile("small.txt", + "I am a small file.\n", + fs.WithMode(activities.ModeFile), + ), + ), + ), + }, + { + name: "Unarchives a zipped transfer and strips the top-level dir", + params: &activities.UnarchiveParams{ + SourcePath: filepath.Join("zipped_transfer", "small.zip"), + StripTopLevelDir: true, + }, + want: &activities.UnarchiveResult{ + DestPath: filepath.Join("zipped_transfer", "extract"), + IsDir: true, + }, + wantFs: fs.Expected(t, fs.WithMode(activities.ModeDir), + fs.WithFile("small.txt", + "I am a small file.\n", + fs.WithMode(activities.ModeFile), + ), + ), + }, + { + name: "Unarchives a tarred and gzipped bag transfer", + params: &activities.UnarchiveParams{ + SourcePath: filepath.Join("gzipped_bag", "small_bag.tgz"), + }, + want: &activities.UnarchiveResult{ + DestPath: filepath.Join("gzipped_bag", "extract"), + IsDir: true, + }, + wantFs: fs.Expected(t, fs.WithMode(activities.ModeDir), + fs.WithDir("small_bag", append( + []fs.PathOp{fs.WithMode(activities.ModeDir)}, + bagPathOps..., + )...), + ), + }, + { + name: "Unarchives a tgz bag transfer and strips top-level dir", + params: &activities.UnarchiveParams{ + SourcePath: filepath.Join("gzipped_bag", "small_bag.tgz"), + StripTopLevelDir: true, + }, + want: &activities.UnarchiveResult{ + DestPath: filepath.Join("gzipped_bag", "extract"), + IsDir: true, + }, + wantFs: fs.Expected(t, append( + []fs.PathOp{fs.WithMode(activities.ModeDir)}, + bagPathOps..., + )...), + }, + { + name: "Returns a directory path unaltered", + params: &activities.UnarchiveParams{ + SourcePath: filepath.Join("bag", "small_bag"), + }, + want: &activities.UnarchiveResult{ + DestPath: filepath.Join("bag", "small_bag"), + IsDir: true, + }, + }, + { + name: "Returns a non-archive file path unaltered", + params: &activities.UnarchiveParams{ + SourcePath: filepath.Join("single_file_transfer", "small.txt"), + }, + want: &activities.UnarchiveResult{ + DestPath: filepath.Join("single_file_transfer", "small.txt"), + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ts := &temporalsdk_testsuite.WorkflowTestSuite{} + env := ts.NewTestActivityEnvironment() + env.RegisterActivityWithOptions( + activities.NewUnarchiveActivity(logr.Discard()).Execute, + temporalsdk_activity.RegisterOptions{ + Name: activities.UnarchiveActivityName, + }, + ) + + // New source dir for each test run. + sourceDir := fs.NewDir(t, "enduro-test-unarchive", + fs.FromDir("../../testdata"), + ) + tt.params.SourcePath = sourceDir.Join(tt.params.SourcePath) + + enc, err := env.ExecuteActivity(activities.UnarchiveActivityName, tt.params) + if tt.wantErr != "" { + assert.Error(t, err, tt.wantErr) + assert.Assert(t, temporal.NonRetryableError(err)) + return + } + assert.NilError(t, err) + + var res activities.UnarchiveResult + _ = enc.Get(&res) + + if tt.want != nil { + tt.want.DestPath = sourceDir.Join(tt.want.DestPath) + assert.DeepEqual(t, &res, tt.want) + } + if tt.wantFs != (fs.Manifest{}) { + assert.Assert(t, fs.Equal(res.DestPath, tt.wantFs)) + } + }) + } +} diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 5e26f8081..f4c6b15a1 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -53,10 +53,8 @@ type TransferInfo struct { // It is populated by the workflow request. req package_.ProcessingWorkflowRequest - // TempFile is the temporary location where the blob is downloaded. - // - // It is populated by the workflow with the result of DownloadActivity. - TempFile string + // TempPath is the temporary location of a working copy of the transfer. + TempPath string // SIPID given by a3m. // @@ -298,24 +296,35 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context // Download. { - if tinfo.req.WatcherName != "" && !tinfo.req.IsDir { - // TODO: even if TempFile is defined, we should confirm that the file is - // locally available in disk, just in case we're in the context of a - // session retry where a different worker is doing the work. In that - // case, the activity whould be executed again. - if tinfo.TempFile == "" { - var downloadResult activities.DownloadActivityResult - activityOpts := withActivityOptsForLongLivedRequest(sessCtx) - err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.DownloadActivityName, &activities.DownloadActivityParams{ - Key: tinfo.req.Key, - WatcherName: tinfo.req.WatcherName, - }).Get(activityOpts, &downloadResult) - if err != nil { - return err - } - tinfo.TempFile = downloadResult.Path - } + var downloadResult activities.DownloadActivityResult + activityOpts := withActivityOptsForLongLivedRequest(sessCtx) + err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.DownloadActivityName, &activities.DownloadActivityParams{ + Key: tinfo.req.Key, + WatcherName: tinfo.req.WatcherName, + }).Get(activityOpts, &downloadResult) + if err != nil { + return err + } + tinfo.TempPath = downloadResult.Path + } + + // Unarchive. + { + activityOpts := withActivityOptsForLocalAction(sessCtx) + var result activities.UnarchiveResult + err := temporalsdk_workflow.ExecuteActivity( + activityOpts, + activities.UnarchiveActivityName, + &activities.UnarchiveParams{ + SourcePath: tinfo.TempPath, + StripTopLevelDir: tinfo.req.StripTopLevelDir, + }, + ).Get(activityOpts, &result) + if err != nil { + return err } + tinfo.TempPath = result.DestPath + tinfo.req.IsDir = result.IsDir } // Bundle. @@ -333,7 +342,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context activityOpts, activities.BundleActivityName, &activities.BundleActivityParams{ - SourcePath: tinfo.TempFile, + SourcePath: tinfo.TempPath, TransferDir: transferDir, StripTopLevelDir: tinfo.req.StripTopLevelDir, IsDir: tinfo.req.IsDir, @@ -349,10 +358,10 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context // Delete local temporary files. defer func() { // TODO: call clean up here to enforce that we always destroy TempDir. - if tinfo.Bundle.FullPathBeforeStrip != "" { + if tinfo.Bundle.FullPath != "" { activityOpts := withActivityOptsForRequest(sessCtx) _ = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ - FullPath: tinfo.Bundle.FullPathBeforeStrip, + FullPath: tinfo.Bundle.FullPath, }).Get(activityOpts, nil) } }() diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 81c56f7fb..a79f6ac27 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -67,6 +67,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) { sftpc := sftp_fake.NewMockClient(ctrl) s.env.RegisterActivityWithOptions(activities.NewDownloadActivity(logger, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}) + s.env.RegisterActivityWithOptions(activities.NewUnarchiveActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.UnarchiveActivityName}) s.env.RegisterActivityWithOptions(activities.NewBundleActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName}) s.env.RegisterActivityWithOptions(a3m.NewCreateAIPActivity(logger, &a3m.Config{}, pkgsvc).Execute, temporalsdk_activity.RegisterOptions{Name: a3m.CreateAIPActivityName}) s.env.RegisterActivityWithOptions(activities.NewUploadActivity(nil).Execute, temporalsdk_activity.RegisterOptions{Name: activities.UploadActivityName}) @@ -162,16 +163,20 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { &activities.DownloadActivityResult{Path: "/tmp/enduro123456/" + key}, nil, ) + s.env.OnActivity(activities.UnarchiveActivityName, sessionCtx, + &activities.UnarchiveParams{SourcePath: "/tmp/enduro123456/" + key}, + ).Return( + &activities.UnarchiveResult{DestPath: "/tmp/enduro123456/extract"}, nil, + ) + s.env.OnActivity(activities.BundleActivityName, sessionCtx, &activities.BundleActivityParams{ - SourcePath: "/tmp/enduro123456/" + key, + SourcePath: "/tmp/enduro123456/extract", TransferDir: s.transferDir, }, ).Return( &activities.BundleActivityResult{ - RelPath: "enduro4162369760/transfer", - FullPath: "/tmp/2098266580-enduro-transfer/enduro4162369760/transfer", - FullPathBeforeStrip: "/tmp/2098266580-enduro-transfer/enduro4162369760", + FullPath: "/tmp/2098266580-enduro-transfer/enduro4162369760/transfer", }, nil, ) @@ -235,16 +240,20 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { &activities.DownloadActivityResult{Path: "/tmp/enduro123456/" + key}, nil, ) + s.env.OnActivity(activities.UnarchiveActivityName, sessionCtx, + &activities.UnarchiveParams{SourcePath: "/tmp/enduro123456/" + key}, + ).Return( + &activities.UnarchiveResult{DestPath: "/tmp/enduro123456/extract"}, nil, + ) + s.env.OnActivity(activities.BundleActivityName, sessionCtx, &activities.BundleActivityParams{ - SourcePath: "/tmp/enduro123456/" + key, + SourcePath: "/tmp/enduro123456/extract", TransferDir: s.transferDir, }, ).Return( &activities.BundleActivityResult{ - RelPath: "enduro4162369760/transfer", - FullPath: "/tmp/2098266580-enduro-transfer/enduro4162369760/transfer", - FullPathBeforeStrip: "/tmp/2098266580-enduro-transfer/enduro4162369760", + FullPath: "/tmp/2098266580-enduro-transfer/enduro4162369760/transfer", }, nil, ) @@ -313,15 +322,17 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { &activities.DownloadActivityResult{Path: "/tmp/enduro123456/" + key}, nil, ) + s.env.OnActivity(activities.UnarchiveActivityName, sessionCtx, + &activities.UnarchiveParams{SourcePath: "/tmp/enduro123456/" + key}, + ).Return( + &activities.UnarchiveResult{DestPath: "/tmp/enduro123456/extract"}, nil, + ) + s.env.OnActivity(activities.BundleActivityName, sessionCtx, - &activities.BundleActivityParams{ - SourcePath: "/tmp/enduro123456/" + key, - }, + &activities.BundleActivityParams{SourcePath: "/tmp/enduro123456/extract"}, ).Return( &activities.BundleActivityResult{ - RelPath: "enduro4162369760/transfer", - FullPath: "/tmp/2098266580-enduro-transfer/enduro4162369760/transfer", - FullPathBeforeStrip: "/tmp/2098266580-enduro-transfer/enduro4162369760", + FullPath: "/tmp/2098266580-enduro-transfer/enduro4162369760/transfer", }, nil, ) @@ -414,16 +425,20 @@ func (s *ProcessingWorkflowTestSuite) TestPackageRejection() { &activities.DownloadActivityResult{Path: "/tmp/enduro123456/" + key}, nil, ) + s.env.OnActivity(activities.UnarchiveActivityName, sessionCtx, + &activities.UnarchiveParams{SourcePath: "/tmp/enduro123456/" + key}, + ).Return( + &activities.UnarchiveResult{DestPath: "/tmp/enduro123456/extract"}, nil, + ) + s.env.OnActivity(activities.BundleActivityName, sessionCtx, &activities.BundleActivityParams{ - SourcePath: "/tmp/enduro123456/" + key, + SourcePath: "/tmp/enduro123456/extract", TransferDir: s.transferDir, }, ).Return( &activities.BundleActivityResult{ - RelPath: "enduro4162369760/transfer", - FullPath: "/tmp/2098266580-enduro-transfer/enduro4162369760/transfer", - FullPathBeforeStrip: "/tmp/2098266580-enduro-transfer/enduro4162369760", + FullPath: "/tmp/2098266580-enduro-transfer/enduro4162369760/transfer", }, nil, )