From d5bfbd530166c947d4da611b1cf9d0d012d52f8f Mon Sep 17 00:00:00 2001 From: krehermann <16602512+krehermann@users.noreply.github.com> Date: Fri, 21 Feb 2025 18:30:36 -0700 Subject: [PATCH 1/3] feat(deployment): high fidelty offchain client --- core/scripts/go.mod | 1 + .../changeset/jd_register_nodes_test.go | 14 +- .../environment/memory/jd_job_client_test.go | 296 ++++++++++ .../memory/jd_job_service_client.go | 538 ++++++++++++++++++ deployment/environment/memory/job_client.go | 333 ----------- deployment/environment/memory/node.go | 63 ++ .../environment/memory/offchain_client.go | 207 +++++++ deployment/go.mod | 3 +- integration-tests/go.mod | 1 + integration-tests/load/go.mod | 1 + system-tests/lib/go.mod | 1 + system-tests/tests/go.mod | 1 + 12 files changed, 1120 insertions(+), 339 deletions(-) create mode 100644 deployment/environment/memory/jd_job_client_test.go create mode 100644 deployment/environment/memory/jd_job_service_client.go delete mode 100644 deployment/environment/memory/job_client.go create mode 100644 deployment/environment/memory/offchain_client.go diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 1f2e0a7dd94..44741ca3a10 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -366,6 +366,7 @@ require ( github.com/supranational/blst v0.3.13 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tendermint/go-amino v0.16.0 // indirect + github.com/test-go/testify v1.1.4 // indirect github.com/testcontainers/testcontainers-go v0.35.0 // indirect github.com/theodesp/go-heaps v0.0.0-20190520121037-88e35354fe0a // indirect github.com/tidwall/gjson v1.18.0 // indirect diff --git a/deployment/data-streams/changeset/jd_register_nodes_test.go b/deployment/data-streams/changeset/jd_register_nodes_test.go index c0b539a1f15..731d013a6b6 100644 --- a/deployment/data-streams/changeset/jd_register_nodes_test.go +++ b/deployment/data-streams/changeset/jd_register_nodes_test.go @@ -6,6 +6,8 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" + "github.com/smartcontractkit/chainlink-integrations/evm/testutils" + nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" "github.com/smartcontractkit/chainlink/deployment" "github.com/smartcontractkit/chainlink/deployment/common/changeset" "github.com/smartcontractkit/chainlink/deployment/environment/memory" @@ -14,19 +16,21 @@ import ( func TestRegisterNodesWithJD(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) lggr := logger.TestLogger(t) e := memory.NewMemoryEnvironment(t, lggr, zapcore.InfoLevel, memory.MemoryEnvironmentConfig{Chains: 1, Nodes: 1}) - nodeP2pKey := e.NodeIDs[0] - jobClient, ok := e.Offchain.(*memory.JobClient) require.True(t, ok, "expected Offchain to be of type *memory.JobClient") - require.Lenf(t, jobClient.Nodes, 1, "expected exactly 1 node") + + resp, err := jobClient.ListNodes(ctx, &nodev1.ListNodesRequest{}) + require.NoError(t, err) + require.Lenf(t, resp.Nodes, 1, "expected exactly 1 node") require.Emptyf(t, jobClient.RegisteredNodes, "no registered nodes expected") - csaKey := jobClient.Nodes[nodeP2pKey].Keys.CSA.PublicKeyString() + csaKey := resp.Nodes[0].GetPublicKey() - e, err := changeset.Apply(t, e, nil, + e, err = changeset.Apply(t, e, nil, changeset.Configure( deployment.CreateLegacyChangeSet(RegisterNodesWithJD), RegisterNodesInput{ diff --git a/deployment/environment/memory/jd_job_client_test.go b/deployment/environment/memory/jd_job_client_test.go new file mode 100644 index 00000000000..07f80b45017 --- /dev/null +++ b/deployment/environment/memory/jd_job_client_test.go @@ -0,0 +1,296 @@ +package memory_test + +import ( + "fmt" + "testing" + + "github.com/hashicorp/consul/sdk/freeport" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + + "github.com/smartcontractkit/chainlink-integrations/evm/testutils" + jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job" + "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes" + "github.com/smartcontractkit/chainlink/deployment" + "github.com/smartcontractkit/chainlink/deployment/environment/memory" +) + +func TestJobClientJobAPI(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + chains, _ := memory.NewMemoryChains(t, 1, 1) + ports := freeport.GetN(t, 1) + testNode := memory.NewNode(t, ports[0], chains, nil, zapcore.DebugLevel, false, deployment.CapabilityRegistryConfig{}) + + // Set up the JobClient with a mock node + nodeID := "node-1" + externalJobID := "f1ac5211-ab79-4c31-ba1c-0997b72db466" + + jobSpecToml := testJobProposalTOML(t, externalJobID) + nodes := map[string]memory.Node{ + nodeID: *testNode, + } + jobClient := memory.NewMemoryJobClient(nodes) + + // Create a mock request + req := &jobv1.ProposeJobRequest{ + NodeId: nodeID, + Spec: jobSpecToml, + Labels: []*ptypes.Label{ + { + Key: "label-key", + Value: ptr("label-value"), + }, + }, + } + + // Call the ProposeJob method + resp, err := jobClient.ProposeJob(ctx, req) + + // Validate the response + require.NoError(t, err) + assert.NotNil(t, resp) + assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status) + assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus) + assert.Equal(t, jobSpecToml, resp.Proposal.Spec) + assert.Equal(t, externalJobID, resp.Proposal.JobId) + + expectedProposalID := resp.Proposal.Id + expectedProposal := resp.Proposal + + t.Run("GetJob", func(t *testing.T) { + t.Run("existing job", func(t *testing.T) { + // Create a mock request + getReq := &jobv1.GetJobRequest{ + IdOneof: &jobv1.GetJobRequest_Id{Id: externalJobID}, + } + + getResp, err := jobClient.GetJob(ctx, getReq) + require.NoError(t, err) + assert.NotNil(t, getResp) + assert.Equal(t, externalJobID, getResp.Job.Id) + }) + + t.Run("non-existing job", func(t *testing.T) { + // Create a mock request + getReq := &jobv1.GetJobRequest{ + IdOneof: &jobv1.GetJobRequest_Id{Id: "non-existing-job"}, + } + + getResp, err := jobClient.GetJob(ctx, getReq) + require.Error(t, err) + assert.Nil(t, getResp) + }) + + }) + + t.Run("ListJobs", func(t *testing.T) { + type listCase struct { + name string + req *jobv1.ListJobsRequest + checkErr func(t *testing.T, err error) + checkResp func(t *testing.T, resp *jobv1.ListJobsResponse) + } + cases := []listCase{ + { + name: "no filters", + req: &jobv1.ListJobsRequest{}, + checkResp: func(t *testing.T, resp *jobv1.ListJobsResponse) { + assert.NotNil(t, resp) + assert.Len(t, resp.Jobs, 1) + assert.Equal(t, externalJobID, resp.Jobs[0].Id) + }, + }, + { + name: "with id filter", + req: &jobv1.ListJobsRequest{ + Filter: &jobv1.ListJobsRequest_Filter{ + Ids: []string{externalJobID}, + }, + }, + checkResp: func(t *testing.T, resp *jobv1.ListJobsResponse) { + assert.NotNil(t, resp) + assert.Len(t, resp.Jobs, 1) + assert.Equal(t, externalJobID, resp.Jobs[0].Id) + }, + }, + { + name: "non-existing job id", + req: &jobv1.ListJobsRequest{ + Filter: &jobv1.ListJobsRequest_Filter{ + Ids: []string{"non-existing-job-id"}, + }, + }, + checkResp: func(t *testing.T, resp *jobv1.ListJobsResponse) { + require.NotNil(t, resp) + assert.Len(t, resp.Jobs, 0) + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + // Call the ListJobs method + listResp, err := jobClient.ListJobs(ctx, c.req) + if c.checkErr != nil { + c.checkErr(t, err) + return + } + require.NoError(t, err) + c.checkResp(t, listResp) + }) + } + }) + + t.Run("GetProposal", func(t *testing.T) { + t.Run("existing proposal", func(t *testing.T) { + // Create a mock request + getReq := &jobv1.GetProposalRequest{ + Id: expectedProposalID, + } + + getResp, err := jobClient.GetProposal(ctx, getReq) + require.NoError(t, err) + assert.NotNil(t, getResp) + assert.Equal(t, expectedProposal, getResp.Proposal) + }) + + t.Run("non-existing proposal", func(t *testing.T) { + // Create a mock request + getReq := &jobv1.GetProposalRequest{ + Id: "non-existing-job", + } + + getResp, err := jobClient.GetProposal(ctx, getReq) + require.Error(t, err) + assert.Nil(t, getResp) + }) + }) + + t.Run("ListProposals", func(t *testing.T) { + type listCase struct { + name string + req *jobv1.ListProposalsRequest + checkErr func(t *testing.T, err error) + checkResp func(t *testing.T, resp *jobv1.ListProposalsResponse) + } + cases := []listCase{ + + { + name: "no filters", + req: &jobv1.ListProposalsRequest{}, + checkResp: func(t *testing.T, resp *jobv1.ListProposalsResponse) { + assert.NotNil(t, resp) + assert.Len(t, resp.Proposals, 1) + assert.Equal(t, expectedProposalID, resp.Proposals[0].Id) + assert.Equal(t, expectedProposal, resp.Proposals[0]) + }, + }, + { + name: "with id filter", + req: &jobv1.ListProposalsRequest{ + Filter: &jobv1.ListProposalsRequest_Filter{ + Ids: []string{expectedProposalID}, + }, + }, + checkResp: func(t *testing.T, resp *jobv1.ListProposalsResponse) { + assert.NotNil(t, resp) + assert.Len(t, resp.Proposals, 1) + assert.Equal(t, expectedProposalID, resp.Proposals[0].Id) + assert.Equal(t, expectedProposal, resp.Proposals[0]) + }, + }, + + { + name: "non-existing job id", + req: &jobv1.ListProposalsRequest{ + Filter: &jobv1.ListProposalsRequest_Filter{ + Ids: []string{"non-existing-job-id"}, + }, + }, + checkResp: func(t *testing.T, resp *jobv1.ListProposalsResponse) { + require.NotNil(t, resp) + assert.Len(t, resp.Proposals, 0, "expected no proposals %v", resp.Proposals) + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + listResp, err := jobClient.ListProposals(ctx, c.req) + if c.checkErr != nil { + c.checkErr(t, err) + return + } + require.NoError(t, err) + c.checkResp(t, listResp) + }) + } + }) + +} + +func ptr(s string) *string { + return &s +} + +// need some non-ocr job type to avoid the ocr validation and the p2pwrapper check +func testJobProposalTOML(t *testing.T, externalJobId string) string { + tomlString := ` +type = "standardcapabilities" +schemaVersion = 1 +externalJobID = "%s" +name = "hacking" +forwardingAllowed = false +command = "/home/capabilities/nowhere" +config = "" +` + return fmt.Sprintf(tomlString, externalJobId) +} + +func setupOne(t *testing.T) *memory.JobClient { + t.Helper() + ctx := testutils.Context(t) + // Create a new memory node + + chains, _ := memory.NewMemoryChains(t, 1, 1) + ports := freeport.GetN(t, 1) + testNode := memory.NewNode(t, ports[0], chains, nil, zapcore.DebugLevel, false, deployment.CapabilityRegistryConfig{}) + + // Set up the JobClient with a mock node + nodeID := "node-1" + externalJobID := "f1ac5211-ab79-4c31-ba1c-0997b72db466" + // need some non-ocr job type to avoid the ocr validation and the p2pwrapper check + + jobSpecToml := testJobProposalTOML(t, externalJobID) + nodes := map[string]memory.Node{ + nodeID: *testNode, + } + jobClient := memory.NewMemoryJobClient(nodes) + + // Create a mock request + req := &jobv1.ProposeJobRequest{ + NodeId: nodeID, + Spec: jobSpecToml, + Labels: []*ptypes.Label{ + { + Key: "label-key", + Value: ptr("label-value"), + }, + }, + } + + // Call the ProposeJob method + resp, err := jobClient.ProposeJob(ctx, req) + + // Validate the response + require.NoError(t, err) + assert.NotNil(t, resp) + assert.Equal(t, externalJobID, resp.Proposal.Id) + assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status) + assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus) + assert.Equal(t, jobSpecToml, resp.Proposal.Spec) + assert.Equal(t, externalJobID, resp.Proposal.JobId) + return jobClient +} diff --git a/deployment/environment/memory/jd_job_service_client.go b/deployment/environment/memory/jd_job_service_client.go new file mode 100644 index 00000000000..e784e6d7014 --- /dev/null +++ b/deployment/environment/memory/jd_job_service_client.go @@ -0,0 +1,538 @@ +package memory + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/google/uuid" + "github.com/pelletier/go-toml/v2" + "google.golang.org/grpc" + + jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job" + "github.com/smartcontractkit/chainlink/v2/core/services/feeds" + "github.com/smartcontractkit/chainlink/v2/core/services/job" +) + +type JobServiceClient struct { + jobStore + proposalStore + nodeStore +} + +func NewJobServiceClient(ns nodeStore) *JobServiceClient { + return &JobServiceClient{ + jobStore: newMapJobStore(), + proposalStore: newMapProposalStore(), + nodeStore: ns, + } +} + +func (j *JobServiceClient) BatchProposeJob(ctx context.Context, in *jobv1.BatchProposeJobRequest, opts ...grpc.CallOption) (*jobv1.BatchProposeJobResponse, error) { + targets := make(map[string]Node) + for _, nodeID := range in.NodeIds { + node, err := j.nodeStore.get(nodeID) + if err != nil { + return nil, fmt.Errorf("node not found: %s", nodeID) + } + targets[nodeID] = *node + } + if len(targets) == 0 { + return nil, errors.New("no nodes found") + } + out := &jobv1.BatchProposeJobResponse{ + SuccessResponses: make(map[string]*jobv1.ProposeJobResponse), + FailedResponses: make(map[string]*jobv1.ProposeJobFailure), + } + var totalErr error + for id := range targets { + singleReq := &jobv1.ProposeJobRequest{ + NodeId: id, + Spec: in.Spec, + Labels: in.Labels, + } + resp, err := j.ProposeJob(ctx, singleReq) + if err != nil { + out.FailedResponses[id] = &jobv1.ProposeJobFailure{ + ErrorMessage: err.Error(), + } + totalErr = errors.Join(totalErr, fmt.Errorf("failed to propose job for node %s: %w", id, err)) + } + out.SuccessResponses[id] = resp + } + return out, totalErr + +} + +func (j *JobServiceClient) UpdateJob(ctx context.Context, in *jobv1.UpdateJobRequest, opts ...grpc.CallOption) (*jobv1.UpdateJobResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j *JobServiceClient) GetJob(ctx context.Context, in *jobv1.GetJobRequest, opts ...grpc.CallOption) (*jobv1.GetJobResponse, error) { + // implementation detail that job id and uuid is the same + jb, err := j.jobStore.get(in.GetId()) + if err != nil { + return nil, fmt.Errorf("failed to get job: %w", err) + } + // TODO CCIP-3108 implement me + return &jobv1.GetJobResponse{ + Job: jb, + }, nil +} + +func (j *JobServiceClient) GetProposal(ctx context.Context, in *jobv1.GetProposalRequest, opts ...grpc.CallOption) (*jobv1.GetProposalResponse, error) { + p, err := j.proposalStore.get(in.Id) + if err != nil { + return nil, fmt.Errorf("failed to get proposal: %w", err) + } + return &jobv1.GetProposalResponse{ + Proposal: p, + }, nil +} + +func (j *JobServiceClient) ListJobs(ctx context.Context, in *jobv1.ListJobsRequest, opts ...grpc.CallOption) (*jobv1.ListJobsResponse, error) { + jbs, err := j.jobStore.list(in.Filter) + if err != nil { + return nil, fmt.Errorf("failed to list jobs: %w", err) + } + + return &jobv1.ListJobsResponse{ + Jobs: jbs, + }, nil + +} + +func (j *JobServiceClient) ListProposals(ctx context.Context, in *jobv1.ListProposalsRequest, opts ...grpc.CallOption) (*jobv1.ListProposalsResponse, error) { + proposals, err := j.proposalStore.list(in.Filter) + if err != nil { + return nil, fmt.Errorf("failed to list proposals: %w", err) + } + return &jobv1.ListProposalsResponse{ + Proposals: proposals, + }, nil +} + +// ProposeJob is used to propose a job to the node +// It auto approves the job +func (j *JobServiceClient) ProposeJob(ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption) (*jobv1.ProposeJobResponse, error) { + n, err := j.nodeStore.get(in.NodeId) + if err != nil { + return nil, fmt.Errorf("node not found: %w", err) + } + _, err = job.ValidateSpec(in.Spec) + if err != nil { + return nil, fmt.Errorf("failed to validate job spec: %w", err) + } + var extractor ExternalJobIDExtractor + err = toml.Unmarshal([]byte(in.Spec), &extractor) + if err != nil { + return nil, fmt.Errorf("failed to load job spec: %w", err) + } + if extractor.ExternalJobID == "" { + return nil, fmt.Errorf("externalJobID is required") + } + + appProposalID, err := n.App.GetFeedsService().ProposeJob(ctx, &feeds.ProposeJobArgs{ + FeedsManagerID: 1, + Spec: in.Spec, + }) + if err != nil { + return nil, fmt.Errorf("failed to propose job: %w", err) + } + + // auto approve for now + proposedSpec, err := n.App.GetFeedsService().ListSpecsByJobProposalIDs(ctx, []int64{appProposalID}) + if err != nil { + return nil, fmt.Errorf("failed to list specs: %w", err) + } + if len(proposedSpec) != 1 { + return nil, fmt.Errorf("expected 1 spec, got %d", len(proposedSpec)) + } + err = n.App.GetFeedsService().ApproveSpec(ctx, proposedSpec[0].ID, true) + if err != nil { + return nil, fmt.Errorf("failed to approve job: %w", err) + } + + storeProposalID := uuid.Must(uuid.NewRandom()).String() + p := &jobv1.ProposeJobResponse{Proposal: &jobv1.Proposal{ + // make the proposal id the same as the job id for further reference + // if you are changing this make sure to change the GetProposal and ListJobs method implementation + Id: storeProposalID, + // Auto approve for now + Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, + DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, + Spec: in.Spec, + JobId: extractor.ExternalJobID, + CreatedAt: nil, + UpdatedAt: nil, + AckedAt: nil, + ResponseReceivedAt: nil, + }} + + // save the proposal and job + { + var ( + storeErr error // used to cleanup if we fail to save the job + job *jobv1.Job + ) + + storeErr = j.proposalStore.put(storeProposalID, p.Proposal) + if err != nil { + return nil, fmt.Errorf("failed to save proposal: %w", err) + } + defer func() { + // cleanup if we fail to save the job + if storeErr != nil { + j.proposalStore.delete(storeProposalID) + } + }() + + job, storeErr = j.jobStore.get(extractor.ExternalJobID) + if storeErr != nil && !errors.Is(storeErr, errNoExist) { + return nil, fmt.Errorf("failed to get job: %w", storeErr) + } + if errors.Is(storeErr, errNoExist) { + job = &jobv1.Job{ + Id: extractor.ExternalJobID, + Uuid: extractor.ExternalJobID, + NodeId: in.NodeId, + ProposalIds: []string{storeProposalID}, + Labels: in.Labels, + } + } else { + job.ProposalIds = append(job.ProposalIds, storeProposalID) + } + storeErr = j.jobStore.put(extractor.ExternalJobID, job) + if storeErr != nil { + return nil, fmt.Errorf("failed to save job: %w", storeErr) + } + } + return p, nil +} + +func (j *JobServiceClient) RevokeJob(ctx context.Context, in *jobv1.RevokeJobRequest, opts ...grpc.CallOption) (*jobv1.RevokeJobResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j *JobServiceClient) DeleteJob(ctx context.Context, in *jobv1.DeleteJobRequest, opts ...grpc.CallOption) (*jobv1.DeleteJobResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +type ExternalJobIDExtractor struct { + ExternalJobID string `toml:"externalJobID"` +} + +var errNoExist = errors.New("does not exist") + +// proposalStore is an interface for storing job proposals. +type proposalStore interface { + put(proposalID string, proposal *jobv1.Proposal) error + get(proposalID string) (*jobv1.Proposal, error) + list(filter *jobv1.ListProposalsRequest_Filter) ([]*jobv1.Proposal, error) + delete(proposalID string) error +} + +// jobStore is an interface for storing jobs. +type jobStore interface { + put(jobID string, job *jobv1.Job) error + get(jobID string) (*jobv1.Job, error) + list(filter *jobv1.ListJobsRequest_Filter) ([]*jobv1.Job, error) + delete(jobID string) error +} + +// nodeStore is an interface for storing nodes. +type nodeStore interface { + put(nodeID string, node *Node) error + get(nodeID string) (*Node, error) + list() []*Node + asMap() map[string]*Node + delete(nodeID string) error +} + +var _ jobStore = &mapJobStore{} + +type mapJobStore struct { + mu sync.Mutex + jobs map[string]*jobv1.Job + nodesToJobIDs map[string][]string + uuidToJobIDs map[string][]string +} + +func newMapJobStore() *mapJobStore { + return &mapJobStore{ + jobs: make(map[string]*jobv1.Job), + nodesToJobIDs: make(map[string][]string), + uuidToJobIDs: make(map[string][]string), + } +} + +func (m *mapJobStore) put(jobID string, job *jobv1.Job) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.jobs == nil { + m.jobs = make(map[string]*jobv1.Job) + m.nodesToJobIDs = make(map[string][]string) + m.uuidToJobIDs = make(map[string][]string) + } + m.jobs[jobID] = job + if _, ok := m.nodesToJobIDs[job.NodeId]; !ok { + m.nodesToJobIDs[job.NodeId] = make([]string, 0) + } + m.nodesToJobIDs[job.NodeId] = append(m.nodesToJobIDs[job.NodeId], jobID) + if _, ok := m.uuidToJobIDs[job.Uuid]; !ok { + m.uuidToJobIDs[job.Uuid] = make([]string, 0) + } + m.uuidToJobIDs[job.Uuid] = append(m.uuidToJobIDs[job.Uuid], jobID) + return nil +} + +func (m *mapJobStore) get(jobID string) (*jobv1.Job, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.jobs == nil { + return nil, fmt.Errorf("%w: job not found: %s", errNoExist, jobID) + } + job, ok := m.jobs[jobID] + if !ok { + return nil, fmt.Errorf("%w: job not found: %s", errNoExist, jobID) + } + return job, nil +} + +func (m *mapJobStore) list(filter *jobv1.ListJobsRequest_Filter) ([]*jobv1.Job, error) { + if filter != nil && filter.NodeIds != nil && filter.Uuids != nil && filter.Ids != nil { + return nil, errors.New("only one of NodeIds, Uuids or Ids can be set") + } + m.mu.Lock() + defer m.mu.Unlock() + if m.jobs == nil { + return []*jobv1.Job{}, nil + } + + jobs := make([]*jobv1.Job, 0, len(m.jobs)) + + if filter == nil || (filter.NodeIds == nil && filter.Uuids == nil && filter.Ids == nil) { + for _, job := range m.jobs { + jobs = append(jobs, job) + } + return jobs, nil + } + + wantedJobIDs := make(map[string]struct{}) + // use node ids to contruct wanted job ids + if filter.NodeIds != nil { + for _, nodeID := range filter.NodeIds { + jobIDs, ok := m.nodesToJobIDs[nodeID] + if !ok { + continue + } + for _, jobID := range jobIDs { + wantedJobIDs[jobID] = struct{}{} + } + } + } else if filter.Uuids != nil { + for _, uuid := range filter.Uuids { + jobIDs, ok := m.uuidToJobIDs[uuid] + if !ok { + continue + } + for _, jobID := range jobIDs { + wantedJobIDs[jobID] = struct{}{} + } + } + + } else if filter.Ids != nil { + for _, jobID := range filter.Ids { + wantedJobIDs[jobID] = struct{}{} + } + } + for _, job := range m.jobs { + if _, ok := wantedJobIDs[job.Id]; ok { + jobs = append(jobs, job) + } + } + return jobs, nil +} + +func (m *mapJobStore) delete(jobID string) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.jobs == nil { + return fmt.Errorf("job not found: %s", jobID) + } + job, ok := m.jobs[jobID] + if !ok { + return nil + } + delete(m.jobs, jobID) + delete(m.nodesToJobIDs, job.NodeId) + delete(m.uuidToJobIDs, job.Uuid) + return nil +} + +var _ proposalStore = &mapProposalStore{} + +type mapProposalStore struct { + mu sync.Mutex + proposals map[string]*jobv1.Proposal + jobIdToProposalId map[string]string +} + +func newMapProposalStore() *mapProposalStore { + return &mapProposalStore{ + proposals: make(map[string]*jobv1.Proposal), + jobIdToProposalId: make(map[string]string), + } +} + +func (m *mapProposalStore) put(proposalID string, proposal *jobv1.Proposal) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.proposals == nil { + m.proposals = make(map[string]*jobv1.Proposal) + } + if m.jobIdToProposalId == nil { + m.jobIdToProposalId = make(map[string]string) + } + m.proposals[proposalID] = proposal + m.jobIdToProposalId[proposal.JobId] = proposalID + return nil +} +func (m *mapProposalStore) get(proposalID string) (*jobv1.Proposal, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.proposals == nil { + return nil, fmt.Errorf("proposal not found: %s", proposalID) + } + proposal, ok := m.proposals[proposalID] + if !ok { + return nil, fmt.Errorf("%w: proposal not found: %s", errNoExist, proposalID) + } + return proposal, nil +} +func (m *mapProposalStore) list(filter *jobv1.ListProposalsRequest_Filter) ([]*jobv1.Proposal, error) { + if filter != nil && filter.GetIds() != nil && filter.GetJobIds() != nil { + return nil, errors.New("only one of Ids or JobIds can be set") + } + m.mu.Lock() + defer m.mu.Unlock() + if m.proposals == nil { + return nil, nil + } + proposals := make([]*jobv1.Proposal, 0) + // all proposals + if filter == nil || (filter.GetIds() == nil && filter.GetJobIds() == nil) { + for _, proposal := range m.proposals { + proposals = append(proposals, proposal) + } + return proposals, nil + } + + // can't both be nil at this point + wantedProposalIDs := filter.GetIds() + if wantedProposalIDs == nil { + wantedProposalIDs = make([]string, 0) + for _, jobId := range filter.GetJobIds() { + proposalID, ok := m.jobIdToProposalId[jobId] + if !ok { + continue + } + wantedProposalIDs = append(wantedProposalIDs, proposalID) + } + } + + for _, want := range wantedProposalIDs { + p, ok := m.proposals[want] + if !ok { + continue + } + proposals = append(proposals, p) + } + return proposals, nil +} +func (m *mapProposalStore) delete(proposalID string) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.proposals == nil { + return fmt.Errorf("proposal not found: %s", proposalID) + } + + delete(m.proposals, proposalID) + return nil +} + +var _ nodeStore = &mapNodeStore{} + +type mapNodeStore struct { + mu sync.Mutex + nodes map[string]*Node +} + +func newMapNodeStore(n map[string]*Node) *mapNodeStore { + return &mapNodeStore{ + nodes: n, + } +} +func (m *mapNodeStore) put(nodeID string, node *Node) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.nodes == nil { + m.nodes = make(map[string]*Node) + } + m.nodes[nodeID] = node + return nil +} +func (m *mapNodeStore) get(nodeID string) (*Node, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.nodes == nil { + return nil, fmt.Errorf("node not found: %s", nodeID) + } + node, ok := m.nodes[nodeID] + if !ok { + return nil, fmt.Errorf("%w: node not found: %s", errNoExist, nodeID) + } + return node, nil +} +func (m *mapNodeStore) list() []*Node { + m.mu.Lock() + defer m.mu.Unlock() + if m.nodes == nil { + return nil + } + nodes := make([]*Node, 0) + for _, node := range m.nodes { + nodes = append(nodes, node) + } + return nodes +} +func (m *mapNodeStore) delete(nodeID string) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.nodes == nil { + return fmt.Errorf("node not found: %s", nodeID) + } + _, ok := m.nodes[nodeID] + if !ok { + return nil + } + delete(m.nodes, nodeID) + return nil +} + +func (m *mapNodeStore) asMap() map[string]*Node { + m.mu.Lock() + defer m.mu.Unlock() + if m.nodes == nil { + return nil + } + nodes := make(map[string]*Node) + for k, v := range m.nodes { + nodes[k] = v + } + return nodes +} diff --git a/deployment/environment/memory/job_client.go b/deployment/environment/memory/job_client.go deleted file mode 100644 index 2b8adec6a14..00000000000 --- a/deployment/environment/memory/job_client.go +++ /dev/null @@ -1,333 +0,0 @@ -package memory - -import ( - "context" - "errors" - "fmt" - "slices" - "strings" - - "github.com/pelletier/go-toml/v2" - "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/timestamppb" - - csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa" - jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job" - nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" - "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes" - - "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/validate" - ocr2validate "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate" - "github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap" -) - -type JobClient struct { - Nodes map[string]Node - RegisteredNodes map[string]Node -} - -func (j JobClient) BatchProposeJob(ctx context.Context, in *jobv1.BatchProposeJobRequest, opts ...grpc.CallOption) (*jobv1.BatchProposeJobResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) UpdateJob(ctx context.Context, in *jobv1.UpdateJobRequest, opts ...grpc.CallOption) (*jobv1.UpdateJobResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) DisableNode(ctx context.Context, in *nodev1.DisableNodeRequest, opts ...grpc.CallOption) (*nodev1.DisableNodeResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) EnableNode(ctx context.Context, in *nodev1.EnableNodeRequest, opts ...grpc.CallOption) (*nodev1.EnableNodeResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j *JobClient) RegisterNode(ctx context.Context, in *nodev1.RegisterNodeRequest, opts ...grpc.CallOption) (*nodev1.RegisterNodeResponse, error) { - if in == nil || in.GetPublicKey() == "" { - return nil, errors.New("public key is required") - } - - if _, exists := j.RegisteredNodes[in.GetPublicKey()]; exists { - return nil, fmt.Errorf("node with Public Key %s is already registered", in.GetPublicKey()) - } - - var foundNode *Node - for _, node := range j.Nodes { - if node.Keys.CSA.ID() == in.GetPublicKey() { - foundNode = &node - break - } - } - - if foundNode == nil { - return nil, fmt.Errorf("node with Public Key %s is not known", in.GetPublicKey()) - } - - j.RegisteredNodes[in.GetPublicKey()] = *foundNode - - return &nodev1.RegisterNodeResponse{ - Node: &nodev1.Node{ - Id: in.GetPublicKey(), - PublicKey: in.GetPublicKey(), - IsEnabled: true, - IsConnected: true, - Labels: in.Labels, - }, - }, nil -} - -func (j JobClient) UpdateNode(ctx context.Context, in *nodev1.UpdateNodeRequest, opts ...grpc.CallOption) (*nodev1.UpdateNodeResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) GetKeypair(ctx context.Context, in *csav1.GetKeypairRequest, opts ...grpc.CallOption) (*csav1.GetKeypairResponse, error) { - // TODO implement me - panic("implement me") -} - -func (j JobClient) ListKeypairs(ctx context.Context, in *csav1.ListKeypairsRequest, opts ...grpc.CallOption) (*csav1.ListKeypairsResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) GetNode(ctx context.Context, in *nodev1.GetNodeRequest, opts ...grpc.CallOption) (*nodev1.GetNodeResponse, error) { - n, ok := j.Nodes[in.Id] - if !ok { - return nil, errors.New("node not found") - } - return &nodev1.GetNodeResponse{ - Node: &nodev1.Node{ - Id: in.Id, - PublicKey: n.Keys.CSA.PublicKeyString(), - IsEnabled: true, - IsConnected: true, - }, - }, nil -} - -func (j JobClient) ListNodes(ctx context.Context, in *nodev1.ListNodesRequest, opts ...grpc.CallOption) (*nodev1.ListNodesResponse, error) { - var nodes []*nodev1.Node - for id, n := range j.Nodes { - node := &nodev1.Node{ - Id: id, - PublicKey: n.Keys.CSA.ID(), - IsEnabled: true, - IsConnected: true, - Labels: []*ptypes.Label{ - { - Key: "p2p_id", - Value: ptr(n.Keys.PeerID.String()), - }, - }, - } - if ApplyNodeFilter(in.Filter, node) { - nodes = append(nodes, node) - } - } - return &nodev1.ListNodesResponse{ - Nodes: nodes, - }, nil -} - -func (j JobClient) ListNodeChainConfigs(ctx context.Context, in *nodev1.ListNodeChainConfigsRequest, opts ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error) { - if in.Filter == nil { - return nil, errors.New("filter is required") - } - if len(in.Filter.NodeIds) != 1 { - return nil, errors.New("only one node id is supported") - } - n, ok := j.Nodes[in.Filter.NodeIds[0]] - if !ok { - return nil, fmt.Errorf("node id not found: %s", in.Filter.NodeIds[0]) - } - chainConfigs, err := n.JDChainConfigs() - if err != nil { - return nil, err - } - return &nodev1.ListNodeChainConfigsResponse{ - ChainConfigs: chainConfigs, - }, nil -} - -func (j JobClient) GetJob(ctx context.Context, in *jobv1.GetJobRequest, opts ...grpc.CallOption) (*jobv1.GetJobResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) GetProposal(ctx context.Context, in *jobv1.GetProposalRequest, opts ...grpc.CallOption) (*jobv1.GetProposalResponse, error) { - // we are using proposal id as job id - // refer to ListJobs and ProposeJobs for the assignment of proposal id - for _, node := range j.Nodes { - jobs, _, err := node.App.JobORM().FindJobs(ctx, 0, 1000) - if err != nil { - return nil, err - } - for _, job := range jobs { - if job.ExternalJobID.String() == in.Id { - specBytes, err := toml.Marshal(job.CCIPSpec) - if err != nil { - return nil, fmt.Errorf("failed to marshal job spec: %w", err) - } - return &jobv1.GetProposalResponse{ - Proposal: &jobv1.Proposal{ - Id: job.ExternalJobID.String(), - Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, - Spec: string(specBytes), - JobId: job.ExternalJobID.String(), - }, - }, nil - } - } - } - return nil, fmt.Errorf("job not found: %s", in.Id) -} - -func (j JobClient) ListJobs(ctx context.Context, in *jobv1.ListJobsRequest, opts ...grpc.CallOption) (*jobv1.ListJobsResponse, error) { - jobResponse := make([]*jobv1.Job, 0) - for _, req := range in.Filter.NodeIds { - if _, ok := j.Nodes[req]; !ok { - return nil, fmt.Errorf("node not found: %s", req) - } - n := j.Nodes[req] - jobs, _, err := n.App.JobORM().FindJobs(ctx, 0, 1000) - if err != nil { - return nil, err - } - for _, job := range jobs { - jobResponse = append(jobResponse, &jobv1.Job{ - Id: string(job.ID), - Uuid: job.ExternalJobID.String(), - NodeId: req, - // based on the current implementation, there is only one proposal per job - // see ProposeJobs for ProposalId assignment - ProposalIds: []string{job.ExternalJobID.String()}, - CreatedAt: timestamppb.New(job.CreatedAt), - UpdatedAt: timestamppb.New(job.CreatedAt), - }) - } - } - return &jobv1.ListJobsResponse{ - Jobs: jobResponse, - }, nil -} - -func (j JobClient) ListProposals(ctx context.Context, in *jobv1.ListProposalsRequest, opts ...grpc.CallOption) (*jobv1.ListProposalsResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) ProposeJob(ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption) (*jobv1.ProposeJobResponse, error) { - n := j.Nodes[in.NodeId] - // TODO: Use FMS - jb, err := validate.ValidatedCCIPSpec(in.Spec) - if err != nil { - if !strings.Contains(err.Error(), "the only supported type is currently 'ccip'") { - return nil, err - } - // check if it's offchainreporting2 job - jb, err = ocr2validate.ValidatedOracleSpecToml( - ctx, - n.App.GetConfig().OCR2(), - n.App.GetConfig().Insecure(), - in.Spec, - nil, // not required for validation - ) - if err != nil { - if !strings.Contains(err.Error(), "the only supported type is currently 'offchainreporting2'") { - return nil, err - } - // check if it's bootstrap job - jb, err = ocrbootstrap.ValidatedBootstrapSpecToml(in.Spec) - if err != nil { - return nil, fmt.Errorf("failed to validate job spec only ccip, bootstrap and offchainreporting2 are supported: %w", err) - } - } - } - err = n.App.AddJobV2(ctx, &jb) - if err != nil { - return nil, err - } - return &jobv1.ProposeJobResponse{Proposal: &jobv1.Proposal{ - // make the proposal id the same as the job id for further reference - // if you are changing this make sure to change the GetProposal and ListJobs method implementation - Id: jb.ExternalJobID.String(), - // Auto approve for now - Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, - DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, - Spec: in.Spec, - JobId: jb.ExternalJobID.String(), - CreatedAt: nil, - UpdatedAt: nil, - AckedAt: nil, - ResponseReceivedAt: nil, - }}, nil -} - -func (j JobClient) RevokeJob(ctx context.Context, in *jobv1.RevokeJobRequest, opts ...grpc.CallOption) (*jobv1.RevokeJobResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) DeleteJob(ctx context.Context, in *jobv1.DeleteJobRequest, opts ...grpc.CallOption) (*jobv1.DeleteJobResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) ReplayLogs(selectorToBlock map[uint64]uint64) error { - for _, node := range j.Nodes { - if err := node.ReplayLogs(selectorToBlock); err != nil { - return err - } - } - return nil -} - -func NewMemoryJobClient(nodesByPeerID map[string]Node) *JobClient { - return &JobClient{nodesByPeerID, make(map[string]Node)} -} - -func ApplyNodeFilter(filter *nodev1.ListNodesRequest_Filter, node *nodev1.Node) bool { - if filter == nil { - return true - } - if len(filter.Ids) > 0 { - idx := slices.IndexFunc(filter.Ids, func(id string) bool { - return node.Id == id - }) - if idx < 0 { - return false - } - } - for _, selector := range filter.Selectors { - idx := slices.IndexFunc(node.Labels, func(label *ptypes.Label) bool { - return label.Key == selector.Key - }) - if idx < 0 { - return false - } - label := node.Labels[idx] - - switch selector.Op { - case ptypes.SelectorOp_IN: - values := strings.Split(*selector.Value, ",") - found := slices.Contains(values, *label.Value) - if !found { - return false - } - case ptypes.SelectorOp_EQ: - if *label.Value != *selector.Value { - return false - } - case ptypes.SelectorOp_EXIST: - // do nothing - default: - panic("unimplemented selector") - } - } - return true -} diff --git a/deployment/environment/memory/node.go b/deployment/environment/memory/node.go index 0f81bb60b19..e98d9541777 100644 --- a/deployment/environment/memory/node.go +++ b/deployment/environment/memory/node.go @@ -2,6 +2,9 @@ package memory import ( "context" + "crypto/rand" + + "encoding/hex" "fmt" "math/big" "net" @@ -16,6 +19,7 @@ import ( gethtypes "github.com/ethereum/go-ethereum/core/types" chainsel "github.com/smartcontractkit/chain-selectors" "github.com/stretchr/testify/require" + "github.com/test-go/testify/mock" "go.uber.org/zap/zapcore" "golang.org/x/exp/maps" @@ -35,6 +39,7 @@ import ( "github.com/smartcontractkit/chainlink-integrations/evm/assets" "github.com/smartcontractkit/chainlink-integrations/evm/client" v2toml "github.com/smartcontractkit/chainlink-integrations/evm/config/toml" + "github.com/smartcontractkit/chainlink-integrations/evm/testutils" evmutils "github.com/smartcontractkit/chainlink-integrations/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" @@ -48,8 +53,13 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey" "github.com/smartcontractkit/chainlink/v2/core/services/relay" "github.com/smartcontractkit/chainlink/v2/core/utils" + "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/smartcontractkit/chainlink/v2/core/utils/testutils/heavyweight" "github.com/smartcontractkit/chainlink/v2/plugins" + + pb "github.com/smartcontractkit/chainlink-protos/orchestrator/feedsmanager" + feeds2 "github.com/smartcontractkit/chainlink/v2/core/services/feeds" + feedsMocks "github.com/smartcontractkit/chainlink/v2/core/services/feeds/mocks" ) type Node struct { @@ -377,6 +387,9 @@ func NewNode( }) keys := CreateKeys(t, app, chains, solchains) + //JD + + setupJD(t, app) return &Node{ App: app, Chains: slices.Concat( @@ -598,3 +611,53 @@ func (e KeystoreSim) Eth() keystore.Eth { func (e KeystoreSim) CSA() keystore.CSA { return e.csa } + +func setupJD(t *testing.T, app chainlink.Application) { + secret := randomBytes32(t) + pkey, err := crypto.PublicKeyFromHex(hex.EncodeToString(secret[:])) + m := feeds2.RegisterManagerParams{ + Name: "In memory env test", + URI: "http://dev.null:8080", + PublicKey: *pkey, + } + f := app.GetFeedsService() + connManager := feedsMocks.NewConnectionsManager(t) + connManager.On("Connect", mock.Anything).Maybe() + connManager.On("GetClient", mock.Anything).Maybe().Return(noopFeedsClient{}, nil) + connManager.On("Close").Maybe().Return() + connManager.On("IsConnected", mock.Anything).Maybe().Return(true) + f.Unsafe_SetConnectionsManager(connManager) + + _, err = f.RegisterManager(testutils.Context(t), m) + require.NoError(t, err) +} + +func randomBytes32(t *testing.T) []byte { + t.Helper() + b := make([]byte, 32) + _, err := rand.Read(b) + require.NoError(t, err) + return b +} + +type noopFeedsClient struct{} + +func (n noopFeedsClient) ApprovedJob(context.Context, *pb.ApprovedJobRequest) (*pb.ApprovedJobResponse, error) { + return &pb.ApprovedJobResponse{}, nil +} + +func (n noopFeedsClient) Healthcheck(context.Context, *pb.HealthcheckRequest) (*pb.HealthcheckResponse, error) { + return &pb.HealthcheckResponse{}, nil +} + +func (n noopFeedsClient) UpdateNode(context.Context, *pb.UpdateNodeRequest) (*pb.UpdateNodeResponse, error) { + return &pb.UpdateNodeResponse{}, nil +} + +func (n noopFeedsClient) RejectedJob(context.Context, *pb.RejectedJobRequest) (*pb.RejectedJobResponse, error) { + return &pb.RejectedJobResponse{}, nil +} + +func (n noopFeedsClient) CancelledJob(context.Context, *pb.CancelledJobRequest) (*pb.CancelledJobResponse, error) { + return &pb.CancelledJobResponse{}, nil +} diff --git a/deployment/environment/memory/offchain_client.go b/deployment/environment/memory/offchain_client.go new file mode 100644 index 00000000000..a97280956f7 --- /dev/null +++ b/deployment/environment/memory/offchain_client.go @@ -0,0 +1,207 @@ +package memory + +import ( + "context" + "errors" + "fmt" + "slices" + "strings" + + "google.golang.org/grpc" + + csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa" + nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" + "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes" + "github.com/smartcontractkit/chainlink/deployment" +) + +var _ deployment.OffchainClient = &JobClient{} + +type JobClient struct { + // Nodes map[string]Node + RegisteredNodes map[string]Node + nodeStore + *JobServiceClient +} + +func NewMemoryJobClient(nodesByPeerID map[string]Node) *JobClient { + m := make(map[string]*Node) + for id, node := range nodesByPeerID { + m[id] = &node + } + ns := newMapNodeStore(m) + return &JobClient{ + // Nodes: nodesByPeerID, + RegisteredNodes: make(map[string]Node), + JobServiceClient: NewJobServiceClient(ns), + nodeStore: ns, + } +} + +func (j JobClient) EnableNode(ctx context.Context, in *nodev1.EnableNodeRequest, opts ...grpc.CallOption) (*nodev1.EnableNodeResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j JobClient) DisableNode(ctx context.Context, in *nodev1.DisableNodeRequest, opts ...grpc.CallOption) (*nodev1.DisableNodeResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j *JobClient) RegisterNode(ctx context.Context, in *nodev1.RegisterNodeRequest, opts ...grpc.CallOption) (*nodev1.RegisterNodeResponse, error) { + if in == nil || in.GetPublicKey() == "" { + return nil, errors.New("public key is required") + } + + if _, exists := j.RegisteredNodes[in.GetPublicKey()]; exists { + return nil, fmt.Errorf("node with Public Key %s is already registered", in.GetPublicKey()) + } + + var foundNode *Node + for _, node := range j.nodeStore.list() { + if node.Keys.CSA.ID() == in.GetPublicKey() { + foundNode = node + break + } + } + + if foundNode == nil { + return nil, fmt.Errorf("node with Public Key %s is not known", in.GetPublicKey()) + } + + j.RegisteredNodes[in.GetPublicKey()] = *foundNode + + return &nodev1.RegisterNodeResponse{ + Node: &nodev1.Node{ + Id: in.GetPublicKey(), + PublicKey: in.GetPublicKey(), + IsEnabled: true, + IsConnected: true, + Labels: in.Labels, + }, + }, nil +} + +func (j JobClient) UpdateNode(ctx context.Context, in *nodev1.UpdateNodeRequest, opts ...grpc.CallOption) (*nodev1.UpdateNodeResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j JobClient) GetKeypair(ctx context.Context, in *csav1.GetKeypairRequest, opts ...grpc.CallOption) (*csav1.GetKeypairResponse, error) { + // TODO implement me + panic("implement me") +} + +func (j JobClient) ListKeypairs(ctx context.Context, in *csav1.ListKeypairsRequest, opts ...grpc.CallOption) (*csav1.ListKeypairsResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j JobClient) GetNode(ctx context.Context, in *nodev1.GetNodeRequest, opts ...grpc.CallOption) (*nodev1.GetNodeResponse, error) { + n, err := j.nodeStore.get(in.Id) + if err != nil { + return nil, err + } + return &nodev1.GetNodeResponse{ + Node: &nodev1.Node{ + Id: in.Id, + PublicKey: n.Keys.CSA.PublicKeyString(), + IsEnabled: true, + IsConnected: true, + }, + }, nil +} + +func (j JobClient) ListNodes(ctx context.Context, in *nodev1.ListNodesRequest, opts ...grpc.CallOption) (*nodev1.ListNodesResponse, error) { + var nodes []*nodev1.Node + for id, n := range j.nodeStore.asMap() { + node := &nodev1.Node{ + Id: id, + PublicKey: n.Keys.CSA.ID(), + IsEnabled: true, + IsConnected: true, + Labels: []*ptypes.Label{ + { + Key: "p2p_id", + Value: ptr(n.Keys.PeerID.String()), + }, + }, + } + if ApplyNodeFilter(in.Filter, node) { + nodes = append(nodes, node) + } + } + return &nodev1.ListNodesResponse{ + Nodes: nodes, + }, nil +} + +func (j JobClient) ListNodeChainConfigs(ctx context.Context, in *nodev1.ListNodeChainConfigsRequest, opts ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error) { + if in.Filter == nil { + return nil, errors.New("filter is required") + } + if len(in.Filter.NodeIds) != 1 { + return nil, errors.New("only one node id is supported") + } + n, err := j.nodeStore.get(in.Filter.NodeIds[0]) //j.Nodes[in.Filter.NodeIds[0]] + if err != nil { + return nil, fmt.Errorf("node id not found: %s", in.Filter.NodeIds[0]) + } + chainConfigs, err := n.JDChainConfigs() + if err != nil { + return nil, err + } + return &nodev1.ListNodeChainConfigsResponse{ + ChainConfigs: chainConfigs, + }, nil +} + +func (j JobClient) ReplayLogs(selectorToBlock map[uint64]uint64) error { + for _, node := range j.nodeStore.list() { + if err := node.ReplayLogs(selectorToBlock); err != nil { + return err + } + } + return nil +} + +func ApplyNodeFilter(filter *nodev1.ListNodesRequest_Filter, node *nodev1.Node) bool { + if filter == nil { + return true + } + if len(filter.Ids) > 0 { + idx := slices.IndexFunc(filter.Ids, func(id string) bool { + return node.Id == id + }) + if idx < 0 { + return false + } + } + for _, selector := range filter.Selectors { + idx := slices.IndexFunc(node.Labels, func(label *ptypes.Label) bool { + return label.Key == selector.Key + }) + if idx < 0 { + return false + } + label := node.Labels[idx] + + switch selector.Op { + case ptypes.SelectorOp_IN: + values := strings.Split(*selector.Value, ",") + found := slices.Contains(values, *label.Value) + if !found { + return false + } + case ptypes.SelectorOp_EQ: + if *label.Value != *selector.Value { + return false + } + case ptypes.SelectorOp_EXIST: + // do nothing + default: + panic("unimplemented selector") + } + } + return true +} diff --git a/deployment/go.mod b/deployment/go.mod index 0d72033e49c..30fbc36df7e 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -36,12 +36,14 @@ require ( github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250211162441-3d6cea220efb github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02 github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 + github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 github.com/smartcontractkit/chainlink-solana v1.1.2-0.20250213203720-e15b1333a14a github.com/smartcontractkit/chainlink-testing-framework/framework v0.4.7 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.22 github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298 github.com/smartcontractkit/mcms v0.10.0 github.com/stretchr/testify v1.10.0 + github.com/test-go/testify v1.1.4 github.com/testcontainers/testcontainers-go v0.35.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 @@ -356,7 +358,6 @@ require ( github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.1 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250207205350-420ccacab78a // indirect - github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 // indirect github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 // indirect github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.10 // indirect diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 899a705d0e7..ead529a831f 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -455,6 +455,7 @@ require ( github.com/supranational/blst v0.3.13 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tendermint/go-amino v0.16.0 // indirect + github.com/test-go/testify v1.1.4 // indirect github.com/theodesp/go-heaps v0.0.0-20190520121037-88e35354fe0a // indirect github.com/tidwall/gjson v1.18.0 // indirect github.com/tidwall/match v1.1.1 // indirect diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 052043bfe7f..86ad32ea699 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -450,6 +450,7 @@ require ( github.com/supranational/blst v0.3.13 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tendermint/go-amino v0.16.0 // indirect + github.com/test-go/testify v1.1.4 // indirect github.com/testcontainers/testcontainers-go v0.35.0 // indirect github.com/theodesp/go-heaps v0.0.0-20190520121037-88e35354fe0a // indirect github.com/tidwall/gjson v1.18.0 // indirect diff --git a/system-tests/lib/go.mod b/system-tests/lib/go.mod index bb0dfe89443..bb4e5f67efb 100644 --- a/system-tests/lib/go.mod +++ b/system-tests/lib/go.mod @@ -368,6 +368,7 @@ require ( github.com/supranational/blst v0.3.13 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tendermint/go-amino v0.16.0 // indirect + github.com/test-go/testify v1.1.4 // indirect github.com/testcontainers/testcontainers-go v0.35.0 // indirect github.com/theodesp/go-heaps v0.0.0-20190520121037-88e35354fe0a // indirect github.com/tidwall/gjson v1.18.0 // indirect diff --git a/system-tests/tests/go.mod b/system-tests/tests/go.mod index 395b0882f5d..5aab0cf5b00 100644 --- a/system-tests/tests/go.mod +++ b/system-tests/tests/go.mod @@ -372,6 +372,7 @@ require ( github.com/supranational/blst v0.3.13 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tendermint/go-amino v0.16.0 // indirect + github.com/test-go/testify v1.1.4 // indirect github.com/testcontainers/testcontainers-go v0.35.0 // indirect github.com/theodesp/go-heaps v0.0.0-20190520121037-88e35354fe0a // indirect github.com/tidwall/gjson v1.18.0 // indirect From 1a455ee0d92f02cafc82c0a831d94570df724fa2 Mon Sep 17 00:00:00 2001 From: krehermann <16602512+krehermann@users.noreply.github.com> Date: Tue, 25 Feb 2025 16:24:22 -0700 Subject: [PATCH 2/3] fix tests --- core/services/feeds/service.go | 2 +- deployment/ccip/changeset/v1_5/cs_jobspec.go | 2 +- .../environment/memory/jd_job_client_test.go | 6 ++---- .../memory/jd_job_service_client.go | 19 +++++++++++++------ deployment/environment/memory/node.go | 4 ++-- .../environment/memory/offchain_client.go | 2 +- 6 files changed, 20 insertions(+), 15 deletions(-) diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 99bbb2e0cbb..1ca9990d617 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -645,7 +645,7 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64, if exists { // note: CLO auto-increments the version number on re-proposal, so this should never happen - return 0, errors.New("proposed job spec version already exists") + return 0, fmt.Errorf("external job id %d: version conflict: version %d already exists at job proposal id %d %v", args.RemoteUUID, args.Version, existing.ID, *existing) } } diff --git a/deployment/ccip/changeset/v1_5/cs_jobspec.go b/deployment/ccip/changeset/v1_5/cs_jobspec.go index 882d78c6ff2..de6182c2742 100644 --- a/deployment/ccip/changeset/v1_5/cs_jobspec.go +++ b/deployment/ccip/changeset/v1_5/cs_jobspec.go @@ -90,7 +90,7 @@ func JobSpecsForLanesChangeset(env deployment.Environment, c JobSpecsForLanesCon // JOBID will be empty if the proposal failed. return deployment.ChangesetOutput{ Jobs: Jobs, - }, fmt.Errorf("failed to propose job: %w", err) + }, fmt.Errorf("failed to propose job %s: %w", job, err) } Jobs[len(Jobs)-1].JobID = res.Proposal.JobId } diff --git a/deployment/environment/memory/jd_job_client_test.go b/deployment/environment/memory/jd_job_client_test.go index 07f80b45017..c66e9b76049 100644 --- a/deployment/environment/memory/jd_job_client_test.go +++ b/deployment/environment/memory/jd_job_client_test.go @@ -82,7 +82,6 @@ func TestJobClientJobAPI(t *testing.T) { require.Error(t, err) assert.Nil(t, getResp) }) - }) t.Run("ListJobs", func(t *testing.T) { @@ -124,7 +123,7 @@ func TestJobClientJobAPI(t *testing.T) { }, checkResp: func(t *testing.T, resp *jobv1.ListJobsResponse) { require.NotNil(t, resp) - assert.Len(t, resp.Jobs, 0) + assert.Empty(t, resp.Jobs) }, }, } @@ -211,7 +210,7 @@ func TestJobClientJobAPI(t *testing.T) { }, checkResp: func(t *testing.T, resp *jobv1.ListProposalsResponse) { require.NotNil(t, resp) - assert.Len(t, resp.Proposals, 0, "expected no proposals %v", resp.Proposals) + assert.Empty(t, resp.Proposals, "expected no proposals %v", resp.Proposals) }, }, } @@ -228,7 +227,6 @@ func TestJobClientJobAPI(t *testing.T) { }) } }) - } func ptr(s string) *string { diff --git a/deployment/environment/memory/jd_job_service_client.go b/deployment/environment/memory/jd_job_service_client.go index e784e6d7014..cc27e89a4d8 100644 --- a/deployment/environment/memory/jd_job_service_client.go +++ b/deployment/environment/memory/jd_job_service_client.go @@ -62,7 +62,6 @@ func (j *JobServiceClient) BatchProposeJob(ctx context.Context, in *jobv1.BatchP out.SuccessResponses[id] = resp } return out, totalErr - } func (j *JobServiceClient) UpdateJob(ctx context.Context, in *jobv1.UpdateJobRequest, opts ...grpc.CallOption) (*jobv1.UpdateJobResponse, error) { @@ -101,7 +100,6 @@ func (j *JobServiceClient) ListJobs(ctx context.Context, in *jobv1.ListJobsReque return &jobv1.ListJobsResponse{ Jobs: jbs, }, nil - } func (j *JobServiceClient) ListProposals(ctx context.Context, in *jobv1.ListProposalsRequest, opts ...grpc.CallOption) (*jobv1.ListProposalsResponse, error) { @@ -131,17 +129,27 @@ func (j *JobServiceClient) ProposeJob(ctx context.Context, in *jobv1.ProposeJobR return nil, fmt.Errorf("failed to load job spec: %w", err) } if extractor.ExternalJobID == "" { - return nil, fmt.Errorf("externalJobID is required") + return nil, errors.New("externalJobID is required") + } + + // must auto increment the version to avoid collision on the node side + proposals, err := j.proposalStore.list(&jobv1.ListProposalsRequest_Filter{ + JobIds: []string{extractor.ExternalJobID}, + }) + if err != nil { + return nil, fmt.Errorf("failed to list proposals: %w", err) } appProposalID, err := n.App.GetFeedsService().ProposeJob(ctx, &feeds.ProposeJobArgs{ FeedsManagerID: 1, Spec: in.Spec, + RemoteUUID: uuid.MustParse(extractor.ExternalJobID), + Version: int32(len(proposals) + 1), }) if err != nil { return nil, fmt.Errorf("failed to propose job: %w", err) } - + fmt.Printf("proposed job uuid %s with id, spec, version: %d\n%s\n%d\n", extractor.ExternalJobID, appProposalID, in.Spec, len(proposals)+1) // auto approve for now proposedSpec, err := n.App.GetFeedsService().ListSpecsByJobProposalIDs(ctx, []int64{appProposalID}) if err != nil { @@ -323,7 +331,7 @@ func (m *mapJobStore) list(filter *jobv1.ListJobsRequest_Filter) ([]*jobv1.Job, } wantedJobIDs := make(map[string]struct{}) - // use node ids to contruct wanted job ids + // use node ids to construct wanted job ids if filter.NodeIds != nil { for _, nodeID := range filter.NodeIds { jobIDs, ok := m.nodesToJobIDs[nodeID] @@ -344,7 +352,6 @@ func (m *mapJobStore) list(filter *jobv1.ListJobsRequest_Filter) ([]*jobv1.Job, wantedJobIDs[jobID] = struct{}{} } } - } else if filter.Ids != nil { for _, jobID := range filter.Ids { wantedJobIDs[jobID] = struct{}{} diff --git a/deployment/environment/memory/node.go b/deployment/environment/memory/node.go index e98d9541777..c5de6e742f7 100644 --- a/deployment/environment/memory/node.go +++ b/deployment/environment/memory/node.go @@ -387,7 +387,7 @@ func NewNode( }) keys := CreateKeys(t, app, chains, solchains) - //JD + // JD setupJD(t, app) return &Node{ @@ -614,7 +614,7 @@ func (e KeystoreSim) CSA() keystore.CSA { func setupJD(t *testing.T, app chainlink.Application) { secret := randomBytes32(t) - pkey, err := crypto.PublicKeyFromHex(hex.EncodeToString(secret[:])) + pkey, err := crypto.PublicKeyFromHex(hex.EncodeToString(secret)) m := feeds2.RegisterManagerParams{ Name: "In memory env test", URI: "http://dev.null:8080", diff --git a/deployment/environment/memory/offchain_client.go b/deployment/environment/memory/offchain_client.go index a97280956f7..7ebd5131d7f 100644 --- a/deployment/environment/memory/offchain_client.go +++ b/deployment/environment/memory/offchain_client.go @@ -143,7 +143,7 @@ func (j JobClient) ListNodeChainConfigs(ctx context.Context, in *nodev1.ListNode if len(in.Filter.NodeIds) != 1 { return nil, errors.New("only one node id is supported") } - n, err := j.nodeStore.get(in.Filter.NodeIds[0]) //j.Nodes[in.Filter.NodeIds[0]] + n, err := j.nodeStore.get(in.Filter.NodeIds[0]) // j.Nodes[in.Filter.NodeIds[0]] if err != nil { return nil, fmt.Errorf("node id not found: %s", in.Filter.NodeIds[0]) } From 1cef16b73cd5ed65f709f1e9fc94d7e1ca76175f Mon Sep 17 00:00:00 2001 From: krehermann <16602512+krehermann@users.noreply.github.com> Date: Tue, 25 Feb 2025 17:09:33 -0700 Subject: [PATCH 3/3] cleanup --- core/scripts/go.mod | 1 - core/services/feeds/service.go | 2 +- core/services/feeds/service_test.go | 2 +- ...ervice_client.go => job_service_client.go} | 26 ++-- ...ent_test.go => job_service_client_test.go} | 143 ++++++++++++------ deployment/environment/memory/node.go | 3 +- .../environment/memory/node_service_client.go | 120 +++++++++++++++ .../environment/memory/offchain_client.go | 111 -------------- deployment/go.mod | 1 - integration-tests/go.mod | 1 - integration-tests/load/go.mod | 1 - system-tests/lib/go.mod | 1 - system-tests/tests/go.mod | 1 - 13 files changed, 235 insertions(+), 178 deletions(-) rename deployment/environment/memory/{jd_job_service_client.go => job_service_client.go} (95%) rename deployment/environment/memory/{jd_job_client_test.go => job_service_client_test.go} (69%) create mode 100644 deployment/environment/memory/node_service_client.go diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 44741ca3a10..1f2e0a7dd94 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -366,7 +366,6 @@ require ( github.com/supranational/blst v0.3.13 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tendermint/go-amino v0.16.0 // indirect - github.com/test-go/testify v1.1.4 // indirect github.com/testcontainers/testcontainers-go v0.35.0 // indirect github.com/theodesp/go-heaps v0.0.0-20190520121037-88e35354fe0a // indirect github.com/tidwall/gjson v1.18.0 // indirect diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 1ca9990d617..120ce9f55c7 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -645,7 +645,7 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64, if exists { // note: CLO auto-increments the version number on re-proposal, so this should never happen - return 0, fmt.Errorf("external job id %d: version conflict: version %d already exists at job proposal id %d %v", args.RemoteUUID, args.Version, existing.ID, *existing) + return 0, fmt.Errorf("external job id %s: version conflict: version %d already exists at job proposal id %d %v", args.RemoteUUID, args.Version, existing.ID, *existing) } } diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index ce0e933df49..04526ab5323 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -1276,7 +1276,7 @@ func Test_Service_ProposeJob(t *testing.T) { svc.orm.On("ExistsSpecByJobProposalIDAndVersion", mock.Anything, jpFluxMonitor.ID, argsFluxMonitor.Version).Return(true, nil) }, args: argsFluxMonitor, - wantErr: "proposed job spec version already exists", + wantErr: "version conflict", }, { name: "upsert error", diff --git a/deployment/environment/memory/jd_job_service_client.go b/deployment/environment/memory/job_service_client.go similarity index 95% rename from deployment/environment/memory/jd_job_service_client.go rename to deployment/environment/memory/job_service_client.go index cc27e89a4d8..ef2c52b4944 100644 --- a/deployment/environment/memory/jd_job_service_client.go +++ b/deployment/environment/memory/job_service_client.go @@ -139,12 +139,12 @@ func (j *JobServiceClient) ProposeJob(ctx context.Context, in *jobv1.ProposeJobR if err != nil { return nil, fmt.Errorf("failed to list proposals: %w", err) } - + proposalVersion := int32(len(proposals) + 1) //nolint:gosec // G115 appProposalID, err := n.App.GetFeedsService().ProposeJob(ctx, &feeds.ProposeJobArgs{ FeedsManagerID: 1, Spec: in.Spec, RemoteUUID: uuid.MustParse(extractor.ExternalJobID), - Version: int32(len(proposals) + 1), + Version: proposalVersion, }) if err != nil { return nil, fmt.Errorf("failed to propose job: %w", err) @@ -155,10 +155,11 @@ func (j *JobServiceClient) ProposeJob(ctx context.Context, in *jobv1.ProposeJobR if err != nil { return nil, fmt.Errorf("failed to list specs: %w", err) } - if len(proposedSpec) != 1 { - return nil, fmt.Errorf("expected 1 spec, got %d", len(proposedSpec)) + // possible to have multiple specs for the same job proposal id; take the last one + if len(proposedSpec) == 0 { + return nil, fmt.Errorf("no specs found for job proposal id: %d", appProposalID) } - err = n.App.GetFeedsService().ApproveSpec(ctx, proposedSpec[0].ID, true) + err = n.App.GetFeedsService().ApproveSpec(ctx, proposedSpec[len(proposedSpec)-1].ID, true) if err != nil { return nil, fmt.Errorf("failed to approve job: %w", err) } @@ -167,7 +168,8 @@ func (j *JobServiceClient) ProposeJob(ctx context.Context, in *jobv1.ProposeJobR p := &jobv1.ProposeJobResponse{Proposal: &jobv1.Proposal{ // make the proposal id the same as the job id for further reference // if you are changing this make sure to change the GetProposal and ListJobs method implementation - Id: storeProposalID, + Id: storeProposalID, + Revision: int64(proposalVersion), // Auto approve for now Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, @@ -193,7 +195,7 @@ func (j *JobServiceClient) ProposeJob(ctx context.Context, in *jobv1.ProposeJobR defer func() { // cleanup if we fail to save the job if storeErr != nil { - j.proposalStore.delete(storeProposalID) + j.proposalStore.delete(storeProposalID) //nolint:errcheck // ignore error nothing to do } }() @@ -332,7 +334,8 @@ func (m *mapJobStore) list(filter *jobv1.ListJobsRequest_Filter) ([]*jobv1.Job, wantedJobIDs := make(map[string]struct{}) // use node ids to construct wanted job ids - if filter.NodeIds != nil { + switch { + case filter.NodeIds != nil: for _, nodeID := range filter.NodeIds { jobIDs, ok := m.nodesToJobIDs[nodeID] if !ok { @@ -342,7 +345,7 @@ func (m *mapJobStore) list(filter *jobv1.ListJobsRequest_Filter) ([]*jobv1.Job, wantedJobIDs[jobID] = struct{}{} } } - } else if filter.Uuids != nil { + case filter.Uuids != nil: for _, uuid := range filter.Uuids { jobIDs, ok := m.uuidToJobIDs[uuid] if !ok { @@ -352,11 +355,14 @@ func (m *mapJobStore) list(filter *jobv1.ListJobsRequest_Filter) ([]*jobv1.Job, wantedJobIDs[jobID] = struct{}{} } } - } else if filter.Ids != nil { + case filter.Ids != nil: for _, jobID := range filter.Ids { wantedJobIDs[jobID] = struct{}{} } + default: + panic("this should never happen because of the nil filter check") } + for _, job := range m.jobs { if _, ok := wantedJobIDs[job.Id]; ok { jobs = append(jobs, job) diff --git a/deployment/environment/memory/jd_job_client_test.go b/deployment/environment/memory/job_service_client_test.go similarity index 69% rename from deployment/environment/memory/jd_job_client_test.go rename to deployment/environment/memory/job_service_client_test.go index c66e9b76049..4f8d096010d 100644 --- a/deployment/environment/memory/jd_job_client_test.go +++ b/deployment/environment/memory/job_service_client_test.go @@ -16,6 +16,99 @@ import ( "github.com/smartcontractkit/chainlink/deployment/environment/memory" ) +func TestJobClientProposeJob(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + chains, _ := memory.NewMemoryChains(t, 1, 1) + ports := freeport.GetN(t, 1) + testNode := memory.NewNode(t, ports[0], chains, nil, zapcore.DebugLevel, false, deployment.CapabilityRegistryConfig{}) + + // Set up the JobClient with a mock node + nodeID := "node-1" + nodes := map[string]memory.Node{ + nodeID: *testNode, + } + jobClient := memory.NewMemoryJobClient(nodes) + + type testCase struct { + name string + req *jobv1.ProposeJobRequest + checkErr func(t *testing.T, err error) + checkResp func(t *testing.T, resp *jobv1.ProposeJobResponse) + } + cases := []testCase{ + { + name: "valid request", + req: &jobv1.ProposeJobRequest{ + NodeId: "node-1", + Spec: testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"), + }, + checkResp: func(t *testing.T, resp *jobv1.ProposeJobResponse) { + assert.NotNil(t, resp) + assert.Equal(t, int64(1), resp.Proposal.Revision) + assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status) + assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus) + assert.Equal(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466", resp.Proposal.JobId) + assert.Equal(t, testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"), resp.Proposal.Spec) + }, + }, + { + name: "idempotent request bumps version", + req: &jobv1.ProposeJobRequest{ + NodeId: "node-1", + Spec: testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"), + }, + // the feeds service doesn't allow duplicate job names + checkResp: func(t *testing.T, resp *jobv1.ProposeJobResponse) { + assert.NotNil(t, resp) + assert.Equal(t, int64(2), resp.Proposal.Revision) + assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status) + assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus) + assert.Equal(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466", resp.Proposal.JobId) + assert.Equal(t, testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"), resp.Proposal.Spec) + }, + }, + { + name: "another request", + req: &jobv1.ProposeJobRequest{ + NodeId: "node-1", + Spec: testJobProposalTOML(t, "11115211-ab79-4c31-ba1c-0997b72aaaaa"), + }, + checkResp: func(t *testing.T, resp *jobv1.ProposeJobResponse) { + assert.NotNil(t, resp) + assert.Equal(t, int64(1), resp.Proposal.Revision) + assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status) + assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus) + assert.Equal(t, "11115211-ab79-4c31-ba1c-0997b72aaaaa", resp.Proposal.JobId) + assert.Equal(t, testJobProposalTOML(t, "11115211-ab79-4c31-ba1c-0997b72aaaaa"), resp.Proposal.Spec) + }, + }, + { + name: "node does not exist", + req: &jobv1.ProposeJobRequest{ + NodeId: "node-2", + Spec: testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"), + }, + checkErr: func(t *testing.T, err error) { + require.Error(t, err) + assert.Contains(t, err.Error(), "node not found") + }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + // Call the ProposeJob method + resp, err := jobClient.ProposeJob(ctx, c.req) + if c.checkErr != nil { + c.checkErr(t, err) + return + } + require.NoError(t, err) + c.checkResp(t, resp) + }) + } +} + func TestJobClientJobAPI(t *testing.T) { t.Parallel() ctx := testutils.Context(t) @@ -239,56 +332,10 @@ func testJobProposalTOML(t *testing.T, externalJobId string) string { type = "standardcapabilities" schemaVersion = 1 externalJobID = "%s" -name = "hacking" +name = "hacking-%s" forwardingAllowed = false command = "/home/capabilities/nowhere" config = "" ` - return fmt.Sprintf(tomlString, externalJobId) -} - -func setupOne(t *testing.T) *memory.JobClient { - t.Helper() - ctx := testutils.Context(t) - // Create a new memory node - - chains, _ := memory.NewMemoryChains(t, 1, 1) - ports := freeport.GetN(t, 1) - testNode := memory.NewNode(t, ports[0], chains, nil, zapcore.DebugLevel, false, deployment.CapabilityRegistryConfig{}) - - // Set up the JobClient with a mock node - nodeID := "node-1" - externalJobID := "f1ac5211-ab79-4c31-ba1c-0997b72db466" - // need some non-ocr job type to avoid the ocr validation and the p2pwrapper check - - jobSpecToml := testJobProposalTOML(t, externalJobID) - nodes := map[string]memory.Node{ - nodeID: *testNode, - } - jobClient := memory.NewMemoryJobClient(nodes) - - // Create a mock request - req := &jobv1.ProposeJobRequest{ - NodeId: nodeID, - Spec: jobSpecToml, - Labels: []*ptypes.Label{ - { - Key: "label-key", - Value: ptr("label-value"), - }, - }, - } - - // Call the ProposeJob method - resp, err := jobClient.ProposeJob(ctx, req) - - // Validate the response - require.NoError(t, err) - assert.NotNil(t, resp) - assert.Equal(t, externalJobID, resp.Proposal.Id) - assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status) - assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus) - assert.Equal(t, jobSpecToml, resp.Proposal.Spec) - assert.Equal(t, externalJobID, resp.Proposal.JobId) - return jobClient + return fmt.Sprintf(tomlString, externalJobId, externalJobId) } diff --git a/deployment/environment/memory/node.go b/deployment/environment/memory/node.go index c5de6e742f7..0155701fdc6 100644 --- a/deployment/environment/memory/node.go +++ b/deployment/environment/memory/node.go @@ -18,8 +18,8 @@ import ( "github.com/ethereum/go-ethereum/common" gethtypes "github.com/ethereum/go-ethereum/core/types" chainsel "github.com/smartcontractkit/chain-selectors" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/test-go/testify/mock" "go.uber.org/zap/zapcore" "golang.org/x/exp/maps" @@ -615,6 +615,7 @@ func (e KeystoreSim) CSA() keystore.CSA { func setupJD(t *testing.T, app chainlink.Application) { secret := randomBytes32(t) pkey, err := crypto.PublicKeyFromHex(hex.EncodeToString(secret)) + require.NoError(t, err) m := feeds2.RegisterManagerParams{ Name: "In memory env test", URI: "http://dev.null:8080", diff --git a/deployment/environment/memory/node_service_client.go b/deployment/environment/memory/node_service_client.go new file mode 100644 index 00000000000..d11ce8fe5b8 --- /dev/null +++ b/deployment/environment/memory/node_service_client.go @@ -0,0 +1,120 @@ +package memory + +import ( + "context" + "errors" + "fmt" + + "google.golang.org/grpc" + + nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" + "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes" +) + +func (j JobClient) EnableNode(ctx context.Context, in *nodev1.EnableNodeRequest, opts ...grpc.CallOption) (*nodev1.EnableNodeResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j JobClient) DisableNode(ctx context.Context, in *nodev1.DisableNodeRequest, opts ...grpc.CallOption) (*nodev1.DisableNodeResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j *JobClient) RegisterNode(ctx context.Context, in *nodev1.RegisterNodeRequest, opts ...grpc.CallOption) (*nodev1.RegisterNodeResponse, error) { + if in == nil || in.GetPublicKey() == "" { + return nil, errors.New("public key is required") + } + + if _, exists := j.RegisteredNodes[in.GetPublicKey()]; exists { + return nil, fmt.Errorf("node with Public Key %s is already registered", in.GetPublicKey()) + } + + var foundNode *Node + for _, node := range j.nodeStore.list() { + if node.Keys.CSA.ID() == in.GetPublicKey() { + foundNode = node + break + } + } + + if foundNode == nil { + return nil, fmt.Errorf("node with Public Key %s is not known", in.GetPublicKey()) + } + + j.RegisteredNodes[in.GetPublicKey()] = *foundNode + + return &nodev1.RegisterNodeResponse{ + Node: &nodev1.Node{ + Id: in.GetPublicKey(), + PublicKey: in.GetPublicKey(), + IsEnabled: true, + IsConnected: true, + Labels: in.Labels, + }, + }, nil +} + +func (j JobClient) UpdateNode(ctx context.Context, in *nodev1.UpdateNodeRequest, opts ...grpc.CallOption) (*nodev1.UpdateNodeResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j JobClient) GetNode(ctx context.Context, in *nodev1.GetNodeRequest, opts ...grpc.CallOption) (*nodev1.GetNodeResponse, error) { + n, err := j.nodeStore.get(in.Id) + if err != nil { + return nil, err + } + return &nodev1.GetNodeResponse{ + Node: &nodev1.Node{ + Id: in.Id, + PublicKey: n.Keys.CSA.PublicKeyString(), + IsEnabled: true, + IsConnected: true, + }, + }, nil +} + +func (j JobClient) ListNodes(ctx context.Context, in *nodev1.ListNodesRequest, opts ...grpc.CallOption) (*nodev1.ListNodesResponse, error) { + var nodes []*nodev1.Node + for id, n := range j.nodeStore.asMap() { + node := &nodev1.Node{ + Id: id, + PublicKey: n.Keys.CSA.ID(), + IsEnabled: true, + IsConnected: true, + Labels: []*ptypes.Label{ + { + Key: "p2p_id", + Value: ptr(n.Keys.PeerID.String()), + }, + }, + } + if ApplyNodeFilter(in.Filter, node) { + nodes = append(nodes, node) + } + } + return &nodev1.ListNodesResponse{ + Nodes: nodes, + }, nil +} + +func (j JobClient) ListNodeChainConfigs(ctx context.Context, in *nodev1.ListNodeChainConfigsRequest, opts ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error) { + if in.Filter == nil { + return nil, errors.New("filter is required") + } + if len(in.Filter.NodeIds) != 1 { + return nil, errors.New("only one node id is supported") + } + n, err := j.nodeStore.get(in.Filter.NodeIds[0]) // j.Nodes[in.Filter.NodeIds[0]] + if err != nil { + return nil, fmt.Errorf("node id not found: %s", in.Filter.NodeIds[0]) + } + chainConfigs, err := n.JDChainConfigs() + if err != nil { + return nil, err + } + return &nodev1.ListNodeChainConfigsResponse{ + ChainConfigs: chainConfigs, + }, nil +} diff --git a/deployment/environment/memory/offchain_client.go b/deployment/environment/memory/offchain_client.go index 7ebd5131d7f..6e40a1a6498 100644 --- a/deployment/environment/memory/offchain_client.go +++ b/deployment/environment/memory/offchain_client.go @@ -2,8 +2,6 @@ package memory import ( "context" - "errors" - "fmt" "slices" "strings" @@ -18,7 +16,6 @@ import ( var _ deployment.OffchainClient = &JobClient{} type JobClient struct { - // Nodes map[string]Node RegisteredNodes map[string]Node nodeStore *JobServiceClient @@ -38,55 +35,6 @@ func NewMemoryJobClient(nodesByPeerID map[string]Node) *JobClient { } } -func (j JobClient) EnableNode(ctx context.Context, in *nodev1.EnableNodeRequest, opts ...grpc.CallOption) (*nodev1.EnableNodeResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) DisableNode(ctx context.Context, in *nodev1.DisableNodeRequest, opts ...grpc.CallOption) (*nodev1.DisableNodeResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j *JobClient) RegisterNode(ctx context.Context, in *nodev1.RegisterNodeRequest, opts ...grpc.CallOption) (*nodev1.RegisterNodeResponse, error) { - if in == nil || in.GetPublicKey() == "" { - return nil, errors.New("public key is required") - } - - if _, exists := j.RegisteredNodes[in.GetPublicKey()]; exists { - return nil, fmt.Errorf("node with Public Key %s is already registered", in.GetPublicKey()) - } - - var foundNode *Node - for _, node := range j.nodeStore.list() { - if node.Keys.CSA.ID() == in.GetPublicKey() { - foundNode = node - break - } - } - - if foundNode == nil { - return nil, fmt.Errorf("node with Public Key %s is not known", in.GetPublicKey()) - } - - j.RegisteredNodes[in.GetPublicKey()] = *foundNode - - return &nodev1.RegisterNodeResponse{ - Node: &nodev1.Node{ - Id: in.GetPublicKey(), - PublicKey: in.GetPublicKey(), - IsEnabled: true, - IsConnected: true, - Labels: in.Labels, - }, - }, nil -} - -func (j JobClient) UpdateNode(ctx context.Context, in *nodev1.UpdateNodeRequest, opts ...grpc.CallOption) (*nodev1.UpdateNodeResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - func (j JobClient) GetKeypair(ctx context.Context, in *csav1.GetKeypairRequest, opts ...grpc.CallOption) (*csav1.GetKeypairResponse, error) { // TODO implement me panic("implement me") @@ -97,65 +45,6 @@ func (j JobClient) ListKeypairs(ctx context.Context, in *csav1.ListKeypairsReque panic("implement me") } -func (j JobClient) GetNode(ctx context.Context, in *nodev1.GetNodeRequest, opts ...grpc.CallOption) (*nodev1.GetNodeResponse, error) { - n, err := j.nodeStore.get(in.Id) - if err != nil { - return nil, err - } - return &nodev1.GetNodeResponse{ - Node: &nodev1.Node{ - Id: in.Id, - PublicKey: n.Keys.CSA.PublicKeyString(), - IsEnabled: true, - IsConnected: true, - }, - }, nil -} - -func (j JobClient) ListNodes(ctx context.Context, in *nodev1.ListNodesRequest, opts ...grpc.CallOption) (*nodev1.ListNodesResponse, error) { - var nodes []*nodev1.Node - for id, n := range j.nodeStore.asMap() { - node := &nodev1.Node{ - Id: id, - PublicKey: n.Keys.CSA.ID(), - IsEnabled: true, - IsConnected: true, - Labels: []*ptypes.Label{ - { - Key: "p2p_id", - Value: ptr(n.Keys.PeerID.String()), - }, - }, - } - if ApplyNodeFilter(in.Filter, node) { - nodes = append(nodes, node) - } - } - return &nodev1.ListNodesResponse{ - Nodes: nodes, - }, nil -} - -func (j JobClient) ListNodeChainConfigs(ctx context.Context, in *nodev1.ListNodeChainConfigsRequest, opts ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error) { - if in.Filter == nil { - return nil, errors.New("filter is required") - } - if len(in.Filter.NodeIds) != 1 { - return nil, errors.New("only one node id is supported") - } - n, err := j.nodeStore.get(in.Filter.NodeIds[0]) // j.Nodes[in.Filter.NodeIds[0]] - if err != nil { - return nil, fmt.Errorf("node id not found: %s", in.Filter.NodeIds[0]) - } - chainConfigs, err := n.JDChainConfigs() - if err != nil { - return nil, err - } - return &nodev1.ListNodeChainConfigsResponse{ - ChainConfigs: chainConfigs, - }, nil -} - func (j JobClient) ReplayLogs(selectorToBlock map[uint64]uint64) error { for _, node := range j.nodeStore.list() { if err := node.ReplayLogs(selectorToBlock); err != nil { diff --git a/deployment/go.mod b/deployment/go.mod index 30fbc36df7e..cd378e0c4a6 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -43,7 +43,6 @@ require ( github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298 github.com/smartcontractkit/mcms v0.10.0 github.com/stretchr/testify v1.10.0 - github.com/test-go/testify v1.1.4 github.com/testcontainers/testcontainers-go v0.35.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 diff --git a/integration-tests/go.mod b/integration-tests/go.mod index ead529a831f..899a705d0e7 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -455,7 +455,6 @@ require ( github.com/supranational/blst v0.3.13 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tendermint/go-amino v0.16.0 // indirect - github.com/test-go/testify v1.1.4 // indirect github.com/theodesp/go-heaps v0.0.0-20190520121037-88e35354fe0a // indirect github.com/tidwall/gjson v1.18.0 // indirect github.com/tidwall/match v1.1.1 // indirect diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 86ad32ea699..052043bfe7f 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -450,7 +450,6 @@ require ( github.com/supranational/blst v0.3.13 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tendermint/go-amino v0.16.0 // indirect - github.com/test-go/testify v1.1.4 // indirect github.com/testcontainers/testcontainers-go v0.35.0 // indirect github.com/theodesp/go-heaps v0.0.0-20190520121037-88e35354fe0a // indirect github.com/tidwall/gjson v1.18.0 // indirect diff --git a/system-tests/lib/go.mod b/system-tests/lib/go.mod index bb4e5f67efb..bb0dfe89443 100644 --- a/system-tests/lib/go.mod +++ b/system-tests/lib/go.mod @@ -368,7 +368,6 @@ require ( github.com/supranational/blst v0.3.13 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tendermint/go-amino v0.16.0 // indirect - github.com/test-go/testify v1.1.4 // indirect github.com/testcontainers/testcontainers-go v0.35.0 // indirect github.com/theodesp/go-heaps v0.0.0-20190520121037-88e35354fe0a // indirect github.com/tidwall/gjson v1.18.0 // indirect diff --git a/system-tests/tests/go.mod b/system-tests/tests/go.mod index 5aab0cf5b00..395b0882f5d 100644 --- a/system-tests/tests/go.mod +++ b/system-tests/tests/go.mod @@ -372,7 +372,6 @@ require ( github.com/supranational/blst v0.3.13 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tendermint/go-amino v0.16.0 // indirect - github.com/test-go/testify v1.1.4 // indirect github.com/testcontainers/testcontainers-go v0.35.0 // indirect github.com/theodesp/go-heaps v0.0.0-20190520121037-88e35354fe0a // indirect github.com/tidwall/gjson v1.18.0 // indirect