Skip to content

Commit

Permalink
Merge pull request #5606 from oasisprotocol/jberci/feature/observers
Browse files Browse the repository at this point in the history
Add observer nodes automatically to ACL
  • Loading branch information
jberci authored Mar 25, 2024
2 parents 0dedeaf + c630125 commit e39ffaf
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 16 deletions.
5 changes: 5 additions & 0 deletions .changelog/5606.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Add observer nodes automatically to the keymanager's access list

Observer nodes for a given paratime had to be added manually. This
change brings observer nodes in line with compute nodes, which were
added automatically.
78 changes: 62 additions & 16 deletions go/worker/keymanager/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,32 +267,54 @@ func (w *rtNodeWatcher) watch(ctx context.Context) {
}
defer epoSub.Close()

watcher, err := nodes.NewVersionedNodeDescriptorWatcher(ctx, w.consensus)
// Maintain lists of observer and compute nodes.
observerNodes := map[signature.PublicKey]*node.Node{}
computeNodes := map[signature.PublicKey]*node.Node{}

getFlatList := func() []*node.Node {
list := make([]*node.Node, 0, len(observerNodes)+len(computeNodes))
for _, nd := range observerNodes {
list = append(list, nd)
}
for id, nd := range computeNodes {
if _, dup := observerNodes[id]; !dup {
list = append(list, nd)
}
}
return list
}

nodeCh, nodeSub, err := w.consensus.Registry().WatchNodes(ctx)
if err != nil {
w.logger.Error("failed to create node watcher",
w.logger.Error("failed to subscribe to registry node updates",
"err", err,
)
return
}
defer nodeSub.Close()

ch, sub, err := watcher.WatchNodeUpdates()
// And populate the list of observer nodes with the currently known set of them.
nodes, err := w.consensus.Registry().GetNodes(ctx, consensus.HeightLatest)
if err != nil {
w.logger.Error("failed to watch node updates",
w.logger.Error("failed to fetch list of nodes from the registry",
"err", err,
)
return
}
defer sub.Close()
for _, nd := range nodes {
if nd.HasRoles(node.RoleObserver) && nd.HasRuntime(w.runtimeID) {
observerNodes[nd.ID] = nd
}
}
w.accessList.UpdateNodes(w.runtimeID, getFlatList())

for {
select {
case <-ctx.Done():
return
case <-epoCh:
func() {
watcher.Reset()
defer watcher.Freeze(0)

// Get executor committee members.
cms, err := w.consensus.Scheduler().GetCommittees(ctx, &scheduler.GetCommitteesRequest{
Height: consensus.HeightLatest,
RuntimeID: w.runtimeID,
Expand All @@ -303,26 +325,50 @@ func (w *rtNodeWatcher) watch(ctx context.Context) {
)
return
}
clear(computeNodes)

for _, cm := range cms {
if cm.Kind != scheduler.KindComputeExecutor {
continue
}

for _, member := range cm.Members {
_, _ = watcher.WatchNode(ctx, member.PublicKey)
nd, err := w.consensus.Registry().GetNode(ctx, &registry.IDQuery{
ID: member.PublicKey,
Height: consensus.HeightLatest,
})
if err != nil {
w.logger.Error("failed to fetch node descriptor for committee member",
"err", err,
"member", member.PublicKey,
)
continue
}
computeNodes[nd.ID] = nd
}
}
}()
case nu := <-ch:
if nu.Reset {
// Ignore reset events to avoid clearing the access list before setting a new one.
// This is safe because a reset event is always followed by a freeze event after the
// nodes have been set (even if the new set is empty).
continue
case ne := <-nodeCh:
switch ne.IsRegistration {
case true:
// A new node registered, which may need to be added to the ACL.
if ne.Node.HasRoles(node.RoleObserver) && ne.Node.HasRuntime(w.runtimeID) {
observerNodes[ne.Node.ID] = ne.Node
}
if _, known := computeNodes[ne.Node.ID]; known {
computeNodes[ne.Node.ID] = ne.Node
}

case false:
// A node's descriptor expired, it potentially needs to be removed.
if _, known := observerNodes[ne.Node.ID]; known {
delete(observerNodes, ne.Node.ID)
} else {
continue
}
}
}

w.accessList.UpdateNodes(w.runtimeID, watcher.GetNodes())
w.accessList.UpdateNodes(w.runtimeID, getFlatList())
}
}

0 comments on commit e39ffaf

Please sign in to comment.