Skip to content

Commit

Permalink
feat(enginenetx): implement deterministic+random mixing (#1558)
Browse files Browse the repository at this point in the history
We need deterministic+random mixing of HTTPS dial tactics to ensure that
we prioritize some tactics coming from the DNS before attempting all the
previous tactics, which would make the bootstrap super slow.

Part of ooni/probe#2704.
  • Loading branch information
bassosimone committed Apr 17, 2024
1 parent 0d4dc93 commit 7dab5a2
Show file tree
Hide file tree
Showing 2 changed files with 266 additions and 0 deletions.
68 changes: 68 additions & 0 deletions internal/enginenetx/mix.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package enginenetx

import "sync"

// mixSequentially mixes entries from primary followed by entries from fallback.
//
// This function returns a channel where we emit the edited
Expand All @@ -17,3 +19,69 @@ func mixSequentially(primary, fallback <-chan *httpsDialerTactic) <-chan *httpsD
}()
return output
}

// mixDeterministicThenRandomConfig contains config for [mixDeterministicThenRandom].
type mixDeterministicThenRandomConfig struct {
// C is the channel to mix from.
C <-chan *httpsDialerTactic

// N is the number of entries to read from at the
// beginning before starting random mixing.
N int
}

// mixDeterministicThenRandom reads the first N entries from primary, if any, then the first N
// entries from fallback, if any, and then randomly mixes the entries.
func mixDeterministicThenRandom(primary, fallback *mixDeterministicThenRandomConfig) <-chan *httpsDialerTactic {
output := make(chan *httpsDialerTactic)
go func() {
defer close(output)
mixTryEmitN(primary.C, primary.N, output)
mixTryEmitN(fallback.C, fallback.N, output)
for tx := range mixRandomly(primary.C, fallback.C) {
output <- tx
}
}()
return output
}

func mixTryEmitN(input <-chan *httpsDialerTactic, numToRead int, output chan<- *httpsDialerTactic) {
for idx := 0; idx < numToRead; idx++ {
tactic, good := <-input
if !good {
return
}
output <- tactic
}
}

func mixRandomly(left, right <-chan *httpsDialerTactic) <-chan *httpsDialerTactic {
output := make(chan *httpsDialerTactic)
go func() {
// read from left
waitg := &sync.WaitGroup{}
waitg.Add(1)
go func() {
defer waitg.Done()
for tx := range left {
output <- tx
}
}()

// read from right
waitg.Add(1)
go func() {
defer waitg.Done()
for tx := range right {
output <- tx
}
}()

// close when done
go func() {
waitg.Wait()
close(output)
}()
}()
return output
}
198 changes: 198 additions & 0 deletions internal/enginenetx/mix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,201 @@ func TestMixSequentially(t *testing.T) {
t.Fatal(diff)
}
}

func TestMixDeterministicThenRandom(t *testing.T) {
// define primary data source
primary := []*httpsDialerTactic{{
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "a1.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "a2.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "a3.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "a4.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "a5.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "a6.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "a7.com",
VerifyHostname: "api.ooni.io",
}}

// define fallback data source
fallback := []*httpsDialerTactic{{
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "b1.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "b2.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "b3.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "b4.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "b5.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "b6.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "b7.com",
VerifyHostname: "api.ooni.io",
}}

// define the expectations for the beginning of the result
expectBeginning := []*httpsDialerTactic{{
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "a1.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "a2.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "b1.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "b2.com",
VerifyHostname: "api.ooni.io",
}, {
Address: "130.192.91.211",
InitialDelay: 0,
Port: "443",
SNI: "b3.com",
VerifyHostname: "api.ooni.io",
}}

// remix
outch := mixDeterministicThenRandom(
&mixDeterministicThenRandomConfig{
C: streamTacticsFromSlice(primary),
N: 2,
},
&mixDeterministicThenRandomConfig{
C: streamTacticsFromSlice(fallback),
N: 3,
},
)
var output []*httpsDialerTactic
for tx := range outch {
output = append(output, tx)
}

// make sure we have the expected number of entries
if len(output) != 14 {
t.Fatal("we need 14 entries")
}
if diff := cmp.Diff(expectBeginning, output[:5]); diff != "" {
t.Fatal(diff)
}

// make sure each entry is represented
const (
inprimary = 1 << 0
infallback
inoutput
)
mapping := make(map[string]int)
for _, entry := range primary {
mapping[entry.tacticSummaryKey()] |= inprimary
}
for _, entry := range fallback {
mapping[entry.tacticSummaryKey()] |= infallback
}
for _, entry := range output {
mapping[entry.tacticSummaryKey()] |= inoutput
}
for entry, flags := range mapping {
if flags != (inprimary|inoutput) && flags != (infallback|inoutput) {
t.Fatal("unexpected flags", flags, "for entry", entry)
}
}
}

func TestMixTryEmitNWithClosedChannel(t *testing.T) {
// create an already closed channel
inputch := make(chan *httpsDialerTactic)
close(inputch)

// create channel for collecting the results
outputch := make(chan *httpsDialerTactic)

go func() {
// Implementation note: mixTryEmitN does not close the channel
// when done, therefore we need to close it ourselves.
mixTryEmitN(inputch, 10, outputch)
close(outputch)
}()

// read the output channel
var output []*httpsDialerTactic
for tx := range outputch {
output = append(output, tx)
}

// make sure we didn't read anything
if len(output) != 0 {
t.Fatal("expected zero entries")
}
}

0 comments on commit 7dab5a2

Please sign in to comment.