Skip to content

Commit

Permalink
bitswap/httpnet: ping via http request. fix tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
hsanjuan committed Jan 27, 2025
1 parent cd98823 commit e4d6054
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 84 deletions.
10 changes: 5 additions & 5 deletions bitswap/network/http_multiaddr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ func TestExtractHTTPAddress(t *testing.T) {
return
}

got, sni, err := ExtractHTTPAddress(ma)
got, err := ExtractHTTPAddress(ma)
if (err != nil) != tt.expectErr {
t.Errorf("got: %s", got)
t.Errorf("got: %s", got.URL)
t.Errorf("ExtractHTTPAddress() error = %v, wantErr %v", err, tt.expectErr)
return
}

if tt.want != nil && (got == nil || got.String() != tt.want.String() || tt.sni != sni) {
t.Errorf("ExtractHTTPAddress() = %v (%s), want %v (%s)", got, sni, tt.want, tt.sni)
if tt.want != nil && (got.URL == nil || got.URL.String() != tt.want.String() || tt.sni != got.SNI) {
t.Errorf("ExtractHTTPAddress() = %v (%s), want %v (%s)", got.URL, got.SNI, tt.want, tt.sni)
}
})
}
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestExtractHTTPAddressesFromPeer(t *testing.T) {

// Compare URLs
for i := range got {
if got[i].String() != tt.want[i].String() {
if got[i].URL.String() != tt.want[i].String() {
t.Errorf("ExtractHTTPAddressesFromPeer() URL at index %d = %v, want %v", i, got[i], tt.want[i])
}
}
Expand Down
16 changes: 10 additions & 6 deletions bitswap/network/httpnet/httpnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (

var pingCid = "bafkqaaa" // identity CID

const http2proto = "HTTP/2.0"

// Option allows to configure the Network.
type Option func(net *Network)

Expand Down Expand Up @@ -167,7 +169,6 @@ type Network struct {
func New(host host.Host, opts ...Option) network.BitSwapNetwork {
htnet := &Network{
host: host,
pinger: newPinger(host),
userAgent: defaultUserAgent(),
maxBlockSize: DefaultMaxBlockSize,
dialTimeout: DefaultDialTimeout,
Expand Down Expand Up @@ -236,10 +237,13 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork {
}

c := &http.Client{
Transport: newTransport(t),
Transport: t,
}
htnet.client = c

pinger := newPinger(host, htnet.client, pingCid, htnet.userAgent)
htnet.pinger = pinger

return htnet
}

Expand Down Expand Up @@ -353,7 +357,7 @@ func (ht *Network) Connect(ctx context.Context, p peer.AddrInfo) error {
// on success.
var workingAddrs []multiaddr.Multiaddr
for i, u := range urls {
req, err := ht.buildRequest(ctx, u, "GET", pingCid)
req, err := buildRequest(ctx, u, "GET", pingCid, ht.userAgent)
if err != nil {
log.Debug(err)
return err
Expand All @@ -370,7 +374,7 @@ func (ht *Network) Connect(ctx context.Context, p peer.AddrInfo) error {
continue
}

if resp.Proto != "HTTP/2.0" {
if resp.Proto != http2proto {
log.Warnf("%s://%q is not using HTTP/2 (%s)", req.URL.Scheme, req.URL.Host, resp.Proto)
}

Expand Down Expand Up @@ -452,7 +456,7 @@ func (ht *Network) Stats() network.Stats {
}

// buildRequests sets up common settings for making a requests.
func (ht *Network) buildRequest(ctx context.Context, u network.ParsedURL, method string, cid string) (*http.Request, error) {
func buildRequest(ctx context.Context, u network.ParsedURL, method string, cid string, userAgent string) (*http.Request, error) {
// copy url
sendURL, _ := url.Parse(u.URL.String())
sendURL.RawQuery = "format=raw"
Expand All @@ -470,7 +474,7 @@ func (ht *Network) buildRequest(ctx context.Context, u network.ParsedURL, method

headers := make(http.Header)
headers.Add("Accept", "application/vnd.ipld.raw")
headers.Add("User-Agent", ht.userAgent)
headers.Add("User-Agent", userAgent)
if u.SNI != "" {
headers.Add("Host", u.SNI)
}
Expand Down
36 changes: 26 additions & 10 deletions bitswap/network/httpnet/httpnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,49 +296,65 @@ func TestBestURL(t *testing.T) {
now := time.Now()
surls := []*senderURL{
{
url: urls[0],
ParsedURL: network.ParsedURL{
URL: urls[0],
},
cooldown: now.Add(time.Second),
clientErrors: 0,
serverErrors: 2,
},
{
url: urls[1],
ParsedURL: network.ParsedURL{
URL: urls[1],
},
cooldown: now.Add(time.Second),
clientErrors: 0,
serverErrors: 1,
},
{
url: urls[2],
ParsedURL: network.ParsedURL{
URL: urls[2],
},
cooldown: time.Time{},
clientErrors: 0,
serverErrors: 3,
},
{
url: urls[3],
ParsedURL: network.ParsedURL{
URL: urls[3],
},
cooldown: time.Time{},
clientErrors: 0,
serverErrors: 2,
},
{
url: urls[4],
ParsedURL: network.ParsedURL{
URL: urls[4],
},
cooldown: time.Time{},
clientErrors: 0,
serverErrors: 1,
},
{
url: urls[5],
ParsedURL: network.ParsedURL{
URL: urls[5],
},
cooldown: time.Time{},
clientErrors: 0,
serverErrors: 20,
},
{
url: urls[6],
ParsedURL: network.ParsedURL{
URL: urls[6],
},
cooldown: time.Time{},
clientErrors: 2,
serverErrors: 0,
},
{
url: urls[7],
ParsedURL: network.ParsedURL{
URL: urls[7],
},
cooldown: now.Add(2 * time.Second),
clientErrors: 0,
serverErrors: 0,
Expand All @@ -360,8 +376,8 @@ func TestBestURL(t *testing.T) {
}

for i, u := range ms.urls {
if u.url.String() != expected[i] {
t.Error("wrong url order", i, u.url)
if u.URL.String() != expected[i] {
t.Error("wrong url order", i, u.URL)
}
}

Expand Down
8 changes: 3 additions & 5 deletions bitswap/network/httpnet/msg_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm

ctx, cancel := context.WithTimeout(ctx, sender.opts.SendTimeout)
defer cancel()
req, err := sender.ht.buildRequest(ctx, u.ParsedURL, method, entry.Cid.String())
req, err := buildRequest(ctx, u.ParsedURL, method, entry.Cid.String(), sender.ht.userAgent)
if err != nil {
return &senderError{
Type: typeFatal,
Expand Down Expand Up @@ -386,11 +386,9 @@ func (sender *httpMsgSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessa
if entry.Cancel {
entryCtxs[i] = ctx
entryCancels[i] = nop
} else {
entryCtxs[i], entryCancels[i] = sender.ht.requestTracker.requestContext(ctx, entry.Cid)
}
// The TTL here is just for auto-cleaning the request context
// from the request tracker. It is set in a way that ensure that the request
// has run
entryCtxs[i], entryCancels[i] = sender.ht.requestTracker.requestContext(ctx, entry.Cid)
}

WANTLIST_LOOP:
Expand Down
51 changes: 26 additions & 25 deletions bitswap/network/httpnet/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@ package httpnet

import (
"context"
"net"
"net/url"
"fmt"
"net/http"
"sync"
"time"

"github.com/ipfs/boxo/bitswap/network"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
probing "github.com/prometheus-community/pro-bing"
"go.uber.org/multierr"
)

// pinger pings connected hosts on regular intervals
// and tracks their latency.
type pinger struct {
host host.Host
host host.Host
pingCid string
userAgent string
client *http.Client

latenciesLock sync.RWMutex
latencies map[peer.ID]time.Duration
Expand All @@ -27,9 +29,11 @@ type pinger struct {
pings map[peer.ID]context.CancelFunc
}

func newPinger(h host.Host) *pinger {
func newPinger(h host.Host, client *http.Client, pingCid, userAgent string) *pinger {
return &pinger{
host: h,
pingCid: pingCid,
userAgent: userAgent,
latencies: make(map[peer.ID]time.Duration),
pings: make(map[peer.ID]context.CancelFunc),
}
Expand All @@ -49,36 +53,33 @@ func (pngr *pinger) ping(ctx context.Context, p peer.ID) ping.Result {

results := make(chan ping.Result, len(urls))
for _, u := range urls {
go func(u *url.URL) {
// Remove port from url.
host, _, err := net.SplitHostPort(u.Host)
go func(u network.ParsedURL) {
req, err := buildRequest(ctx, u, "GET", pngr.pingCid, pngr.userAgent)
if err != nil {
results <- ping.Result{
Error: err,
}
log.Debug(err)
results <- ping.Result{Error: err}
return
}

pinger, err := probing.NewPinger(host)
log.Debugf("ping request to %s", req.URL)
start := time.Now()
resp, err := pngr.client.Do(req)
if err != nil {
log.Debug("pinger error ", err)
results <- ping.Result{
Error: err,
}
results <- ping.Result{Error: err}
return
}
pinger.Count = 1

err = pinger.RunWithContext(ctx)
if err != nil {
log.Debug("ping error ", err)
results <- ping.Result{
Error: err,
}
if resp.StatusCode >= 300 { // non-success
err := fmt.Errorf("ping request to %q returned %d", req.URL, resp.StatusCode)
log.Error(err)
results <- ping.Result{Error: err}
return
}

results <- ping.Result{
RTT: pinger.Statistics().AvgRtt,
RTT: time.Since(start),
}
}(u.URL)
}(u)
}

var result ping.Result
Expand Down
28 changes: 0 additions & 28 deletions bitswap/network/httpnet/transport.go

This file was deleted.

17 changes: 12 additions & 5 deletions bitswap/network/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"go.uber.org/multierr"
)

type router struct {
Expand Down Expand Up @@ -112,10 +111,18 @@ func (rt *router) Connect(ctx context.Context, p peer.AddrInfo) error {
}

func (rt *router) DisconnectFrom(ctx context.Context, p peer.ID) error {
return multierr.Combine(
rt.HTTP.DisconnectFrom(ctx, p),
rt.Bitswap.DisconnectFrom(ctx, p),
)
// DisconnectFrom is only called from bitswap.Server, on failures
// receiving a bitswap message. Normally, if HTTP is prioritized, we
// should not have requested anything over bitswap, so this should not
// happen.
//
// Still, follow prioritization rule.
pi := rt.Peerstore.PeerInfo(p)
htaddrs, _ := SplitHTTPAddrs(pi)
if len(htaddrs.Addrs) > 0 {
return rt.HTTP.DisconnectFrom(ctx, p)
}
return rt.Bitswap.DisconnectFrom(ctx, p)
}

func (rt *router) Stats() Stats {
Expand Down

0 comments on commit e4d6054

Please sign in to comment.