Skip to content

Commit

Permalink
ignore non-canonical tx messages
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Aug 1, 2024
1 parent 0bced19 commit 10bfe53
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 53 deletions.
56 changes: 38 additions & 18 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ func (handler *TxHandler) Start() {
})

// libp2p pubsub validator and handler abstracted as TaggedMessageProcessor
// TODO: rename to validators
handler.net.RegisterValidatorHandlers([]network.TaggedMessageValidatorHandler{
{
Tag: protocol.TxnTag,
Expand Down Expand Up @@ -559,7 +558,7 @@ func (handler *TxHandler) deleteFromCaches(msgKey *crypto.Digest, canonicalKey *

// dedupCanonical checks if the transaction group has been seen before after reencoding to canonical representation.
// returns a key used for insertion if the group was not found.
func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.SignedTxn, consumed int) (key *crypto.Digest, isDup bool) {
func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.SignedTxn, consumed int) (key *crypto.Digest, reencoded []byte, isDup bool) {
// consider situations where someone want to censor transactions A
// 1. Txn A is not part of a group => txn A with a valid signature is OK
// Censorship attempts are:
Expand All @@ -576,14 +575,16 @@ func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.Signed
// - using individual txn from a group: {A, Z} could be poisoned by {A, B}, where B is invalid

var d crypto.Digest
var reencodedBuf []byte
ntx := len(unverifiedTxGroup)
if ntx == 1 {
// a single transaction => cache/dedup canonical txn with its signature
enc := unverifiedTxGroup[0].MarshalMsg(nil)
d = crypto.Hash(enc)
if handler.txCanonicalCache.CheckAndPut(&d) {
return nil, true
return nil, nil, true
}
reencodedBuf = enc
} else {
// a transaction group => cache/dedup the entire group canonical group
encodeBuf := make([]byte, 0, unverifiedTxGroup[0].Msgsize()*ntx)
Expand All @@ -594,14 +595,15 @@ func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.Signed
// reallocated, some assumption on size was wrong
// log and skip
logging.Base().Warnf("Decoded size %d does not match to encoded %d", consumed, len(encodeBuf))
return nil, false
return nil, nil, false

Check warning on line 598 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L598

Added line #L598 was not covered by tests
}
d = crypto.Hash(encodeBuf)
if handler.txCanonicalCache.CheckAndPut(&d) {
return nil, true
return nil, nil, true
}
reencodedBuf = encodeBuf
}
return &d, false
return &d, reencodedBuf, false
}

// incomingMsgDupCheck runs the duplicate check on a raw incoming message.
Expand Down Expand Up @@ -696,28 +698,32 @@ func decodeMsg(data []byte) (unverifiedTxGroup []transactions.SignedTxn, consume
return unverifiedTxGroup, consumed, false
}

// incomingTxGroupDupRateLimit checks
// - if the incoming transaction group has been seen before after reencoding to canonical representation, and
// - if the sender is rate limited by the per-application rate limiter.
func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int, sender network.DisconnectablePeer) (*crypto.Digest, bool) {
// incomingTxGroupDupRateLimit checks if the incoming transaction group has been seen before after reencoding to canonical representation.
// It also return canonical representation of the transaction group allowing the caller to compare it with the input.
func (handler *TxHandler) incomingTxGroupCanonicalDedup(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int) (*crypto.Digest, []byte, bool) {
var canonicalKey *crypto.Digest
var reencoded []byte
if handler.txCanonicalCache != nil {
var isDup bool
if canonicalKey, isDup = handler.dedupCanonical(unverifiedTxGroup, encodedExpectedSize); isDup {
if canonicalKey, reencoded, isDup = handler.dedupCanonical(unverifiedTxGroup, encodedExpectedSize); isDup {
transactionMessagesDupCanonical.Inc(nil)
return canonicalKey, true
return nil, nil, true
}
}
return canonicalKey, reencoded, false
}

// incomingTxGroupAppRateLimit checks if the sender is rate limited by the per-application rate limiter.
func (handler *TxHandler) incomingTxGroupAppRateLimit(unverifiedTxGroup []transactions.SignedTxn, sender network.DisconnectablePeer) bool {
// rate limit per application in a group. Limiting any app in a group drops the entire message.
if handler.appLimiter != nil {
congestedARL := len(handler.backlogQueue) > handler.appLimiterBacklogThreshold
if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, sender.(network.IPAddressable).RoutingAddr()) {
transactionMessagesAppLimiterDrop.Inc(nil)
return canonicalKey, true
return true

Check warning on line 723 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L723

Added line #L723 was not covered by tests
}
}
return canonicalKey, false
return false
}

// processIncomingTxn decodes a transaction group from incoming message and enqueues into the back log for processing.
Expand Down Expand Up @@ -753,13 +759,17 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
return network.OutgoingMessage{Action: network.Disconnect}
}

canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender)
canonicalKey, _, drop := handler.incomingTxGroupCanonicalDedup(unverifiedTxGroup, consumed)
if drop {
// this re-serialized txgroup was detected as a duplicate by the canonical message cache,
// or it was rate-limited by the per-app rate limiter
return network.OutgoingMessage{Action: network.Ignore}
}

