Skip to content

Commit

Permalink
Merge pull request #5553 from oasisprotocol/kostko/feature/rt-executo…
Browse files Browse the repository at this point in the history
…r-iosingleroot

go/worker/compute: Simplify I/O root commit
  • Loading branch information
kostko authored Feb 12, 2024
2 parents 93ecca9 + cb75520 commit 8102918
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 171 deletions.
4 changes: 4 additions & 0 deletions .changelog/5553.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/compute: Simplify I/O root commit

This also avoids an intermediate committed IO root which complicates the
required database layout.
182 changes: 12 additions & 170 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ func (n *Node) transitionStateToProcessingFailure(
proposal: proposal,
rank: rank,
computed: nil,
raw: nil,
}
}

Expand Down Expand Up @@ -436,21 +435,6 @@ func (n *Node) scheduleBatch(ctx context.Context, round uint64, force bool) {
}()
}

func (n *Node) storeTransactions(ctx context.Context, blk *block.Block, inputWriteLog storage.WriteLog, inputRoot hash.Hash) error {
var emptyRoot hash.Hash
emptyRoot.Empty()

return n.storage.Apply(ctx, &storage.ApplyRequest{
Namespace: blk.Header.Namespace,
RootType: storage.RootTypeIO,
SrcRound: blk.Header.Round + 1,
SrcRoot: emptyRoot,
DstRound: blk.Header.Round + 1,
DstRoot: inputRoot,
WriteLog: inputWriteLog,
})
}

