Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Keystone] Launcher: don't fail if some capabilities are missing locally #15174

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func (w *launcher) Launch(ctx context.Context, state *registrysyncer.LocalRegist
}
}

// - remote capability DONs (with IsPublic = true) the current node is a part of.
// These need server-side shims.
// Capability DONs (with IsPublic = true) the current node is a part of.
// These need server-side shims to expose my own capabilities externally.
myCapabilityDONs := []registrysyncer.DON{}
remoteCapabilityDONs := []registrysyncer.DON{}
for _, d := range publicDONs {
Expand Down Expand Up @@ -223,11 +223,11 @@ func (w *launcher) Launch(ctx context.Context, state *registrysyncer.LocalRegist
}
}

// Finally, if I'm a capability DON, let's enable external access
// Finally, if I'm in a capability DON, let's enable external access
// to the capability.
if len(myCapabilityDONs) > 0 {
for _, mcd := range myCapabilityDONs {
err := w.exposeCapabilities(ctx, myID, mcd, state, remoteWorkflowDONs)
for _, myDON := range myCapabilityDONs {
err := w.exposeCapabilities(ctx, myID, myDON, state, remoteWorkflowDONs)
if err != nil {
return err
}
Expand Down Expand Up @@ -395,10 +395,10 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee

switch capability.CapabilityType {
case capabilities.CapabilityTypeTrigger:
newTriggerPublisher := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
newTriggerPublisher := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
publisher := remote.NewTriggerPublisher(
capabilityConfig.RemoteTriggerConfig,
capability.(capabilities.TriggerCapability),
cap.(capabilities.TriggerCapability),
info,
don.DON,
idsToDONs,
Expand All @@ -410,18 +410,19 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee

err := w.addReceiver(ctx, capability, don, newTriggerPublisher)
if err != nil {
return fmt.Errorf("failed to add server-side receiver: %w", err)
w.lggr.Errorw("failed to add server-side receiver for a trigger capability - it won't be exposed remotely", "id", cid, "error", err)
// continue attempting other capabilities
}
case capabilities.CapabilityTypeAction:
w.lggr.Warn("no remote client configured for capability type action, skipping configuration")
case capabilities.CapabilityTypeConsensus:
w.lggr.Warn("no remote client configured for capability type consensus, skipping configuration")
case capabilities.CapabilityTypeTarget:
newTargetServer := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
newTargetServer := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
return target.NewServer(
capabilityConfig.RemoteTargetConfig,
myPeerID,
capability.(capabilities.TargetCapability),
cap.(capabilities.TargetCapability),
info,
don.DON,
idsToDONs,
Expand All @@ -433,7 +434,8 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee

err := w.addReceiver(ctx, capability, don, newTargetServer)
if err != nil {
return fmt.Errorf("failed to add server-side receiver: %w", err)
w.lggr.Errorw("failed to add server-side receiver for a target capability - it won't be exposed remotely", "id", cid, "error", err)
// continue attempting other capabilities
}
default:
w.lggr.Warnf("unknown capability type, skipping configuration: %+v", capability)
Expand Down
20 changes: 14 additions & 6 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) {

triggerCapID := randomWord()
targetCapID := randomWord()
// one capability from onchain registry is not set up locally
fullMissingTargetID := "[email protected]"
missingTargetCapID := randomWord()
dID := uint32(1)
// The below state describes a Workflow DON (AcceptsWorkflows = true),
// which exposes the streams-trigger and write_chain capabilities.
Expand All @@ -130,8 +133,9 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) {
Members: nodes,
},
CapabilityConfigurations: map[string]registrysyncer.CapabilityConfiguration{
fullTriggerCapID: {},
fullTargetID: {},
fullTriggerCapID: {},
fullTargetID: {},
fullMissingTargetID: {},
},
},
},
Expand All @@ -144,35 +148,39 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) {
ID: "[email protected]",
CapabilityType: capabilities.CapabilityTypeTarget,
},
fullMissingTargetID: {
ID: fullMissingTargetID,
CapabilityType: capabilities.CapabilityTypeTarget,
},
},
IDsToNodes: map[p2ptypes.PeerID]kcr.INodeInfoProviderNodeInfo{
nodes[0]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: nodes[0],
EncryptionPublicKey: randomWord(),
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID},
},
nodes[1]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: nodes[1],
EncryptionPublicKey: randomWord(),
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID},
},
nodes[2]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: nodes[2],
EncryptionPublicKey: randomWord(),
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID},
},
nodes[3]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: nodes[3],
EncryptionPublicKey: randomWord(),
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID},
},
},
}
Expand Down
20 changes: 10 additions & 10 deletions core/services/registrysyncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (s *registrySyncer) updateStateLoop() {
}
}

func (s *registrySyncer) localRegistry(ctx context.Context) (*LocalRegistry, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name was really confusing.

func (s *registrySyncer) importOnchainRegistry(ctx context.Context) (*LocalRegistry, error) {
caps := []kcr.CapabilitiesRegistryCapabilityInfo{}

err := s.reader.GetLatestValue(ctx, s.capabilitiesContract.ReadIdentifier("getCapabilities"), primitives.Unconfirmed, nil, &caps)
Expand Down Expand Up @@ -288,33 +288,33 @@ func (s *registrySyncer) Sync(ctx context.Context, isInitialSync bool) error {
s.reader = reader
}

var lr *LocalRegistry
var latestRegistry *LocalRegistry
var err error

if isInitialSync {
s.lggr.Debug("syncing with local registry")
lr, err = s.orm.LatestLocalRegistry(ctx)
latestRegistry, err = s.orm.LatestLocalRegistry(ctx)
if err != nil {
s.lggr.Warnw("failed to sync with local registry, using remote registry instead", "error", err)
} else {
lr.lggr = s.lggr
lr.getPeerID = s.getPeerID
latestRegistry.lggr = s.lggr
latestRegistry.getPeerID = s.getPeerID
}
}

if lr == nil {
if latestRegistry == nil {
s.lggr.Debug("syncing with remote registry")
localRegistry, err := s.localRegistry(ctx)
importedRegistry, err := s.importOnchainRegistry(ctx)
if err != nil {
return fmt.Errorf("failed to sync with remote registry: %w", err)
}
lr = localRegistry
latestRegistry = importedRegistry
// Attempt to send local registry to the update channel without blocking
// This is to prevent the tests from hanging if they are not calling `Start()` on the syncer
select {
case <-s.stopCh:
s.lggr.Debug("sync cancelled, stopping")
case s.updateChan <- lr:
case s.updateChan <- latestRegistry:
// Successfully sent state
s.lggr.Debug("remote registry update triggered successfully")
default:
Expand All @@ -324,7 +324,7 @@ func (s *registrySyncer) Sync(ctx context.Context, isInitialSync bool) error {
}

for _, h := range s.launchers {
lrCopy := deepCopyLocalRegistry(lr)
lrCopy := deepCopyLocalRegistry(latestRegistry)
if err := h.Launch(ctx, &lrCopy); err != nil {
s.lggr.Errorf("error calling launcher: %s", err)
s.metrics.incrementLauncherFailureCounter(ctx)
Expand Down
Loading