if handler.incomingTxGroupAppRateLimit(unverifiedTxGroup, rawmsg.Sender) {
return network.OutgoingMessage{Action: network.Ignore}

Check warning on line 770 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L770

Added line #L770 was not covered by tests
}

select {
case handler.backlogQueue <- &txBacklogMsg{
rawmsg: &rawmsg,
Expand Down Expand Up @@ -794,10 +804,20 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
return network.OutgoingMessage{Action: network.Disconnect}
}

canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender)
canonicalKey, reencoded, drop := handler.incomingTxGroupCanonicalDedup(unverifiedTxGroup, consumed)
if drop {
// this re-serialized txgroup was detected as a duplicate by the canonical message cache,
// or it was rate-limited by the per-app rate limiter
return network.OutgoingMessage{Action: network.Ignore}

Check warning on line 809 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L809

Added line #L809 was not covered by tests
}
if reencoded == nil {
reencoded = reencode(unverifiedTxGroup)
}

if !bytes.Equal(rawmsg.Data, reencoded) {
// ignore non-canonically encoded messages
return network.OutgoingMessage{Action: network.Ignore}
}

if handler.incomingTxGroupAppRateLimit(unverifiedTxGroup, rawmsg.Sender) {
return network.OutgoingMessage{Action: network.Ignore}

Check warning on line 821 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L821

Added line #L821 was not covered by tests
}

Expand Down
97 changes: 62 additions & 35 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,42 +646,42 @@ func TestTxHandlerProcessIncomingGroup(t *testing.T) {
}
}

func craftNonCanonical(t *testing.T, stxn *transactions.SignedTxn, blobStxn []byte) []byte {
// make non-canonical encoding and ensure it is not accepted
stxnNonCanTxn := transactions.SignedTxn{Txn: stxn.Txn}
blobTxn := protocol.Encode(&stxnNonCanTxn)
stxnNonCanAuthAddr := transactions.SignedTxn{AuthAddr: stxn.AuthAddr}
blobAuthAddr := protocol.Encode(&stxnNonCanAuthAddr)
stxnNonCanAuthSig := transactions.SignedTxn{Sig: stxn.Sig}
blobSig := protocol.Encode(&stxnNonCanAuthSig)

if blobStxn == nil {
blobStxn = protocol.Encode(stxn)
}

// double check our skills for transactions.SignedTxn creation by creating a new canonical encoding and comparing to the original
blobValidation := make([]byte, 0, len(blobTxn)+len(blobAuthAddr)+len(blobSig))
blobValidation = append(blobValidation[:], blobAuthAddr...)
blobValidation = append(blobValidation[:], blobSig[1:]...) // cut transactions.SignedTxn's field count
blobValidation = append(blobValidation[:], blobTxn[1:]...) // cut transactions.SignedTxn's field count
blobValidation[0] += 2 // increase field count
require.Equal(t, blobStxn, blobValidation)

// craft non-canonical
blobNonCan := make([]byte, 0, len(blobTxn)+len(blobAuthAddr)+len(blobSig))
blobNonCan = append(blobNonCan[:], blobTxn...)
blobNonCan = append(blobNonCan[:], blobAuthAddr[1:]...) // cut transactions.SignedTxn's field count
blobNonCan = append(blobNonCan[:], blobSig[1:]...) // cut transactions.SignedTxn's field count
blobNonCan[0] += 2 // increase field count
require.Len(t, blobNonCan, len(blobStxn))
require.NotEqual(t, blobStxn, blobNonCan)
return blobNonCan
}

