Skip to content

Commit

Permalink
feat(workflows): adds registry syncer (#15277)
Browse files Browse the repository at this point in the history
* feat(workflows): adds registry syncer

* chore(relay): tests secrets updating

* chore(job): adjust db tests

* chore(syncer): removes signaler ticker
  • Loading branch information
MStreet3 authored Nov 21, 2024
1 parent 212bde3 commit 64aeef9
Show file tree
Hide file tree
Showing 20 changed files with 2,069 additions and 28 deletions.
15 changes: 15 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,21 @@ packages:
github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer:
interfaces:
ORM:
github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer:
interfaces:
ORM:
ContractReader:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: contract_reader_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
Handler:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: handler_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
github.com/smartcontractkit/chainlink/v2/core/capabilities/targets:
interfaces:
ContractValueGetter:
2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

// TODO: wire this up to config so we only instantiate it
// if a workflow registry address is provided.
workflowRegistrySyncer := syncer.NewWorkflowRegistry()
workflowRegistrySyncer := syncer.NewNullWorkflowRegistrySyncer()
srvcs = append(srvcs, workflowRegistrySyncer)

var externalPeerWrapper p2ptypes.PeerWrapper
Expand Down
42 changes: 33 additions & 9 deletions core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/relay"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs"
"github.com/smartcontractkit/chainlink/v2/core/utils/testutils/heavyweight"
)
Expand Down Expand Up @@ -1873,6 +1874,7 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) {
c.ID = s.ID
c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow99", addr1) // insert with mismatched name
c.SpecType = job.YamlSpec
c.SecretsID = s.SecretsID
return mustInsertWFJob(t, o, &c)
},
},
Expand All @@ -1892,29 +1894,40 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) {
var c job.WorkflowSpec
c.ID = s.ID
c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow03", addr2) // insert with mismatched owner
c.SecretsID = s.SecretsID
return mustInsertWFJob(t, o, &c)
},
},
wantErr: true,
},
}

for _, tt := range tests {
for i, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := testutils.Context(t)
ks := cltest.NewKeyStore(t, tt.fields.ds)

secretsORM := syncer.NewWorkflowRegistryDS(tt.fields.ds, logger.TestLogger(t))

sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz")
require.NoError(t, err)
tt.args.spec.SecretsID = sql.NullInt64{Int64: sid, Valid: true}

pipelineORM := pipeline.NewORM(tt.fields.ds, logger.TestLogger(t), configtest.NewTestGeneralConfig(t).JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(tt.fields.ds)
o := NewTestORM(t, tt.fields.ds, pipelineORM, bridgesORM, ks)

var wantJobID int32
if tt.args.before != nil {
wantJobID = tt.args.before(t, o, tt.args.spec)
}
ctx := testutils.Context(t)

gotJ, err := o.FindJobIDByWorkflow(ctx, *tt.args.spec)
if (err != nil) != tt.wantErr {
t.Errorf("orm.FindJobByWorkflow() error = %v, wantErr %v", err, tt.wantErr)
return
}

if err == nil {
assert.Equal(t, wantJobID, gotJ, "mismatch job id")
}
Expand All @@ -1936,25 +1949,36 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) {
bridges.NewORM(db),
cltest.NewKeyStore(t, db))
ctx := testutils.Context(t)
secretsORM := syncer.NewWorkflowRegistryDS(db, logger.TestLogger(t))

var sids []int64
for i := 0; i < 3; i++ {
sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz")
require.NoError(t, err)
sids = append(sids, sid)
}

wfYaml1 := pkgworkflows.WFYamlSpec(t, "workflow00", addr1)
s1 := job.WorkflowSpec{
Workflow: wfYaml1,
SpecType: job.YamlSpec,
Workflow: wfYaml1,
SpecType: job.YamlSpec,
SecretsID: sql.NullInt64{Int64: sids[0], Valid: true},
}
wantJobID1 := mustInsertWFJob(t, o, &s1)

wfYaml2 := pkgworkflows.WFYamlSpec(t, "workflow01", addr1)
s2 := job.WorkflowSpec{
Workflow: wfYaml2,
SpecType: job.YamlSpec,
Workflow: wfYaml2,
SpecType: job.YamlSpec,
SecretsID: sql.NullInt64{Int64: sids[1], Valid: true},
}
wantJobID2 := mustInsertWFJob(t, o, &s2)

wfYaml3 := pkgworkflows.WFYamlSpec(t, "workflow00", addr2)
s3 := job.WorkflowSpec{
Workflow: wfYaml3,
SpecType: job.YamlSpec,
Workflow: wfYaml3,
SpecType: job.YamlSpec,
SecretsID: sql.NullInt64{Int64: sids[2], Valid: true},
}
wantJobID3 := mustInsertWFJob(t, o, &s3)

Expand Down Expand Up @@ -1992,7 +2016,7 @@ func mustInsertWFJob(t *testing.T, orm job.ORM, s *job.WorkflowSpec) int32 {
}

err = orm.CreateJob(ctx, &j)
require.NoError(t, err, "failed to insert job with wf spec %v %s", s, s.Workflow)
require.NoError(t, err, "failed to insert job with wf spec %+v %s", s, err)
return j.ID
}

