Skip to content

Commit

Permalink
go/keymanager/churp: Ignore suspended runtimes on epoch change
Browse files Browse the repository at this point in the history
  • Loading branch information
peternose committed Feb 13, 2024
1 parent 17cf53a commit 111541a
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 48 deletions.
80 changes: 43 additions & 37 deletions go/consensus/cometbft/apps/keymanager/churp/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,61 @@ import (
beacon "github.com/oasisprotocol/oasis-core/go/beacon/api"
tmapi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api"
churpState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/keymanager/churp/state"
registryState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/registry/state"
"github.com/oasisprotocol/oasis-core/go/keymanager/churp"
)

func (ext *churpExt) onEpochChange(ctx *tmapi.Context, epoch beacon.EpochTime) error {
// Query the runtime and node lists.
state := churpState.NewMutableState(ctx.State())
regState := registryState.NewMutableState(ctx.State())
runtimes, _ := regState.Runtimes(ctx)

statuses, err := state.Statuses(ctx)
if err != nil {
return fmt.Errorf("keymanager: churp: failed to fetch statuses: %w", err)
}

for _, status := range statuses {
if status.NextHandoff == churp.HandoffsDisabled {
continue
for _, rt := range runtimes {
statuses, err := state.Statuses(ctx, rt.ID)
if err != nil {
return fmt.Errorf("keymanager: churp: failed to fetch runtime statuses: %w", err)
}

switch epoch {
case status.NextHandoff:
// The epoch for the handoff just started, meaning that registrations
// are now closed. If not enough nodes applied for the next committee,
// we need to reset applications and start collecting again.
minCommitteeSize := int(status.Threshold)*2 + 1
if len(status.Applications) >= minCommitteeSize {
for _, status := range statuses {
if status.NextHandoff == churp.HandoffsDisabled {
continue
}
case status.NextHandoff + 1:
// Handoff ended. Not all nodes replicated the secret and confirmed it,
// as otherwise the next handoff epoch would be updated.
// Reset and start collecting again
default:
continue
}

// The handoff failed, so postpone the round to the next epoch, giving
// nodes one epoch time to submit applications.
status.Applications = nil
status.Checksum = nil
status.NextHandoff = epoch + 1

if err := state.SetStatus(ctx, status); err != nil {
ctx.Logger().Error("keymanager: churp: failed to set status",
"err", err,
)
return fmt.Errorf("keymanager: churp: failed to set status: %w", err)
}
switch epoch {
case status.NextHandoff:
// The epoch for the handoff just started, meaning that registrations
// are now closed. If not enough nodes applied for the next committee,
// we need to reset applications and start collecting again.
minCommitteeSize := int(status.Threshold)*2 + 1
if len(status.Applications) >= minCommitteeSize {
continue
}
case status.NextHandoff + 1:
// Handoff ended. Not all nodes replicated the secret and confirmed it,
// as otherwise the next handoff epoch would be updated.
// Reset and start collecting again
default:
continue
}

// The handoff failed, so postpone the round to the next epoch, giving
// nodes one epoch time to submit applications.
status.Applications = nil
status.Checksum = nil
status.NextHandoff = epoch + 1

ctx.EmitEvent(tmapi.NewEventBuilder(ext.appName).TypedAttribute(&churp.UpdateEvent{
Status: status,
}))
if err := state.SetStatus(ctx, status); err != nil {
ctx.Logger().Error("keymanager: churp: failed to set status",
"err", err,
)
return fmt.Errorf("keymanager: churp: failed to set status: %w", err)
}

ctx.EmitEvent(tmapi.NewEventBuilder(ext.appName).TypedAttribute(&churp.UpdateEvent{
Status: status,
}))
}
}

return nil
Expand Down
52 changes: 43 additions & 9 deletions go/consensus/cometbft/apps/keymanager/churp/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ var (
// Value is CBOR-serialized churp.ConsensusParameters.
parametersKeyFmt = consensus.KeyFormat.New(0x74)

// satusKeyFmt is the status key format.
// statusKeyFmt is the status key format.
//
// Key format is: 0x75 <runtime-id> <churp-id>.
// Value is CBOR-serialized churp.Status.
satusKeyFmt = consensus.KeyFormat.New(0x75, keyformat.H(&common.Namespace{}), uint8(0))
statusKeyFmt = consensus.KeyFormat.New(0x75, keyformat.H(&common.Namespace{}), uint8(0))
)

// ImmutableState is a immutable state wrapper.
Expand All @@ -48,9 +48,9 @@ func (st *ImmutableState) ConsensusParameters(ctx context.Context) (*churp.Conse
return &params, nil
}

// Status returns the CHURP status for the specified runtime ID and CHURP ID.
// Status returns the CHURP status for the specified runtime and CHURP instance.
func (st *ImmutableState) Status(ctx context.Context, runtimeID common.Namespace, churpID uint8) (*churp.Status, error) {
data, err := st.is.Get(ctx, satusKeyFmt.Encode(&runtimeID, churpID))
data, err := st.is.Get(ctx, statusKeyFmt.Encode(&runtimeID, churpID))
if err != nil {
return nil, abciAPI.UnavailableStateError(err)
}
Expand All @@ -65,14 +65,48 @@ func (st *ImmutableState) Status(ctx context.Context, runtimeID common.Namespace
return &status, nil
}

// Statuses returns the CHURP statuses for all runtimes.
func (st *ImmutableState) Statuses(ctx context.Context) ([]*churp.Status, error) {
// Statuses returns the CHURP statuses for the specified runtime.
func (st *ImmutableState) Statuses(ctx context.Context, runtimeID common.Namespace) ([]*churp.Status, error) {
it := st.is.NewIterator(ctx)
defer it.Close()

// We need to pre-hash the runtime ID, so we can compare it below.
runtimeIDHash := keyformat.PreHashed(runtimeID.Hash())

var statuses []*churp.Status
for it.Seek(statusKeyFmt.Encode(&runtimeID)); it.Valid(); it.Next() {
var (
hash keyformat.PreHashed
churpID uint8
)
if !statusKeyFmt.Decode(it.Key(), &hash, &churpID) {
break
}
if runtimeIDHash != hash {
break
}

var status churp.Status
if err := cbor.Unmarshal(it.Value(), &status); err != nil {
return nil, abciAPI.UnavailableStateError(err)
}
statuses = append(statuses, &status)
}
if it.Err() != nil {
return nil, abciAPI.UnavailableStateError(it.Err())
}

return statuses, nil
}

// AllStatuses returns the CHURP statuses for all runtimes.
func (st *ImmutableState) AllStatuses(ctx context.Context) ([]*churp.Status, error) {
it := st.is.NewIterator(ctx)
defer it.Close()

var statuses []*churp.Status
for it.Seek(satusKeyFmt.Encode()); it.Valid(); it.Next() {
if !satusKeyFmt.Decode(it.Key()) {
for it.Seek(statusKeyFmt.Encode()); it.Valid(); it.Next() {
if !statusKeyFmt.Decode(it.Key()) {
break
}

Expand Down Expand Up @@ -118,7 +152,7 @@ func (st *MutableState) SetConsensusParameters(ctx context.Context, params *chur

// SetStatus updates the state using the provided CHURP status.
func (st *MutableState) SetStatus(ctx context.Context, status *churp.Status) error {
err := st.ms.Insert(ctx, satusKeyFmt.Encode(&status.RuntimeID, status.ID), cbor.Marshal(status))
err := st.ms.Insert(ctx, statusKeyFmt.Encode(&status.RuntimeID, status.ID), cbor.Marshal(status))
return abciAPI.UnavailableStateError(err)
}

Expand Down
53 changes: 51 additions & 2 deletions go/consensus/cometbft/apps/keymanager/churp/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestStatuses(t *testing.T) {
}

// Empty state.
fetched, err := st.Statuses(ctx)
fetched, err := st.Statuses(ctx, statuses[0].RuntimeID)
require.NoError(t, err)
require.Empty(t, fetched)

Expand All @@ -135,7 +135,56 @@ func TestStatuses(t *testing.T) {
}

// New state.
fetched, err = st.Statuses(ctx)
fetched, err = st.Statuses(ctx, statuses[0].RuntimeID)
require.NoError(t, err)
require.ElementsMatch(t, statuses[:2], fetched)
}

func TestAllStatuses(t *testing.T) {
appState := abciAPI.NewMockApplicationState(&abciAPI.MockApplicationStateConfig{})
ctx := appState.NewContext(abciAPI.ContextBeginBlock)
defer ctx.Close()

st := NewMutableState(ctx.State())

// Prepare.
statuses := []*churp.Status{
{
Identity: churp.Identity{
ID: 1,
RuntimeID: common.NewTestNamespaceFromSeed([]byte{1}, common.NamespaceTest),
},
Threshold: 1,
},
{
Identity: churp.Identity{
ID: 2,
RuntimeID: common.NewTestNamespaceFromSeed([]byte{1}, common.NamespaceTest),
},
Threshold: 2,
},
{
Identity: churp.Identity{
ID: 1,
RuntimeID: common.NewTestNamespaceFromSeed([]byte{2}, common.NamespaceTest),
},
Threshold: 1,
},
}

// Empty state.
fetched, err := st.AllStatuses(ctx)
require.NoError(t, err)
require.Empty(t, fetched)

// Set state.
for _, status := range statuses {
err = st.SetStatus(ctx, status)
require.NoError(t, err)
}

// New state.
fetched, err = st.AllStatuses(ctx)
require.NoError(t, err)
require.ElementsMatch(t, statuses, fetched)
}

0 comments on commit 111541a

Please sign in to comment.