diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 06c964130cf..b789749a384 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -705,7 +705,7 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64, if exists { // note: CLO auto-increments the version number on re-proposal, so this should never happen - return 0, errors.New("proposed job spec version already exists") + return 0, fmt.Errorf("external job id %s: version conflict: version %d already exists at job proposal id %d %v", args.RemoteUUID, args.Version, existing.ID, *existing) } } diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index 1cf14b00ef5..bc95a1d061e 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -1285,7 +1285,7 @@ func Test_Service_ProposeJob(t *testing.T) { svc.orm.On("ExistsSpecByJobProposalIDAndVersion", mock.Anything, jpFluxMonitor.ID, argsFluxMonitor.Version).Return(true, nil) }, args: argsFluxMonitor, - wantErr: "proposed job spec version already exists", + wantErr: "version conflict", }, { name: "upsert error", diff --git a/deployment/ccip/changeset/v1_5/cs_jobspec.go b/deployment/ccip/changeset/v1_5/cs_jobspec.go index 882d78c6ff2..de6182c2742 100644 --- a/deployment/ccip/changeset/v1_5/cs_jobspec.go +++ b/deployment/ccip/changeset/v1_5/cs_jobspec.go @@ -90,7 +90,7 @@ func JobSpecsForLanesChangeset(env deployment.Environment, c JobSpecsForLanesCon // JOBID will be empty if the proposal failed. return deployment.ChangesetOutput{ Jobs: Jobs, - }, fmt.Errorf("failed to propose job: %w", err) + }, fmt.Errorf("failed to propose job %s: %w", job, err) } Jobs[len(Jobs)-1].JobID = res.Proposal.JobId } diff --git a/deployment/data-streams/changeset/jd_register_nodes_test.go b/deployment/data-streams/changeset/jd_register_nodes_test.go index c0b539a1f15..731d013a6b6 100644 --- a/deployment/data-streams/changeset/jd_register_nodes_test.go +++ b/deployment/data-streams/changeset/jd_register_nodes_test.go @@ -6,6 +6,8 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" + "github.com/smartcontractkit/chainlink-integrations/evm/testutils" + nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" "github.com/smartcontractkit/chainlink/deployment" "github.com/smartcontractkit/chainlink/deployment/common/changeset" "github.com/smartcontractkit/chainlink/deployment/environment/memory" @@ -14,19 +16,21 @@ import ( func TestRegisterNodesWithJD(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) lggr := logger.TestLogger(t) e := memory.NewMemoryEnvironment(t, lggr, zapcore.InfoLevel, memory.MemoryEnvironmentConfig{Chains: 1, Nodes: 1}) - nodeP2pKey := e.NodeIDs[0] - jobClient, ok := e.Offchain.(*memory.JobClient) require.True(t, ok, "expected Offchain to be of type *memory.JobClient") - require.Lenf(t, jobClient.Nodes, 1, "expected exactly 1 node") + + resp, err := jobClient.ListNodes(ctx, &nodev1.ListNodesRequest{}) + require.NoError(t, err) + require.Lenf(t, resp.Nodes, 1, "expected exactly 1 node") require.Emptyf(t, jobClient.RegisteredNodes, "no registered nodes expected") - csaKey := jobClient.Nodes[nodeP2pKey].Keys.CSA.PublicKeyString() + csaKey := resp.Nodes[0].GetPublicKey() - e, err := changeset.Apply(t, e, nil, + e, err = changeset.Apply(t, e, nil, changeset.Configure( deployment.CreateLegacyChangeSet(RegisterNodesWithJD), RegisterNodesInput{ diff --git a/deployment/environment/memory/job_client.go b/deployment/environment/memory/job_client.go deleted file mode 100644 index 2b8adec6a14..00000000000 --- a/deployment/environment/memory/job_client.go +++ /dev/null @@ -1,333 +0,0 @@ -package memory - -import ( - "context" - "errors" - "fmt" - "slices" - "strings" - - "github.com/pelletier/go-toml/v2" - "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/timestamppb" - - csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa" - jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job" - nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" - "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes" - - "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/validate" - ocr2validate "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate" - "github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap" -) - -type JobClient struct { - Nodes map[string]Node - RegisteredNodes map[string]Node -} - -func (j JobClient) BatchProposeJob(ctx context.Context, in *jobv1.BatchProposeJobRequest, opts ...grpc.CallOption) (*jobv1.BatchProposeJobResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) UpdateJob(ctx context.Context, in *jobv1.UpdateJobRequest, opts ...grpc.CallOption) (*jobv1.UpdateJobResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) DisableNode(ctx context.Context, in *nodev1.DisableNodeRequest, opts ...grpc.CallOption) (*nodev1.DisableNodeResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) EnableNode(ctx context.Context, in *nodev1.EnableNodeRequest, opts ...grpc.CallOption) (*nodev1.EnableNodeResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j *JobClient) RegisterNode(ctx context.Context, in *nodev1.RegisterNodeRequest, opts ...grpc.CallOption) (*nodev1.RegisterNodeResponse, error) { - if in == nil || in.GetPublicKey() == "" { - return nil, errors.New("public key is required") - } - - if _, exists := j.RegisteredNodes[in.GetPublicKey()]; exists { - return nil, fmt.Errorf("node with Public Key %s is already registered", in.GetPublicKey()) - } - - var foundNode *Node - for _, node := range j.Nodes { - if node.Keys.CSA.ID() == in.GetPublicKey() { - foundNode = &node - break - } - } - - if foundNode == nil { - return nil, fmt.Errorf("node with Public Key %s is not known", in.GetPublicKey()) - } - - j.RegisteredNodes[in.GetPublicKey()] = *foundNode - - return &nodev1.RegisterNodeResponse{ - Node: &nodev1.Node{ - Id: in.GetPublicKey(), - PublicKey: in.GetPublicKey(), - IsEnabled: true, - IsConnected: true, - Labels: in.Labels, - }, - }, nil -} - -func (j JobClient) UpdateNode(ctx context.Context, in *nodev1.UpdateNodeRequest, opts ...grpc.CallOption) (*nodev1.UpdateNodeResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) GetKeypair(ctx context.Context, in *csav1.GetKeypairRequest, opts ...grpc.CallOption) (*csav1.GetKeypairResponse, error) { - // TODO implement me - panic("implement me") -} - -func (j JobClient) ListKeypairs(ctx context.Context, in *csav1.ListKeypairsRequest, opts ...grpc.CallOption) (*csav1.ListKeypairsResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) GetNode(ctx context.Context, in *nodev1.GetNodeRequest, opts ...grpc.CallOption) (*nodev1.GetNodeResponse, error) { - n, ok := j.Nodes[in.Id] - if !ok { - return nil, errors.New("node not found") - } - return &nodev1.GetNodeResponse{ - Node: &nodev1.Node{ - Id: in.Id, - PublicKey: n.Keys.CSA.PublicKeyString(), - IsEnabled: true, - IsConnected: true, - }, - }, nil -} - -func (j JobClient) ListNodes(ctx context.Context, in *nodev1.ListNodesRequest, opts ...grpc.CallOption) (*nodev1.ListNodesResponse, error) { - var nodes []*nodev1.Node - for id, n := range j.Nodes { - node := &nodev1.Node{ - Id: id, - PublicKey: n.Keys.CSA.ID(), - IsEnabled: true, - IsConnected: true, - Labels: []*ptypes.Label{ - { - Key: "p2p_id", - Value: ptr(n.Keys.PeerID.String()), - }, - }, - } - if ApplyNodeFilter(in.Filter, node) { - nodes = append(nodes, node) - } - } - return &nodev1.ListNodesResponse{ - Nodes: nodes, - }, nil -} - -func (j JobClient) ListNodeChainConfigs(ctx context.Context, in *nodev1.ListNodeChainConfigsRequest, opts ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error) { - if in.Filter == nil { - return nil, errors.New("filter is required") - } - if len(in.Filter.NodeIds) != 1 { - return nil, errors.New("only one node id is supported") - } - n, ok := j.Nodes[in.Filter.NodeIds[0]] - if !ok { - return nil, fmt.Errorf("node id not found: %s", in.Filter.NodeIds[0]) - } - chainConfigs, err := n.JDChainConfigs() - if err != nil { - return nil, err - } - return &nodev1.ListNodeChainConfigsResponse{ - ChainConfigs: chainConfigs, - }, nil -} - -func (j JobClient) GetJob(ctx context.Context, in *jobv1.GetJobRequest, opts ...grpc.CallOption) (*jobv1.GetJobResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) GetProposal(ctx context.Context, in *jobv1.GetProposalRequest, opts ...grpc.CallOption) (*jobv1.GetProposalResponse, error) { - // we are using proposal id as job id - // refer to ListJobs and ProposeJobs for the assignment of proposal id - for _, node := range j.Nodes { - jobs, _, err := node.App.JobORM().FindJobs(ctx, 0, 1000) - if err != nil { - return nil, err - } - for _, job := range jobs { - if job.ExternalJobID.String() == in.Id { - specBytes, err := toml.Marshal(job.CCIPSpec) - if err != nil { - return nil, fmt.Errorf("failed to marshal job spec: %w", err) - } - return &jobv1.GetProposalResponse{ - Proposal: &jobv1.Proposal{ - Id: job.ExternalJobID.String(), - Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, - Spec: string(specBytes), - JobId: job.ExternalJobID.String(), - }, - }, nil - } - } - } - return nil, fmt.Errorf("job not found: %s", in.Id) -} - -func (j JobClient) ListJobs(ctx context.Context, in *jobv1.ListJobsRequest, opts ...grpc.CallOption) (*jobv1.ListJobsResponse, error) { - jobResponse := make([]*jobv1.Job, 0) - for _, req := range in.Filter.NodeIds { - if _, ok := j.Nodes[req]; !ok { - return nil, fmt.Errorf("node not found: %s", req) - } - n := j.Nodes[req] - jobs, _, err := n.App.JobORM().FindJobs(ctx, 0, 1000) - if err != nil { - return nil, err - } - for _, job := range jobs { - jobResponse = append(jobResponse, &jobv1.Job{ - Id: string(job.ID), - Uuid: job.ExternalJobID.String(), - NodeId: req, - // based on the current implementation, there is only one proposal per job - // see ProposeJobs for ProposalId assignment - ProposalIds: []string{job.ExternalJobID.String()}, - CreatedAt: timestamppb.New(job.CreatedAt), - UpdatedAt: timestamppb.New(job.CreatedAt), - }) - } - } - return &jobv1.ListJobsResponse{ - Jobs: jobResponse, - }, nil -} - -func (j JobClient) ListProposals(ctx context.Context, in *jobv1.ListProposalsRequest, opts ...grpc.CallOption) (*jobv1.ListProposalsResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) ProposeJob(ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption) (*jobv1.ProposeJobResponse, error) { - n := j.Nodes[in.NodeId] - // TODO: Use FMS - jb, err := validate.ValidatedCCIPSpec(in.Spec) - if err != nil { - if !strings.Contains(err.Error(), "the only supported type is currently 'ccip'") { - return nil, err - } - // check if it's offchainreporting2 job - jb, err = ocr2validate.ValidatedOracleSpecToml( - ctx, - n.App.GetConfig().OCR2(), - n.App.GetConfig().Insecure(), - in.Spec, - nil, // not required for validation - ) - if err != nil { - if !strings.Contains(err.Error(), "the only supported type is currently 'offchainreporting2'") { - return nil, err - } - // check if it's bootstrap job - jb, err = ocrbootstrap.ValidatedBootstrapSpecToml(in.Spec) - if err != nil { - return nil, fmt.Errorf("failed to validate job spec only ccip, bootstrap and offchainreporting2 are supported: %w", err) - } - } - } - err = n.App.AddJobV2(ctx, &jb) - if err != nil { - return nil, err - } - return &jobv1.ProposeJobResponse{Proposal: &jobv1.Proposal{ - // make the proposal id the same as the job id for further reference - // if you are changing this make sure to change the GetProposal and ListJobs method implementation - Id: jb.ExternalJobID.String(), - // Auto approve for now - Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, - DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, - Spec: in.Spec, - JobId: jb.ExternalJobID.String(), - CreatedAt: nil, - UpdatedAt: nil, - AckedAt: nil, - ResponseReceivedAt: nil, - }}, nil -} - -func (j JobClient) RevokeJob(ctx context.Context, in *jobv1.RevokeJobRequest, opts ...grpc.CallOption) (*jobv1.RevokeJobResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) DeleteJob(ctx context.Context, in *jobv1.DeleteJobRequest, opts ...grpc.CallOption) (*jobv1.DeleteJobResponse, error) { - // TODO CCIP-3108 implement me - panic("implement me") -} - -func (j JobClient) ReplayLogs(selectorToBlock map[uint64]uint64) error { - for _, node := range j.Nodes { - if err := node.ReplayLogs(selectorToBlock); err != nil { - return err - } - } - return nil -} - -func NewMemoryJobClient(nodesByPeerID map[string]Node) *JobClient { - return &JobClient{nodesByPeerID, make(map[string]Node)} -} - -func ApplyNodeFilter(filter *nodev1.ListNodesRequest_Filter, node *nodev1.Node) bool { - if filter == nil { - return true - } - if len(filter.Ids) > 0 { - idx := slices.IndexFunc(filter.Ids, func(id string) bool { - return node.Id == id - }) - if idx < 0 { - return false - } - } - for _, selector := range filter.Selectors { - idx := slices.IndexFunc(node.Labels, func(label *ptypes.Label) bool { - return label.Key == selector.Key - }) - if idx < 0 { - return false - } - label := node.Labels[idx] - - switch selector.Op { - case ptypes.SelectorOp_IN: - values := strings.Split(*selector.Value, ",") - found := slices.Contains(values, *label.Value) - if !found { - return false - } - case ptypes.SelectorOp_EQ: - if *label.Value != *selector.Value { - return false - } - case ptypes.SelectorOp_EXIST: - // do nothing - default: - panic("unimplemented selector") - } - } - return true -} diff --git a/deployment/environment/memory/job_service_client.go b/deployment/environment/memory/job_service_client.go new file mode 100644 index 00000000000..ef2c52b4944 --- /dev/null +++ b/deployment/environment/memory/job_service_client.go @@ -0,0 +1,551 @@ +package memory + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/google/uuid" + "github.com/pelletier/go-toml/v2" + "google.golang.org/grpc" + + jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job" + "github.com/smartcontractkit/chainlink/v2/core/services/feeds" + "github.com/smartcontractkit/chainlink/v2/core/services/job" +) + +type JobServiceClient struct { + jobStore + proposalStore + nodeStore +} + +func NewJobServiceClient(ns nodeStore) *JobServiceClient { + return &JobServiceClient{ + jobStore: newMapJobStore(), + proposalStore: newMapProposalStore(), + nodeStore: ns, + } +} + +func (j *JobServiceClient) BatchProposeJob(ctx context.Context, in *jobv1.BatchProposeJobRequest, opts ...grpc.CallOption) (*jobv1.BatchProposeJobResponse, error) { + targets := make(map[string]Node) + for _, nodeID := range in.NodeIds { + node, err := j.nodeStore.get(nodeID) + if err != nil { + return nil, fmt.Errorf("node not found: %s", nodeID) + } + targets[nodeID] = *node + } + if len(targets) == 0 { + return nil, errors.New("no nodes found") + } + out := &jobv1.BatchProposeJobResponse{ + SuccessResponses: make(map[string]*jobv1.ProposeJobResponse), + FailedResponses: make(map[string]*jobv1.ProposeJobFailure), + } + var totalErr error + for id := range targets { + singleReq := &jobv1.ProposeJobRequest{ + NodeId: id, + Spec: in.Spec, + Labels: in.Labels, + } + resp, err := j.ProposeJob(ctx, singleReq) + if err != nil { + out.FailedResponses[id] = &jobv1.ProposeJobFailure{ + ErrorMessage: err.Error(), + } + totalErr = errors.Join(totalErr, fmt.Errorf("failed to propose job for node %s: %w", id, err)) + } + out.SuccessResponses[id] = resp + } + return out, totalErr +} + +func (j *JobServiceClient) UpdateJob(ctx context.Context, in *jobv1.UpdateJobRequest, opts ...grpc.CallOption) (*jobv1.UpdateJobResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j *JobServiceClient) GetJob(ctx context.Context, in *jobv1.GetJobRequest, opts ...grpc.CallOption) (*jobv1.GetJobResponse, error) { + // implementation detail that job id and uuid is the same + jb, err := j.jobStore.get(in.GetId()) + if err != nil { + return nil, fmt.Errorf("failed to get job: %w", err) + } + // TODO CCIP-3108 implement me + return &jobv1.GetJobResponse{ + Job: jb, + }, nil +} + +func (j *JobServiceClient) GetProposal(ctx context.Context, in *jobv1.GetProposalRequest, opts ...grpc.CallOption) (*jobv1.GetProposalResponse, error) { + p, err := j.proposalStore.get(in.Id) + if err != nil { + return nil, fmt.Errorf("failed to get proposal: %w", err) + } + return &jobv1.GetProposalResponse{ + Proposal: p, + }, nil +} + +func (j *JobServiceClient) ListJobs(ctx context.Context, in *jobv1.ListJobsRequest, opts ...grpc.CallOption) (*jobv1.ListJobsResponse, error) { + jbs, err := j.jobStore.list(in.Filter) + if err != nil { + return nil, fmt.Errorf("failed to list jobs: %w", err) + } + + return &jobv1.ListJobsResponse{ + Jobs: jbs, + }, nil +} + +func (j *JobServiceClient) ListProposals(ctx context.Context, in *jobv1.ListProposalsRequest, opts ...grpc.CallOption) (*jobv1.ListProposalsResponse, error) { + proposals, err := j.proposalStore.list(in.Filter) + if err != nil { + return nil, fmt.Errorf("failed to list proposals: %w", err) + } + return &jobv1.ListProposalsResponse{ + Proposals: proposals, + }, nil +} + +// ProposeJob is used to propose a job to the node +// It auto approves the job +func (j *JobServiceClient) ProposeJob(ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption) (*jobv1.ProposeJobResponse, error) { + n, err := j.nodeStore.get(in.NodeId) + if err != nil { + return nil, fmt.Errorf("node not found: %w", err) + } + _, err = job.ValidateSpec(in.Spec) + if err != nil { + return nil, fmt.Errorf("failed to validate job spec: %w", err) + } + var extractor ExternalJobIDExtractor + err = toml.Unmarshal([]byte(in.Spec), &extractor) + if err != nil { + return nil, fmt.Errorf("failed to load job spec: %w", err) + } + if extractor.ExternalJobID == "" { + return nil, errors.New("externalJobID is required") + } + + // must auto increment the version to avoid collision on the node side + proposals, err := j.proposalStore.list(&jobv1.ListProposalsRequest_Filter{ + JobIds: []string{extractor.ExternalJobID}, + }) + if err != nil { + return nil, fmt.Errorf("failed to list proposals: %w", err) + } + proposalVersion := int32(len(proposals) + 1) //nolint:gosec // G115 + appProposalID, err := n.App.GetFeedsService().ProposeJob(ctx, &feeds.ProposeJobArgs{ + FeedsManagerID: 1, + Spec: in.Spec, + RemoteUUID: uuid.MustParse(extractor.ExternalJobID), + Version: proposalVersion, + }) + if err != nil { + return nil, fmt.Errorf("failed to propose job: %w", err) + } + fmt.Printf("proposed job uuid %s with id, spec, version: %d\n%s\n%d\n", extractor.ExternalJobID, appProposalID, in.Spec, len(proposals)+1) + // auto approve for now + proposedSpec, err := n.App.GetFeedsService().ListSpecsByJobProposalIDs(ctx, []int64{appProposalID}) + if err != nil { + return nil, fmt.Errorf("failed to list specs: %w", err) + } + // possible to have multiple specs for the same job proposal id; take the last one + if len(proposedSpec) == 0 { + return nil, fmt.Errorf("no specs found for job proposal id: %d", appProposalID) + } + err = n.App.GetFeedsService().ApproveSpec(ctx, proposedSpec[len(proposedSpec)-1].ID, true) + if err != nil { + return nil, fmt.Errorf("failed to approve job: %w", err) + } + + storeProposalID := uuid.Must(uuid.NewRandom()).String() + p := &jobv1.ProposeJobResponse{Proposal: &jobv1.Proposal{ + // make the proposal id the same as the job id for further reference + // if you are changing this make sure to change the GetProposal and ListJobs method implementation + Id: storeProposalID, + Revision: int64(proposalVersion), + // Auto approve for now + Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, + DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, + Spec: in.Spec, + JobId: extractor.ExternalJobID, + CreatedAt: nil, + UpdatedAt: nil, + AckedAt: nil, + ResponseReceivedAt: nil, + }} + + // save the proposal and job + { + var ( + storeErr error // used to cleanup if we fail to save the job + job *jobv1.Job + ) + + storeErr = j.proposalStore.put(storeProposalID, p.Proposal) + if err != nil { + return nil, fmt.Errorf("failed to save proposal: %w", err) + } + defer func() { + // cleanup if we fail to save the job + if storeErr != nil { + j.proposalStore.delete(storeProposalID) //nolint:errcheck // ignore error nothing to do + } + }() + + job, storeErr = j.jobStore.get(extractor.ExternalJobID) + if storeErr != nil && !errors.Is(storeErr, errNoExist) { + return nil, fmt.Errorf("failed to get job: %w", storeErr) + } + if errors.Is(storeErr, errNoExist) { + job = &jobv1.Job{ + Id: extractor.ExternalJobID, + Uuid: extractor.ExternalJobID, + NodeId: in.NodeId, + ProposalIds: []string{storeProposalID}, + Labels: in.Labels, + } + } else { + job.ProposalIds = append(job.ProposalIds, storeProposalID) + } + storeErr = j.jobStore.put(extractor.ExternalJobID, job) + if storeErr != nil { + return nil, fmt.Errorf("failed to save job: %w", storeErr) + } + } + return p, nil +} + +func (j *JobServiceClient) RevokeJob(ctx context.Context, in *jobv1.RevokeJobRequest, opts ...grpc.CallOption) (*jobv1.RevokeJobResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j *JobServiceClient) DeleteJob(ctx context.Context, in *jobv1.DeleteJobRequest, opts ...grpc.CallOption) (*jobv1.DeleteJobResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +type ExternalJobIDExtractor struct { + ExternalJobID string `toml:"externalJobID"` +} + +var errNoExist = errors.New("does not exist") + +// proposalStore is an interface for storing job proposals. +type proposalStore interface { + put(proposalID string, proposal *jobv1.Proposal) error + get(proposalID string) (*jobv1.Proposal, error) + list(filter *jobv1.ListProposalsRequest_Filter) ([]*jobv1.Proposal, error) + delete(proposalID string) error +} + +// jobStore is an interface for storing jobs. +type jobStore interface { + put(jobID string, job *jobv1.Job) error + get(jobID string) (*jobv1.Job, error) + list(filter *jobv1.ListJobsRequest_Filter) ([]*jobv1.Job, error) + delete(jobID string) error +} + +// nodeStore is an interface for storing nodes. +type nodeStore interface { + put(nodeID string, node *Node) error + get(nodeID string) (*Node, error) + list() []*Node + asMap() map[string]*Node + delete(nodeID string) error +} + +var _ jobStore = &mapJobStore{} + +type mapJobStore struct { + mu sync.Mutex + jobs map[string]*jobv1.Job + nodesToJobIDs map[string][]string + uuidToJobIDs map[string][]string +} + +func newMapJobStore() *mapJobStore { + return &mapJobStore{ + jobs: make(map[string]*jobv1.Job), + nodesToJobIDs: make(map[string][]string), + uuidToJobIDs: make(map[string][]string), + } +} + +func (m *mapJobStore) put(jobID string, job *jobv1.Job) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.jobs == nil { + m.jobs = make(map[string]*jobv1.Job) + m.nodesToJobIDs = make(map[string][]string) + m.uuidToJobIDs = make(map[string][]string) + } + m.jobs[jobID] = job + if _, ok := m.nodesToJobIDs[job.NodeId]; !ok { + m.nodesToJobIDs[job.NodeId] = make([]string, 0) + } + m.nodesToJobIDs[job.NodeId] = append(m.nodesToJobIDs[job.NodeId], jobID) + if _, ok := m.uuidToJobIDs[job.Uuid]; !ok { + m.uuidToJobIDs[job.Uuid] = make([]string, 0) + } + m.uuidToJobIDs[job.Uuid] = append(m.uuidToJobIDs[job.Uuid], jobID) + return nil +} + +func (m *mapJobStore) get(jobID string) (*jobv1.Job, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.jobs == nil { + return nil, fmt.Errorf("%w: job not found: %s", errNoExist, jobID) + } + job, ok := m.jobs[jobID] + if !ok { + return nil, fmt.Errorf("%w: job not found: %s", errNoExist, jobID) + } + return job, nil +} + +func (m *mapJobStore) list(filter *jobv1.ListJobsRequest_Filter) ([]*jobv1.Job, error) { + if filter != nil && filter.NodeIds != nil && filter.Uuids != nil && filter.Ids != nil { + return nil, errors.New("only one of NodeIds, Uuids or Ids can be set") + } + m.mu.Lock() + defer m.mu.Unlock() + if m.jobs == nil { + return []*jobv1.Job{}, nil + } + + jobs := make([]*jobv1.Job, 0, len(m.jobs)) + + if filter == nil || (filter.NodeIds == nil && filter.Uuids == nil && filter.Ids == nil) { + for _, job := range m.jobs { + jobs = append(jobs, job) + } + return jobs, nil + } + + wantedJobIDs := make(map[string]struct{}) + // use node ids to construct wanted job ids + switch { + case filter.NodeIds != nil: + for _, nodeID := range filter.NodeIds { + jobIDs, ok := m.nodesToJobIDs[nodeID] + if !ok { + continue + } + for _, jobID := range jobIDs { + wantedJobIDs[jobID] = struct{}{} + } + } + case filter.Uuids != nil: + for _, uuid := range filter.Uuids { + jobIDs, ok := m.uuidToJobIDs[uuid] + if !ok { + continue + } + for _, jobID := range jobIDs { + wantedJobIDs[jobID] = struct{}{} + } + } + case filter.Ids != nil: + for _, jobID := range filter.Ids { + wantedJobIDs[jobID] = struct{}{} + } + default: + panic("this should never happen because of the nil filter check") + } + + for _, job := range m.jobs { + if _, ok := wantedJobIDs[job.Id]; ok { + jobs = append(jobs, job) + } + } + return jobs, nil +} + +func (m *mapJobStore) delete(jobID string) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.jobs == nil { + return fmt.Errorf("job not found: %s", jobID) + } + job, ok := m.jobs[jobID] + if !ok { + return nil + } + delete(m.jobs, jobID) + delete(m.nodesToJobIDs, job.NodeId) + delete(m.uuidToJobIDs, job.Uuid) + return nil +} + +var _ proposalStore = &mapProposalStore{} + +type mapProposalStore struct { + mu sync.Mutex + proposals map[string]*jobv1.Proposal + jobIdToProposalId map[string]string +} + +func newMapProposalStore() *mapProposalStore { + return &mapProposalStore{ + proposals: make(map[string]*jobv1.Proposal), + jobIdToProposalId: make(map[string]string), + } +} + +func (m *mapProposalStore) put(proposalID string, proposal *jobv1.Proposal) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.proposals == nil { + m.proposals = make(map[string]*jobv1.Proposal) + } + if m.jobIdToProposalId == nil { + m.jobIdToProposalId = make(map[string]string) + } + m.proposals[proposalID] = proposal + m.jobIdToProposalId[proposal.JobId] = proposalID + return nil +} +func (m *mapProposalStore) get(proposalID string) (*jobv1.Proposal, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.proposals == nil { + return nil, fmt.Errorf("proposal not found: %s", proposalID) + } + proposal, ok := m.proposals[proposalID] + if !ok { + return nil, fmt.Errorf("%w: proposal not found: %s", errNoExist, proposalID) + } + return proposal, nil +} +func (m *mapProposalStore) list(filter *jobv1.ListProposalsRequest_Filter) ([]*jobv1.Proposal, error) { + if filter != nil && filter.GetIds() != nil && filter.GetJobIds() != nil { + return nil, errors.New("only one of Ids or JobIds can be set") + } + m.mu.Lock() + defer m.mu.Unlock() + if m.proposals == nil { + return nil, nil + } + proposals := make([]*jobv1.Proposal, 0) + // all proposals + if filter == nil || (filter.GetIds() == nil && filter.GetJobIds() == nil) { + for _, proposal := range m.proposals { + proposals = append(proposals, proposal) + } + return proposals, nil + } + + // can't both be nil at this point + wantedProposalIDs := filter.GetIds() + if wantedProposalIDs == nil { + wantedProposalIDs = make([]string, 0) + for _, jobId := range filter.GetJobIds() { + proposalID, ok := m.jobIdToProposalId[jobId] + if !ok { + continue + } + wantedProposalIDs = append(wantedProposalIDs, proposalID) + } + } + + for _, want := range wantedProposalIDs { + p, ok := m.proposals[want] + if !ok { + continue + } + proposals = append(proposals, p) + } + return proposals, nil +} +func (m *mapProposalStore) delete(proposalID string) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.proposals == nil { + return fmt.Errorf("proposal not found: %s", proposalID) + } + + delete(m.proposals, proposalID) + return nil +} + +var _ nodeStore = &mapNodeStore{} + +type mapNodeStore struct { + mu sync.Mutex + nodes map[string]*Node +} + +func newMapNodeStore(n map[string]*Node) *mapNodeStore { + return &mapNodeStore{ + nodes: n, + } +} +func (m *mapNodeStore) put(nodeID string, node *Node) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.nodes == nil { + m.nodes = make(map[string]*Node) + } + m.nodes[nodeID] = node + return nil +} +func (m *mapNodeStore) get(nodeID string) (*Node, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.nodes == nil { + return nil, fmt.Errorf("node not found: %s", nodeID) + } + node, ok := m.nodes[nodeID] + if !ok { + return nil, fmt.Errorf("%w: node not found: %s", errNoExist, nodeID) + } + return node, nil +} +func (m *mapNodeStore) list() []*Node { + m.mu.Lock() + defer m.mu.Unlock() + if m.nodes == nil { + return nil + } + nodes := make([]*Node, 0) + for _, node := range m.nodes { + nodes = append(nodes, node) + } + return nodes +} +func (m *mapNodeStore) delete(nodeID string) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.nodes == nil { + return fmt.Errorf("node not found: %s", nodeID) + } + _, ok := m.nodes[nodeID] + if !ok { + return nil + } + delete(m.nodes, nodeID) + return nil +} + +func (m *mapNodeStore) asMap() map[string]*Node { + m.mu.Lock() + defer m.mu.Unlock() + if m.nodes == nil { + return nil + } + nodes := make(map[string]*Node) + for k, v := range m.nodes { + nodes[k] = v + } + return nodes +} diff --git a/deployment/environment/memory/job_service_client_test.go b/deployment/environment/memory/job_service_client_test.go new file mode 100644 index 00000000000..4f8d096010d --- /dev/null +++ b/deployment/environment/memory/job_service_client_test.go @@ -0,0 +1,341 @@ +package memory_test + +import ( + "fmt" + "testing" + + "github.com/hashicorp/consul/sdk/freeport" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + + "github.com/smartcontractkit/chainlink-integrations/evm/testutils" + jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job" + "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes" + "github.com/smartcontractkit/chainlink/deployment" + "github.com/smartcontractkit/chainlink/deployment/environment/memory" +) + +func TestJobClientProposeJob(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + chains, _ := memory.NewMemoryChains(t, 1, 1) + ports := freeport.GetN(t, 1) + testNode := memory.NewNode(t, ports[0], chains, nil, zapcore.DebugLevel, false, deployment.CapabilityRegistryConfig{}) + + // Set up the JobClient with a mock node + nodeID := "node-1" + nodes := map[string]memory.Node{ + nodeID: *testNode, + } + jobClient := memory.NewMemoryJobClient(nodes) + + type testCase struct { + name string + req *jobv1.ProposeJobRequest + checkErr func(t *testing.T, err error) + checkResp func(t *testing.T, resp *jobv1.ProposeJobResponse) + } + cases := []testCase{ + { + name: "valid request", + req: &jobv1.ProposeJobRequest{ + NodeId: "node-1", + Spec: testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"), + }, + checkResp: func(t *testing.T, resp *jobv1.ProposeJobResponse) { + assert.NotNil(t, resp) + assert.Equal(t, int64(1), resp.Proposal.Revision) + assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status) + assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus) + assert.Equal(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466", resp.Proposal.JobId) + assert.Equal(t, testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"), resp.Proposal.Spec) + }, + }, + { + name: "idempotent request bumps version", + req: &jobv1.ProposeJobRequest{ + NodeId: "node-1", + Spec: testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"), + }, + // the feeds service doesn't allow duplicate job names + checkResp: func(t *testing.T, resp *jobv1.ProposeJobResponse) { + assert.NotNil(t, resp) + assert.Equal(t, int64(2), resp.Proposal.Revision) + assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status) + assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus) + assert.Equal(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466", resp.Proposal.JobId) + assert.Equal(t, testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"), resp.Proposal.Spec) + }, + }, + { + name: "another request", + req: &jobv1.ProposeJobRequest{ + NodeId: "node-1", + Spec: testJobProposalTOML(t, "11115211-ab79-4c31-ba1c-0997b72aaaaa"), + }, + checkResp: func(t *testing.T, resp *jobv1.ProposeJobResponse) { + assert.NotNil(t, resp) + assert.Equal(t, int64(1), resp.Proposal.Revision) + assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status) + assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus) + assert.Equal(t, "11115211-ab79-4c31-ba1c-0997b72aaaaa", resp.Proposal.JobId) + assert.Equal(t, testJobProposalTOML(t, "11115211-ab79-4c31-ba1c-0997b72aaaaa"), resp.Proposal.Spec) + }, + }, + { + name: "node does not exist", + req: &jobv1.ProposeJobRequest{ + NodeId: "node-2", + Spec: testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"), + }, + checkErr: func(t *testing.T, err error) { + require.Error(t, err) + assert.Contains(t, err.Error(), "node not found") + }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + // Call the ProposeJob method + resp, err := jobClient.ProposeJob(ctx, c.req) + if c.checkErr != nil { + c.checkErr(t, err) + return + } + require.NoError(t, err) + c.checkResp(t, resp) + }) + } +} + +func TestJobClientJobAPI(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + chains, _ := memory.NewMemoryChains(t, 1, 1) + ports := freeport.GetN(t, 1) + testNode := memory.NewNode(t, ports[0], chains, nil, zapcore.DebugLevel, false, deployment.CapabilityRegistryConfig{}) + + // Set up the JobClient with a mock node + nodeID := "node-1" + externalJobID := "f1ac5211-ab79-4c31-ba1c-0997b72db466" + + jobSpecToml := testJobProposalTOML(t, externalJobID) + nodes := map[string]memory.Node{ + nodeID: *testNode, + } + jobClient := memory.NewMemoryJobClient(nodes) + + // Create a mock request + req := &jobv1.ProposeJobRequest{ + NodeId: nodeID, + Spec: jobSpecToml, + Labels: []*ptypes.Label{ + { + Key: "label-key", + Value: ptr("label-value"), + }, + }, + } + + // Call the ProposeJob method + resp, err := jobClient.ProposeJob(ctx, req) + + // Validate the response + require.NoError(t, err) + assert.NotNil(t, resp) + assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status) + assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus) + assert.Equal(t, jobSpecToml, resp.Proposal.Spec) + assert.Equal(t, externalJobID, resp.Proposal.JobId) + + expectedProposalID := resp.Proposal.Id + expectedProposal := resp.Proposal + + t.Run("GetJob", func(t *testing.T) { + t.Run("existing job", func(t *testing.T) { + // Create a mock request + getReq := &jobv1.GetJobRequest{ + IdOneof: &jobv1.GetJobRequest_Id{Id: externalJobID}, + } + + getResp, err := jobClient.GetJob(ctx, getReq) + require.NoError(t, err) + assert.NotNil(t, getResp) + assert.Equal(t, externalJobID, getResp.Job.Id) + }) + + t.Run("non-existing job", func(t *testing.T) { + // Create a mock request + getReq := &jobv1.GetJobRequest{ + IdOneof: &jobv1.GetJobRequest_Id{Id: "non-existing-job"}, + } + + getResp, err := jobClient.GetJob(ctx, getReq) + require.Error(t, err) + assert.Nil(t, getResp) + }) + }) + + t.Run("ListJobs", func(t *testing.T) { + type listCase struct { + name string + req *jobv1.ListJobsRequest + checkErr func(t *testing.T, err error) + checkResp func(t *testing.T, resp *jobv1.ListJobsResponse) + } + cases := []listCase{ + { + name: "no filters", + req: &jobv1.ListJobsRequest{}, + checkResp: func(t *testing.T, resp *jobv1.ListJobsResponse) { + assert.NotNil(t, resp) + assert.Len(t, resp.Jobs, 1) + assert.Equal(t, externalJobID, resp.Jobs[0].Id) + }, + }, + { + name: "with id filter", + req: &jobv1.ListJobsRequest{ + Filter: &jobv1.ListJobsRequest_Filter{ + Ids: []string{externalJobID}, + }, + }, + checkResp: func(t *testing.T, resp *jobv1.ListJobsResponse) { + assert.NotNil(t, resp) + assert.Len(t, resp.Jobs, 1) + assert.Equal(t, externalJobID, resp.Jobs[0].Id) + }, + }, + { + name: "non-existing job id", + req: &jobv1.ListJobsRequest{ + Filter: &jobv1.ListJobsRequest_Filter{ + Ids: []string{"non-existing-job-id"}, + }, + }, + checkResp: func(t *testing.T, resp *jobv1.ListJobsResponse) { + require.NotNil(t, resp) + assert.Empty(t, resp.Jobs) + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + // Call the ListJobs method + listResp, err := jobClient.ListJobs(ctx, c.req) + if c.checkErr != nil { + c.checkErr(t, err) + return + } + require.NoError(t, err) + c.checkResp(t, listResp) + }) + } + }) + + t.Run("GetProposal", func(t *testing.T) { + t.Run("existing proposal", func(t *testing.T) { + // Create a mock request + getReq := &jobv1.GetProposalRequest{ + Id: expectedProposalID, + } + + getResp, err := jobClient.GetProposal(ctx, getReq) + require.NoError(t, err) + assert.NotNil(t, getResp) + assert.Equal(t, expectedProposal, getResp.Proposal) + }) + + t.Run("non-existing proposal", func(t *testing.T) { + // Create a mock request + getReq := &jobv1.GetProposalRequest{ + Id: "non-existing-job", + } + + getResp, err := jobClient.GetProposal(ctx, getReq) + require.Error(t, err) + assert.Nil(t, getResp) + }) + }) + + t.Run("ListProposals", func(t *testing.T) { + type listCase struct { + name string + req *jobv1.ListProposalsRequest + checkErr func(t *testing.T, err error) + checkResp func(t *testing.T, resp *jobv1.ListProposalsResponse) + } + cases := []listCase{ + + { + name: "no filters", + req: &jobv1.ListProposalsRequest{}, + checkResp: func(t *testing.T, resp *jobv1.ListProposalsResponse) { + assert.NotNil(t, resp) + assert.Len(t, resp.Proposals, 1) + assert.Equal(t, expectedProposalID, resp.Proposals[0].Id) + assert.Equal(t, expectedProposal, resp.Proposals[0]) + }, + }, + { + name: "with id filter", + req: &jobv1.ListProposalsRequest{ + Filter: &jobv1.ListProposalsRequest_Filter{ + Ids: []string{expectedProposalID}, + }, + }, + checkResp: func(t *testing.T, resp *jobv1.ListProposalsResponse) { + assert.NotNil(t, resp) + assert.Len(t, resp.Proposals, 1) + assert.Equal(t, expectedProposalID, resp.Proposals[0].Id) + assert.Equal(t, expectedProposal, resp.Proposals[0]) + }, + }, + + { + name: "non-existing job id", + req: &jobv1.ListProposalsRequest{ + Filter: &jobv1.ListProposalsRequest_Filter{ + Ids: []string{"non-existing-job-id"}, + }, + }, + checkResp: func(t *testing.T, resp *jobv1.ListProposalsResponse) { + require.NotNil(t, resp) + assert.Empty(t, resp.Proposals, "expected no proposals %v", resp.Proposals) + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + listResp, err := jobClient.ListProposals(ctx, c.req) + if c.checkErr != nil { + c.checkErr(t, err) + return + } + require.NoError(t, err) + c.checkResp(t, listResp) + }) + } + }) +} + +func ptr(s string) *string { + return &s +} + +// need some non-ocr job type to avoid the ocr validation and the p2pwrapper check +func testJobProposalTOML(t *testing.T, externalJobId string) string { + tomlString := ` +type = "standardcapabilities" +schemaVersion = 1 +externalJobID = "%s" +name = "hacking-%s" +forwardingAllowed = false +command = "/home/capabilities/nowhere" +config = "" +` + return fmt.Sprintf(tomlString, externalJobId, externalJobId) +} diff --git a/deployment/environment/memory/node.go b/deployment/environment/memory/node.go index d4a950f431f..4593024c71a 100644 --- a/deployment/environment/memory/node.go +++ b/deployment/environment/memory/node.go @@ -2,6 +2,9 @@ package memory import ( "context" + "crypto/rand" + + "encoding/hex" "fmt" "math/big" "net" @@ -15,6 +18,7 @@ import ( "github.com/ethereum/go-ethereum/common" gethtypes "github.com/ethereum/go-ethereum/core/types" chainsel "github.com/smartcontractkit/chain-selectors" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" "golang.org/x/exp/maps" @@ -35,6 +39,7 @@ import ( "github.com/smartcontractkit/chainlink-integrations/evm/assets" "github.com/smartcontractkit/chainlink-integrations/evm/client" v2toml "github.com/smartcontractkit/chainlink-integrations/evm/config/toml" + "github.com/smartcontractkit/chainlink-integrations/evm/testutils" evmutils "github.com/smartcontractkit/chainlink-integrations/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" @@ -49,8 +54,13 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey" "github.com/smartcontractkit/chainlink/v2/core/services/relay" "github.com/smartcontractkit/chainlink/v2/core/utils" + "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/smartcontractkit/chainlink/v2/core/utils/testutils/heavyweight" "github.com/smartcontractkit/chainlink/v2/plugins" + + pb "github.com/smartcontractkit/chainlink-protos/orchestrator/feedsmanager" + feeds2 "github.com/smartcontractkit/chainlink/v2/core/services/feeds" + feedsMocks "github.com/smartcontractkit/chainlink/v2/core/services/feeds/mocks" ) type Node struct { @@ -380,6 +390,9 @@ func NewNode( }) keys := CreateKeys(t, app, chains, solchains) + // JD + + setupJD(t, app) return &Node{ App: app, Chains: slices.Concat( @@ -603,3 +616,54 @@ func (e KeystoreSim) Eth() keystore.Eth { func (e KeystoreSim) CSA() keystore.CSA { return e.csa } + +func setupJD(t *testing.T, app chainlink.Application) { + secret := randomBytes32(t) + pkey, err := crypto.PublicKeyFromHex(hex.EncodeToString(secret)) + require.NoError(t, err) + m := feeds2.RegisterManagerParams{ + Name: "In memory env test", + URI: "http://dev.null:8080", + PublicKey: *pkey, + } + f := app.GetFeedsService() + connManager := feedsMocks.NewConnectionsManager(t) + connManager.On("Connect", mock.Anything).Maybe() + connManager.On("GetClient", mock.Anything).Maybe().Return(noopFeedsClient{}, nil) + connManager.On("Close").Maybe().Return() + connManager.On("IsConnected", mock.Anything).Maybe().Return(true) + f.Unsafe_SetConnectionsManager(connManager) + + _, err = f.RegisterManager(testutils.Context(t), m) + require.NoError(t, err) +} + +func randomBytes32(t *testing.T) []byte { + t.Helper() + b := make([]byte, 32) + _, err := rand.Read(b) + require.NoError(t, err) + return b +} + +type noopFeedsClient struct{} + +func (n noopFeedsClient) ApprovedJob(context.Context, *pb.ApprovedJobRequest) (*pb.ApprovedJobResponse, error) { + return &pb.ApprovedJobResponse{}, nil +} + +func (n noopFeedsClient) Healthcheck(context.Context, *pb.HealthcheckRequest) (*pb.HealthcheckResponse, error) { + return &pb.HealthcheckResponse{}, nil +} + +func (n noopFeedsClient) UpdateNode(context.Context, *pb.UpdateNodeRequest) (*pb.UpdateNodeResponse, error) { + return &pb.UpdateNodeResponse{}, nil +} + +func (n noopFeedsClient) RejectedJob(context.Context, *pb.RejectedJobRequest) (*pb.RejectedJobResponse, error) { + return &pb.RejectedJobResponse{}, nil +} + +func (n noopFeedsClient) CancelledJob(context.Context, *pb.CancelledJobRequest) (*pb.CancelledJobResponse, error) { + return &pb.CancelledJobResponse{}, nil +} diff --git a/deployment/environment/memory/node_service_client.go b/deployment/environment/memory/node_service_client.go new file mode 100644 index 00000000000..d11ce8fe5b8 --- /dev/null +++ b/deployment/environment/memory/node_service_client.go @@ -0,0 +1,120 @@ +package memory + +import ( + "context" + "errors" + "fmt" + + "google.golang.org/grpc" + + nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" + "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes" +) + +func (j JobClient) EnableNode(ctx context.Context, in *nodev1.EnableNodeRequest, opts ...grpc.CallOption) (*nodev1.EnableNodeResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j JobClient) DisableNode(ctx context.Context, in *nodev1.DisableNodeRequest, opts ...grpc.CallOption) (*nodev1.DisableNodeResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j *JobClient) RegisterNode(ctx context.Context, in *nodev1.RegisterNodeRequest, opts ...grpc.CallOption) (*nodev1.RegisterNodeResponse, error) { + if in == nil || in.GetPublicKey() == "" { + return nil, errors.New("public key is required") + } + + if _, exists := j.RegisteredNodes[in.GetPublicKey()]; exists { + return nil, fmt.Errorf("node with Public Key %s is already registered", in.GetPublicKey()) + } + + var foundNode *Node + for _, node := range j.nodeStore.list() { + if node.Keys.CSA.ID() == in.GetPublicKey() { + foundNode = node + break + } + } + + if foundNode == nil { + return nil, fmt.Errorf("node with Public Key %s is not known", in.GetPublicKey()) + } + + j.RegisteredNodes[in.GetPublicKey()] = *foundNode + + return &nodev1.RegisterNodeResponse{ + Node: &nodev1.Node{ + Id: in.GetPublicKey(), + PublicKey: in.GetPublicKey(), + IsEnabled: true, + IsConnected: true, + Labels: in.Labels, + }, + }, nil +} + +func (j JobClient) UpdateNode(ctx context.Context, in *nodev1.UpdateNodeRequest, opts ...grpc.CallOption) (*nodev1.UpdateNodeResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j JobClient) GetNode(ctx context.Context, in *nodev1.GetNodeRequest, opts ...grpc.CallOption) (*nodev1.GetNodeResponse, error) { + n, err := j.nodeStore.get(in.Id) + if err != nil { + return nil, err + } + return &nodev1.GetNodeResponse{ + Node: &nodev1.Node{ + Id: in.Id, + PublicKey: n.Keys.CSA.PublicKeyString(), + IsEnabled: true, + IsConnected: true, + }, + }, nil +} + +func (j JobClient) ListNodes(ctx context.Context, in *nodev1.ListNodesRequest, opts ...grpc.CallOption) (*nodev1.ListNodesResponse, error) { + var nodes []*nodev1.Node + for id, n := range j.nodeStore.asMap() { + node := &nodev1.Node{ + Id: id, + PublicKey: n.Keys.CSA.ID(), + IsEnabled: true, + IsConnected: true, + Labels: []*ptypes.Label{ + { + Key: "p2p_id", + Value: ptr(n.Keys.PeerID.String()), + }, + }, + } + if ApplyNodeFilter(in.Filter, node) { + nodes = append(nodes, node) + } + } + return &nodev1.ListNodesResponse{ + Nodes: nodes, + }, nil +} + +func (j JobClient) ListNodeChainConfigs(ctx context.Context, in *nodev1.ListNodeChainConfigsRequest, opts ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error) { + if in.Filter == nil { + return nil, errors.New("filter is required") + } + if len(in.Filter.NodeIds) != 1 { + return nil, errors.New("only one node id is supported") + } + n, err := j.nodeStore.get(in.Filter.NodeIds[0]) // j.Nodes[in.Filter.NodeIds[0]] + if err != nil { + return nil, fmt.Errorf("node id not found: %s", in.Filter.NodeIds[0]) + } + chainConfigs, err := n.JDChainConfigs() + if err != nil { + return nil, err + } + return &nodev1.ListNodeChainConfigsResponse{ + ChainConfigs: chainConfigs, + }, nil +} diff --git a/deployment/environment/memory/offchain_client.go b/deployment/environment/memory/offchain_client.go new file mode 100644 index 00000000000..6e40a1a6498 --- /dev/null +++ b/deployment/environment/memory/offchain_client.go @@ -0,0 +1,96 @@ +package memory + +import ( + "context" + "slices" + "strings" + + "google.golang.org/grpc" + + csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa" + nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" + "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes" + "github.com/smartcontractkit/chainlink/deployment" +) + +var _ deployment.OffchainClient = &JobClient{} + +type JobClient struct { + RegisteredNodes map[string]Node + nodeStore + *JobServiceClient +} + +func NewMemoryJobClient(nodesByPeerID map[string]Node) *JobClient { + m := make(map[string]*Node) + for id, node := range nodesByPeerID { + m[id] = &node + } + ns := newMapNodeStore(m) + return &JobClient{ + // Nodes: nodesByPeerID, + RegisteredNodes: make(map[string]Node), + JobServiceClient: NewJobServiceClient(ns), + nodeStore: ns, + } +} + +func (j JobClient) GetKeypair(ctx context.Context, in *csav1.GetKeypairRequest, opts ...grpc.CallOption) (*csav1.GetKeypairResponse, error) { + // TODO implement me + panic("implement me") +} + +func (j JobClient) ListKeypairs(ctx context.Context, in *csav1.ListKeypairsRequest, opts ...grpc.CallOption) (*csav1.ListKeypairsResponse, error) { + // TODO CCIP-3108 implement me + panic("implement me") +} + +func (j JobClient) ReplayLogs(selectorToBlock map[uint64]uint64) error { + for _, node := range j.nodeStore.list() { + if err := node.ReplayLogs(selectorToBlock); err != nil { + return err + } + } + return nil +} + +func ApplyNodeFilter(filter *nodev1.ListNodesRequest_Filter, node *nodev1.Node) bool { + if filter == nil { + return true + } + if len(filter.Ids) > 0 { + idx := slices.IndexFunc(filter.Ids, func(id string) bool { + return node.Id == id + }) + if idx < 0 { + return false + } + } + for _, selector := range filter.Selectors { + idx := slices.IndexFunc(node.Labels, func(label *ptypes.Label) bool { + return label.Key == selector.Key + }) + if idx < 0 { + return false + } + label := node.Labels[idx] + + switch selector.Op { + case ptypes.SelectorOp_IN: + values := strings.Split(*selector.Value, ",") + found := slices.Contains(values, *label.Value) + if !found { + return false + } + case ptypes.SelectorOp_EQ: + if *label.Value != *selector.Value { + return false + } + case ptypes.SelectorOp_EXIST: + // do nothing + default: + panic("unimplemented selector") + } + } + return true +} diff --git a/deployment/go.mod b/deployment/go.mod index bbaf6492c22..00ffe0cba27 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -37,6 +37,7 @@ require ( github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250211162441-3d6cea220efb github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02 github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 + github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 github.com/smartcontractkit/chainlink-solana v1.1.2-0.20250213203720-e15b1333a14a github.com/smartcontractkit/chainlink-testing-framework/framework v0.5.3 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.22 @@ -356,7 +357,6 @@ require ( github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250224190032-809e4b8cf29e // indirect github.com/smartcontractkit/chainlink-feeds v0.1.1 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250207205350-420ccacab78a // indirect - github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 // indirect github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 // indirect github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.10 // indirect