Skip to content

Commit

Permalink
Remove Watcher Service from Bundle activity
Browse files Browse the repository at this point in the history
Fixes #870
  • Loading branch information
djjuhasz committed Feb 29, 2024
1 parent 1ff97e0 commit e23712e
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 104 deletions.
2 changes: 1 addition & 1 deletion cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func main() {
}

w.RegisterActivityWithOptions(activities.NewDownloadActivity(logger, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
w.RegisterActivityWithOptions(activities.NewBundleActivity(logger, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
w.RegisterActivityWithOptions(activities.NewBundleActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
w.RegisterActivityWithOptions(a3m.NewCreateAIPActivity(logger, &cfg.A3m, pkgsvc).Execute, temporalsdk_activity.RegisterOptions{Name: a3m.CreateAIPActivityName})
w.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName})

Expand Down
2 changes: 1 addition & 1 deletion cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func main() {
temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName},
)
w.RegisterActivityWithOptions(
activities.NewBundleActivity(logger, wsvc).Execute,
activities.NewBundleActivity(logger).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName},
)
w.RegisterActivityWithOptions(
Expand Down
62 changes: 26 additions & 36 deletions internal/workflow/activities/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/artefactual-sdps/enduro/internal/bagit"
"github.com/artefactual-sdps/enduro/internal/bundler"
"github.com/artefactual-sdps/enduro/internal/watcher"
)

const (
Expand All @@ -28,26 +27,19 @@ const (

type BundleActivity struct {
logger logr.Logger
wsvc watcher.Service
}

func NewBundleActivity(logger logr.Logger, wsvc watcher.Service) *BundleActivity {
return &BundleActivity{logger: logger, wsvc: wsvc}
func NewBundleActivity(logger logr.Logger) *BundleActivity {
return &BundleActivity{logger: logger}
}

type BundleActivityParams struct {
// WatcherName is the name of the watcher that saw the transfer deposit.
WatcherName string
// SourcePath is the path of the transfer file or directory.
SourcePath string

// TransferDir is the target directory for the bundled package.
TransferDir string

// Key is the blob (file) name of the transfer.
Key string

// TempFile is the path to a downloaded transfer file.
TempFile string

// StripTopLevelDir indicates that the top-level directory in an archive
// transfer (e.g. zip, tar) should be removed from the bundled package
// filepaths when true.
Expand All @@ -71,10 +63,8 @@ func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityPara
)

a.logger.V(1).Info("Executing BundleActivity",
"WatcherName", params.WatcherName,
"SourcePath", params.SourcePath,
"TransferDir", params.TransferDir,
"Key", params.Key,
"TempFile", params.TempFile,
"StripTopLevelDir", params.StripTopLevelDir,
"IsDir", params.IsDir,
)
Expand All @@ -87,23 +77,22 @@ func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityPara
}

if params.IsDir {
var w watcher.Watcher
w, err = a.wsvc.ByName(params.WatcherName)
if err == nil {
src, _ := securejoin.SecureJoin(w.Path(), params.Key)
dst := params.TransferDir
res.FullPath, res.FullPathBeforeStrip, err = a.Copy(ctx, src, dst, false)
}
res.FullPath, res.FullPathBeforeStrip, err = a.Copy(
ctx,
params.SourcePath,
params.TransferDir,
false,
)
} else {
unar := a.Unarchiver(params.Key, params.TempFile)
unar := a.Unarchiver(params.SourcePath)
if unar == nil {
res.FullPath, err = a.SingleFile(ctx, params.TransferDir, params.Key, params.TempFile)
res.FullPath, err = a.SingleFile(ctx, params.TransferDir, params.SourcePath)
if err != nil {
err = fmt.Errorf("bundle single file: %v", err)
}
res.FullPathBeforeStrip = res.FullPath
} else {
res.FullPath, res.FullPathBeforeStrip, err = a.Bundle(ctx, unar, params.TransferDir, params.Key, params.TempFile, params.StripTopLevelDir)
res.FullPath, res.FullPathBeforeStrip, err = a.Bundle(ctx, unar, params.TransferDir, params.SourcePath, params.StripTopLevelDir)
}
}
if err != nil {
Expand Down Expand Up @@ -133,19 +122,20 @@ func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityPara
}

// Unarchiver returns the unarchiver suited for the archival format.
func (a *BundleActivity) Unarchiver(key, filename string) archiver.Unarchiver {
if iface, err := archiver.ByExtension(key); err == nil {
func (a *BundleActivity) Unarchiver(sourcePath string) archiver.Unarchiver {
if iface, err := archiver.ByExtension(filepath.Base(sourcePath)); err == nil {
if u, ok := iface.(archiver.Unarchiver); ok {
return u
}
}

file, err := os.Open(filepath.Clean(filename))
r, err := os.Open(sourcePath) // #nosec G304 -- trusted file path.
if err != nil {
return nil
}
defer file.Close() //#nosec G307 -- Errors returned by Close() here do not require specific handling.
if u, err := archiver.ByHeader(file); err == nil {
defer r.Close()

if u, err := archiver.ByHeader(r); err == nil {
return u
}

Expand All @@ -155,19 +145,19 @@ func (a *BundleActivity) Unarchiver(key, filename string) archiver.Unarchiver {
// SingleFile bundles a transfer with the downloaded blob in it.
//
// TODO: Write metadata.csv and checksum files to the metadata dir.
func (a *BundleActivity) SingleFile(ctx context.Context, transferDir, key, tempFile string) (string, error) {
func (a *BundleActivity) SingleFile(ctx context.Context, transferDir, sourcePath string) (string, error) {
b, err := bundler.NewBundlerWithTempDir(transferDir)
if err != nil {
return "", fmt.Errorf("create bundler: %v", err)
}

src, err := os.Open(tempFile) // #nosec G304 -- trusted file path.
src, err := os.Open(sourcePath) // #nosec G304 -- trusted file path.
if err != nil {
return "", fmt.Errorf("open source file: %v", err)
}
defer src.Close()

err = b.Write(filepath.Join("objects", key), src)
err = b.Write(filepath.Join("objects", filepath.Base(sourcePath)), src)
if err != nil {
return "", fmt.Errorf("write file: %v", err)
}
Expand All @@ -180,15 +170,15 @@ func (a *BundleActivity) SingleFile(ctx context.Context, transferDir, key, tempF
}

// Bundle a transfer with the contents found in the archive.
func (a *BundleActivity) Bundle(ctx context.Context, unar archiver.Unarchiver, transferDir, key, tempFile string, stripTopLevelDir bool) (string, string, error) {
func (a *BundleActivity) Bundle(ctx context.Context, unar archiver.Unarchiver, transferDir, sourcePath string, stripTopLevelDir bool) (string, string, error) {
// Create a new directory for our transfer with the name randomized.
const prefix = "enduro"
tempDir, err := os.MkdirTemp(transferDir, prefix)
if err != nil {
return "", "", fmt.Errorf("error creating temporary directory: %s", err)
}

if err := unar.Unarchive(tempFile, tempDir); err != nil {
if err := unar.Unarchive(sourcePath, tempDir); err != nil {
return "", "", fmt.Errorf("error unarchiving file: %v", err)
}

Expand All @@ -201,7 +191,7 @@ func (a *BundleActivity) Bundle(ctx context.Context, unar archiver.Unarchiver, t
}

// Delete the archive. We still have a copy in the watched source.
_ = os.Remove(tempFile)
_ = os.Remove(sourcePath)

return tempDir, tempDirBeforeStrip, nil
}
Expand Down
58 changes: 9 additions & 49 deletions internal/workflow/activities/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,30 @@ import (
"go.artefactual.dev/tools/temporal"
temporalsdk_activity "go.temporal.io/sdk/activity"
temporalsdk_testsuite "go.temporal.io/sdk/testsuite"
"go.uber.org/mock/gomock"
"gotest.tools/v3/assert"
"gotest.tools/v3/fs"

watcherfake "github.com/artefactual-sdps/enduro/internal/watcher/fake"
"github.com/artefactual-sdps/enduro/internal/workflow/activities"
)

func TestBundleActivity(t *testing.T) {
watcherName := "watcher"
sourceDir := fs.NewDir(t, "enduro-bundle-test",
fs.FromDir("../../testdata"),
)
destDir := fs.NewDir(t, "enduro-bundle-test")

type test struct {
name string
params *activities.BundleActivityParams
msvc func(*gomock.Controller) *watcherfake.MockService
watchRec func(*watcherfake.MockWatcherMockRecorder)
wantFs fs.Manifest
wantErr string
name string
params *activities.BundleActivityParams
wantFs fs.Manifest
wantErr string
}
for _, tt := range []test{
{
name: "Bundles a single file",
params: &activities.BundleActivityParams{
WatcherName: watcherName,
SourcePath: sourceDir.Join("single_file_transfer", "small.txt"),
TransferDir: destDir.Path(),
Key: "small.txt",
TempFile: sourceDir.Join("single_file_transfer", "small.txt"),
},
wantFs: fs.Expected(t, fs.WithMode(activities.ModeDir),
fs.WithDir("objects", fs.WithMode(activities.ModeDir),
Expand All @@ -52,43 +45,19 @@ func TestBundleActivity(t *testing.T) {
{
name: "Bundles a local standard transfer directory",
params: &activities.BundleActivityParams{
WatcherName: watcherName,
SourcePath: sourceDir.Join("standard_transfer", "small"),
TransferDir: destDir.Path(),
Key: "small",
TempFile: sourceDir.Join("standard_transfer", "small"),
IsDir: true,
},
msvc: func(ctrl *gomock.Controller) *watcherfake.MockService {
svc := watcherfake.NewMockService(ctrl)
watcher := watcherfake.NewMockWatcher(ctrl)

svc.EXPECT().
ByName(watcherName).
Return(
watcher,
nil,
)

watcher.EXPECT().
Path().
Return(sourceDir.Join("standard_transfer"))

return svc
},
watchRec: func(watcher *watcherfake.MockWatcherMockRecorder) {
watcher.Path().Return(sourceDir.Join("standard_transfer"))
},
wantFs: fs.Expected(t, fs.WithMode(activities.ModeDir),
fs.WithFile("small.txt", "I am a small file.\n", fs.WithMode(activities.ModeFile)),
),
},
{
name: "Bundles a zipped standard transfer",
params: &activities.BundleActivityParams{
WatcherName: watcherName,
SourcePath: sourceDir.Join("zipped_transfer", "small.zip"),
TransferDir: destDir.Path(),
Key: "small.zip",
TempFile: sourceDir.Join("zipped_transfer", "small.zip"),
StripTopLevelDir: true,
},
wantFs: fs.Expected(t, fs.WithMode(activities.ModeDir),
Expand All @@ -98,10 +67,8 @@ func TestBundleActivity(t *testing.T) {
{
name: "Bundles a tarred and gzipped bag transfer",
params: &activities.BundleActivityParams{
WatcherName: watcherName,
SourcePath: sourceDir.Join("gzipped_bag", "small_bag.tgz"),
TransferDir: destDir.Path(),
Key: "small_bag.tgz",
TempFile: sourceDir.Join("gzipped_bag", "small_bag.tgz"),
StripTopLevelDir: true,
},
wantFs: fs.Expected(t, fs.WithMode(activities.ModeDir),
Expand Down Expand Up @@ -157,17 +124,10 @@ e91f941be5973ff71f1dccbdd1a32d598881893a7f21be516aca743da38b1689 bagit.txt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

var wsvc *watcherfake.MockService

ts := &temporalsdk_testsuite.WorkflowTestSuite{}
env := ts.NewTestActivityEnvironment()
ctrl := gomock.NewController(t)
if tt.msvc != nil {
wsvc = tt.msvc(ctrl)
}

env.RegisterActivityWithOptions(
activities.NewBundleActivity(logr.Discard(), wsvc).Execute,
activities.NewBundleActivity(logr.Discard()).Execute,
temporalsdk_activity.RegisterOptions{
Name: activities.BundleActivityName,
},
Expand Down
6 changes: 2 additions & 4 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,10 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
activityOpts,
activities.BundleActivityName,
&activities.BundleActivityParams{
WatcherName: tinfo.req.WatcherName,
SourcePath: tinfo.TempFile,
TransferDir: transferDir,
Key: tinfo.req.Key,
IsDir: tinfo.req.IsDir,
TempFile: tinfo.TempFile,
StripTopLevelDir: tinfo.req.StripTopLevelDir,
IsDir: tinfo.req.IsDir,
},
).Get(activityOpts, &bundleResult)
if err != nil {
Expand Down
18 changes: 5 additions & 13 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
sftpc := sftp_fake.NewMockClient(ctrl)

s.env.RegisterActivityWithOptions(activities.NewDownloadActivity(logger, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
s.env.RegisterActivityWithOptions(activities.NewBundleActivity(logger, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
s.env.RegisterActivityWithOptions(activities.NewBundleActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
s.env.RegisterActivityWithOptions(a3m.NewCreateAIPActivity(logger, &a3m.Config{}, pkgsvc).Execute, temporalsdk_activity.RegisterOptions{Name: a3m.CreateAIPActivityName})
s.env.RegisterActivityWithOptions(activities.NewUploadActivity(nil).Execute, temporalsdk_activity.RegisterOptions{Name: activities.UploadActivityName})
s.env.RegisterActivityWithOptions(activities.NewMoveToPermanentStorageActivity(nil).Execute, temporalsdk_activity.RegisterOptions{Name: activities.MoveToPermanentStorageActivityName})
Expand Down Expand Up @@ -164,10 +164,8 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() {

s.env.OnActivity(activities.BundleActivityName, sessionCtx,
&activities.BundleActivityParams{
WatcherName: watcherName,
SourcePath: "/tmp/enduro123456/" + key,
TransferDir: s.transferDir,
Key: key,
TempFile: "/tmp/enduro123456/" + key,
},
).Return(
&activities.BundleActivityResult{
Expand Down Expand Up @@ -239,10 +237,8 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() {

s.env.OnActivity(activities.BundleActivityName, sessionCtx,
&activities.BundleActivityParams{
WatcherName: watcherName,
SourcePath: "/tmp/enduro123456/" + key,
TransferDir: s.transferDir,
Key: key,
TempFile: "/tmp/enduro123456/" + key,
},
).Return(
&activities.BundleActivityResult{
Expand Down Expand Up @@ -319,9 +315,7 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() {

s.env.OnActivity(activities.BundleActivityName, sessionCtx,
&activities.BundleActivityParams{
WatcherName: watcherName,
Key: key,
TempFile: "/tmp/enduro123456/" + key,
SourcePath: "/tmp/enduro123456/" + key,
},
).Return(
&activities.BundleActivityResult{
Expand Down Expand Up @@ -422,10 +416,8 @@ func (s *ProcessingWorkflowTestSuite) TestPackageRejection() {

s.env.OnActivity(activities.BundleActivityName, sessionCtx,
&activities.BundleActivityParams{
WatcherName: watcherName,
SourcePath: "/tmp/enduro123456/" + key,
TransferDir: s.transferDir,
Key: key,
TempFile: "/tmp/enduro123456/" + key,
},
).Return(
&activities.BundleActivityResult{
Expand Down

0 comments on commit e23712e

Please sign in to comment.