Skip to content

Commit

Permalink
Merge pull request #185 from xssnick/dev-v19
Browse files Browse the repository at this point in the history
DHT Improvements
  • Loading branch information
xssnick authored Apr 20, 2024
2 parents 3b432be + 86a0803 commit db2bc98
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 77 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<img align="right" width="425px" src="https://github.com/xssnick/props/blob/master/logoimg.png?raw=true">

[![Based on TON][ton-svg]][ton]
![Coverage](https://img.shields.io/badge/Coverage-74.1%25-brightgreen)
![Coverage](https://img.shields.io/badge/Coverage-74.0%25-brightgreen)

Golang library for interacting with TON blockchain.

Expand Down
6 changes: 3 additions & 3 deletions adnl/dht/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (b *Bucket) getNodes() dhtNodeList {
b.mx.RLock()
defer b.mx.RUnlock()

return b.nodes
return append(dhtNodeList{}, b.nodes...)
}

func (b *Bucket) getNode(id string) *dhtNode {
Expand Down Expand Up @@ -56,7 +56,7 @@ func (b *Bucket) addNode(node *dhtNode) {

func (b *Bucket) sortAndFilter() {
sort.Sort(b.nodes)
if len(b.nodes) > int(b.k*2) {
b.nodes = b.nodes[:b.k*2]
if len(b.nodes) > int(b.k*5) {
b.nodes = b.nodes[:b.k*5]
}
}
48 changes: 23 additions & 25 deletions adnl/dht/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,12 @@ func (c *Client) Store(
cancel()
if err != nil {
return
} else {
Logger("Adding nodes", len(nodes))
for _, n := range nodes {
if _, err = c.addNode(n); err != nil {
continue
}
}

Logger("Adding nodes", len(nodes))
for _, n := range nodes {
if _, err = c.addNode(n); err != nil {
continue
}
}
}()
Expand Down Expand Up @@ -543,35 +543,33 @@ func (c *Client) FindValue(ctx context.Context, key *Key, continuation ...*Conti
}

func (c *Client) buildPriorityList(id []byte) *priorityList {
plist := newPriorityList(_K*3, id)

added := 0
plistGood := newPriorityList(_K*3, id)
plistBad := newPriorityList(_K, id)

for i := 255; i >= 0; i-- {
bucket := c.buckets[i]
knownNodes := bucket.getNodes()
for _, node := range knownNodes {
if node != nil && node.badScore == 0 {
if plist.addNode(node) {
added++
}
if node == nil {
continue
}

if atomic.LoadInt32(&node.badScore) == 0 {
plistGood.addNode(node)
} else {
plistBad.addNode(node)
}
}
}

if added < _K {
for i := 255; i >= 0; i-- {
bucket := c.buckets[i]
knownNodes := bucket.getNodes()
for _, node := range knownNodes {
if node != nil && node.badScore > 0 {
if plist.addNode(node) {
added++
}
}
}
// add K not good nodes to retry them if they can be better
for {
node, _ := plistBad.getNode()
if node == nil {
break
}
plistGood.addNode(node)
}

return plist
return plistGood
}
6 changes: 3 additions & 3 deletions adnl/dht/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func TestClient_FindAddressesIntegration(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

dhtClient, err := NewClientFromConfigUrl(ctx, gateway, "https://ton.org/global.config.json")
dhtClient, err := NewClientFromConfigUrl(ctx, gateway, "https://tonutils.com/global.config.json")
if err != nil {
t.Fatalf("failed to init DHT client: %s", err.Error())
}
Expand Down Expand Up @@ -724,7 +724,7 @@ func TestClient_Close(t *testing.T) {
// ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
// defer cancel()
//
// dhtClient, err := NewClientFromConfigUrl(ctx, gateway, "https://ton.org/global.config.json")
// dhtClient, err := NewClientFromConfigUrl(ctx, gateway, "https://tonutils.com/global.config.json")
// if err != nil {
// t.Fatalf("failed to init DHT client: %s", err.Error())
// }
Expand Down Expand Up @@ -775,7 +775,7 @@ func TestClient_StoreAddressIntegration(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
defer cancel()

dhtClient, err := NewClientFromConfigUrl(ctx, gateway, "https://ton.org/global.config.json")
dhtClient, err := NewClientFromConfigUrl(ctx, gateway, "https://tonutils.com/global.config.json")
if err != nil {
t.Fatalf("failed to init DHT client: %s", err.Error())
}
Expand Down
74 changes: 44 additions & 30 deletions adnl/dht/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/xssnick/tonutils-go/tl"
)

const _MaxFailCount = 5
const _MaxFailCount = 3

type dhtNode struct {
adnlId []byte
Expand Down Expand Up @@ -89,6 +89,27 @@ func (n *dhtNode) findNodes(ctx context.Context, id []byte, K int32) (result []*
return nil, fmt.Errorf("failed to find nodes, unexpected response type %s", reflect.TypeOf(res).String())
}

func (n *dhtNode) doPing(ctx context.Context) (err error) {
val, err := tl.Serialize(Ping{
ID: time.Now().Unix(),
}, true)
if err != nil {
return fmt.Errorf("failed to serialize dht ping query: %w", err)
}

var res any
err = n.query(ctx, tl.Raw(val), &res)
if err != nil {
return fmt.Errorf("failed to ping dht node: %w", err)
}

switch res.(type) {
case Pong:
return nil
}
return fmt.Errorf("failed to ping node, unexpected response type %s", reflect.TypeOf(res).String())
}

func (n *dhtNode) storeValue(ctx context.Context, id []byte, value *Value) error {
if err := checkValue(id, value); err != nil {
return fmt.Errorf("corrupted value: %w", err)
Expand Down Expand Up @@ -124,23 +145,11 @@ func (n *dhtNode) findValue(ctx context.Context, id []byte, K int32) (result any
return nil, fmt.Errorf("failed to serialize dht value: %w", err)
}

if err = ctx.Err(); err != nil {
// if context is canceled we are not trying to query
return nil, err
}

reportLimit := time.Now().Add(3 * time.Second)

var res any
err = n.query(ctx, tl.Raw(val), &res)
if err != nil {
if time.Now().After(reportLimit) {
// to not report good nodes, because of our short deadline
n.updateStatus(false)
}
return nil, fmt.Errorf("failed to do query to dht node: %w", err)
}
n.updateStatus(true)

switch r := res.(type) {
case ValueNotFoundResult:
Expand Down Expand Up @@ -239,6 +248,11 @@ func checkValue(id []byte, value *Value) error {
}

func (n *dhtNode) query(ctx context.Context, req, res tl.Serializable) error {
if err := ctx.Err(); err != nil {
// if context is canceled we are not trying to query
return err
}

atomic.AddInt32(&n.inFlyQueries, 1)

peer, err := n.client.gateway.RegisterClient(n.addr, n.serverKey)
Expand All @@ -248,42 +262,42 @@ func (n *dhtNode) query(ctx context.Context, req, res tl.Serializable) error {
}

defer func() {
if atomic.AddInt32(&n.inFlyQueries, -1) == 0 && n.badScore > 0 {
if atomic.AddInt32(&n.inFlyQueries, -1) == 0 && n.badScore > 1 {
peer.Close()
}
}()

t := time.Now()
reportLimit := t.Add(queryTimeout - 500*time.Millisecond)
atomic.StoreInt64(&n.lastQueryAt, t.Unix())
err = peer.Query(ctx, req, res)
if err != nil {
if time.Now().After(reportLimit) {
// to not report good nodes, because of our short deadline
n.updateStatus(false)
}
return err
}
ping := time.Since(t)
atomic.StoreInt64(&n.ping, int64(ping))

n.updateStatus(true)

return nil
}

func (n *dhtNode) updateStatus(isGood bool) {
if isGood {
for {
badScore := atomic.LoadInt32(&n.badScore)
if badScore >= _MaxFailCount {
if !atomic.CompareAndSwapInt32(&n.badScore, badScore, badScore-1) {
continue
}
Logger("Make DHT peer {} feel good {}", n.id(), badScore-1)
}
break
}
} else {
badScore := atomic.LoadInt32(&n.badScore)
if badScore <= _MaxFailCount {
badScore = atomic.AddInt32(&n.badScore, 2)
}
Logger("Make DHT peer {} feel bad {}", n.id(), badScore)
atomic.StoreInt32(&n.badScore, 0)
Logger("Make DHT peer {} feel good {}", n.id(), 0)
return
}

badScore := atomic.LoadInt32(&n.badScore)
if badScore <= _MaxFailCount {
badScore = atomic.AddInt32(&n.badScore, 1)
}
Logger("Make DHT peer {} feel bad {}", n.id(), badScore)
}

func (n *dhtNode) id() string {
Expand Down
4 changes: 2 additions & 2 deletions adnl/rldp/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func TestTransport_RoundTripIntegration(t *testing.T) {
t.Fatal(err)
}

dhtClient, err := dht.NewClientFromConfigUrl(context.Background(), gateway, "https://ton.org/global.config.json")
dhtClient, err := dht.NewClientFromConfigUrl(context.Background(), gateway, "https://tonutils.com/global.config.json")
if err != nil {
t.Fatal(err)
}
Expand All @@ -438,7 +438,7 @@ func getDNSResolver() *dns.Client {
client := liteclient.NewConnectionPool()

// connect to testnet lite server
err := client.AddConnectionsFromConfigUrl(context.Background(), "https://ton.org/global.config.json")
err := client.AddConnectionsFromConfigUrl(context.Background(), "https://tonutils.com/global.config.json")
if err != nil {
panic(err)
}
Expand Down
6 changes: 3 additions & 3 deletions liteclient/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Test_Conn(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err := client.AddConnectionsFromConfigUrl(ctx, "https://ton.org/global.config.json")
err := client.AddConnectionsFromConfigUrl(ctx, "https://tonutils.com/global.config.json")
if err != nil {
t.Fatal("add connections err", err)
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func Test_ConnSticky(t *testing.T) {
defer cancel()
ctx = client.StickyContext(ctx)

err := client.AddConnectionsFromConfigUrl(ctx, "https://ton.org/global.config.json")
err := client.AddConnectionsFromConfigUrl(ctx, "https://tonutils.com/global.config.json")
if err != nil {
t.Fatal("add connections err", err)
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func Test_ConnSticky(t *testing.T) {
func Test_ServerProxy(t *testing.T) {
client := NewConnectionPool()

err := client.AddConnectionsFromConfigUrl(context.Background(), "https://ton.org/global.config.json")
err := client.AddConnectionsFromConfigUrl(context.Background(), "https://tonutils.com/global.config.json")
if err != nil {
t.Fatal("add connections err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion ton/dns/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var api = func() ton.APIClientWrapped {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := client.AddConnectionsFromConfigUrl(ctx, "https://ton.org/global.config.json")
err := client.AddConnectionsFromConfigUrl(ctx, "https://tonutils.com/global.config.json")
if err != nil {
panic(err)
}
Expand Down
6 changes: 3 additions & 3 deletions ton/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var apiTestNet = func() APIClientWrapped {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := client.AddConnectionsFromConfigUrl(ctx, "https://ton-blockchain.github.io/testnet-global.config.json")
err := client.AddConnectionsFromConfigUrl(ctx, "https://tonutils.com/testnet-global.config.json")
if err != nil {
panic(err)
}
Expand All @@ -37,7 +37,7 @@ var api = func() APIClientWrapped {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

cfg, err := liteclient.GetConfigFromUrl(ctx, "https://ton.org/global.config.json")
cfg, err := liteclient.GetConfigFromUrl(ctx, "https://tonutils.com/global.config.json")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -621,7 +621,7 @@ func TestAccountStorage_LoadFromCell_ExtraCurrencies(t *testing.T) {
}

func TestAPIClient_GetBlockProofForward(t *testing.T) {
cfg, err := liteclient.GetConfigFromUrl(context.Background(), "https://ton.org/global.config.json")
cfg, err := liteclient.GetConfigFromUrl(context.Background(), "https://tonutils.com/global.config.json")
if err != nil {
t.Fatal("get cfg err:", err.Error())
return
Expand Down
2 changes: 1 addition & 1 deletion ton/jetton/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var api = func() ton.APIClientWrapped {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := client.AddConnectionsFromConfigUrl(ctx, "https://ton-blockchain.github.io/testnet-global.config.json")
err := client.AddConnectionsFromConfigUrl(ctx, "https://tonutils.com/testnet-global.config.json")
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ton/nft/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var api = func() ton.APIClientWrapped {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

err := client.AddConnectionsFromConfigUrl(ctx, "https://ton-blockchain.github.io/testnet-global.config.json")
err := client.AddConnectionsFromConfigUrl(ctx, "https://tonutils.com/testnet-global.config.json")
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ton/payments/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var api = func() ton.APIClientWrapped {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := client.AddConnectionsFromConfigUrl(ctx, "https://ton-blockchain.github.io/testnet-global.config.json")
err := client.AddConnectionsFromConfigUrl(ctx, "https://tonutils.com/testnet-global.config.json")
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ton/runmethod.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type RunSmcMethod struct {
ID *BlockIDExt `tl:"struct"`
Account AccountID `tl:"struct"`
MethodID uint64 `tl:"long"`
Params *cell.Cell `tl:"cell"`
Params *cell.Cell `tl:"cell optional"`
}

type RunMethodResult struct {
Expand Down
4 changes: 2 additions & 2 deletions ton/wallet/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var api = func() ton.APIClientWrapped {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := client.AddConnectionsFromConfigUrl(ctx, "https://ton-blockchain.github.io/testnet-global.config.json")
err := client.AddConnectionsFromConfigUrl(ctx, "https://tonutils.com/testnet-global.config.json")
if err != nil {
panic(err)
}
Expand All @@ -41,7 +41,7 @@ var apiMain = func() ton.APIClientWrapped {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := client.AddConnectionsFromConfigUrl(ctx, "https://ton.org/global.config.json")
err := client.AddConnectionsFromConfigUrl(ctx, "https://tonutils.com/global.config.json")
if err != nil {
panic(err)
}
Expand Down

0 comments on commit db2bc98

Please sign in to comment.