Skip to content

Commit

Permalink
Add an unarchive activity
Browse files Browse the repository at this point in the history
Fixes #869

- Move all archive extraction code from the bundle activity to the
  unarchive activity
- Add an `fsutil.SetFileModes()` method to recursively set file modes
  on a directory
- Add an unzipped bag testdata transfer
  • Loading branch information
djjuhasz committed Mar 4, 2024
1 parent aff8ee3 commit 5b74d28
Show file tree
Hide file tree
Showing 15 changed files with 533 additions and 169 deletions.
1 change: 1 addition & 0 deletions cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func main() {
}

w.RegisterActivityWithOptions(activities.NewDownloadActivity(logger, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
w.RegisterActivityWithOptions(activities.NewUnarchiveActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.UnarchiveActivityName})

Check warning on line 139 in cmd/enduro-a3m-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-a3m-worker/main.go#L139

Added line #L139 was not covered by tests
w.RegisterActivityWithOptions(activities.NewBundleActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
w.RegisterActivityWithOptions(a3m.NewCreateAIPActivity(logger, &cfg.A3m, pkgsvc).Execute, temporalsdk_activity.RegisterOptions{Name: a3m.CreateAIPActivityName})
w.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName})
Expand Down
4 changes: 4 additions & 0 deletions cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func main() {
activities.NewDownloadActivity(logger, wsvc).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName},
)
w.RegisterActivityWithOptions(
activities.NewUnarchiveActivity(logger).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.UnarchiveActivityName},
)

Check warning on line 149 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L146-L149

Added lines #L146 - L149 were not covered by tests
w.RegisterActivityWithOptions(
activities.NewBundleActivity(logger).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName},
Expand Down
24 changes: 24 additions & 0 deletions internal/fsutil/fsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package fsutil

import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -48,3 +50,25 @@ func Move(src, dst string) error {

return err
}

// SetFileModes recursively sets the file mode of root and its contents.
func SetFileModes(root string, dirMode, fileMode int) error {
return filepath.WalkDir(root,
func(path string, d os.DirEntry, err error) error {
if err != nil {
return err

Check warning on line 59 in internal/fsutil/fsutil.go

View check run for this annotation

Codecov / codecov/patch

internal/fsutil/fsutil.go#L59

Added line #L59 was not covered by tests
}

mode := fs.FileMode(fileMode)
if d.IsDir() {
mode = fs.FileMode(dirMode)
}

if err := os.Chmod(path, mode); err != nil {
return fmt.Errorf("set permissions: %v", err)

Check warning on line 68 in internal/fsutil/fsutil.go

View check run for this annotation

Codecov / codecov/patch

internal/fsutil/fsutil.go#L68

Added line #L68 was not covered by tests
}

return nil
},
)
}
25 changes: 25 additions & 0 deletions internal/fsutil/fsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,28 @@ func TestMove(t *testing.T) {
assert.Assert(t, fs.Equal(dst, srcManifest))
})
}

func TestSetFileModes(t *testing.T) {
td := fs.NewDir(t, "enduro-test-fsutil",
fs.WithDir("transfer", fs.WithMode(0o755),
fs.WithFile("test1", "I'm a test file.", fs.WithMode(0o644)),
fs.WithDir("subdir", fs.WithMode(0o755),
fs.WithFile("test2", "Another test file.", fs.WithMode(0o644)),
),
),
)

err := fsutil.SetFileModes(td.Join("transfer"), 0o700, 0o600)
assert.NilError(t, err)
assert.Assert(t, fs.Equal(
td.Path(),
fs.Expected(t,
fs.WithDir("transfer", fs.WithMode(0o700),
fs.WithFile("test1", "I'm a test file.", fs.WithMode(0o600)),
fs.WithDir("subdir", fs.WithMode(0o700),
fs.WithFile("test2", "Another test file.", fs.WithMode(0o600)),
),
),
),
))
}
3 changes: 3 additions & 0 deletions internal/testdata/bag/small_bag/bag-info.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Bag-Software-Agent: bagit.py v1.8.1 <https://github.com/LibraryOfCongress/bagit-python>
Bagging-Date: 2023-12-12
Payload-Oxum: 19.1
2 changes: 2 additions & 0 deletions internal/testdata/bag/small_bag/bagit.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
BagIt-Version: 0.97
Tag-File-Character-Encoding: UTF-8
1 change: 1 addition & 0 deletions internal/testdata/bag/small_bag/data/small.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
I am a small file.
1 change: 1 addition & 0 deletions internal/testdata/bag/small_bag/manifest-sha256.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
4450c8a88130a3b397bfc659245c4f0f87a8c79d017a60bdb1bd32f4b51c8133 data/small.txt
3 changes: 3 additions & 0 deletions internal/testdata/bag/small_bag/tagmanifest-sha256.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ac3f0fa6e7763ba403c1bca2b6e785a51bfcd5102fe7cbc1cfcf05be77ffdf24 manifest-sha256.txt
fd696a4957ed3f8329860c7191e518162b99c942b26b42291386da69bb3c2bc8 bag-info.txt
e91f941be5973ff71f1dccbdd1a32d598881893a7f21be516aca743da38b1689 bagit.txt
133 changes: 29 additions & 104 deletions internal/workflow/activities/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