Expand Down
4 changes: 4 additions & 0 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package job

import (
"context"
"database/sql"
"database/sql/driver"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -877,6 +878,9 @@ type WorkflowSpec struct {
WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow.
WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow.
WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow.
BinaryURL string `db:"binary_url"`
ConfigURL string `db:"config_url"`
SecretsID sql.NullInt64 `db:"secrets_id"`
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"`
Expand Down
4 changes: 2 additions & 2 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error {
case Stream:
// 'stream' type has no associated spec, nothing to do here
case Workflow:
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at, spec_type, config)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW(), :spec_type, :config)
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, binary_url, config_url, secrets_id, created_at, updated_at, spec_type, config)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, :binary_url, :config_url, :secrets_id, NOW(), NOW(), :spec_type, :config)
RETURNING id;`
specID, err := tx.prepareQuerySpecID(ctx, sql, jb.WorkflowSpec)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package workflow_registry_syncer_test

import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper"
coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"

"github.com/stretchr/testify/require"
)

func Test_SecretsWorker(t *testing.T) {
var (
ctx = coretestutils.Context(t)
lggr = logger.TestLogger(t)
backendTH = testutils.NewEVMBackendTH(t)
db = pgtest.NewSqlxDB(t)
orm = syncer.NewWorkflowRegistryDS(db, lggr)

giveTicker = time.NewTicker(500 * time.Millisecond)
giveSecretsURL = "https://original-url.com"
donID = uint32(1)
giveWorkflow = RegisterWorkflowCMD{
Name: "test-wf",
DonID: donID,
Status: uint8(1),
SecretsURL: giveSecretsURL,
}
giveContents = "contents"
wantContents = "updated contents"
fetcherFn = func(_ context.Context, _ string) ([]byte, error) {
return []byte(wantContents), nil
}
contractName = syncer.ContractName
forceUpdateSecretsEvent = string(syncer.ForceUpdateSecretsEvent)
)

defer giveTicker.Stop()

// fill ID with randomd data
var giveID [32]byte
_, err := rand.Read((giveID)[:])
require.NoError(t, err)
giveWorkflow.ID = giveID

// Deploy a test workflow_registry
wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client())
backendTH.Backend.Commit()
require.NoError(t, err)

lggr.Infof("deployed workflow registry at %s\n", wfRegistryAddr.Hex())

// Build the ContractReader config
contractReaderCfg := evmtypes.ChainReaderConfig{
Contracts: map[string]evmtypes.ChainContractReader{
contractName: {
ContractPollingFilter: evmtypes.ContractPollingFilter{
GenericEventNames: []string{forceUpdateSecretsEvent},
},
ContractABI: workflow_registry_wrapper.WorkflowRegistryABI,
Configs: map[string]*evmtypes.ChainReaderDefinition{
forceUpdateSecretsEvent: {
ChainSpecificName: forceUpdateSecretsEvent,
ReadType: evmtypes.Event,
},
},
},
},
}

contractReaderCfgBytes, err := json.Marshal(contractReaderCfg)
require.NoError(t, err)

contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes)
require.NoError(t, err)

err = contractReader.Bind(ctx, []types.BoundContract{{Name: contractName, Address: wfRegistryAddr.Hex()}})
require.NoError(t, err)

// Seed the DB
hash, err := crypto.Keccak256(append(backendTH.ContractsOwner.From[:], []byte(giveSecretsURL)...))
require.NoError(t, err)
giveHash := hex.EncodeToString(hash)

gotID, err := orm.Create(ctx, giveSecretsURL, giveHash, giveContents)
require.NoError(t, err)

gotSecretsURL, err := orm.GetSecretsURLByID(ctx, gotID)
require.NoError(t, err)
require.Equal(t, giveSecretsURL, gotSecretsURL)

// verify the DB
contents, err := orm.GetContents(ctx, giveSecretsURL)
require.NoError(t, err)
require.Equal(t, contents, giveContents)

// Create the worker
worker := syncer.NewWorkflowRegistry(
lggr,
orm,
contractReader,
fetcherFn,
wfRegistryAddr.Hex(),
syncer.WithTicker(giveTicker.C),
)

servicetest.Run(t, worker)

// setup contract state to allow the secrets to be updated
updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true)
updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true)
registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow)

// generate a log event
requestForceUpdateSecrets(t, backendTH, wfRegistryC, giveSecretsURL)

// Require the secrets contents to eventually be updated
require.Eventually(t, func() bool {
secrets, err := orm.GetContents(ctx, giveSecretsURL)
lggr.Debugf("got secrets %v", secrets)
require.NoError(t, err)
return secrets == wantContents
}, 5*time.Second, time.Second)
}

func updateAuthorizedAddress(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
addresses []common.Address,
_ bool,
) {
t.Helper()
_, err := wfRegC.UpdateAuthorizedAddresses(th.ContractsOwner, addresses, true)
require.NoError(t, err, "failed to update authorised addresses")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
gotAddresses, err := wfRegC.GetAllAuthorizedAddresses(&bind.CallOpts{
From: th.ContractsOwner.From,
})
require.NoError(t, err)
require.ElementsMatch(t, addresses, gotAddresses)
}

func updateAllowedDONs(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
donIDs []uint32,
allowed bool,
) {
t.Helper()
_, err := wfRegC.UpdateAllowedDONs(th.ContractsOwner, donIDs, allowed)
require.NoError(t, err, "failed to update DONs")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
gotDons, err := wfRegC.GetAllAllowedDONs(&bind.CallOpts{
From: th.ContractsOwner.From,
})
require.NoError(t, err)
require.ElementsMatch(t, donIDs, gotDons)
}

type RegisterWorkflowCMD struct {
Name string
ID [32]byte
DonID uint32
Status uint8
BinaryURL string
ConfigURL string
SecretsURL string
}

func registerWorkflow(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
input RegisterWorkflowCMD,
) {
t.Helper()
_, err := wfRegC.RegisterWorkflow(th.ContractsOwner, input.Name, input.ID, input.DonID,
input.Status, input.BinaryURL, input.ConfigURL, input.SecretsURL)
require.NoError(t, err, "failed to register workflow")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}

func requestForceUpdateSecrets(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
secretsURL string,
) {
_, err := wfRegC.RequestForceUpdateSecrets(th.ContractsOwner, secretsURL)
require.NoError(t, err)
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}
Loading

0 comments on commit 64aeef9

Please sign in to comment.