Skip to content

Commit

Permalink
Add stateful LeaderSelectionPolicy to EpochData for snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
ranchalp committed Jan 13, 2023
1 parent 740e2ff commit e0cead7
Show file tree
Hide file tree
Showing 44 changed files with 256 additions and 175 deletions.
7 changes: 5 additions & 2 deletions cmd/bench/cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,14 @@ func runNode() error {
}

localCrypto := deploytest.NewLocalCryptoSystem("pseudo", membership.GetIDs(initialMembership), logger)

genesisCheckpoint, err := trantor.GenesisCheckpoint([]byte{}, smrParams, logger)
if err != nil {
return fmt.Errorf("could not create genesis checkpoint: %w", err)
}
benchApp, err := trantor.New(
ownID,
transport,
trantor.GenesisCheckpoint([]byte{}, smrParams),
genesisCheckpoint,
localCrypto.Crypto(ownID),
&App{Logger: logger, Membership: initialMembership},
smrParams,
Expand Down
6 changes: 5 additions & 1 deletion cmd/mircat/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,15 @@ func debuggerNode(id t.NodeID, membership map[t.NodeID]t.NodeAddress) (*mir.Node
// Instantiate an ISS protocol module with the default configuration.
// TODO: The initial app state must be involved here. Otherwise checkpoint hashes might not match.
issConfig := issutil.DefaultParams(membership)
stateSnapshotpb, err := iss.InitialStateSnapshot([]byte{}, issConfig, logger)
if err != nil {
return nil, err
}
protocol, err := iss.New(
id,
iss.DefaultModuleConfig(),
issConfig,
checkpoint.Genesis(iss.InitialStateSnapshot([]byte{}, issConfig)),
checkpoint.Genesis(stateSnapshotpb),
crypto.SHA256,
cryptoImpl,
logging.Decorate(logger, "ISS: "),
Expand Down
46 changes: 34 additions & 12 deletions pkg/iss/iss.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ type ISS struct {
// If no stable checkpoint has been observed yet, lastStableCheckpoint is initialized to a stable checkpoint value
// corresponding to the initial state and associated with sequence number 0.
lastStableCheckpoint *checkpoint.StableCheckpoint

// The logic for selecting leader nodes in each epoch.
// For details see the documentation of the LeaderSelectionPolicy type.
// ATTENTION: The leader selection policy is stateful!
// Must not be nil.
LeaderPolicy issutil.LeaderSelectionPolicy
}

// New returns a new initialized instance of the ISS protocol module to be used when instantiating a mir.Node.
Expand Down Expand Up @@ -132,6 +138,7 @@ func New(

// Logger the ISS implementation uses to output log messages.
logger logging.Logger,

) (*ISS, error) {
if logger == nil {
logger = logging.ConsoleErrorLogger
Expand All @@ -144,6 +151,10 @@ func New(

// TODO: Make sure that startingChkp is consistent with params.

leaderPolicy, err := issutil.LeaderPolicyFromBytes(startingChkp.Snapshot.EpochData.LeaderPolicy)
if err != nil {
return nil, fmt.Errorf("invalid leader policy in starting checkpoint: %w", err)
}
// Initialize a new ISS object.
iss := &ISS{
ownID: ownID,
Expand All @@ -161,6 +172,7 @@ func New(
nextDeliveredSN: startingChkp.SeqNr(),
newEpochSN: startingChkp.SeqNr(),
lastStableCheckpoint: startingChkp,
LeaderPolicy: leaderPolicy,
// TODO: Make sure that verification of the stable checkpoint certificate for epoch 0 is handled properly.
// (Probably "always valid", if the membership is right.) There is no epoch -1 with nodes to sign it.
}
Expand All @@ -182,7 +194,8 @@ func New(
func InitialStateSnapshot(
appState []byte,
params *issutil.ModuleParams,
) *commonpb.StateSnapshot {
logger logging.Logger,
) (*commonpb.StateSnapshot, error) {

// Create the first membership and all ConfigOffset following ones (by using the initial one).
memberships := make([]map[t.NodeID]t.NodeAddress, params.ConfigOffset+1)
Expand All @@ -192,14 +205,18 @@ func InitialStateSnapshot(

// TODO: This assumes the simple leader selection policy. Generalize!
firstEpochLength := params.SegmentLength * len(params.InitialMembership)

leaderPolicy, err := issutil.NewBlackListLeaderPolicy(maputil.GetSortedKeys(params.InitialMembership), issutil.StrongQuorum(len(params.InitialMembership))).Bytes()
if err != nil {
return nil, err
}
return &commonpb.StateSnapshot{
AppData: appState,
EpochData: &commonpb.EpochData{
EpochConfig: events.EpochConfig(0, 0, firstEpochLength, memberships),
ClientProgress: clientprogress.NewClientProgress(nil).Pb(),
LeaderPolicy: leaderPolicy,
},
}
}, nil
}

// ============================================================
Expand Down Expand Up @@ -264,7 +281,7 @@ func (iss *ISS) applyInit() (*events.EventList, error) {
eventsOut.PushBack(events.AppRestoreState(iss.moduleConfig.App, iss.lastStableCheckpoint.Pb()))

// Start the first epoch (not necessarily epoch 0, depending on the starting checkpoint).
eventsOut.PushBackList(iss.startEpoch(iss.lastStableCheckpoint.Epoch(), iss.Params.LeaderPolicy))
eventsOut.PushBackList(iss.startEpoch(iss.lastStableCheckpoint.Epoch()))

return eventsOut, nil
}
Expand Down Expand Up @@ -541,6 +558,12 @@ func (iss *ISS) applyStableCheckpointMessage(chkpPb *checkpointpb.StableCheckpoi

iss.logger.Log(logging.LevelDebug, "Installing state snapshot.", "epoch", chkp.Epoch())

result, err := issutil.LeaderPolicyFromBytes(chkp.Snapshot.EpochData.LeaderPolicy)
if err != nil {
iss.logger.Log(logging.LevelWarn, "Error deserializing leader selection policy from checkpoint", err)
return events.EmptyList(), nil
}
iss.LeaderPolicy = result
// Clean up global ISS state that belongs to the current epoch
// instance that local replica got stuck with.
iss.epochs = make(map[t.EpochNr]*epochInfo)
Expand All @@ -553,10 +576,8 @@ func (iss *ISS) applyStableCheckpointMessage(chkpPb *checkpointpb.StableCheckpoi
// and initialize the corresponding availability submodules.
iss.memberships = chkp.Memberships()
iss.nextNewMembership = nil

// Update the last stable checkpoint stored in the global ISS structure.
iss.lastStableCheckpoint = chkp

// Create an event to request the application module for
// restoring its state from the snapshot received in the new
// stable checkpoint message.
Expand All @@ -568,7 +589,6 @@ func (iss *ISS) applyStableCheckpointMessage(chkpPb *checkpointpb.StableCheckpoi
// TODO: Properly serialize and deserialize the leader selection policy and pass it here.
eventsOut.PushBackList(iss.startEpoch(
chkp.Epoch(),
iss.Params.LeaderPolicy.Reconfigure(maputil.GetSortedKeys(iss.memberships[0])),
))

// Prune the old state of all related modules.
Expand All @@ -591,15 +611,15 @@ func (iss *ISS) applyStableCheckpointMessage(chkpPb *checkpointpb.StableCheckpoi
// startEpoch emits the events necessary for a new epoch to start operating.
// This includes informing the application about the new epoch and initializing all the necessary external modules
// such as availability and orderers.
func (iss *ISS) startEpoch(epochNr t.EpochNr, leaderPolicy issutil.LeaderSelectionPolicy) *events.EventList {
func (iss *ISS) startEpoch(epochNr t.EpochNr) *events.EventList {
eventsOut := events.EmptyList()

// Initialize the internal data structures for the new epoch.
nodeIDs := maputil.GetSortedKeys(iss.memberships[0])
epoch := newEpochInfo(epochNr, iss.newEpochSN, nodeIDs, leaderPolicy)
epoch := newEpochInfo(epochNr, iss.newEpochSN, nodeIDs, iss.LeaderPolicy)
iss.epochs[epochNr] = &epoch
iss.epoch = &epoch
iss.logger.Log(logging.LevelInfo, "Initializing new epoch", "epochNr", epochNr, "nodes", nodeIDs, "leaders", leaderPolicy.Leaders())
iss.logger.Log(logging.LevelInfo, "Initializing new epoch", "epochNr", epochNr, "nodes", nodeIDs, "leaders", iss.LeaderPolicy.Leaders())

// Signal the new epoch to the application.
eventsOut.PushBack(events.NewEpoch(iss.moduleConfig.App, iss.epoch.Nr()))
Expand Down Expand Up @@ -707,6 +727,9 @@ func (iss *ISS) processCommitted() (*events.EventList, error) {
}
}

if iss.commitLog[iss.nextDeliveredSN].Aborted {
iss.LeaderPolicy.Suspect(iss.epoch.Nr(), iss.commitLog[iss.nextDeliveredSN].Suspect)
}
// Create a new DeliverCert event.
eventsOut.PushBack(events.DeliverCert(iss.moduleConfig.App, iss.nextDeliveredSN, &cert))

Expand Down Expand Up @@ -753,6 +776,7 @@ func (iss *ISS) advanceEpoch() (*events.EventList, error) {
oldNodeIDs := maputil.GetSortedKeys(iss.memberships[0])
iss.memberships = append(iss.memberships[1:], iss.nextNewMembership)
iss.nextNewMembership = nil
iss.epoch.leaderPolicy.Reconfigure(maputil.GetSortedKeys(iss.memberships[0]))

// Start executing the new epoch.
// This must happen before starting the checkpoint protocol, since the application
Expand All @@ -761,9 +785,7 @@ func (iss *ISS) advanceEpoch() (*events.EventList, error) {
// (startEpoch emits an event for the application making it transition to the new epoch).
eventsOut.PushBackList(iss.startEpoch(
newEpochNr,
iss.epoch.leaderPolicy.Reconfigure(maputil.GetSortedKeys(iss.memberships[0])),
))

// Create a new checkpoint tracker to start the checkpointing protocol.
// This must happen after initialization of the new epoch,
// as the sequence number the checkpoint will be associated with (iss.nextDeliveredSN)
Expand Down
3 changes: 2 additions & 1 deletion pkg/modules/mockmodules/internal/mock_internal/impl.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions pkg/net/grpc/grpctransport.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/net/grpc/grpctransport_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/orderers/pbftsegmentchkp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package orderers

import (
"bytes"

"github.com/filecoin-project/mir/pkg/util/issutil"

"github.com/filecoin-project/mir/pkg/events"
Expand Down
1 change: 1 addition & 0 deletions pkg/orderers/pbftslot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package orderers

import (
"bytes"

"github.com/filecoin-project/mir/pkg/util/issutil"

"github.com/filecoin-project/mir/pkg/pb/eventpb"
Expand Down
3 changes: 2 additions & 1 deletion pkg/orderers/pbftviewchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package orderers
import (
"bytes"
"fmt"
"github.com/filecoin-project/mir/pkg/util/issutil"
"sort"

"github.com/filecoin-project/mir/pkg/util/issutil"

"github.com/filecoin-project/mir/pkg/pb/eventpb"

"github.com/filecoin-project/mir/pkg/events"
Expand Down
10 changes: 6 additions & 4 deletions pkg/pb/availabilitypb/availabilitypb.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pkg/pb/availabilitypb/batchdbpb/batchdbpb.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pkg/pb/availabilitypb/mscpb/mscpb.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pkg/pb/batchfetcherpb/batchfetcherpb.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pkg/pb/bcbpb/bcbpb.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions pkg/pb/checkpointpb/checkpointpb.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e0cead7

Please sign in to comment.