Skip to content

Commit

Permalink
Make mempool not drop transactions (#501)
Browse files Browse the repository at this point in the history
* Rename deliveredrequests to deliveredtxs
* Add methods for querying client progress
* Update mempool-related protobuf definitions
* Re-generate protobufs
* Clean up and add imports for future code
* Adapt mempool transaction lookup to new protobufs
* Make mempool aware of the current epoch

This is required for not emitting the same transactions multiple times
in the same epoch, but still re-emitting transactions
that have not been delivered in a previous epoch.

The main changes to the logic of the mempool itself
are not yet included in this commit to simplify review.
They will follow in the next commit.

* Make mempool not drop transactions

The explanation of how this works is written in the comments
in the beginning of the updated formbatches.go file.

Signed-off-by: Matej Pavlovic <[email protected]>
  • Loading branch information
matejpavlovic authored Jul 26, 2023
1 parent 86ae92b commit f40ba06
Show file tree
Hide file tree
Showing 55 changed files with 1,017 additions and 617 deletions.
2 changes: 2 additions & 0 deletions pkg/availability/multisigcollector/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
msctypes "github.com/filecoin-project/mir/pkg/availability/multisigcollector/types"
cryptopbtypes "github.com/filecoin-project/mir/pkg/pb/cryptopb/types"
trantorpbtypes "github.com/filecoin-project/mir/pkg/pb/trantorpb/types"
tt "github.com/filecoin-project/mir/pkg/trantor/types"
t "github.com/filecoin-project/mir/pkg/types"
)

Expand All @@ -30,6 +31,7 @@ type ModuleConfig struct {
// All replicas are expected to use identical module parameters.
type ModuleParams struct {
InstanceUID []byte // unique identifier for this instance used to prevent replay attacks
EpochNr tt.EpochNr // The epoch in which this instance of MultisigCollector operates
Membership *trantorpbtypes.Membership // the list of participating nodes
Limit int // the maximum number of certificates to generate before a request is completed
MaxRequests int // the maximum number of requests to be provided by this module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ import (
mscpbdsl "github.com/filecoin-project/mir/pkg/pb/availabilitypb/mscpb/dsl"
mscpbmsgs "github.com/filecoin-project/mir/pkg/pb/availabilitypb/mscpb/msgs"
mscpbtypes "github.com/filecoin-project/mir/pkg/pb/availabilitypb/mscpb/types"
cryptopbdsl "github.com/filecoin-project/mir/pkg/pb/cryptopb/dsl"
transportpbdsl "github.com/filecoin-project/mir/pkg/pb/transportpb/dsl"
tt "github.com/filecoin-project/mir/pkg/trantor/types"
"github.com/filecoin-project/mir/pkg/util/membutil"
"github.com/filecoin-project/mir/pkg/util/sliceutil"

apbtypes "github.com/filecoin-project/mir/pkg/pb/availabilitypb/types"
cryptopbdsl "github.com/filecoin-project/mir/pkg/pb/cryptopb/dsl"
mempooldsl "github.com/filecoin-project/mir/pkg/pb/mempoolpb/dsl"
transportpbdsl "github.com/filecoin-project/mir/pkg/pb/transportpb/dsl"
trantorpbtypes "github.com/filecoin-project/mir/pkg/pb/trantorpb/types"
tt "github.com/filecoin-project/mir/pkg/trantor/types"
t "github.com/filecoin-project/mir/pkg/types"
"github.com/filecoin-project/mir/pkg/util/maputil"
"github.com/filecoin-project/mir/pkg/util/membutil"
"github.com/filecoin-project/mir/pkg/util/sliceutil"
)

// certCreationState represents the state related to this part of the module.
Expand Down Expand Up @@ -74,7 +73,7 @@ func IncludeCreatingCertificates(
receivedSig: make(map[t.NodeID]bool),
sigs: make(map[t.NodeID][]byte),
}
mempooldsl.RequestBatch(m, mc.Mempool, &requestBatchFromMempoolContext{reqID})
mempooldsl.RequestBatch(m, mc.Mempool, params.EpochNr, &requestBatchFromMempoolContext{reqID})
return nil
})

