From 7dab5a2981218f43b4ff84e03a8ccf0878ea357f Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Wed, 17 Apr 2024 17:08:51 +0200 Subject: [PATCH] feat(enginenetx): implement deterministic+random mixing (#1558) We need deterministic+random mixing of HTTPS dial tactics to ensure that we prioritize some tactics coming from the DNS before attempting all the previous tactics, which would make the bootstrap super slow. Part of https://github.com/ooni/probe/issues/2704. --- internal/enginenetx/mix.go | 68 +++++++++++ internal/enginenetx/mix_test.go | 198 ++++++++++++++++++++++++++++++++ 2 files changed, 266 insertions(+) diff --git a/internal/enginenetx/mix.go b/internal/enginenetx/mix.go index 5dc7281fc..1cac673ca 100644 --- a/internal/enginenetx/mix.go +++ b/internal/enginenetx/mix.go @@ -1,5 +1,7 @@ package enginenetx +import "sync" + // mixSequentially mixes entries from primary followed by entries from fallback. // // This function returns a channel where we emit the edited @@ -17,3 +19,69 @@ func mixSequentially(primary, fallback <-chan *httpsDialerTactic) <-chan *httpsD }() return output } + +// mixDeterministicThenRandomConfig contains config for [mixDeterministicThenRandom]. +type mixDeterministicThenRandomConfig struct { + // C is the channel to mix from. + C <-chan *httpsDialerTactic + + // N is the number of entries to read from at the + // beginning before starting random mixing. + N int +} + +// mixDeterministicThenRandom reads the first N entries from primary, if any, then the first N +// entries from fallback, if any, and then randomly mixes the entries. +func mixDeterministicThenRandom(primary, fallback *mixDeterministicThenRandomConfig) <-chan *httpsDialerTactic { + output := make(chan *httpsDialerTactic) + go func() { + defer close(output) + mixTryEmitN(primary.C, primary.N, output) + mixTryEmitN(fallback.C, fallback.N, output) + for tx := range mixRandomly(primary.C, fallback.C) { + output <- tx + } + }() + return output +} + +func mixTryEmitN(input <-chan *httpsDialerTactic, numToRead int, output chan<- *httpsDialerTactic) { + for idx := 0; idx < numToRead; idx++ { + tactic, good := <-input + if !good { + return + } + output <- tactic + } +} + +func mixRandomly(left, right <-chan *httpsDialerTactic) <-chan *httpsDialerTactic { + output := make(chan *httpsDialerTactic) + go func() { + // read from left + waitg := &sync.WaitGroup{} + waitg.Add(1) + go func() { + defer waitg.Done() + for tx := range left { + output <- tx + } + }() + + // read from right + waitg.Add(1) + go func() { + defer waitg.Done() + for tx := range right { + output <- tx + } + }() + + // close when done + go func() { + waitg.Wait() + close(output) + }() + }() + return output +} diff --git a/internal/enginenetx/mix_test.go b/internal/enginenetx/mix_test.go index 0b30c837b..990373af8 100644 --- a/internal/enginenetx/mix_test.go +++ b/internal/enginenetx/mix_test.go @@ -27,3 +27,201 @@ func TestMixSequentially(t *testing.T) { t.Fatal(diff) } } + +func TestMixDeterministicThenRandom(t *testing.T) { + // define primary data source + primary := []*httpsDialerTactic{{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a1.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a2.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a3.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a4.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a5.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a6.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a7.com", + VerifyHostname: "api.ooni.io", + }} + + // define fallback data source + fallback := []*httpsDialerTactic{{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b1.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b2.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b3.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b4.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b5.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b6.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b7.com", + VerifyHostname: "api.ooni.io", + }} + + // define the expectations for the beginning of the result + expectBeginning := []*httpsDialerTactic{{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a1.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a2.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b1.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b2.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b3.com", + VerifyHostname: "api.ooni.io", + }} + + // remix + outch := mixDeterministicThenRandom( + &mixDeterministicThenRandomConfig{ + C: streamTacticsFromSlice(primary), + N: 2, + }, + &mixDeterministicThenRandomConfig{ + C: streamTacticsFromSlice(fallback), + N: 3, + }, + ) + var output []*httpsDialerTactic + for tx := range outch { + output = append(output, tx) + } + + // make sure we have the expected number of entries + if len(output) != 14 { + t.Fatal("we need 14 entries") + } + if diff := cmp.Diff(expectBeginning, output[:5]); diff != "" { + t.Fatal(diff) + } + + // make sure each entry is represented + const ( + inprimary = 1 << 0 + infallback + inoutput + ) + mapping := make(map[string]int) + for _, entry := range primary { + mapping[entry.tacticSummaryKey()] |= inprimary + } + for _, entry := range fallback { + mapping[entry.tacticSummaryKey()] |= infallback + } + for _, entry := range output { + mapping[entry.tacticSummaryKey()] |= inoutput + } + for entry, flags := range mapping { + if flags != (inprimary|inoutput) && flags != (infallback|inoutput) { + t.Fatal("unexpected flags", flags, "for entry", entry) + } + } +} + +func TestMixTryEmitNWithClosedChannel(t *testing.T) { + // create an already closed channel + inputch := make(chan *httpsDialerTactic) + close(inputch) + + // create channel for collecting the results + outputch := make(chan *httpsDialerTactic) + + go func() { + // Implementation note: mixTryEmitN does not close the channel + // when done, therefore we need to close it ourselves. + mixTryEmitN(inputch, 10, outputch) + close(outputch) + }() + + // read the output channel + var output []*httpsDialerTactic + for tx := range outputch { + output = append(output, tx) + } + + // make sure we didn't read anything + if len(output) != 0 { + t.Fatal("expected zero entries") + } +}