securejoin "github.com/cyphar/filepath-securejoin"
"github.com/go-logr/logr"
"github.com/mholt/archiver/v3"
"github.com/otiai10/copy"
temporal_tools "go.artefactual.dev/tools/temporal"

Expand Down Expand Up @@ -51,9 +50,7 @@ type BundleActivityParams struct {
}

type BundleActivityResult struct {
RelPath string // Path of the transfer relative to the transfer directory.
FullPath string // Full path to the transfer in the worker running the session.
FullPathBeforeStrip string // Same as FullPath but includes the top-level dir even when stripped.
FullPath string // Full path to the transfer in the worker running the session.
}

func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityParams) (*BundleActivityResult, error) {
Expand All @@ -77,22 +74,14 @@ func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityPara
}

if params.IsDir {
res.FullPath, res.FullPathBeforeStrip, err = a.Copy(
ctx,
params.SourcePath,
params.TransferDir,
false,
)
res.FullPath, err = a.Bundle(ctx, params.SourcePath, params.TransferDir)
if err != nil {
err = fmt.Errorf("bundle dir: %v", err)

Check warning on line 79 in internal/workflow/activities/bundle.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/bundle.go#L79

Added line #L79 was not covered by tests
}
} else {
unar := a.Unarchiver(params.SourcePath)
if unar == nil {
res.FullPath, err = a.SingleFile(ctx, params.TransferDir, params.SourcePath)
if err != nil {
err = fmt.Errorf("bundle single file: %v", err)
}
res.FullPathBeforeStrip = res.FullPath
} else {
res.FullPath, res.FullPathBeforeStrip, err = a.Bundle(ctx, unar, params.TransferDir, params.SourcePath, params.StripTopLevelDir)
res.FullPath, err = a.SingleFile(ctx, params.SourcePath, params.TransferDir)
if err != nil {
err = fmt.Errorf("bundle single file: %v", err)

Check warning on line 84 in internal/workflow/activities/bundle.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/bundle.go#L84

Added line #L84 was not covered by tests
}
}
if err != nil {
Expand All @@ -101,51 +90,28 @@ func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityPara

err = unbag(res.FullPath)
if err != nil {
return nil, temporal_tools.NewNonRetryableError(err)
}

res.RelPath, err = filepath.Rel(params.TransferDir, res.FullPath)
if err != nil {
return nil, temporal_tools.NewNonRetryableError(fmt.Errorf(
"error calculating relative path to transfer (base=%q, target=%q): %v",
params.TransferDir, res.FullPath, err,
))
return nil, temporal_tools.NewNonRetryableError(
fmt.Errorf("bundle: unbag: %v", err),
)

Check warning on line 95 in internal/workflow/activities/bundle.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/bundle.go#L93-L95

Added lines #L93 - L95 were not covered by tests
}

if err = setPermissions(res.FullPath); err != nil {
return nil, temporal_tools.NewNonRetryableError(
fmt.Errorf("set permissions: %v", err),
fmt.Errorf("bundle: set permissions: %v", err),

Check warning on line 100 in internal/workflow/activities/bundle.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/bundle.go#L100

Added line #L100 was not covered by tests
)
}

return res, nil
}

// Unarchiver returns the unarchiver suited for the archival format.
func (a *BundleActivity) Unarchiver(sourcePath string) archiver.Unarchiver {
if iface, err := archiver.ByExtension(filepath.Base(sourcePath)); err == nil {
if u, ok := iface.(archiver.Unarchiver); ok {
return u
}
}

r, err := os.Open(sourcePath) // #nosec G304 -- trusted file path.
if err != nil {
return nil
}
defer r.Close()

if u, err := archiver.ByHeader(r); err == nil {
return u
}

return nil
}

// SingleFile bundles a transfer with the downloaded blob in it.
//
// TODO: Write metadata.csv and checksum files to the metadata dir.
func (a *BundleActivity) SingleFile(ctx context.Context, transferDir, sourcePath string) (string, error) {
func (a *BundleActivity) SingleFile(
ctx context.Context,
sourcePath string,
transferDir string,
) (string, error) {
b, err := bundler.NewBundlerWithTempDir(transferDir)
if err != nil {
return "", fmt.Errorf("create bundler: %v", err)
Expand All @@ -170,76 +136,35 @@ func (a *BundleActivity) SingleFile(ctx context.Context, transferDir, sourcePath
}

// Bundle a transfer with the contents found in the archive.
func (a *BundleActivity) Bundle(ctx context.Context, unar archiver.Unarchiver, transferDir, sourcePath string, stripTopLevelDir bool) (string, string, error) {
// Create a new directory for our transfer with the name randomized.
const prefix = "enduro"
tempDir, err := os.MkdirTemp(transferDir, prefix)
func (a *BundleActivity) Bundle(
ctx context.Context,
sourcePath string,
transferDir string,
) (string, error) {
tempDir, err := a.Copy(ctx, sourcePath, transferDir)
if err != nil {
return "", "", fmt.Errorf("error creating temporary directory: %s", err)
}

if err := unar.Unarchive(sourcePath, tempDir); err != nil {
return "", "", fmt.Errorf("error unarchiving file: %v", err)
}

tempDirBeforeStrip := tempDir
if stripTopLevelDir {
tempDir, err = stripDirContainer(tempDir)
if err != nil {
return "", "", err
}
return "", fmt.Errorf("bundle: %v", err)

Check warning on line 146 in internal/workflow/activities/bundle.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/bundle.go#L146

Added line #L146 was not covered by tests
}

// Delete the archive. We still have a copy in the watched source.
_ = os.Remove(sourcePath)

return tempDir, tempDirBeforeStrip, nil
return tempDir, nil
}

// Copy a transfer in the given destination using an intermediate temp. directory.
func (a *BundleActivity) Copy(ctx context.Context, src, dst string, stripTopLevelDir bool) (string, string, error) {
func (a *BundleActivity) Copy(ctx context.Context, src, dst string) (string, error) {
const prefix = "enduro"
tempDir, err := os.MkdirTemp(dst, prefix)
if err != nil {
return "", "", fmt.Errorf("error creating temporary directory: %s", err)
return "", fmt.Errorf("error creating temporary directory: %s", err)

Check warning on line 160 in internal/workflow/activities/bundle.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/bundle.go#L160

Added line #L160 was not covered by tests
}

if err := copy.Copy(src, tempDir); err != nil {
return "", "", fmt.Errorf("error copying transfer: %v", err)
}

tempDirBeforeStrip := tempDir
if stripTopLevelDir {
tempDir, err = stripDirContainer(tempDir)
if err != nil {
return "", "", err
}
return "", fmt.Errorf("error copying transfer: %v", err)

Check warning on line 164 in internal/workflow/activities/bundle.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/bundle.go#L164

Added line #L164 was not covered by tests
}

return tempDir, tempDirBeforeStrip, nil
}

// stripDirContainer strips the top-level directory of a transfer.
func stripDirContainer(path string) (string, error) {
const errPrefix = "error stripping top-level dir"
ff, err := os.Open(filepath.Clean(path))
if err != nil {
return "", fmt.Errorf("%s: cannot open path: %v", errPrefix, err)
}
defer ff.Close()

fis, err := ff.Readdir(2)
if err != nil {
return "", fmt.Errorf("%s: error reading dir: %v", errPrefix, err)
}
if len(fis) != 1 {
return "", fmt.Errorf("%s: directory has more than one child", errPrefix)
}
if !fis[0].IsDir() {
return "", fmt.Errorf("%s: top-level item is not a directory", errPrefix)
}
securePath, _ := securejoin.SecureJoin(path, fis[0].Name())
return securePath, nil
return tempDir, nil
}

// unbag converts a bagged transfer into a standard Archivematica transfer.
Expand Down
30 changes: 7 additions & 23 deletions internal/workflow/activities/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,11 @@ func TestBundleActivity(t *testing.T) {
),
},
{
name: "Bundles a zipped standard transfer",
name: "Bundles a BagIt transfer",
params: &activities.BundleActivityParams{
SourcePath: sourceDir.Join("zipped_transfer", "small.zip"),
TransferDir: destDir.Path(),
StripTopLevelDir: true,
},
wantFs: fs.Expected(t, fs.WithMode(activities.ModeDir),
fs.WithFile("small.txt", "I am a small file.\n", fs.WithMode(activities.ModeFile)),
),
},
{
name: "Bundles a tarred and gzipped bag transfer",
params: &activities.BundleActivityParams{
SourcePath: sourceDir.Join("gzipped_bag", "small_bag.tgz"),
TransferDir: destDir.Path(),
StripTopLevelDir: true,
SourcePath: sourceDir.Join("bag", "small_bag"),
TransferDir: destDir.Path(),
IsDir: true,
},
wantFs: fs.Expected(t, fs.WithMode(activities.ModeDir),
fs.WithFile(
Expand Down Expand Up @@ -148,16 +137,11 @@ e91f941be5973ff71f1dccbdd1a32d598881893a7f21be516aca743da38b1689 bagit.txt
if tt.wantFs != (fs.Manifest{}) {
assert.Assert(t, fs.Equal(res.FullPath, tt.wantFs))
}
assert.NilError(t, err)

if tt.params.StripTopLevelDir {
assert.Equal(t, res.FullPathBeforeStrip, filepath.Dir(res.FullPath))
} else {
assert.Equal(t, res.FullPathBeforeStrip, res.FullPath)
}

p, err := filepath.Rel(destDir.Path(), res.FullPath)
rp, err := filepath.Rel(tt.params.TransferDir, res.FullPath)
assert.NilError(t, err)
assert.Equal(t, res.RelPath, p)
assert.Assert(t, len(rp) > 0)
})
}
}
Loading

0 comments on commit 5b74d28

Please sign in to comment.