Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Database loading statuses #5405

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions go/consensus/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ var (
StatusStateReady StatusState
// StatusStateSyncing is the syncing status state.
StatusStateSyncing StatusState = 1
// StatusStateDBLoading is the status state when the database is loading.
StatusStateDBLoading StatusState = 2
)

// String returns a string representation of a status state.
Expand All @@ -263,6 +265,8 @@ func (s StatusState) String() string {
return "ready"
case StatusStateSyncing:
return "syncing"
case StatusStateDBLoading:
return "loading database"
default:
return "[invalid status state]"
}
Expand All @@ -271,10 +275,8 @@ func (s StatusState) String() string {
// MarshalText encodes a StatusState into text form.
func (s StatusState) MarshalText() ([]byte, error) {
switch s {
case StatusStateReady:
return []byte(StatusStateReady.String()), nil
case StatusStateSyncing:
return []byte(StatusStateSyncing.String()), nil
case StatusStateReady, StatusStateSyncing, StatusStateDBLoading:
return []byte(s.String()), nil
default:
return nil, fmt.Errorf("invalid StatusState: %d", s)
}
Expand All @@ -287,6 +289,8 @@ func (s *StatusState) UnmarshalText(text []byte) error {
*s = StatusStateReady
case StatusStateSyncing.String():
*s = StatusStateSyncing
case StatusStateDBLoading.String():
*s = StatusStateDBLoading
default:
return fmt.Errorf("invalid StatusState: %s", string(text))
}
Expand Down
13 changes: 11 additions & 2 deletions go/consensus/cometbft/full/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,19 @@ type commonNode struct {
const (
stateNotReady = 0
stateInitialized = 1
stateStarted = 2
stateStopping = 3
stateDBLoaded = 2
stateStarted = 3
stateStopping = 4
)

func (n *commonNode) initialized() bool {
return atomic.LoadUint32(&n.state) >= stateInitialized
}

func (n *commonNode) dbLoaded() bool {
return atomic.LoadUint32(&n.state) >= stateDBLoaded
}

func (n *commonNode) started() bool {
return atomic.LoadUint32(&n.state) >= stateStarted
}
Expand Down Expand Up @@ -148,6 +153,10 @@ func (n *commonNode) start() error {
return n.mux.Start()
}

func (n *commonNode) finishDBLoading() {
atomic.StoreUint32(&n.state, stateDBLoaded)
}

func (n *commonNode) finishStart() {
atomic.StoreUint32(&n.state, stateStarted)
close(n.startedCh)
Expand Down
6 changes: 5 additions & 1 deletion go/consensus/cometbft/full/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,10 @@ func (t *fullService) GetStatus(ctx context.Context) (*consensusAPI.Status, erro
if err != nil {
return nil, err
}
status.Status = consensusAPI.StatusStateSyncing
status.Status = consensusAPI.StatusStateDBLoading
if t.dbLoaded() {
status.Status = consensusAPI.StatusStateSyncing
}

status.P2P = &consensusAPI.P2PStatus{}
status.P2P.PubKey = t.identity.P2PSigner.Public()
Expand Down Expand Up @@ -679,6 +682,7 @@ func (t *fullService) lazyInit() error { // nolint: gocyclo
default:
}

t.finishDBLoading()
return db, nil
}

Expand Down
21 changes: 21 additions & 0 deletions go/runtime/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type Runtime interface {
// RegisterStorage sets the given local storage backend for the runtime.
RegisterStorage(storage storageAPI.Backend)

// StorageInitFailed reports that storage initialization for the runtime failed.
StorageInitFailed(err error)

// AddRoles adds available node roles to the runtime.
AddRoles(roles node.RolesMask)

Expand Down Expand Up @@ -129,6 +132,7 @@ type runtime struct { // nolint: maligned

consensus consensus.Backend
storage storageAPI.Backend
storageErrCh chan error
localStorage localstorage.LocalStorage

history history.History
Expand Down Expand Up @@ -204,9 +208,21 @@ func (r *runtime) RegisterStorage(storage storageAPI.Backend) {
if r.storage != nil {
panic("runtime storage backend already assigned")
}
close(r.storageErrCh)
r.storage = storage
}

func (r *runtime) StorageInitFailed(err error) {
r.Lock()
defer r.Unlock()

if r.storage != nil {
panic("runtime storage backend set already but reported init error")
}
r.storageErrCh <- err
close(r.storageErrCh)
}

func (r *runtime) AddRoles(roles node.RolesMask) {
r.Lock()
defer r.Unlock()
Expand Down Expand Up @@ -394,6 +410,10 @@ func (r *runtime) watchUpdates(ctx context.Context) {
}

func (r *runtime) finishInitialization() error {
if err, ok := <-r.storageErrCh; ok && err != nil {
return err
}

r.Lock()
defer r.Unlock()

Expand Down Expand Up @@ -555,6 +575,7 @@ func newRuntime(
id: id,
dataDir: rtDataDir,
consensus: consensus,
storageErrCh: make(chan error, 1),
localStorage: localStorage,
cancelCtx: cancel,
registryDescriptorCh: make(chan struct{}),
Expand Down
1 change: 1 addition & 0 deletions go/worker/storage/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const ModuleName = "worker/storage"
type StorageWorkerStatus string

const (
StatusDBLoading StorageWorkerStatus = "loading database"
StatusInitializing StorageWorkerStatus = "initializing"
StatusStarting StorageWorkerStatus = "starting"
StatusStopping StorageWorkerStatus = "stopping"
Expand Down
50 changes: 42 additions & 8 deletions go/worker/storage/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ type Node struct { // nolint: maligned
quitCh chan struct{}
workerQuitCh chan struct{}

initCh chan struct{}
storageInitCh chan error
initCh chan struct{}
}

func NewNode(
Expand All @@ -157,7 +158,7 @@ func NewNode(
roleProvider registration.RoleProvider,
rpcRoleProvider registration.RoleProvider,
workerCommonCfg workerCommon.Config,
localStorage storageApi.LocalBackend,
storageCtor func(context.Context) (storageApi.LocalBackend, error),
checkpointerCfg *checkpoint.CheckpointerConfig,
checkpointSyncCfg *CheckpointSyncConfig,
) (*Node, error) {
Expand All @@ -173,21 +174,21 @@ func NewNode(

workerCommonCfg: workerCommonCfg,

localStorage: localStorage,

fetchPool: fetchPool,

checkpointSyncCfg: checkpointSyncCfg,

status: api.StatusInitializing,
status: api.StatusDBLoading,

blockCh: channels.NewInfiniteChannel(),
diffCh: make(chan *fetchedDiff),
finalizeCh: make(chan finalizeResult),

quitCh: make(chan struct{}),
workerQuitCh: make(chan struct{}),
initCh: make(chan struct{}),

storageInitCh: make(chan error, 1),
initCh: make(chan struct{}),
}

// Validate checkpoint sync configuration.
Expand All @@ -200,6 +201,36 @@ func NewNode(

n.ctx, n.ctxCancel = context.WithCancel(context.Background())

go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't doing this async dangerous? Where do we ensure that start is not called before this completes? Related goroutines should probably block on a channel that gets closed once this deferred init completes?

defer close(n.storageInitCh)
localStorage, err := storageCtor(n.ctx)
if err != nil {
err = fmt.Errorf("error creating storage worker local backend: %w", err)
n.storageInitCh <- err
commonNode.Runtime.StorageInitFailed(err)
return
}
n.storageInitCh <- n.newBottomHalf(commonNode, rpcRoleProvider, localStorage, checkpointerCfg)
}()

return n, nil
}

func (n *Node) newBottomHalf(
commonNode *committee.Node,
rpcRoleProvider registration.RoleProvider,
localStorage storageApi.LocalBackend,
checkpointerCfg *checkpoint.CheckpointerConfig,
) error {
func() {
n.statusLock.Lock()
defer n.statusLock.Unlock()

n.status = api.StatusInitializing
}()
n.localStorage = localStorage
commonNode.Runtime.RegisterStorage(localStorage)

// Create a new checkpointer if enabled.
if checkpointerCfg != nil {
checkpointerCfg = &checkpoint.CheckpointerConfig{
Expand Down Expand Up @@ -245,7 +276,7 @@ func NewNode(
*checkpointerCfg,
)
if err != nil {
return nil, fmt.Errorf("failed to create checkpointer: %w", err)
return fmt.Errorf("failed to create checkpointer: %w", err)
}
}

Expand All @@ -264,7 +295,7 @@ func NewNode(
commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
}

return n, nil
return nil
}

// Service interface.
Expand All @@ -276,6 +307,9 @@ func (n *Node) Name() string {

// Start causes the worker to start responding to CometBFT new block events.
func (n *Node) Start() error {
if err := <-n.storageInitCh; err != nil {
return err
}
go n.watchQuit()
go n.worker()
if n.checkpointer != nil {
Expand Down
26 changes: 17 additions & 9 deletions go/worker/storage/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storage

import (
"context"
"fmt"

"github.com/oasisprotocol/oasis-core/go/common"
Expand All @@ -9,6 +10,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/workerpool"
"github.com/oasisprotocol/oasis-core/go/config"
storageApi "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common"
committeeCommon "github.com/oasisprotocol/oasis-core/go/worker/common/committee"
Expand Down Expand Up @@ -95,9 +97,8 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node, checkpointerC
}
}

localStorage, err := NewLocalBackend(commonNode.Runtime.DataDir(), id)
if err != nil {
return fmt.Errorf("can't create local storage backend: %w", err)
storageCtor := func(ctx context.Context) (storageApi.LocalBackend, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why we don't create the local storage backend in committee/node? This would avoid passing in this factory function.

return NewLocalBackend(commonNode.Runtime.DataDir(), id)
}

node, err := committee.NewNode(
Expand All @@ -106,7 +107,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node, checkpointerC
rp,
rpRPC,
w.commonWorker.GetConfig(),
localStorage,
storageCtor,
checkpointerCfg,
&committee.CheckpointSyncConfig{
Disabled: config.GlobalConfig.Storage.CheckpointSyncDisabled,
Expand All @@ -116,7 +117,6 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node, checkpointerC
if err != nil {
return err
}
commonNode.Runtime.RegisterStorage(localStorage)
commonNode.AddHooks(node)
w.runtimes[id] = node

Expand Down Expand Up @@ -167,13 +167,21 @@ func (w *Worker) Start() error {
}()

// Start all runtimes and wait for initialization.
go func() {
w.logger.Info("starting storage sync services", "num_runtimes", len(w.runtimes))
var err error

for _, r := range w.runtimes {
_ = r.Start()
w.logger.Info("starting storage sync services", "num_runtimes", len(w.runtimes))
for id, r := range w.runtimes {
if err = r.Start(); err != nil {
w.logger.Error("committee node did not start successfully", "err", err, "runtime", id)
}
defer func(r *committee.Node) {
if err != nil {
r.Stop()
}
}(r)
}

go func() {
// Wait for runtimes to be initialized and the node to be registered.
for _, r := range w.runtimes {
<-r.Initialized()
Expand Down
Loading