Expand Down
6 changes: 3 additions & 3 deletions pkg/availability/multisigcollector/multisigcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package multisigcollector
import (
"math"

"github.com/filecoin-project/mir/pkg/availability/multisigcollector/common"
"github.com/filecoin-project/mir/pkg/availability/multisigcollector/internal/parts/batchreconstruction"
"github.com/filecoin-project/mir/pkg/availability/multisigcollector/internal/parts/certcreation"
"github.com/filecoin-project/mir/pkg/availability/multisigcollector/internal/parts/certverification"

"github.com/filecoin-project/mir/pkg/availability/multisigcollector/common"
"github.com/filecoin-project/mir/pkg/dsl"
"github.com/filecoin-project/mir/pkg/factorymodule"
"github.com/filecoin-project/mir/pkg/logging"
Expand All @@ -24,7 +23,7 @@ type ModuleConfig = common.ModuleConfig
type ModuleParams = common.ModuleParams

// DefaultParamsTemplate returns the availability module parameters structure partially filled with default values.
// Fields without a meaningful default value (like InstanceUID and Membership)
// Fields without a meaningful default value (like InstanceUID, Epoch, and Membership)
// are left empty (zero values for their corresponding type).
func DefaultParamsTemplate() ModuleParams {
return ModuleParams{
Expand Down Expand Up @@ -70,6 +69,7 @@ func NewReconfigurableModule(mc ModuleConfig, paramsTemplate ModuleParams, logge
// Fill in instance-specific parameters.
moduleParams := paramsTemplate
moduleParams.InstanceUID = []byte(mscID)
moduleParams.EpochNr = mscParams.Epoch
moduleParams.Membership = mscParams.Membership
moduleParams.MaxRequests = int(mscParams.MaxRequests)
// TODO: Use InstanceUIDs properly.
Expand Down
36 changes: 19 additions & 17 deletions pkg/batchfetcher/batchfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,28 @@ package batchfetcher
import (
"fmt"

bfevents "github.com/filecoin-project/mir/pkg/batchfetcher/events"
"github.com/filecoin-project/mir/pkg/checkpoint"
"github.com/filecoin-project/mir/pkg/clientprogress"
"github.com/filecoin-project/mir/pkg/dsl"
"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/modules"
apppbdsl "github.com/filecoin-project/mir/pkg/pb/apppb/dsl"
apppbevents "github.com/filecoin-project/mir/pkg/pb/apppb/events"
availabilitypbdsl "github.com/filecoin-project/mir/pkg/pb/availabilitypb/dsl"
apbtypes "github.com/filecoin-project/mir/pkg/pb/availabilitypb/types"
"github.com/filecoin-project/mir/pkg/pb/batchfetcherpb"
bfeventstypes "github.com/filecoin-project/mir/pkg/pb/batchfetcherpb/events"
checkpointpbtypes "github.com/filecoin-project/mir/pkg/pb/checkpointpb/types"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
eventpbdsl "github.com/filecoin-project/mir/pkg/pb/eventpb/dsl"
eventpbtypes "github.com/filecoin-project/mir/pkg/pb/eventpb/types"
isspbdsl "github.com/filecoin-project/mir/pkg/pb/isspb/dsl"
mppbdsl "github.com/filecoin-project/mir/pkg/pb/mempoolpb/dsl"
"github.com/filecoin-project/mir/pkg/pb/trantorpb"
tt "github.com/filecoin-project/mir/pkg/trantor/types"

availabilitypbdsl "github.com/filecoin-project/mir/pkg/pb/availabilitypb/dsl"
apbtypes "github.com/filecoin-project/mir/pkg/pb/availabilitypb/types"
bfeventstypes "github.com/filecoin-project/mir/pkg/pb/batchfetcherpb/events"
trantorpbdsl "github.com/filecoin-project/mir/pkg/pb/trantorpb/dsl"
trantorpbtypes "github.com/filecoin-project/mir/pkg/pb/trantorpb/types"

eventpbdsl "github.com/filecoin-project/mir/pkg/pb/eventpb/dsl"

"github.com/filecoin-project/mir/pkg/clientprogress"
"github.com/filecoin-project/mir/pkg/dsl"
"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
tt "github.com/filecoin-project/mir/pkg/trantor/types"
t "github.com/filecoin-project/mir/pkg/types"
)

Expand Down Expand Up @@ -80,6 +78,10 @@ func NewModule(mc ModuleConfig, epochNr tt.EpochNr, clientProgress *clientprogre
epochNr = newEpochNr
output.Enqueue(&outputItem{
event: apppbevents.NewEpoch(mc.Destination, epochNr, protocolModule).Pb(),
f: func(_ *eventpb.Event) {
clientProgress.GarbageCollect()
mppbdsl.NewEpoch(m, mc.Mempool, epochNr, trantorpbtypes.ClientProgressFromPb(clientProgress.Pb()))
},
})
output.Flush(m)
return nil
Expand Down Expand Up @@ -141,10 +143,10 @@ func NewModule(mc ModuleConfig, epochNr tt.EpochNr, clientProgress *clientprogre
// At the time of forwarding, submit the client progress to the checkpointing protocol.
f: func(_ *eventpb.Event) {
clientProgress.GarbageCollect()
dsl.EmitEvent(m, bfevents.ClientProgress(
trantorpbdsl.ClientProgress(m,
mc.Checkpoint.Then(t.ModuleID(fmt.Sprintf("%v", epochNr))),
clientProgress.Pb(),
))
trantorpbtypes.ClientProgressFromPb(clientProgress.Pb()).Progress,
)
},
})
output.Flush(m)
Expand Down
1 change: 1 addition & 0 deletions pkg/batchfetcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ type ModuleConfig struct {
Availability t.ModuleID // ID of the factory module containing the availability modules.
Checkpoint t.ModuleID // ID of the checkpoint factory module to which to submit client progress.
Destination t.ModuleID // ID of the module to deliver the produced event stream to (usually the application).
Mempool t.ModuleID // ID of the mempool module to send updates on client progress to.
}
7 changes: 7 additions & 0 deletions pkg/clientprogress/clientprogress.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ func NewClientProgress(logger logging.Logger) *ClientProgress {
}
}

func (cp *ClientProgress) Contains(clID tt.ClientID, txNo tt.TxNo) bool {
if _, ok := cp.ClientTrackers[clID]; !ok {
return false
}
return cp.ClientTrackers[clID].Contains(txNo)
}

func (cp *ClientProgress) Add(clID tt.ClientID, txNo tt.TxNo) bool {
if _, ok := cp.ClientTrackers[clID]; !ok {
cp.ClientTrackers[clID] = EmptyDeliveredTXs(logging.Decorate(cp.logger, "", "clID", clID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,44 +39,55 @@ func DeliveredTXsFromPb(pb *trantorpb.DeliveredTXs, logger logging.Logger) *Deli
return dr
}

// Contains returns true if the given txNo has already been added.
func (dt *DeliveredTXs) Contains(txNo tt.TxNo) bool {

if txNo < dt.lowWM {
return true
}

_, alreadyPresent := dt.delivered[txNo]
return alreadyPresent
}

// Add adds a transaction number that is considered delivered to the DeliveredTXs.
// Returns true if the transaction number has been added now (after not being previously present).
// Returns false if the transaction number has already been added before the call to Add.
func (dr *DeliveredTXs) Add(txNo tt.TxNo) bool {
func (dt *DeliveredTXs) Add(txNo tt.TxNo) bool {

if txNo < dr.lowWM {
dr.logger.Log(logging.LevelDebug, "Transaction number below client's watermark window.",
"lowWM", dr.lowWM, "txNo", txNo)
if txNo < dt.lowWM {
dt.logger.Log(logging.LevelDebug, "Transaction number below client's watermark window.",
"lowWM", dt.lowWM, "txNo", txNo)
return false
}

_, alreadyPresent := dr.delivered[txNo]
dr.delivered[txNo] = struct{}{}
_, alreadyPresent := dt.delivered[txNo]
dt.delivered[txNo] = struct{}{}
return !alreadyPresent
}

// GarbageCollect reduces the memory footprint of the DeliveredTXs
// by deleting a contiguous prefix of delivered transaction numbers
// and increasing the low watermark accordingly.
// Returns the new low watermark.
func (dr *DeliveredTXs) GarbageCollect() tt.TxNo {
func (dt *DeliveredTXs) GarbageCollect() tt.TxNo {

for _, ok := dr.delivered[dr.lowWM]; ok; _, ok = dr.delivered[dr.lowWM] {
delete(dr.delivered, dr.lowWM)
dr.lowWM++
for _, ok := dt.delivered[dt.lowWM]; ok; _, ok = dt.delivered[dt.lowWM] {
delete(dt.delivered, dt.lowWM)
dt.lowWM++
}

return dr.lowWM
return dt.lowWM
}

func (dr *DeliveredTXs) Pb() *trantorpb.DeliveredTXs {
delivered := make([]uint64, len(dr.delivered))
for i, txNo := range maputil.GetSortedKeys(dr.delivered) {
func (dt *DeliveredTXs) Pb() *trantorpb.DeliveredTXs {
delivered := make([]uint64, len(dt.delivered))
for i, txNo := range maputil.GetSortedKeys(dt.delivered) {
delivered[i] = txNo.Pb()
}

return &trantorpb.DeliveredTXs{
LowWm: dr.lowWM.Pb(),
LowWm: dt.lowWM.Pb(),
Delivered: delivered,
}
}
7 changes: 3 additions & 4 deletions pkg/iss/iss.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ import (
es "github.com/go-errors/errors"
"google.golang.org/protobuf/proto"

cvpbdsl "github.com/filecoin-project/mir/pkg/pb/checkpointpb/chkpvalidatorpb/dsl"

"github.com/filecoin-project/mir/pkg/orderers/common"

"github.com/filecoin-project/mir/pkg/checkpoint"
"github.com/filecoin-project/mir/pkg/clientprogress"
"github.com/filecoin-project/mir/pkg/crypto"
Expand All @@ -33,11 +29,13 @@ import (
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/orderers"
"github.com/filecoin-project/mir/pkg/orderers/common"
apppbdsl "github.com/filecoin-project/mir/pkg/pb/apppb/dsl"
"github.com/filecoin-project/mir/pkg/pb/availabilitypb"
apbdsl "github.com/filecoin-project/mir/pkg/pb/availabilitypb/dsl"
mscpbtypes "github.com/filecoin-project/mir/pkg/pb/availabilitypb/mscpb/types"
apbtypes "github.com/filecoin-project/mir/pkg/pb/availabilitypb/types"
cvpbdsl "github.com/filecoin-project/mir/pkg/pb/checkpointpb/chkpvalidatorpb/dsl"
chkppbdsl "github.com/filecoin-project/mir/pkg/pb/checkpointpb/dsl"
chkppbmsgs "github.com/filecoin-project/mir/pkg/pb/checkpointpb/msgs"
checkpointpbtypes "github.com/filecoin-project/mir/pkg/pb/checkpointpb/types"
Expand Down Expand Up @@ -629,6 +627,7 @@ func (iss *ISS) initAvailability() {
&factorypbtypes.GeneratorParams{
Type: &factorypbtypes.GeneratorParams_MultisigCollector{
MultisigCollector: &mscpbtypes.InstanceParams{
Epoch: iss.epoch.Nr(),
Membership: iss.memberships[0],
MaxRequests: uint64(iss.Params.SegmentLength)},
},
Expand Down
8 changes: 7 additions & 1 deletion pkg/mempool/simplemempool/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"time"

trantorpbtypes "github.com/filecoin-project/mir/pkg/pb/trantorpb/types"
tt "github.com/filecoin-project/mir/pkg/trantor/types"
t "github.com/filecoin-project/mir/pkg/types"
"github.com/filecoin-project/mir/pkg/util/indexedlist"
)

// ModuleConfig sets the module ids. All replicas are expected to use identical module configurations.
Expand Down Expand Up @@ -40,6 +42,10 @@ type ModuleParams struct {
}

// State represents the common state accessible to all parts of the module implementation.
// TODO: Consider moving this definition inside the `internal` subdirectory, as it is only used by the mempool.
type State struct {
TxByID map[string]*trantorpbtypes.Transaction
// All the transactions in the mempool.
// Incoming transactions that have not yet been delivered are added to this list.
// They are removed upon delivery.
Transactions *indexedlist.IndexedList[tt.TxID, *trantorpbtypes.Transaction]
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func IncludeBatchCreation(
return nil
})

mpdsl.UponRequestBatch(m, func(origin *mppbtypes.RequestBatchOrigin) error {
mpdsl.UponRequestBatch(m, func(_ tt.EpochNr, origin *mppbtypes.RequestBatchOrigin) error {
txs := fetchTransactions()
mpdsl.RequestTransactionIDs(m, mc.Self, txs, &requestTxIDsContext{
txs: txs,
Expand Down
Loading

0 comments on commit f40ba06

Please sign in to comment.