Skip to content

Commit

Permalink
swarm: implement blackhole detection and happy eyeballs dialing
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Jun 5, 2023
1 parent 6f27081 commit b2e8657
Show file tree
Hide file tree
Showing 7 changed files with 550 additions and 95 deletions.
188 changes: 188 additions & 0 deletions p2p/net/swarm/black_hole_detector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package swarm

import (
"sync"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

type outcome int

const (
outcomeSuccess outcome = iota
outcomeFailed
)

type blackholeState int

const (
blackholeStateAllowed blackholeState = iota
blackholeStateBlocked
)

// blackHoleDetector provides black hole detection for dials to addresses selected by the
// selector. On detecting that dials to addresses selected by the selector are failing,
// subsequent dials to the addresses are refused and only 1 dial every n attempts is allowed.
type blackHoleDetector struct {
// selector selects addresses for blackhole detection. Dials to addresses for which
// the selector returns false are always allowed.
selector func(addr ma.Multiaddr) bool
// every nth dial to the address is permitted irrespective of the wrapper status
n int
// minDials is the minimum number of completed dials required before dials are blocked
minDials int
// minSuccessFraction is the minimum success fraction required to allow dials
minSuccessFraction float64
// name for the detector. Useful for debugging
name string

// requests counts number of dial requests up to nth request. Resets to 0 every nth request.
requests int
// allowed counts the number of dials allowed up to `minDials`
allowed int
// outcomes of the last x allowed dials
outcomes []outcome
// outcomeIdx is the index of the next outcome in the sliding window
outcomeIdx int
// successes is the count of successful dials in outcomes
successes int
// failures is the count of failed dials in outcomes
failures int
// full is true when we have a full sliding window worth of outcomes.
// Keeping this as a separate variable helps avoid clearing out the entire sliding window on
// reset.
full bool
// state is the current state of the detector
state blackholeState

mu sync.Mutex
}

func newBlackHoleDetector(selector func(addr ma.Multiaddr) bool, name string, allowNth int, minDials int, slidingWindowSize int, minSuccessFraction float64) *blackHoleDetector {
return &blackHoleDetector{
selector: selector,
n: allowNth,
minDials: minDials,
minSuccessFraction: minSuccessFraction,
outcomes: make([]outcome, slidingWindowSize),
name: name,
}
}

func newIPv6BlackHoleDetector() *blackHoleDetector {
return newBlackHoleDetector(
func(addr ma.Multiaddr) bool {
return manet.IsPublicAddr(addr) && isProtocolAddr(addr, ma.P_IP6)
},
"IPv6",
100,
100,
1000,
0.01,
)
}

func newUDPBlackHoleDetector() *blackHoleDetector {
return newBlackHoleDetector(
func(addr ma.Multiaddr) bool {
return manet.IsPublicAddr(addr) && isProtocolAddr(addr, ma.P_UDP)
},
"UDP",
100,
100,
1000,
0.01,
)
}

func (b *blackHoleDetector) CompletedDial(addr ma.Multiaddr, success bool) {
if b == nil || !b.selector(addr) {
return
}
b.mu.Lock()
defer b.mu.Unlock()

if b.state == blackholeStateBlocked && success {
// If the call succeeds in a blocked state we reset to allowed.
// This is better than slowly accumulating values till we cross the minSuccessFraction
// threshold since a blackhole is a binary property.
b.reset()
return
}

b.allowed++
if b.allowed > b.minDials {
b.allowed = b.minDials
}

// Discard the earliest outcome
if b.full {
if b.outcomes[b.outcomeIdx] == outcomeSuccess {
b.successes--
} else {
b.failures--
}
}
if success {
b.successes++
b.outcomes[b.outcomeIdx] = outcomeSuccess
} else {
b.failures++
b.outcomes[b.outcomeIdx] = outcomeFailed
}

b.outcomeIdx++
if b.outcomeIdx == len(b.outcomes) {
b.outcomeIdx = 0
b.full = true
}

b.updateState()
}

func (b *blackHoleDetector) IsDialAllowed(addr ma.Multiaddr) bool {
if b == nil || !b.selector(addr) {
return true
}

b.mu.Lock()
defer b.mu.Unlock()
b.requests++
if b.requests == b.n {
b.requests = 0
return true
}
return b.state == blackholeStateAllowed
}

func (b *blackHoleDetector) reset() {
b.allowed = 0
b.successes = 0
b.failures = 0
b.outcomeIdx = 0
b.full = false
b.updateState()
}

func (b *blackHoleDetector) updateState() {
st := b.state
successFraction := 0.0
if b.allowed < b.minDials {
b.state = blackholeStateAllowed
} else {
successFraction = float64(b.successes) / float64(b.successes+b.failures)
if successFraction >= b.minSuccessFraction {
b.state = blackholeStateAllowed
} else {
b.state = blackholeStateBlocked
}
}
if st != b.state {
if b.state == blackholeStateAllowed {
log.Debugf("%s blackHoleDetector state changed to Allowed", b.name)
} else {
log.Debugf("%s blackHoleDetector state changed to Blocked. Success fraction is %0.3f", b.name, successFraction)
}
}
}
137 changes: 137 additions & 0 deletions p2p/net/swarm/black_hole_detector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package swarm

import (
"testing"

ma "github.com/multiformats/go-multiaddr"
)

func TestBlackHoleDetectorInapplicableAddress(t *testing.T) {
bhd := newBlackHoleDetector(func(_ ma.Multiaddr) bool { return false }, "",
10, 10, 10, 1.0)
addr := ma.StringCast("/ip4/127.0.0.1/tcp/1234")
for i := 0; i < 1000; i++ {
bhd.CompletedDial(addr, false)
}
if !bhd.IsDialAllowed(addr) {
t.Errorf("expected dial to inapplicable address to be always allowed")
}
}

func TestBlackHoleDetectorReset(t *testing.T) {
n := 10
minDials := 10
bhd := newBlackHoleDetector(func(_ ma.Multiaddr) bool { return true }, "",
n, minDials, 100, 0.2)
addr := ma.StringCast("/ip4/127.0.0.1/tcp/1024")
var i = 0
// calls up to threshold should be allowed
for i = 1; i <= minDials; i++ {
if !bhd.IsDialAllowed(addr) {
t.Fatalf("expected calls up to minDials to be allowed")
}
bhd.CompletedDial(addr, false)
}
// after threshold calls every nth call should be allowed
for i = minDials + 1; i < 42; i++ {
isAllowed := bhd.IsDialAllowed(addr)
if (i%n == 0 && !isAllowed) || (i%n != 0 && isAllowed) {
t.Fatalf("expected every nth dial to be allowed")
}
}

bhd.CompletedDial(addr, true)
// check if calls up to threshold are allowed after success
for i = 0; i < minDials; i++ {
if !bhd.IsDialAllowed(addr) {
t.Fatalf("expected black hole detector state to reset after success")
}
bhd.CompletedDial(addr, false)
}

// next call should be refused
if bhd.IsDialAllowed(addr) {
t.Fatalf("expected dial to be blocked")
}
}

func TestBlackHoleDetector(t *testing.T) {
n := 100
threshold := 10
windowSize := 10
bhd := newBlackHoleDetector(func(_ ma.Multiaddr) bool { return true }, "",
n, threshold, windowSize, 0.4)
addr := ma.StringCast("/ip4/127.0.0.1/tcp/1024")
var i = 0
// 5 success and 5 fails
for i = 1; i <= 5; i++ {
bhd.CompletedDial(addr, true)
}
for i = 1; i <= 5; i++ {
bhd.CompletedDial(addr, false)
}

if !bhd.IsDialAllowed(addr) {
t.Fatalf("expected dial to be allowed")
}
// 4 success and 6 fails
bhd.CompletedDial(addr, false)

if !bhd.IsDialAllowed(addr) {
t.Fatalf("expected dial to be allowed")
}
// 3 success and 7 fails
bhd.CompletedDial(addr, false)

// should be blocked
if bhd.IsDialAllowed(addr) {
t.Fatalf("expected dial to be blocked")
}

bhd.CompletedDial(addr, true)
// 5 success and 5 fails
for i = 1; i <= 5; i++ {
bhd.CompletedDial(addr, true)
}
for i = 1; i <= 5; i++ {
bhd.CompletedDial(addr, false)
}

if !bhd.IsDialAllowed(addr) {
t.Fatalf("expected dial to be allowed")
}
// 4 success and 6 fails
bhd.CompletedDial(addr, false)

if !bhd.IsDialAllowed(addr) {
t.Fatalf("expected dial to be allowed")
}
// 3 success and 7 fails
bhd.CompletedDial(addr, false)

// should be blocked
if bhd.IsDialAllowed(addr) {
t.Fatalf("expected dial to be blocked")
}

}

func TestBlackHoleDetectorSlidingWindowAndMinDials(t *testing.T) {
n := 100
// sliding window can be less than minDials
threshold := 10
windowSize := 5
bhd := newBlackHoleDetector(func(_ ma.Multiaddr) bool { return true }, "",
n, threshold, windowSize, 0.4)
addr := ma.StringCast("/ip4/127.0.0.1/tcp/1024")
for i := 0; i < threshold; i++ {
if !bhd.IsDialAllowed(addr) {
t.Errorf("expected dials to be allowed up to minDials")
}
bhd.CompletedDial(addr, false)
}
// dial should be blocked
if bhd.IsDialAllowed(addr) {
t.Errorf("expected dial to be blocked")
}
}
Loading

0 comments on commit b2e8657

Please sign in to comment.