Skip to content

Commit

Permalink
fix transaction cache queue broadcast destination peer selection
Browse files Browse the repository at this point in the history
  • Loading branch information
cedricfung committed May 7, 2024
1 parent ffa75d4 commit f2fc5e0
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 15 deletions.
11 changes: 11 additions & 0 deletions kernel/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,17 @@ func (node *Node) PledgingNode(timestamp uint64) *CNode {
return nil
}

func (node *Node) ListWorkingAcceptedNodes(timestamp uint64) []*CNode {
nodes := node.NodesListWithoutState(timestamp, true)
if len(nodes) == 0 {
return nodes
}
if node.GetRemovingOrSlashingNode(nodes[0].IdForNetwork) != nil {
return nodes[1:]
}
return nodes
}

func (node *Node) GetAcceptedOrPledgingNode(id crypto.Hash) *CNode {
nodes := node.NodesListWithoutState(uint64(clock.Now().UnixNano()), false)
for _, cn := range nodes {
Expand Down
27 changes: 15 additions & 12 deletions kernel/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func (node *Node) loopCacheQueue() error {
continue
}

neighbors := node.Peer.Neighbors()
if len(neighbors) <= 0 {
working := node.ListWorkingAcceptedNodes(uint64(clock.Now().UnixNano()))
if len(working) <= 0 {
continue
}
var stale []crypto.Hash
Expand Down Expand Up @@ -93,19 +93,22 @@ func (node *Node) loopCacheQueue() error {
nbor := node.electSnapshotNode(tx.TransactionType(), uint64(now.UnixNano()))
if !nbor.HasValue() {
hb := new(big.Int).SetBytes(hash[:])
mb := big.NewInt(now.Unix() / 60)
mb := big.NewInt(now.UnixNano() / int64(time.Minute))
ib := new(big.Int).Add(hb, mb)
idx := new(big.Int).Mod(ib, big.NewInt(int64(len(neighbors))))
nbor = neighbors[idx.Int64()].IdForNetwork
idx := new(big.Int).Mod(ib, big.NewInt(int64(len(working))))
nbor = working[idx.Int64()].IdForNetwork
}
node.SendTransactionToPeer(nbor, hash)

s := &common.Snapshot{
Version: common.SnapshotVersionCommonEncoding,
NodeId: node.IdForNetwork,
if nbor != node.IdForNetwork {
err := node.SendTransactionToPeer(nbor, hash)
logger.Debugf("queue.SendTransactionToPeer(%s, %s) => %v", hash, nbor, err)
} else {
s := &common.Snapshot{
Version: common.SnapshotVersionCommonEncoding,
NodeId: node.IdForNetwork,
}
s.AddSoleTransaction(hash)
node.chain.AppendSelfEmpty(s)
}
s.AddSoleTransaction(hash)
node.chain.AppendSelfEmpty(s)
}
if err != nil {
logger.Printf("LoopCacheQueue CacheRetrieveTransactions ERROR %s\n", err)
Expand Down
9 changes: 6 additions & 3 deletions rpc/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func testConsensus(t *testing.T, withRelayers bool) {
require.Equal(transactionsCount, len(tl))
require.Equal(transactionsCount, len(sl))
gt := testVerifyInfo(require, nodes)
require.Truef(gt.Timestamp.Before(epoch.Add(1*time.Second)), "%s should before %s", gt.Timestamp, epoch.Add(1*time.Second))
gts := epoch.Add(time.Second)
require.Truef(gt.Timestamp.Before(gts), "%s should before %s", gt.Timestamp, gts)

genesisAmount := (13439 + 3.5) / float64(INPUTS)
domainAddress := accounts[0].String()
Expand All @@ -129,7 +130,8 @@ func testConsensus(t *testing.T, withRelayers bool) {
testVerifyDeposits(require, nodes, deposits)

gt = testVerifyInfo(require, nodes)
require.Truef(gt.Timestamp.Before(epoch.Add(20*time.Second)), "%s should before %s", gt.Timestamp, epoch.Add(20*time.Second))
gts = epoch.Add(20 * time.Second)
require.Truef(gt.Timestamp.Before(gts), "%s should before %s", gt.Timestamp, gts)
hr := testDumpGraphHead(nodes[0].Host, instances[0].IdForNetwork)
require.NotNil(hr)
require.GreaterOrEqual(hr.Round, uint64(0))
Expand Down Expand Up @@ -157,7 +159,8 @@ func testConsensus(t *testing.T, withRelayers bool) {
require.Equal(transactionsCount, len(tl))

gt = testVerifyInfo(require, nodes)
require.True(gt.Timestamp.Before(epoch.Add(31 * time.Second)))
gts = epoch.Add(31 * time.Second)
require.Truef(gt.Timestamp.Before(gts), "%s should before %s", gt.Timestamp, gts)
hr = testDumpGraphHead(nodes[0].Host, instances[0].IdForNetwork)
require.NotNil(hr)
require.Greater(hr.Round, uint64(0))
Expand Down

0 comments on commit f2fc5e0

Please sign in to comment.