diff --git a/.changelog/5572.feature.md b/.changelog/5572.feature.md new file mode 100644 index 00000000000..8b745215417 --- /dev/null +++ b/.changelog/5572.feature.md @@ -0,0 +1 @@ +go/worker/keymanager: Add churp worker diff --git a/go/consensus/cometbft/apps/keymanager/churp/ext.go b/go/consensus/cometbft/apps/keymanager/churp/ext.go index 7865ab6f2fa..126e43664d4 100644 --- a/go/consensus/cometbft/apps/keymanager/churp/ext.go +++ b/go/consensus/cometbft/apps/keymanager/churp/ext.go @@ -3,14 +3,11 @@ package churp import ( "fmt" - "github.com/cometbft/cometbft/abci/types" - "github.com/oasisprotocol/oasis-core/go/common/cbor" "github.com/oasisprotocol/oasis-core/go/consensus/api" "github.com/oasisprotocol/oasis-core/go/consensus/api/transaction" tmapi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api" registryState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/registry/state" - genesis "github.com/oasisprotocol/oasis-core/go/genesis/api" "github.com/oasisprotocol/oasis-core/go/keymanager/churp" ) @@ -88,11 +85,6 @@ func (*churpExt) EndBlock(*tmapi.Context) error { return nil } -// InitChain implements api.Extension. -func (ext *churpExt) InitChain(*tmapi.Context, types.RequestInitChain, *genesis.Document) error { - return nil -} - func (*churpExt) enabled(ctx *tmapi.Context) (bool, error) { regState := registryState.NewMutableState(ctx.State()) regParams, err := regState.ConsensusParameters(ctx) diff --git a/go/consensus/cometbft/apps/keymanager/churp/genesis.go b/go/consensus/cometbft/apps/keymanager/churp/genesis.go new file mode 100644 index 00000000000..b40db6bc54e --- /dev/null +++ b/go/consensus/cometbft/apps/keymanager/churp/genesis.go @@ -0,0 +1,27 @@ +package churp + +import ( + "fmt" + + "github.com/cometbft/cometbft/abci/types" + + tmapi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api" + churpState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/keymanager/churp/state" + genesis "github.com/oasisprotocol/oasis-core/go/genesis/api" + "github.com/oasisprotocol/oasis-core/go/keymanager/churp" +) + +// InitChain implements api.Extension. +func (ext *churpExt) InitChain(ctx *tmapi.Context, _ types.RequestInitChain, _ *genesis.Document) error { + if enabled, err := ext.enabled(ctx); err != nil || !enabled { + return nil + } + + state := churpState.NewMutableState(ctx.State()) + + if err := state.SetConsensusParameters(ctx, &churp.DefaultConsensusParameters); err != nil { + return fmt.Errorf("cometbft/keymanager/churp: failed to set consensus parameters: %w", err) + } + + return nil +} diff --git a/go/keymanager/churp/rpc.go b/go/keymanager/churp/rpc.go new file mode 100644 index 00000000000..cce2289b890 --- /dev/null +++ b/go/keymanager/churp/rpc.go @@ -0,0 +1,12 @@ +package churp + +// RPCMethodInit is the name of the `init` method. +var RPCMethodInit = "churp/init" + +// InitRequest represents an initialization request. +type InitRequest struct { + Identity + + // Round is the round for which the node would like to register. + Round uint64 `json:"round,omitempty"` +} diff --git a/go/oasis-test-runner/oasis/fixture.go b/go/oasis-test-runner/oasis/fixture.go index ccb9e367670..be60faae827 100644 --- a/go/oasis-test-runner/oasis/fixture.go +++ b/go/oasis-test-runner/oasis/fixture.go @@ -326,6 +326,7 @@ type KeymanagerFixture struct { LogWatcherHandlerFactories []log.WatcherHandlerFactory `json:"-"` PrivatePeerPubKeys []string `json:"private_peer_pub_keys,omitempty"` + ChurpIDs []uint8 `json:"churp_ids,omitempty"` } // Create instantiates the key manager described by the fixture. @@ -364,6 +365,7 @@ func (f *KeymanagerFixture) Create(net *Network) (*Keymanager, error) { Policy: policy, SentryIndices: f.Sentries, PrivatePeerPubKeys: f.PrivatePeerPubKeys, + ChurpIDs: f.ChurpIDs, }) } diff --git a/go/oasis-test-runner/oasis/keymanager.go b/go/oasis-test-runner/oasis/keymanager.go index b41ebfb72a5..3b3fff1b98b 100644 --- a/go/oasis-test-runner/oasis/keymanager.go +++ b/go/oasis-test-runner/oasis/keymanager.go @@ -18,6 +18,7 @@ import ( registry "github.com/oasisprotocol/oasis-core/go/registry/api" "github.com/oasisprotocol/oasis-core/go/runtime/bundle" runtimeConfig "github.com/oasisprotocol/oasis-core/go/runtime/config" + keymanagerConfig "github.com/oasisprotocol/oasis-core/go/worker/keymanager/config" ) const ( @@ -164,6 +165,7 @@ type Keymanager struct { // nolint: maligned p2pPort uint16 privatePeerPubKeys []string + churpIDs []uint8 } // KeymanagerCfg is the Oasis key manager provisioning configuration. @@ -178,6 +180,9 @@ type KeymanagerCfg struct { // PrivatePeerPubKeys is a list of base64-encoded libp2p public keys of peers who may call non-public methods. PrivatePeerPubKeys []string + + // ChurpIDs is a list of supported CHURP schemes. + ChurpIDs []uint8 } // IdentityKeyPath returns the paths to the node's identity key. @@ -296,6 +301,15 @@ func (km *Keymanager) ModifyConfig() error { km.Config.Keymanager.RuntimeID = km.runtime.ID().String() km.Config.Keymanager.PrivatePeerPubKeys = km.privatePeerPubKeys + // Configuration for the CHURP key manager extension. + schemes := make([]keymanagerConfig.ChurpSchemeConfig, len(km.churpIDs)) + for i, id := range km.churpIDs { + schemes[i] = keymanagerConfig.ChurpSchemeConfig{ + ID: id, + } + } + km.Config.Keymanager.Churp.Schemes = schemes + // Sentry configuration. sentries, err := resolveSentries(km.net, km.sentryIndices) if err != nil { @@ -349,6 +363,7 @@ func (net *Network) NewKeymanager(cfg *KeymanagerCfg) (*Keymanager, error) { consensusPort: host.getProvisionedPort(nodePortConsensus), p2pPort: host.getProvisionedPort(nodePortP2P), privatePeerPubKeys: cfg.PrivatePeerPubKeys, + churpIDs: cfg.ChurpIDs, } // Remove any exploded bundles on cleanup. diff --git a/go/oasis-test-runner/scenario/e2e/runtime/keymanager_churp.go b/go/oasis-test-runner/scenario/e2e/runtime/keymanager_churp.go new file mode 100644 index 00000000000..a995a27dbe9 --- /dev/null +++ b/go/oasis-test-runner/scenario/e2e/runtime/keymanager_churp.go @@ -0,0 +1,120 @@ +package runtime + +import ( + "context" + "fmt" + + "github.com/oasisprotocol/oasis-core/go/consensus/api/transaction" + "github.com/oasisprotocol/oasis-core/go/keymanager/churp" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario" +) + +// KeymanagerChurp is the key manager CHURP scenario. +var KeymanagerChurp scenario.Scenario = newKmChurpImpl() + +type kmChurpImpl struct { + Scenario +} + +func newKmChurpImpl() scenario.Scenario { + return &kmChurpImpl{ + Scenario: *NewScenario( + "keymanager-churp", + NewTestClient(), + ), + } +} + +func (sc *kmChurpImpl) Clone() scenario.Scenario { + return &kmChurpImpl{ + Scenario: *sc.Scenario.Clone().(*Scenario), + } +} + +func (sc *kmChurpImpl) Fixture() (*oasis.NetworkFixture, error) { + f, err := sc.Scenario.Fixture() + if err != nil { + return nil, err + } + + // We don't need compute workers. + f.ComputeWorkers = []oasis.ComputeWorkerFixture{} + + // Ensure that the key manager workers participate in the CHURP scheme. + f.Keymanagers[0].ChurpIDs = []uint8{0} + + // Enable CHURP extension. + f.Network.EnableKeyManagerCHURP = true + + return f, nil +} + +func (sc *kmChurpImpl) Run(ctx context.Context, _ *env.Env) error { + var nonce uint64 + + if err := sc.Net.Start(); err != nil { + return err + } + + if err := sc.Net.ClientController().WaitReady(ctx); err != nil { + return err + } + + stCh, stSub, err := sc.Net.ClientController().Keymanager.Churp().WatchStatuses(ctx) + if err != nil { + return err + } + defer stSub.Close() + + // Create a new CHURP instance. + identity := churp.Identity{ + ID: 0, + RuntimeID: KeyManagerRuntimeID, + } + req := churp.CreateRequest{ + Identity: identity, + GroupID: churp.EccNistP384, + Threshold: 2, + HandoffInterval: 1, + Policy: churp.SignedPolicySGX{ + Policy: churp.PolicySGX{ + Identity: identity, + }, + }, + } + + tx := churp.NewCreateTx(nonce, &transaction.Fee{Gas: 10000}, &req) + entSigner := sc.Net.Entities()[0].Signer() + sigTx, err := transaction.Sign(entSigner, tx) + if err != nil { + return fmt.Errorf("failed signing create churp transaction: %w", err) + } + err = sc.Net.Controller().Consensus.SubmitTx(ctx, sigTx) + if err != nil { + return fmt.Errorf("failed submitting create churp transaction: %w", err) + } + + // Test wether the key manager node submits an application every round. + var st *churp.Status + for i := range 4 { + select { + case st = <-stCh: + case <-ctx.Done(): + return ctx.Err() + } + + round := uint64(i / 2) + if st.Round != round { + return fmt.Errorf("expected round %d, not round %d", round, st.Round) + } + + // New round started or node just submitted an application. + if appSize := i % 2; len(st.Applications) != appSize { + return fmt.Errorf("status should have %d applications", appSize) + } + } + + return nil +} diff --git a/go/oasis-test-runner/scenario/e2e/runtime/scenario.go b/go/oasis-test-runner/scenario/e2e/runtime/scenario.go index 34870718627..5414c0b5a05 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/scenario.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/scenario.go @@ -326,6 +326,7 @@ func RegisterScenarios() error { KeymanagerReplicateMany, KeymanagerRotationFailure, KeymanagerUpgrade, + KeymanagerChurp, // Dump/restore test. DumpRestore, DumpRestoreRuntimeRoundAdvance, diff --git a/go/worker/keymanager/api/api.go b/go/worker/keymanager/api/api.go index c10f069b6a6..986af3f4af1 100644 --- a/go/worker/keymanager/api/api.go +++ b/go/worker/keymanager/api/api.go @@ -9,6 +9,7 @@ import ( beacon "github.com/oasisprotocol/oasis-core/go/beacon/api" "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/version" + "github.com/oasisprotocol/oasis-core/go/keymanager/churp" "github.com/oasisprotocol/oasis-core/go/keymanager/secrets" enclaverpc "github.com/oasisprotocol/oasis-core/go/runtime/enclaverpc/api" ) @@ -104,6 +105,9 @@ type Status struct { // Secrets is the master and ephemeral secrets status. Secrets *SecretsStatus `json:"secrets"` + + // Churp is the CHURP status. + Churp ChurpStatus `json:"churp"` } // SecretsStatus is the key manager master and ephemeral secrets status. @@ -170,6 +174,18 @@ type EphemeralSecretStats struct { LastGenerated beacon.EpochTime `json:"last_generated_epoch"` } +// ChurpStatus represents the status of the key manager CHURP extension. +type ChurpStatus struct { + // Schemes is a list of CHURP scheme configurations. + Schemes map[uint8]ChurpSchemeStatus `json:"schemes,omitempty"` +} + +// ChurpSchemeStatus represents the status of a CHURP scheme. +type ChurpSchemeStatus struct { + // Status is the consensus status of the CHURP scheme. + Status *churp.Status `json:"status,omitempty"` +} + // RPCAccessController handles the authorization of enclave RPC calls. type RPCAccessController interface { // Methods returns a list of allowed methods. diff --git a/go/worker/keymanager/churp.go b/go/worker/keymanager/churp.go new file mode 100644 index 00000000000..be9e6c2f0d0 --- /dev/null +++ b/go/worker/keymanager/churp.go @@ -0,0 +1,474 @@ +package keymanager + +import ( + "container/heap" + "context" + "fmt" + "sync" + + "github.com/cenkalti/backoff/v4" + "github.com/libp2p/go-libp2p/core" + "golang.org/x/exp/maps" + + beacon "github.com/oasisprotocol/oasis-core/go/beacon/api" + cmnBackoff "github.com/oasisprotocol/oasis-core/go/common/backoff" + "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/config" + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + "github.com/oasisprotocol/oasis-core/go/keymanager/churp" + enclaverpc "github.com/oasisprotocol/oasis-core/go/runtime/enclaverpc/api" + "github.com/oasisprotocol/oasis-core/go/runtime/host" + workerKm "github.com/oasisprotocol/oasis-core/go/worker/keymanager/api" +) + +// maxSubmissionAttempts is the maximum number of attempts to submit +// an application for a round. +const maxSubmissionAttempts = 10 + +var ( + insecureChurpRPCMethods = map[string]struct{}{} + secureChurpRPCMethods = map[string]struct{}{} +) + +// Ensure the CHURP worker implements the RPCAccessController interface. +var _ workerKm.RPCAccessController = (*churpWorker)(nil) + +// churpWorker executes the CHURP protocol for the configured schemes. +type churpWorker struct { + logger *logging.Logger + + initCh chan struct{} + + kmWorker *Worker + + mu sync.Mutex + churps map[uint8]*churp.Status // Guarded by mutex. + + submissions *submissionScheduler +} + +// newChurpWorker constructs a new key manager CHURP worker. +func newChurpWorker( + kmWorker *Worker, +) (*churpWorker, error) { + // Read the configuration to determine in which schemes the worker + // should participate. + churps := make(map[uint8]*churp.Status) + for _, cfg := range config.GlobalConfig.Keymanager.Churp.Schemes { + churps[cfg.ID] = nil + } + + return &churpWorker{ + logger: logging.GetLogger("worker/keymanager/churp"), + initCh: make(chan struct{}), + kmWorker: kmWorker, + churps: churps, + submissions: newSubmissionScheduler(kmWorker), + }, nil +} + +// Methods implements RPCAccessController interface. +func (w *churpWorker) Methods() []string { + var methods []string + methods = append(methods, maps.Keys(secureChurpRPCMethods)...) + methods = append(methods, maps.Keys(insecureChurpRPCMethods)...) + return methods +} + +// Connect implements RPCAccessController interface. +func (w *churpWorker) Connect(core.PeerID) bool { + return false +} + +// Authorize implements RPCAccessController interface. +func (w *churpWorker) Authorize(string, enclaverpc.Kind, core.PeerID) error { + return nil +} + +// Initialized returns a channel that will be closed when the worker +// is initialized. +func (w *churpWorker) Initialized() <-chan struct{} { + return w.initCh +} + +// GetStatus returns the worker status. +func (w *churpWorker) GetStatus() workerKm.ChurpStatus { + w.mu.Lock() + defer w.mu.Unlock() + + status := workerKm.ChurpStatus{ + Schemes: make(map[uint8]workerKm.ChurpSchemeStatus), + } + + for id, st := range w.churps { + status.Schemes[id] = workerKm.ChurpSchemeStatus{ + Status: st, + } + } + + return status +} + +func (w *churpWorker) work(ctx context.Context, _ host.RichRuntime) { + w.logger.Info("starting worker") + + stCh, stSub := w.kmWorker.backend.Churp().WatchStatuses() + defer stSub.Close() + + epoCh, epoSub, err := w.kmWorker.commonWorker.Consensus.Beacon().WatchEpochs(ctx) + if err != nil { + w.logger.Error("failed to watch epochs", + "err", err, + ) + return + } + defer epoSub.Close() + + blkCh, blkSub, err := w.kmWorker.commonWorker.Consensus.WatchBlocks(ctx) + if err != nil { + w.logger.Error("failed to watch blocks", + "err", err, + ) + return + } + defer blkSub.Close() + + close(w.initCh) + + for { + select { + case epoch := <-epoCh: + w.handleNewEpoch(epoch) + case blk := <-blkCh: + w.handleNewBlock(ctx, blk) + case status := <-stCh: + w.handleStatusUpdate(status) + case <-ctx.Done(): + w.logger.Info("stopping worker") + w.submissions.Stop() + return + } + } +} + +// handleNewEpoch is responsible for handling a new epoch. +func (w *churpWorker) handleNewEpoch(epoch beacon.EpochTime) { + w.submissions.Cancel(epoch) + w.submissions.Clear(epoch) +} + +// handleNewBlock is responsible for handling a new block. +func (w *churpWorker) handleNewBlock(ctx context.Context, blk *consensus.Block) { + w.submissions.Start(ctx, blk.Height) +} + +// handleStatusUpdate is responsible for handling status update. +func (w *churpWorker) handleStatusUpdate(status *churp.Status) { + w.mu.Lock() + defer w.mu.Unlock() + + // Skip schemes we are not involved in. + if status.RuntimeID != w.kmWorker.runtimeID { + return + } + if _, ok := w.churps[status.ID]; !ok { + return + } + w.churps[status.ID] = status + + w.logger.Debug("handle status update", + "status", status, + ) + + w.submissions.Queue(status) +} + +// submissionInfo contains details about a scheduled application submission. +type submissionInfo struct { + // churpID represents the identifier of the CHURP scheme. + churpID uint8 + + // round indicates the round number for which an application is scheduled + // to be submitted. + round uint64 + + // epoch signifies the opening epoch for submissions. + epoch beacon.EpochTime + + // height denotes the minimum block height for the submission. + height int64 + + // index specifies the position in the submission queue, + // or -1 if not queued. + index int + + // cancel is a function to cancel the submission if it's in progress, + // otherwise, it's nil. + cancel context.CancelCauseFunc +} + +// submissionScheduler is responsible for generating and submitting +// application requests one epoch before handoffs. +type submissionScheduler struct { + logger *logging.Logger + + wg sync.WaitGroup + + // kmWorker provides access to the common key manager services. + kmWorker *Worker + + // queue contains submissions waiting to be processed, ordered by + // the minimum block height required for the submission. + queue SubmissionQueue + + // running contains submissions that are currently in progress. + running map[uint8]*submissionInfo + + // submissions contains both queued and running submissions. + submissions map[uint8]*submissionInfo +} + +// newSubmissionScheduler creates a new submission scheduler. +func newSubmissionScheduler(kmWorker *Worker) *submissionScheduler { + return &submissionScheduler{ + logger: logging.GetLogger("worker/keymanager/churp/submissions"), + kmWorker: kmWorker, + queue: make([]*submissionInfo, 0), + running: make(map[uint8]*submissionInfo), + submissions: make(map[uint8]*submissionInfo), + } +} + +// Queue schedules the given CHURP scheme for application submission, +// updating or removing the scheduled submission if already present. +func (s *submissionScheduler) Queue(status *churp.Status) { + removeFn := func(info *submissionInfo, cause error) { + if info.cancel != nil { + info.cancel(cause) + delete(s.running, status.ID) + } + if info.index != -1 { + heap.Remove(&s.queue, info.index) + } + delete(s.submissions, status.ID) + } + + // Stop and remove submission for the previous round. + info, ok := s.submissions[status.ID] + if ok && info.round < status.Round { + removeFn(info, fmt.Errorf("new round")) + } + + // Schedule submission for the current round. + info, ok = s.submissions[status.ID] + if !ok { + epoch := status.NextHandoff - 1 + height, err := s.kmWorker.randomBlockHeight(epoch, 50) + if err != nil { + s.logger.Error("failed to select a random block height", + "err", err, + ) + return + } + + info = &submissionInfo{ + churpID: status.ID, + round: status.Round, + epoch: epoch, + height: height, + index: -1, + cancel: nil, + } + + s.submissions[status.ID] = info + heap.Push(&s.queue, info) + } + + // Stop and remove the submission if the application has already + // been submitted or if handoffs are disabled. + switch _, submitted := status.Applications[s.kmWorker.nodeID]; { + case submitted: + removeFn(info, fmt.Errorf("already submitted")) + case status.HandoffsDisabled(): + removeFn(info, fmt.Errorf("handoffs disabled")) + } +} + +// Start starts all eligible queued submissions. +func (s *submissionScheduler) Start(ctx context.Context, height int64) { + for { + if len(s.queue) == 0 { + return + } + info := s.queue.Peek().(*submissionInfo) + if info.height > height { + return + } + _ = heap.Pop(&s.queue) + + submitCtx, submitCancel := context.WithCancelCause(ctx) + + info.cancel = submitCancel + s.running[info.churpID] = info + + s.wg.Add(1) + go s.submitApplication(submitCtx, info.churpID, info.round) + } +} + +// Stop stops all submissions currently in progress and waits for them +// to complete. +func (s *submissionScheduler) Stop() { + cause := fmt.Errorf("stopped") + for _, info := range s.running { + info.cancel(cause) + } + + s.wg.Wait() +} + +// Clear removes queued submissions that didn't complete in time, +// while retaining those that are still pending. +func (s *submissionScheduler) Clear(epoch beacon.EpochTime) { + for { + if len(s.queue) == 0 { + return + } + info := s.queue.Peek().(*submissionInfo) + if info.epoch >= epoch { + return + } + _ = heap.Pop(&s.queue) + } +} + +// Cancel sends stop signal to submissions in progress which are not allowed +// to submit applications in the given epoch. +func (s *submissionScheduler) Cancel(epoch beacon.EpochTime) { + cause := fmt.Errorf("submissions closed: epoch %d", epoch) + for id, churp := range s.running { + if churp.epoch == epoch { + continue + } + churp.cancel(cause) + delete(s.running, id) + } +} + +// submitApplication tries to submit an application, retrying if generation +// or transaction fails. +func (s *submissionScheduler) submitApplication(ctx context.Context, churpID uint8, round uint64) { + defer s.wg.Done() + + ticker := backoff.NewTicker(cmnBackoff.NewExponentialBackOff()) + + for attempt := 1; attempt <= maxSubmissionAttempts; attempt++ { + err := s.trySubmitApplication(ctx, churpID, round) + if err == nil { + return + } + + s.logger.Debug("failed to submit application", + "id", churpID, + "round", round, + "attempt", attempt, + "err", err, + ) + + select { + case <-ticker.C: + case <-ctx.Done(): + return + } + } +} + +// trySubmitApplication tries to submit an application. +func (s *submissionScheduler) trySubmitApplication(ctx context.Context, churpID uint8, round uint64) error { + s.logger.Info("trying to submit application", + "id", churpID, + "round", round, + ) + + // Ask enclave to prepare a dealer and return signed verification matrix. + req := churp.InitRequest{ + Identity: churp.Identity{ + ID: churpID, + RuntimeID: s.kmWorker.runtimeID, + }, + Round: round, + } + var rsp churp.SignedApplicationRequest + if err := s.kmWorker.callEnclaveLocal(churp.RPCMethodInit, req, &rsp); err != nil { + return fmt.Errorf("failed to generate verification matrix: %w", err) + } + + // Validate the signature. + rak, err := s.kmWorker.runtimeAttestationKey() + if err != nil { + return err + } + if err = rsp.VerifyRAK(rak); err != nil { + return fmt.Errorf("failed to verify generate verification matrix response: %w", err) + } + + // Publish transaction. + tx := churp.NewApplyTx(0, nil, &rsp) + if err = consensus.SignAndSubmitTx(ctx, s.kmWorker.commonWorker.Consensus, s.kmWorker.commonWorker.Identity.NodeSigner, tx); err != nil { + return err + } + + return nil +} + +// Ensure that the submission queue implements heap.Interface. +var _ heap.Interface = (*SubmissionQueue)(nil) + +// SubmissionQueue is a queue of CHURP instances ordered by the time they +// are allowed to submit an application. +type SubmissionQueue []*submissionInfo + +// Len implements heap.Interface. +func (q SubmissionQueue) Len() int { + return len(q) +} + +// Less implements heap.Interface. +func (q SubmissionQueue) Less(i, j int) bool { + return q[i].height < q[j].height +} + +// Swap implements heap.Interface. +func (q SubmissionQueue) Swap(i, j int) { + q[i].index = j + q[j].index = i + + q[i], q[j] = q[j], q[i] +} + +// Push implements heap.Interface. +func (q *SubmissionQueue) Push(x any) { + x.(*submissionInfo).index = len(*q) + *q = append(*q, x.(*submissionInfo)) +} + +// Pop implements heap.Interface. +func (q *SubmissionQueue) Pop() any { + old := *q + n := len(old) + x := old[n-1] + x.index = -1 + old[n-1] = nil + *q = old[0 : n-1] + return x +} + +// Peek returns the smallest element in the heap. +func (q SubmissionQueue) Peek() any { + switch l := len(q); l { + case 0: + return nil + default: + return q[l-1] + } +} diff --git a/go/worker/keymanager/config/config.go b/go/worker/keymanager/config/config.go index 6ad60c16495..9a1c980b99e 100644 --- a/go/worker/keymanager/config/config.go +++ b/go/worker/keymanager/config/config.go @@ -1,12 +1,27 @@ // Package config implements global configuration options. package config +// ChurpConfig holds configuration details for the CHURP extension. +type ChurpConfig struct { + // Schemes is a list of CHURP scheme configurations. + Schemes []ChurpSchemeConfig `yaml:"schemes,omitempty"` +} + +// ChurpSchemeConfig holds configuration details for a CHURP scheme. +type ChurpSchemeConfig struct { + // ID is the unique identifier of the CHURP scheme. + ID uint8 `yaml:"id,omitempty"` +} + // Config is the keymanager worker configuration structure. type Config struct { - // Key manager Runtime ID. + // Key manager runtime ID. RuntimeID string `yaml:"runtime_id"` // Base64-encoded public keys of unadvertised peers that may call protected methods. PrivatePeerPubKeys []string `yaml:"private_peer_pub_keys"` + + // Churp holds configuration details for the CHURP extension. + Churp ChurpConfig `yaml:"churp,omitempty"` } // Validate validates the configuration settings. @@ -19,5 +34,8 @@ func DefaultConfig() Config { return Config{ RuntimeID: "", PrivatePeerPubKeys: []string{}, + Churp: ChurpConfig{ + Schemes: []ChurpSchemeConfig{}, + }, } } diff --git a/go/worker/keymanager/init.go b/go/worker/keymanager/init.go index d3d53473645..8c9676f243a 100644 --- a/go/worker/keymanager/init.go +++ b/go/worker/keymanager/init.go @@ -39,6 +39,7 @@ func New( cancelCtx: cancelFn, quitCh: make(chan struct{}), initCh: make(chan struct{}), + nodeID: commonWorker.Identity.NodeSigner.Public(), accessList: NewAccessList(), commonWorker: commonWorker, backend: backend, @@ -86,16 +87,23 @@ func New( if err != nil { return nil, fmt.Errorf("worker/keymanager: failed to create secrets worker: %w", err) } + w.churpWorker, err = newChurpWorker(w) + if err != nil { + return nil, fmt.Errorf("worker/keymanager: failed to create churp worker: %w", err) + } // Prepare access controllers and register their methods. - w.accessControllers = []workerKeymanager.RPCAccessController{w.secretsWorker} + w.accessControllers = []workerKeymanager.RPCAccessController{ + w.secretsWorker, + w.churpWorker, + } w.accessControllersByMethod = make(map[string]workerKeymanager.RPCAccessController) for _, ctrl := range w.accessControllers { for _, m := range ctrl.Methods() { if _, ok := w.accessControllersByMethod[m]; ok { return nil, fmt.Errorf("worker/keymanager: duplicate enclave RPC method: %s", m) } - w.accessControllersByMethod[m] = w.secretsWorker + w.accessControllersByMethod[m] = ctrl } } diff --git a/go/worker/keymanager/status.go b/go/worker/keymanager/status.go index db87a7eabaa..f75acf1015e 100644 --- a/go/worker/keymanager/status.go +++ b/go/worker/keymanager/status.go @@ -19,40 +19,46 @@ func (w *Worker) GetStatus() (*api.Status, error) { default: } - var ss api.StatusState + var status api.StatusState switch { case !w.enabled: - ss = api.StatusStateDisabled + status = api.StatusStateDisabled case stopped: - ss = api.StatusStateStopped + status = api.StatusStateStopped case initialized: - ss = api.StatusStateReady + status = api.StatusStateReady default: - ss = api.StatusStateStarting + status = api.StatusStateStarting } - av, _ := w.GetHostedRuntimeActiveVersion() - al := w.accessList.RuntimeAccessLists() + activeVersion, _ := w.GetHostedRuntimeActiveVersion() + accessList := w.accessList.RuntimeAccessLists() - var rts []common.Namespace + var runtimeClients []common.Namespace if w.kmRuntimeWatcher != nil { - rts = w.kmRuntimeWatcher.Runtimes() + runtimeClients = w.kmRuntimeWatcher.Runtimes() } - var s *api.SecretsStatus + var secrets *api.SecretsStatus if w.secretsWorker != nil { - s = w.secretsWorker.GetStatus() + secrets = w.secretsWorker.GetStatus() + } + + var churp api.ChurpStatus + if w.churpWorker != nil { + churp = w.churpWorker.GetStatus() } w.RLock() defer w.RUnlock() return &api.Status{ - Status: ss, - ActiveVersion: av, + Status: status, + ActiveVersion: activeVersion, RuntimeID: &w.runtimeID, - ClientRuntimes: rts, - AccessList: al, - Secrets: s, + ClientRuntimes: runtimeClients, + AccessList: accessList, + Secrets: secrets, + Churp: churp, }, nil } diff --git a/go/worker/keymanager/worker.go b/go/worker/keymanager/worker.go index b54119d1004..2c89da3f933 100644 --- a/go/worker/keymanager/worker.go +++ b/go/worker/keymanager/worker.go @@ -55,6 +55,8 @@ type Worker struct { // nolint: maligned quitCh chan struct{} initCh chan struct{} + nodeID signature.PublicKey + runtime runtimeRegistry.Runtime runtimeID common.Namespace runtimeLabel string @@ -62,6 +64,7 @@ type Worker struct { // nolint: maligned kmNodeWatcher *kmNodeWatcher kmRuntimeWatcher *kmRuntimeWatcher secretsWorker *secretsWorker + churpWorker *churpWorker accessControllers []workerKeymanager.RPCAccessController accessControllersByMethod map[string]workerKeymanager.RPCAccessController @@ -465,7 +468,7 @@ func (w *Worker) worker() { var wg sync.WaitGroup defer wg.Wait() - wg.Add(4) + wg.Add(5) // Need to explicitly watch for updates related to the key manager runtime // itself. @@ -487,6 +490,12 @@ func (w *Worker) worker() { w.secretsWorker.work(w.ctx, hrt) }() + // Serve CHURP secrets. + go func() { + defer wg.Done() + w.churpWorker.work(w.ctx, hrt) + }() + // Watch runtime updates and register with new capabilities on restarts. go func() { defer wg.Done() @@ -508,5 +517,11 @@ func (w *Worker) worker() { return } + select { + case <-w.churpWorker.Initialized(): + case <-w.ctx.Done(): + return + } + close(w.initCh) } diff --git a/keymanager/src/churp/types.rs b/keymanager/src/churp/types.rs index 842ffd5c909..c1c30466f39 100644 --- a/keymanager/src/churp/types.rs +++ b/keymanager/src/churp/types.rs @@ -1,7 +1,7 @@ //! CHURP types used by the worker-host protocol. use oasis_core_runtime::common::{crypto::signature::Signature, namespace::Namespace}; -/// Signed public key. +/// Initialization request. #[derive(Clone, Debug, Default, PartialEq, Eq, cbor::Encode, cbor::Decode)] pub struct InitRequest { /// A unique identifier within the key manager runtime.