Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
krehermann committed Feb 26, 2025
1 parent 1a455ee commit d83729f
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ func (j *JobServiceClient) ProposeJob(ctx context.Context, in *jobv1.ProposeJobR
if err != nil {
return nil, fmt.Errorf("failed to list proposals: %w", err)
}

proposalVersion := int32(len(proposals) + 1) //nolint:gosec // G115
appProposalID, err := n.App.GetFeedsService().ProposeJob(ctx, &feeds.ProposeJobArgs{
FeedsManagerID: 1,
Spec: in.Spec,
RemoteUUID: uuid.MustParse(extractor.ExternalJobID),
Version: int32(len(proposals) + 1),
Version: proposalVersion,
})
if err != nil {
return nil, fmt.Errorf("failed to propose job: %w", err)
Expand All @@ -155,10 +155,11 @@ func (j *JobServiceClient) ProposeJob(ctx context.Context, in *jobv1.ProposeJobR
if err != nil {
return nil, fmt.Errorf("failed to list specs: %w", err)
}
if len(proposedSpec) != 1 {
return nil, fmt.Errorf("expected 1 spec, got %d", len(proposedSpec))
// possible to have multiple specs for the same job proposal id; take the last one
if len(proposedSpec) == 0 {
return nil, fmt.Errorf("no specs found for job proposal id: %d", appProposalID)
}
err = n.App.GetFeedsService().ApproveSpec(ctx, proposedSpec[0].ID, true)
err = n.App.GetFeedsService().ApproveSpec(ctx, proposedSpec[len(proposedSpec)-1].ID, true)
if err != nil {
return nil, fmt.Errorf("failed to approve job: %w", err)
}
Expand All @@ -167,7 +168,8 @@ func (j *JobServiceClient) ProposeJob(ctx context.Context, in *jobv1.ProposeJobR
p := &jobv1.ProposeJobResponse{Proposal: &jobv1.Proposal{
// make the proposal id the same as the job id for further reference
// if you are changing this make sure to change the GetProposal and ListJobs method implementation
Id: storeProposalID,
Id: storeProposalID,
Revision: int64(proposalVersion),
// Auto approve for now
Status: jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED,
DeliveryStatus: jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED,
Expand All @@ -193,7 +195,7 @@ func (j *JobServiceClient) ProposeJob(ctx context.Context, in *jobv1.ProposeJobR
defer func() {
// cleanup if we fail to save the job
if storeErr != nil {
j.proposalStore.delete(storeProposalID)
j.proposalStore.delete(storeProposalID) //nolint:errcheck // ignore error nothing to do
}
}()

Expand Down Expand Up @@ -332,7 +334,8 @@ func (m *mapJobStore) list(filter *jobv1.ListJobsRequest_Filter) ([]*jobv1.Job,

wantedJobIDs := make(map[string]struct{})
// use node ids to construct wanted job ids
if filter.NodeIds != nil {
switch {
case filter.NodeIds != nil:
for _, nodeID := range filter.NodeIds {
jobIDs, ok := m.nodesToJobIDs[nodeID]
if !ok {
Expand All @@ -342,7 +345,7 @@ func (m *mapJobStore) list(filter *jobv1.ListJobsRequest_Filter) ([]*jobv1.Job,
wantedJobIDs[jobID] = struct{}{}
}
}
} else if filter.Uuids != nil {
case filter.Uuids != nil:
for _, uuid := range filter.Uuids {
jobIDs, ok := m.uuidToJobIDs[uuid]
if !ok {
Expand All @@ -352,11 +355,14 @@ func (m *mapJobStore) list(filter *jobv1.ListJobsRequest_Filter) ([]*jobv1.Job,
wantedJobIDs[jobID] = struct{}{}
}
}
} else if filter.Ids != nil {
case filter.Ids != nil:
for _, jobID := range filter.Ids {
wantedJobIDs[jobID] = struct{}{}
}
default:
panic("this should never happen because of the nil filter check")
}

for _, job := range m.jobs {
if _, ok := wantedJobIDs[job.Id]; ok {
jobs = append(jobs, job)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,99 @@ import (
"github.com/smartcontractkit/chainlink/deployment/environment/memory"
)

func TestJobClientProposeJob(t *testing.T) {
t.Parallel()
ctx := testutils.Context(t)
chains, _ := memory.NewMemoryChains(t, 1, 1)
ports := freeport.GetN(t, 1)
testNode := memory.NewNode(t, ports[0], chains, nil, zapcore.DebugLevel, false, deployment.CapabilityRegistryConfig{})

// Set up the JobClient with a mock node
nodeID := "node-1"
nodes := map[string]memory.Node{
nodeID: *testNode,
}
jobClient := memory.NewMemoryJobClient(nodes)

type testCase struct {
name string
req *jobv1.ProposeJobRequest
checkErr func(t *testing.T, err error)
checkResp func(t *testing.T, resp *jobv1.ProposeJobResponse)
}
cases := []testCase{
{
name: "valid request",
req: &jobv1.ProposeJobRequest{
NodeId: "node-1",
Spec: testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"),
},
checkResp: func(t *testing.T, resp *jobv1.ProposeJobResponse) {
assert.NotNil(t, resp)
assert.Equal(t, int64(1), resp.Proposal.Revision)
assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status)
assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus)
assert.Equal(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466", resp.Proposal.JobId)
assert.Equal(t, testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"), resp.Proposal.Spec)
},
},
{
name: "idempotent request bumps version",
req: &jobv1.ProposeJobRequest{
NodeId: "node-1",
Spec: testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"),
},
// the feeds service doesn't allow duplicate job names
checkResp: func(t *testing.T, resp *jobv1.ProposeJobResponse) {
assert.NotNil(t, resp)
assert.Equal(t, int64(2), resp.Proposal.Revision)
assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status)
assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus)
assert.Equal(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466", resp.Proposal.JobId)
assert.Equal(t, testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"), resp.Proposal.Spec)
},
},
{
name: "another request",
req: &jobv1.ProposeJobRequest{
NodeId: "node-1",
Spec: testJobProposalTOML(t, "11115211-ab79-4c31-ba1c-0997b72aaaaa"),
},
checkResp: func(t *testing.T, resp *jobv1.ProposeJobResponse) {
assert.NotNil(t, resp)
assert.Equal(t, int64(1), resp.Proposal.Revision)
assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status)
assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus)
assert.Equal(t, "11115211-ab79-4c31-ba1c-0997b72aaaaa", resp.Proposal.JobId)
assert.Equal(t, testJobProposalTOML(t, "11115211-ab79-4c31-ba1c-0997b72aaaaa"), resp.Proposal.Spec)
},
},
{
name: "node does not exist",
req: &jobv1.ProposeJobRequest{
NodeId: "node-2",
Spec: testJobProposalTOML(t, "f1ac5211-ab79-4c31-ba1c-0997b72db466"),
},
checkErr: func(t *testing.T, err error) {
require.Error(t, err)
assert.Contains(t, err.Error(), "node not found")
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
// Call the ProposeJob method
resp, err := jobClient.ProposeJob(ctx, c.req)
if c.checkErr != nil {
c.checkErr(t, err)
return
}
require.NoError(t, err)
c.checkResp(t, resp)
})
}
}

func TestJobClientJobAPI(t *testing.T) {
t.Parallel()
ctx := testutils.Context(t)
Expand Down Expand Up @@ -239,56 +332,10 @@ func testJobProposalTOML(t *testing.T, externalJobId string) string {
type = "standardcapabilities"
schemaVersion = 1
externalJobID = "%s"
name = "hacking"
name = "hacking-%s"
forwardingAllowed = false
command = "/home/capabilities/nowhere"
config = ""
`
return fmt.Sprintf(tomlString, externalJobId)
}

func setupOne(t *testing.T) *memory.JobClient {
t.Helper()
ctx := testutils.Context(t)
// Create a new memory node

chains, _ := memory.NewMemoryChains(t, 1, 1)
ports := freeport.GetN(t, 1)
testNode := memory.NewNode(t, ports[0], chains, nil, zapcore.DebugLevel, false, deployment.CapabilityRegistryConfig{})

// Set up the JobClient with a mock node
nodeID := "node-1"
externalJobID := "f1ac5211-ab79-4c31-ba1c-0997b72db466"
// need some non-ocr job type to avoid the ocr validation and the p2pwrapper check

jobSpecToml := testJobProposalTOML(t, externalJobID)
nodes := map[string]memory.Node{
nodeID: *testNode,
}
jobClient := memory.NewMemoryJobClient(nodes)

// Create a mock request
req := &jobv1.ProposeJobRequest{
NodeId: nodeID,
Spec: jobSpecToml,
Labels: []*ptypes.Label{
{
Key: "label-key",
Value: ptr("label-value"),
},
},
}

// Call the ProposeJob method
resp, err := jobClient.ProposeJob(ctx, req)

// Validate the response
require.NoError(t, err)
assert.NotNil(t, resp)
assert.Equal(t, externalJobID, resp.Proposal.Id)
assert.Equal(t, jobv1.ProposalStatus_PROPOSAL_STATUS_APPROVED, resp.Proposal.Status)
assert.Equal(t, jobv1.ProposalDeliveryStatus_PROPOSAL_DELIVERY_STATUS_DELIVERED, resp.Proposal.DeliveryStatus)
assert.Equal(t, jobSpecToml, resp.Proposal.Spec)
assert.Equal(t, externalJobID, resp.Proposal.JobId)
return jobClient
return fmt.Sprintf(tomlString, externalJobId, externalJobId)
}
3 changes: 2 additions & 1 deletion deployment/environment/memory/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"github.com/ethereum/go-ethereum/common"
gethtypes "github.com/ethereum/go-ethereum/core/types"
chainsel "github.com/smartcontractkit/chain-selectors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/test-go/testify/mock"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/maps"

Expand Down Expand Up @@ -615,6 +615,7 @@ func (e KeystoreSim) CSA() keystore.CSA {
func setupJD(t *testing.T, app chainlink.Application) {
secret := randomBytes32(t)
pkey, err := crypto.PublicKeyFromHex(hex.EncodeToString(secret))
require.NoError(t, err)
m := feeds2.RegisterManagerParams{
Name: "In memory env test",
URI: "http://dev.null:8080",
Expand Down
120 changes: 120 additions & 0 deletions deployment/environment/memory/node_service_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package memory

import (
"context"
"errors"
"fmt"

"google.golang.org/grpc"

nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node"
"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes"
)

func (j JobClient) EnableNode(ctx context.Context, in *nodev1.EnableNodeRequest, opts ...grpc.CallOption) (*nodev1.EnableNodeResponse, error) {
// TODO CCIP-3108 implement me
panic("implement me")
}

func (j JobClient) DisableNode(ctx context.Context, in *nodev1.DisableNodeRequest, opts ...grpc.CallOption) (*nodev1.DisableNodeResponse, error) {
// TODO CCIP-3108 implement me
panic("implement me")
}

func (j *JobClient) RegisterNode(ctx context.Context, in *nodev1.RegisterNodeRequest, opts ...grpc.CallOption) (*nodev1.RegisterNodeResponse, error) {
if in == nil || in.GetPublicKey() == "" {
return nil, errors.New("public key is required")
}

if _, exists := j.RegisteredNodes[in.GetPublicKey()]; exists {
return nil, fmt.Errorf("node with Public Key %s is already registered", in.GetPublicKey())
}

var foundNode *Node
for _, node := range j.nodeStore.list() {
if node.Keys.CSA.ID() == in.GetPublicKey() {
foundNode = node
break
}
}

if foundNode == nil {
return nil, fmt.Errorf("node with Public Key %s is not known", in.GetPublicKey())
}

j.RegisteredNodes[in.GetPublicKey()] = *foundNode

return &nodev1.RegisterNodeResponse{
Node: &nodev1.Node{
Id: in.GetPublicKey(),
PublicKey: in.GetPublicKey(),
IsEnabled: true,
IsConnected: true,
Labels: in.Labels,
},
}, nil
}

func (j JobClient) UpdateNode(ctx context.Context, in *nodev1.UpdateNodeRequest, opts ...grpc.CallOption) (*nodev1.UpdateNodeResponse, error) {
// TODO CCIP-3108 implement me
panic("implement me")
}

func (j JobClient) GetNode(ctx context.Context, in *nodev1.GetNodeRequest, opts ...grpc.CallOption) (*nodev1.GetNodeResponse, error) {
n, err := j.nodeStore.get(in.Id)
if err != nil {
return nil, err
}
return &nodev1.GetNodeResponse{
Node: &nodev1.Node{
Id: in.Id,
PublicKey: n.Keys.CSA.PublicKeyString(),
IsEnabled: true,
IsConnected: true,
},
}, nil
}

func (j JobClient) ListNodes(ctx context.Context, in *nodev1.ListNodesRequest, opts ...grpc.CallOption) (*nodev1.ListNodesResponse, error) {
var nodes []*nodev1.Node
for id, n := range j.nodeStore.asMap() {
node := &nodev1.Node{
Id: id,
PublicKey: n.Keys.CSA.ID(),
IsEnabled: true,
IsConnected: true,
Labels: []*ptypes.Label{
{
Key: "p2p_id",
Value: ptr(n.Keys.PeerID.String()),
},
},
}
if ApplyNodeFilter(in.Filter, node) {
nodes = append(nodes, node)
}
}
return &nodev1.ListNodesResponse{
Nodes: nodes,
}, nil
}

func (j JobClient) ListNodeChainConfigs(ctx context.Context, in *nodev1.ListNodeChainConfigsRequest, opts ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error) {
if in.Filter == nil {
return nil, errors.New("filter is required")
}
if len(in.Filter.NodeIds) != 1 {
return nil, errors.New("only one node id is supported")
}
n, err := j.nodeStore.get(in.Filter.NodeIds[0]) // j.Nodes[in.Filter.NodeIds[0]]
if err != nil {
return nil, fmt.Errorf("node id not found: %s", in.Filter.NodeIds[0])
}
chainConfigs, err := n.JDChainConfigs()
if err != nil {
return nil, err
}
return &nodev1.ListNodeChainConfigsResponse{
ChainConfigs: chainConfigs,
}, nil
}
Loading

0 comments on commit d83729f

Please sign in to comment.