From 601bb3e59faedd2cc0499d52e6ea4acfc303554d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Mon, 29 Apr 2024 20:40:50 +0000 Subject: [PATCH] Fix use of processingMCP.xml --- hack/ccp/hack/processingMCP.xml | 169 ++++++++++++++++++++++ hack/ccp/hack/transfer.sh | 3 + hack/ccp/internal/controller/jobs.go | 47 +++--- hack/ccp/internal/controller/package.go | 44 +++--- hack/ccp/internal/controller/transfer.go | 80 ++++++++++ hack/ccp/internal/workflow/config_test.go | 19 +++ 6 files changed, 327 insertions(+), 35 deletions(-) create mode 100644 hack/ccp/hack/processingMCP.xml create mode 100644 hack/ccp/internal/controller/transfer.go create mode 100644 hack/ccp/internal/workflow/config_test.go diff --git a/hack/ccp/hack/processingMCP.xml b/hack/ccp/hack/processingMCP.xml new file mode 100644 index 000000000..28d68e8b6 --- /dev/null +++ b/hack/ccp/hack/processingMCP.xml @@ -0,0 +1,169 @@ + + + + + + 5e58066d-e113-4383-b20b-f301ed4d751c + 8d29eb3d-a8a8-4347-806e-3d8227ed44a1 + + + + 01c651cb-c174-4ba4-b985-1d87a44d6754 + 414da421-b83f-4648-895f-a34840e3c3f5 + + + + accea2bf-ba74-4a3a-bb97-614775c74459 + e0a39199-c62a-4a2f-98de-e9d1116460a8 + + + + 087d27be-c719-47d8-9bbb-9a7d8b609c44 + 4dec164b-79b0-4459-8505-8095af9655b5 + + + + cb8e5706-e73f-472f-ad9b-d1236af8095f + 612e3609-ce9a-4df6-a9a3-63d634d2d934 + + + + 7509e7dc-1e1b-4dce-8d21-e130515fce73 + 612e3609-ce9a-4df6-a9a3-63d634d2d934 + + + + a2ba5278-459a-4638-92d9-38eb1588717d + 44a7c397-8187-4fd2-b8f7-c61737c4df49 + + + + bb194013-597c-4e4a-8493-b36d190f8717 + 7065d256-2f47-4b7d-baec-2c4699626121 + + + + f19926dd-8fb5-4c79-8ade-c83f61f55b40 + 85b1e45d-8f98-4cae-8336-72f40e12cbef + + + + 82ee9ad2-2c74-4c7c-853e-e4eaf68fc8b6 + 0a24787c-00e3-4710-b324-90e792bfb484 + + + + f09847c2-ee51-429a-9478-a860477f6b8d + d97297c7-2b49-4cfe-8c9f-0613d63ed763 + + + + cd844b6e-ab3c-4bc6-b34f-7103f88715de + /api/v2/location/default/DS/ + + + + 56eebd45-5600-4768-a8c2-ec0114555a3d + e9eaef1e-c2e0-4e3b-b942-bfb537162795 + + + + 70fc7040-d4fb-4d19-a0e6-792387ca1006 + 3e891cc4-39d2-4989-a001-5107a009a223 + + + + eeb23509-57e2-4529-8857-9d62525db048 + 5727faac-88af-40e8-8c10-268644b0142d + + + + 498f7a6d-1b8c-431a-aa5d-83f14f3c5e65 + 972fce6c-52c8-4c00-99b9-d6814e377974 + + + + 01d64f58-8295-4b7b-9cab-8f1b153a504f + 9475447c-9889-430c-9477-6287a9574c5b + + + + 2d32235c-02d4-4686-88a6-96f4d6c7b1c3 + 9efab23c-31dc-4cbd-a39d-bb1665460cbe + + + + 8ce07e94-6130-4987-96f0-2399ad45c5c2 + 76befd52-14c3-44f9-838f-15a4e01624b0 + + + + 7a024896-c4f7-4808-a240-44c87c762bc5 + 5b3c8268-5b33-4b70-b1aa-0e4540fe03d1 + + + + 153c5f41-3cfb-47ba-9150-2dd44ebc27df + b7ce05f0-9d94-4b3e-86cc-d4b2c6dba546 + + + + bd899573-694e-4d33-8c9b-df0af802437d + 2dc3f487-e4b0-4e07-a4b3-6216ed24ca14 + + + + b320ce81-9982-408a-9502-097d0daa48fa + /api/v2/location/default/AS/ + + + + d0dfa5fc-e3c2-4638-9eda-f96eea1070e0 + 65273f18-5b4e-4944-af4f-09be175a88e8 + + + + dec97e3c-5598-4b99-b26e-f87a435a6b7f + 01d80b27-4ad1-4bd1-8f8d-f819f18bf685 + + + + de909a42-c5b5-46e1-9985-c031b50e9d30 + 1e0df175-d56d-450d-8bee-7df1dc7ae815 + + + + 92879a29-45bf-4f0b-ac43-e64474f0f2f9 + 6eb8ebe7-fab3-4e4c-b9d7-14de17625baa + + + + 856d2d65-cd25-49fa-8da9-cabb78292894 + 63767e4b-9ce8-4fe2-8724-65cc1f763de0 + + + + 1dad74a2-95df-4825-bbba-dca8b91d2371 + 697c0883-798d-4af7-b8b6-101c7f709cd5 + + + + 7e81f94e-6441-4430-a12d-76df09181b66 + 77355172-b437-4324-9dcc-e2607ad27cb1 + + + + 390d6507-5029-4dae-bcd4-ce7178c9b560 + 63be6081-bee8-4cf5-a453-91893e31940f + + + + 97a5ddc0-d4e0-43ac-a571-9722405a0a9b + 7f5244fe-590b-4e38-beaf-0cf1ccb9e71b + + + diff --git a/hack/ccp/hack/transfer.sh b/hack/ccp/hack/transfer.sh index 265a7246f..39a777305 100755 --- a/hack/ccp/hack/transfer.sh +++ b/hack/ccp/hack/transfer.sh @@ -2,7 +2,10 @@ set -x +__dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" + transfer=$(mktemp -d) +cp $__dir/processingMCP.xml $transfer touch $transfer/hello.txt touch $transfer/bye.txt mv $transfer ~/.ccp/am-pipeline-data/watchedDirectories/activeTransfers/standardTransfer/ diff --git a/hack/ccp/internal/controller/jobs.go b/hack/ccp/internal/controller/jobs.go index 9d7e1772b..f9f8fc67c 100644 --- a/hack/ccp/internal/controller/jobs.go +++ b/hack/ccp/internal/controller/jobs.go @@ -160,18 +160,19 @@ func newOutputDecisionJob(j *job) (*outputDecisionJob, error) { } func (l *outputDecisionJob) exec(ctx context.Context) (uuid.UUID, error) { + if err := l.j.pkg.reload(ctx); err != nil { + return uuid.Nil, fmt.Errorf("reload: %v", err) + } + if err := l.j.save(ctx); err != nil { + return uuid.Nil, fmt.Errorf("save: %v", err) + } + // TODO: store active agent with l.j.p.saveValue. return uuid.Nil, nil } // nextChainDecisionJob. // -// 1. Reload *Package (pull state from db into memory since it can be changed by client scripts). -// 2. Persist job in database: https://github.com/artefactual/archivematica/blob/fbda1a91d6dff086e7124fa1d7a3c7953d8755bb/src/MCPServer/lib/server/jobs/base.py#L76. -// 3. Load preconfigured choices, and resolve the job if the decision is preconfigured. -// 4. Otherwise, mark as awaiting decision, put on hold. Decision must be made by the user via the API. -// 5. ... -// // Manager: linkTaskManagerChoice. // Class: NextChainDecisionJob(DecisionJob). type nextChainDecisionJob struct { @@ -194,17 +195,18 @@ func newNextChainDecisionJob(j *job) (*nextChainDecisionJob, error) { } func (l *nextChainDecisionJob) exec(ctx context.Context) (uuid.UUID, error) { + if err := l.j.pkg.reload(ctx); err != nil { + return uuid.Nil, fmt.Errorf("reload: %v", err) + } + if err := l.j.save(ctx); err != nil { + return uuid.Nil, fmt.Errorf("save: %v", err) + } + // When we have a preconfigured choice. - choice, err := l.j.pkg.PreconfiguredChoice(l.j.wl.ID) - if err != nil { + if chainID, err := l.j.pkg.PreconfiguredChoice(l.j.wl.ID); err != nil { return uuid.Nil, err - } - if choice != nil { - if ret, err := uuid.Parse(choice.GoToChain); err != nil { - l.j.logger.Info("Preconfigured choice is not a valid UUID.", "choice", choice.GoToChain, "err", err) - } else { - return ret, nil - } + } else if chainID != uuid.Nil { + return chainID, nil } // Build decision point and await resolution. @@ -264,6 +266,13 @@ func newUpdateContextDecisionJob(j *job) (*updateContextDecisionJob, error) { } func (l *updateContextDecisionJob) exec(ctx context.Context) (uuid.UUID, error) { + if err := l.j.pkg.reload(ctx); err != nil { + return uuid.Nil, fmt.Errorf("reload: %v", err) + } + if err := l.j.save(ctx); err != nil { + return uuid.Nil, fmt.Errorf("save: %v", err) + } + id := l.j.wl.ExitCodes[0].LinkID if id == nil || *id == uuid.Nil { return uuid.Nil, errors.New("ops") @@ -314,11 +323,15 @@ func (l *directoryClientScriptJob) exec(ctx context.Context) (uuid.UUID, error) return uuid.Nil, err } - if ec, ok := l.j.wl.ExitCodes[taskResult.ExitCode]; ok && ec.LinkID != nil { + if ec, ok := l.j.wl.ExitCodes[taskResult.ExitCode]; ok { + if ec.LinkID == nil { + return uuid.Nil, io.EOF // End of chain. + } return *ec.LinkID, nil } + if l.j.wl.FallbackLinkID == uuid.Nil { - return uuid.Nil, io.EOF + return uuid.Nil, io.EOF // End of chain. } return l.j.wl.FallbackLinkID, nil diff --git a/hack/ccp/internal/controller/package.go b/hack/ccp/internal/controller/package.go index 95e19463d..3ed8b247d 100644 --- a/hack/ccp/internal/controller/package.go +++ b/hack/ccp/internal/controller/package.go @@ -109,51 +109,59 @@ func (p *Package) String() string { return p.Name() } -func (p *Package) PreconfiguredChoice(linkID uuid.UUID) (*workflow.Choice, error) { - li := linkID.String() - - // TODO: automate "Approve standard transfer" until we can submit decisions. - if li == "0c94e6b5-4714-4bec-82c8-e187e0c04d77" { - return &workflow.Choice{ - AppliesTo: "0c94e6b5-4714-4bec-82c8-e187e0c04d77", - GoToChain: "b4567e89-9fea-4256-99f5-a88987026488", - }, nil +// PreconfiguredChoice looks up a pre-configured choice in the processing +// configuration file that is part of the package. +func (p *Package) PreconfiguredChoice(linkID uuid.UUID) (uuid.UUID, error) { + // TODO: auto-approval should only happen if requested by the user, but + // this is convenient during initial development. + if chainID := Transfers.Decide(linkID); chainID != uuid.Nil { + return chainID, nil } f, err := os.Open(filepath.Join(p.path, "processingMCP.xml")) if err != nil { if os.IsNotExist(err) { - return nil, nil + return uuid.Nil, nil } - return nil, err + return uuid.Nil, err } - // TODO: this could be cached if the file isn't going to change. + // TODO: this could be cached if the file isn't going to change, but + // Archivematica is not doing any caching. choices, err := workflow.ParseConfig(f) if err != nil { - return nil, err + return uuid.Nil, err } - var match *workflow.Choice + var chainID uuid.UUID + li := linkID.String() for _, choice := range choices { if choice.AppliesTo == li { - match = &choice + if id, err := uuid.Parse(choice.GoToChain); err != nil { + return uuid.Nil, err + } else { + chainID = id + } break } } // Resort to automated config. // TODO: allow user to choose the system processing config to use. - if match == nil { + if chainID == uuid.Nil { for _, choice := range workflow.AutomatedConfig.Choices.Choices { if choice.AppliesTo == li { - match = &choice + if id, err := uuid.Parse(choice.GoToChain); err != nil { + return uuid.Nil, err + } else { + chainID = id + } break } } } - return match, nil + return chainID, nil } // Decide resolves an awaiting decision. diff --git a/hack/ccp/internal/controller/transfer.go b/hack/ccp/internal/controller/transfer.go new file mode 100644 index 000000000..98db276c4 --- /dev/null +++ b/hack/ccp/internal/controller/transfer.go @@ -0,0 +1,80 @@ +package controller + +import ( + "github.com/google/uuid" +) + +type TransferType struct { + // WatcheDir is the watched directory used to trigger this type of transfer. + WatchedDir string + + // Chain is the chain used to start processing if approval is omitted. + Chain uuid.UUID + + // Link is the link used to start processing if approval is omitted. + Link uuid.UUID + + // DecisionLink is the chain link used to require user approval. + DecisionLink uuid.UUID + + // Decision is the approved chain. + Decision uuid.UUID +} + +type TransferTypes []TransferType + +// Decide resolves the workflow decision point that implements the approval. +func (t TransferTypes) Decide(linkID uuid.UUID) uuid.UUID { + for _, item := range t { + if item.DecisionLink == linkID { + return item.Decision + } + } + return uuid.Nil +} + +// List of transfer types supported by Archivematica. +var Transfers TransferTypes = []TransferType{ + { + WatchedDir: "activeTransfers/standardTransfer", + Chain: uuid.MustParse("6953950b-c101-4f4c-a0c3-0cd0684afe5e"), + Link: uuid.MustParse("045c43ae-d6cf-44f7-97d6-c8a602748565"), + DecisionLink: uuid.MustParse("0c94e6b5-4714-4bec-82c8-e187e0c04d77"), + Decision: uuid.MustParse("b4567e89-9fea-4256-99f5-a88987026488"), + }, + { + WatchedDir: "activeTransfers/zippedDirectory", + Chain: uuid.MustParse("f3caceff-5ad5-4bad-b98c-e73f8cd03450"), + Link: uuid.MustParse("541f5994-73b0-45bb-9cb5-367c06a21be7"), + }, + { + WatchedDir: "activeTransfers/baggitDirectory", + Chain: uuid.MustParse("c75ef451-2040-4511-95ac-3baa0f019b48"), + Link: uuid.MustParse("154dd501-a344-45a9-97e3-b30093da35f5"), + }, + { + WatchedDir: "activeTransfers/baggitZippedDirectory", + Chain: uuid.MustParse("167dc382-4ab1-4051-8e22-e7f1c1bf3e6f"), + Link: uuid.MustParse("3229e01f-adf3-4294-85f7-4acb01b3fbcf"), + }, + { + WatchedDir: "activeTransfers/Dspace", + Chain: uuid.MustParse("1cb2ef0e-afe8-45b5-8d8f-a1e120f06605"), + Link: uuid.MustParse("bda96b35-48c7-44fc-9c9e-d7c5a05016c1"), + }, + { + WatchedDir: "activeTransfers/maildir", + Chain: uuid.MustParse("d381cf76-9313-415f-98a1-55c91e4d78e0"), + Link: uuid.MustParse("da2d650e-8ce3-4b9a-ac97-8ca4744b019f"), + }, + { + WatchedDir: "activeTransfers/TRIM", + Chain: uuid.MustParse("e4a59e3e-3dba-4eb5-9cf1-c1fb3ae61fa9"), + Link: uuid.MustParse("2483c25a-ade8-4566-a259-c6c37350d0d6"), + }, + { + WatchedDir: "activeTransfers/dataverseTransfer", + Chain: uuid.MustParse("10c00bc8-8fc2-419f-b593-cf5518695186"), + Link: uuid.MustParse("0af6b163-5455-4a76-978b-e35cc9ee445f"), + }, +} diff --git a/hack/ccp/internal/workflow/config_test.go b/hack/ccp/internal/workflow/config_test.go new file mode 100644 index 000000000..03f823455 --- /dev/null +++ b/hack/ccp/internal/workflow/config_test.go @@ -0,0 +1,19 @@ +package workflow_test + +import ( + "os" + "testing" + + "github.com/artefactual/archivematica/hack/ccp/internal/workflow" + "gotest.tools/v3/assert" +) + +func TestParseConfig(t *testing.T) { + f, err := os.Open("../../hack/processingMCP.xml") + assert.NilError(t, err) + t.Cleanup(func() { f.Close() }) + + config, err := workflow.ParseConfig(f) + assert.NilError(t, err) + assert.Equal(t, len(config), 32) +}