func TestTxHandlerProcessIncomingCensoring(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

craftNonCanonical := func(t *testing.T, stxn *transactions.SignedTxn, blobStxn []byte) []byte {
// make non-canonical encoding and ensure it is not accepted
stxnNonCanTxn := transactions.SignedTxn{Txn: stxn.Txn}
blobTxn := protocol.Encode(&stxnNonCanTxn)
stxnNonCanAuthAddr := transactions.SignedTxn{AuthAddr: stxn.AuthAddr}
blobAuthAddr := protocol.Encode(&stxnNonCanAuthAddr)
stxnNonCanAuthSig := transactions.SignedTxn{Sig: stxn.Sig}
blobSig := protocol.Encode(&stxnNonCanAuthSig)

if blobStxn == nil {
blobStxn = protocol.Encode(stxn)
}

// double check our skills for transactions.SignedTxn creation by creating a new canonical encoding and comparing to the original
blobValidation := make([]byte, 0, len(blobTxn)+len(blobAuthAddr)+len(blobSig))
blobValidation = append(blobValidation[:], blobAuthAddr...)
blobValidation = append(blobValidation[:], blobSig[1:]...) // cut transactions.SignedTxn's field count
blobValidation = append(blobValidation[:], blobTxn[1:]...) // cut transactions.SignedTxn's field count
blobValidation[0] += 2 // increase field count
require.Equal(t, blobStxn, blobValidation)

// craft non-canonical
blobNonCan := make([]byte, 0, len(blobTxn)+len(blobAuthAddr)+len(blobSig))
blobNonCan = append(blobNonCan[:], blobTxn...)
blobNonCan = append(blobNonCan[:], blobAuthAddr[1:]...) // cut transactions.SignedTxn's field count
blobNonCan = append(blobNonCan[:], blobSig[1:]...) // cut transactions.SignedTxn's field count
blobNonCan[0] += 2 // increase field count
require.Len(t, blobNonCan, len(blobStxn))
require.NotEqual(t, blobStxn, blobNonCan)
return blobNonCan
}

forgeSig := func(t *testing.T, stxn *transactions.SignedTxn, blobStxn []byte) (transactions.SignedTxn, []byte) {
stxnForged := *stxn
crypto.RandBytes(stxnForged.Sig[:])
Expand Down Expand Up @@ -2750,8 +2750,6 @@ func TestTxHandlerCapGuard(t *testing.T) {
}

func TestTxHandlerValidateIncomingTxMessage(t *testing.T) {
partitiontest.PartitionTest(t)

partitiontest.PartitionTest(t)
t.Parallel()

Expand All @@ -2777,8 +2775,16 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) {
outmsg := handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob})
require.Equal(t, outmsg.Action, network.Accept)

// non-canonical message
// for some reason craftNonCanonical cannot handle makeTxns output so make a simpler random txn
stxns, blob := makeRandomTransactions(1)
stxn := stxns[0]
blobNonCan := craftNonCanonical(t, &stxn, blob)
outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blobNonCan})
require.Equal(t, outmsg.Action, network.Ignore)

// invalid signature
stxns, _ := makeTxns(addresses, secrets, 1, 2, genesisHash)
stxns, _ = makeTxns(addresses, secrets, 1, 2, genesisHash)
stxns[0].Sig[0] = stxns[0].Sig[0] + 1
blob2 := protocol.Encode(&stxns[0])
outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob2})
Expand All @@ -2789,4 +2795,25 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) {
blob[0] = blob[0] + 1
outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob})
require.Equal(t, outmsg.Action, network.Disconnect)

t.Run("with-canonical", func(t *testing.T) {
// make sure the reencoding from the canonical dedup checker's reencoding buf is correctly reused
cfg.TxIncomingFilteringFlags = 2
require.True(t, cfg.TxFilterCanonicalEnabled())
handler, err := makeTestTxHandler(ledger, cfg)
require.NoError(t, err)

// valid message
_, blob := makeTxns(addresses, secrets, 1, 2, genesisHash)
outmsg := handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob})
require.Equal(t, outmsg.Action, network.Accept)

// non-canonical message
// for some reason craftNonCanonical cannot handle makeTxns output so make a simpler random txn
stxns, blob := makeRandomTransactions(1)
stxn := stxns[0]
blobNonCan := craftNonCanonical(t, &stxn, blob)
outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blobNonCan})
require.Equal(t, outmsg.Action, network.Ignore)
})
}

0 comments on commit 10bfe53

Please sign in to comment.