func (n *Node) publishProposal(ctx context.Context, proposal *commitment.Proposal) error {
if err := proposal.Sign(n.commonNode.Identity.NodeSigner, n.commonNode.Runtime.ID()); err != nil {
return fmt.Errorf("failed to sign proposal header: %w", err)
Expand Down Expand Up @@ -528,115 +512,6 @@ func (n *Node) startSchedulingBatch(ctx context.Context, batch []*txpool.TxQueue
}
}

func (n *Node) startLocalStorageReplication(
ctx context.Context,
blk *block.Block,
ioRootHash hash.Hash,
batch transaction.RawBatch,
) <-chan error {
ch := make(chan error, 1)

ioRoot := storage.Root{
Namespace: blk.Header.Namespace,
Version: blk.Header.Round + 1,
Type: storage.RootTypeIO,
Hash: ioRootHash,
}

// If we have a local storage node, replicate batch locally so we will be able to Apply
// locally later when proposing a batch. This also avoids needless replication for things
// that we already have.
replicateIO := make(chan error)
go func() {
defer close(replicateIO)

// Check if the root is already present as in this case no replication is needed.
if n.storage.NodeDB().HasRoot(ioRoot) {
replicateIO <- nil
return
}

n.logger.Debug("replicating I/O root locally",
"io_root", ioRoot,
)

emptyRoot := ioRoot
emptyRoot.Hash.Empty()

ioTree := transaction.NewTree(nil, emptyRoot)
defer ioTree.Close()

for idx, tx := range batch {
if err := ioTree.AddTransaction(ctx, transaction.Transaction{Input: tx, BatchOrder: uint32(idx)}, nil); err != nil {
n.logger.Error("failed to create I/O tree",
"err", err,
)
replicateIO <- err
return
}
}

ioWriteLog, ioRootHashCheck, err := ioTree.Commit(ctx)
if err != nil {
n.logger.Error("failed to create I/O tree",
"err", err,
)
replicateIO <- err
return
}
if !ioRootHashCheck.Equal(&ioRootHash) {
n.logger.Error("inconsistent I/O root",
"io_root_hash", ioRootHashCheck,
"expected", ioRootHash,
)
replicateIO <- fmt.Errorf("inconsistent I/O root")
return
}

err = n.storage.Apply(ctx, &storage.ApplyRequest{
Namespace: ioRoot.Namespace,
RootType: ioRoot.Type,
SrcRound: ioRoot.Version,
SrcRoot: emptyRoot.Hash,
DstRound: ioRoot.Version,
DstRoot: ioRoot.Hash,
WriteLog: ioWriteLog,
})
if err != nil {
n.logger.Error("failed to apply I/O tree locally",
"err", err,
)
replicateIO <- err
return
}

replicateIO <- nil
}()

// Wait for replication to complete.
go func() {
defer close(ch)

var combinedErr error
select {
case <-ctx.Done():
combinedErr = ctx.Err()
case err := <-replicateIO:
if err != nil {
combinedErr = fmt.Errorf("failed to replicate I/O root: %w", err)
}
}

n.logger.Debug("local storage replication done",
"io_root", ioRoot,
)

ch <- combinedErr
}()

return ch
}

func (n *Node) runtimeExecuteTxBatch(
ctx context.Context,
rt host.RichRuntime,
Expand Down Expand Up @@ -747,9 +622,6 @@ func (n *Node) startProcessingBatch(ctx context.Context, proposal *commitment.Pr
"batch_size", len(batch),
)

// Optionally start local storage replication in parallel to batch dispatch.
replicateCh := n.startLocalStorageReplication(ctx, n.blockInfo.RuntimeBlock, proposal.Header.BatchHash, batch)

// Ask the runtime to execute the batch.
rsp, err := n.runtimeExecuteTxBatch(
ctx,
Expand All @@ -770,26 +642,12 @@ func (n *Node) startProcessingBatch(ctx context.Context, proposal *commitment.Pr
return
}

// Wait for replication to complete before proposing a batch to ensure that we can cleanly
// apply any updates.
select {
case <-ctx.Done():
return
case err = <-replicateCh:
if err != nil {
n.logger.Error("local storage replication failed",
"err", err,
)
return
}
}

// Submit response to the round worker.
n.processedBatchCh <- &processedBatch{
proposal: proposal,
rank: rank,
computed: &rsp.Batch,
raw: batch,
proposal: proposal,
rank: rank,
computed: &rsp.Batch,
txInputWriteLog: rsp.TxInputWriteLog,
}
}

Expand Down Expand Up @@ -822,7 +680,7 @@ func (n *Node) proposeBatch(
n.logger.Debug("proposing batch",
"scheduler_id", processed.proposal.NodeID,
"node_id", n.commonNode.Identity.NodeSigner.Public(),
"batch_size", len(processed.raw),
"batch_size", len(processed.proposal.Batch),
"io_root", *batch.Header.IORoot,
"state_root", *batch.Header.StateRoot,
"messages_hash", *batch.Header.MessagesHash,
Expand All @@ -845,8 +703,6 @@ func (n *Node) proposeBatch(
ec.Messages = batch.Messages
}

inputRoot := processed.proposal.Header.BatchHash

// Commit I/O and state write logs to storage.
storageErr := func() error {
start := time.Now()
Expand All @@ -858,14 +714,17 @@ func (n *Node) proposeBatch(
defer cancel()

// Store final I/O root.
var emptyRoot hash.Hash
emptyRoot.Empty()

err := n.storage.Apply(ctx, &storage.ApplyRequest{
Namespace: lastHeader.Namespace,
RootType: storage.RootTypeIO,
SrcRound: lastHeader.Round + 1,
SrcRoot: inputRoot,
SrcRoot: emptyRoot,
DstRound: lastHeader.Round + 1,
DstRoot: *batch.Header.IORoot,
WriteLog: batch.IOWriteLog,
WriteLog: append(processed.txInputWriteLog, batch.IOWriteLog...),
})
if err != nil {
return err
Expand Down Expand Up @@ -926,20 +785,10 @@ func (n *Node) proposeBatch(
return
}

// Due to backwards compatibility with runtimes that don't provide transaction hashes as output
// we need to manually compute them here.
txHashes := processed.proposal.Batch
if len(processed.raw) > 0 && len(txHashes) == 0 {
txHashes = make([]hash.Hash, 0, len(processed.raw))
for _, tx := range processed.raw {
txHashes = append(txHashes, hash.NewFromBytes(tx))
}
}

n.proposedBatch = &proposedBatch{
batchStartTime: state.batchStartTime,
proposedIORoot: *ec.Header.Header.IORoot,
txHashes: txHashes,
txHashes: processed.proposal.Batch,
}

n.transitionState(StateWaitingForBatch{})
Expand Down Expand Up @@ -1232,16 +1081,9 @@ func (n *Node) handleProcessedBatch(ctx context.Context, batch *processedBatch)
"input_root", batch.proposal.Header.BatchHash,
"tx_hashes", batch.proposal.Batch,
)
err := n.storeTransactions(ctx, n.blockInfo.RuntimeBlock, batch.txInputWriteLog, batch.proposal.Header.BatchHash)
if err != nil {
n.logger.Error("failed to store transaction",
"err", err,
)
return
}

// Sign and submit the proposal to P2P network.
err = n.publishProposal(ctx, batch.proposal)
err := n.publishProposal(ctx, batch.proposal)
if err != nil {
n.logger.Error("failed to sign and publish proposal",
"err", err,
Expand Down
1 change: 0 additions & 1 deletion go/worker/compute/executor/committee/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ type processedBatch struct {
rank uint64

computed *protocol.ComputedBatch
raw transaction.RawBatch

txInputWriteLog storage.WriteLog
}
Expand Down

0 comments on commit 8102918

Please sign in to comment.