Skip to content

Commit

Permalink
p2p: fan in incoming txns into backlog worker (algorand#6126)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Feb 21, 2025
1 parent 116794e commit 3c4d601
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 61 deletions.
88 changes: 43 additions & 45 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@ const (

// The txBacklogMsg structure used to track a single incoming transaction from the gossip network,
type txBacklogMsg struct {
rawmsg *network.IncomingMessage // the raw message from the network
unverifiedTxGroup []transactions.SignedTxn // the unverified ( and signed ) transaction group
rawmsgDataHash crypto.Digest // hash (or IsZero) of raw message data from the network
unverifiedTxGroupHash crypto.Digest // hash (or IsZero) of the unverifiedTxGroup
verificationErr error // The verification error generated by the verification function, if any.
capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued
rawmsg *network.IncomingMessage // the raw message from the network
unverifiedTxGroup []transactions.SignedTxn // the unverified ( and signed ) transaction group
rawmsgDataHash crypto.Digest // hash (or IsZero) of raw message data from the network
unverifiedTxGroupHash crypto.Digest // hash (or IsZero) of the unverifiedTxGroup
verificationErr error // The verification error generated by the verification function, if any.
capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued
syncCh chan network.ForwardingPolicy // channel to signal the synchronous mode and its ops completion
}

// TxHandler handles transaction messages
Expand Down Expand Up @@ -346,13 +347,21 @@ func (handler *TxHandler) backlogWorker() {
if wi.capguard != nil {
wi.capguard.Served()
}
// if in synchronous mode, signal the completion of the operation
if wi.syncCh != nil {
wi.syncCh <- network.Ignore
}
continue
}
// handler.streamVerifierChan does not receive if ctx is cancelled
select {
case handler.streamVerifierChan <- &verify.UnverifiedTxnSigJob{TxnGroup: wi.unverifiedTxGroup, BacklogMessage: wi}:
case <-handler.ctx.Done():
transactionMessagesDroppedFromBacklog.Inc(nil)
// if in synchronous mode, signal the completion of the operation
if wi.syncCh != nil {
wi.syncCh <- network.Ignore
}
return
}
if wi.capguard != nil {
Expand All @@ -366,7 +375,6 @@ func (handler *TxHandler) backlogWorker() {
m := wi.BacklogMessage.(*txBacklogMsg)
m.verificationErr = wi.Err
handler.postProcessCheckedTxn(m)

case <-handler.ctx.Done():
return
}
Expand Down Expand Up @@ -508,6 +516,11 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) {
// disconnect from peer.
handler.postProcessReportErrors(wi.verificationErr)
logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, wi.verificationErr)
// if in synchronous mode, signal the completion of the operation
if wi.syncCh != nil {
wi.syncCh <- network.Disconnect
return
}
handler.net.Disconnect(wi.rawmsg.Sender)
return
}
Expand All @@ -523,6 +536,10 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) {
if err != nil {
handler.rememberReportErrors(err)
logging.Base().Debugf("could not remember tx: %v", err)
// if in synchronous mode, signal the completion of the operation
if wi.syncCh != nil {
wi.syncCh <- network.Ignore
}
return
}

Expand All @@ -533,6 +550,11 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) {
if err != nil {
logging.Base().Infof("unable to pin transaction: %v", err)
}
// if in synchronous mode, signal the completion of the operation
if wi.syncCh != nil {
wi.syncCh <- network.Accept
return
}

// We reencode here instead of using rawmsg.Data to avoid broadcasting non-canonical encodings
handler.net.Relay(handler.ctx, protocol.TxnTag, reencode(verifiedTxGroup), false, wi.rawmsg.Sender)
Expand Down Expand Up @@ -793,11 +815,6 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net

// validateIncomingTxMessage is the validator for the MessageProcessor implementation used by P2PNetwork.
func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.OutgoingMessage {
msgKey, isDup := handler.incomingMsgDupCheck(rawmsg.Data)
if isDup {
return network.OutgoingMessage{Action: network.Ignore}
}

unverifiedTxGroup, consumed, invalid := decodeMsg(rawmsg.Data)
if invalid {
// invalid encoding or exceeding txgroup, disconnect from this peer
Expand Down Expand Up @@ -827,52 +844,33 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
wi := &txBacklogMsg{
rawmsg: &rawmsg,
unverifiedTxGroup: unverifiedTxGroup,
rawmsgDataHash: msgKey,
unverifiedTxGroupHash: canonicalKey,
capguard: nil,
syncCh: make(chan network.ForwardingPolicy, 1),
}

if handler.checkAlreadyCommitted(wi) {
transactionMessagesAlreadyCommitted.Inc(nil)
return network.OutgoingMessage{
Action: network.Ignore,
}
}

err := handler.batchVerifier.Verify(wi.unverifiedTxGroup)
if err != nil {
handler.postProcessReportErrors(err)
logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, err)
return network.OutgoingMessage{
Action: network.Disconnect,
}
}
verifiedTxGroup := wi.unverifiedTxGroup

// save the transaction, if it has high enough fee and not already in the cache
err = handler.txPool.Remember(verifiedTxGroup)
if err != nil {
handler.rememberReportErrors(err)
logging.Base().Debugf("could not remember tx: %v", err)
return network.OutgoingMessage{
Action: network.Ignore,
}
}
var action network.ForwardingPolicy
select {
case handler.backlogQueue <- wi:
action = <-wi.syncCh
default:
// if we failed here we want to increase the corresponding metric. It might suggest that we
// want to increase the queue size.
transactionMessagesDroppedFromBacklog.Inc(nil)

transactionMessagesRemember.Inc(nil)
// additionally, remove the txn from duplicate caches to ensure it can be re-submitted
handler.deleteFromCaches(crypto.Digest{}, canonicalKey)

// if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions.
err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup)
if err != nil {
logging.Base().Infof("unable to pin transaction: %v", err)
// queue is full, do not if the message valid or not so ignore
action = network.Ignore
}

if hybridNet, ok := handler.net.(HybridRelayer); ok {
_ = hybridNet.BridgeP2PToWS(handler.ctx, protocol.TxnTag, reencoded, false, wi.rawmsg.Sender)
}

return network.OutgoingMessage{
Action: network.Accept,
Action: action,
}
}

Expand Down
4 changes: 4 additions & 0 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2774,6 +2774,8 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) {

handler, err := makeTestTxHandler(ledger, cfg)
require.NoError(t, err)
handler.Start()
defer handler.Stop()

// valid message
_, blob := makeTxns(addresses, secrets, 1, 2, genesisHash)
Expand Down Expand Up @@ -2807,6 +2809,8 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) {
require.True(t, cfg.TxFilterCanonicalEnabled())
handler, err := makeTestTxHandler(ledger, cfg)
require.NoError(t, err)
handler.Start()
defer handler.Stop()

// valid message
_, blob := makeTxns(addresses, secrets, 1, 2, genesisHash)
Expand Down
41 changes: 25 additions & 16 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,24 +927,33 @@ func (n *P2PNetwork) txTopicHandleLoop() {
}
n.log.Debugf("Subscribed to topic %s", p2p.TXTopicName)

for {
// msg from sub.Next not used since all work done by txTopicValidator
_, err := sub.Next(n.ctx)
if err != nil {
if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled {
n.log.Errorf("Error reading from subscription %v, peerId %s", err, n.service.ID())
const threads = incomingThreads / 2 // perf tests showed that 10 (half of incomingThreads) was optimal in terms of TPS (attempted 1, 5, 10, 20)
var wg sync.WaitGroup
wg.Add(threads)
for i := 0; i < threads; i++ {
go func(ctx context.Context, sub p2p.SubNextCancellable, wantTXGossip *atomic.Bool, peerID peer.ID, log logging.Logger) {
defer wg.Done()
for {
// msg from sub.Next not used since all work done by txTopicValidator
_, err := sub.Next(ctx)
if err != nil {
if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled {
log.Errorf("Error reading from subscription %v, peerId %s", err, peerID)
}
log.Debugf("Cancelling subscription to topic %s due Subscription.Next error: %v", p2p.TXTopicName, err)
sub.Cancel()
return
}
// participation or configuration change, cancel subscription and quit
if !wantTXGossip.Load() {
log.Debugf("Cancelling subscription to topic %s due to participation change", p2p.TXTopicName)
sub.Cancel()
return
}
}
n.log.Debugf("Cancelling subscription to topic %s due Subscription.Next error: %v", p2p.TXTopicName, err)
sub.Cancel()
return
}
// participation or configuration change, cancel subscription and quit
if !n.wantTXGossip.Load() {
n.log.Debugf("Cancelling subscription to topic %s due participation change", p2p.TXTopicName)
sub.Cancel()
return
}
}(n.ctx, sub, &n.wantTXGossip, n.service.ID(), n.log)
}
wg.Wait()
}

type gsPeer struct {
Expand Down

0 comments on commit 3c4d601

Please sign in to comment.