diff --git a/caller.go b/caller.go index 20ff1a5..10312e6 100644 --- a/caller.go +++ b/caller.go @@ -34,7 +34,7 @@ var ( waitTime = flag.Duration("waitTime", 5*time.Second, "how long to wait between subsequent listings of open connections") poll = flag.Bool("poll", true, "Whether the polling method should be used to see new connections.") tracerType = flagx.Enum{ - Options: []string{"paris-traceroute", "scamper", "scamper-with-paris-backup"}, + Options: []string{"paris-traceroute", "scamper", "scamper-daemon", "scamper-daemon-with-paris-backup"}, Value: "scamper", } @@ -63,13 +63,16 @@ func main() { promSrv := prometheusx.MustServeMetrics() defer promSrv.Shutdown(ctx) + scamper := &tracer.Scamper{ + Binary: *scamperBin, + OutputPath: *outputPath, + ScamperTimeout: *scamperTimeout, + } scamperDaemon := &tracer.ScamperDaemon{ - Binary: *scamperBin, + Scamper: scamper, AttachBinary: *scattachBin, Warts2JSONBinary: *scwarts2jsonBin, - OutputPath: *outputPath, ControlSocket: *scamperCtrlSocket, - ScamperTimeout: *scamperTimeout, } parisTracer := &tracer.Paris{ Binary: *parisBin, @@ -81,7 +84,11 @@ func main() { // Set up the cache three different ways, depending on the trace method requested. switch tracerType.Value { + case "paris-traceroute": + cache = ipcache.New(ctx, parisTracer, *ipcache.IPCacheTimeout, *ipcache.IPCacheUpdatePeriod) case "scamper": + cache = ipcache.New(ctx, scamper, *ipcache.IPCacheTimeout, *ipcache.IPCacheUpdatePeriod) + case "scamper-daemon": cache = ipcache.New(ctx, scamperDaemon, *ipcache.IPCacheTimeout, *ipcache.IPCacheUpdatePeriod) wg.Add(1) go func() { @@ -90,9 +97,7 @@ func main() { cancel() wg.Done() }() - case "paris-traceroute": - cache = ipcache.New(ctx, parisTracer, *ipcache.IPCacheTimeout, *ipcache.IPCacheUpdatePeriod) - case "scamper-with-paris-backup": + case "scamper-daemon-with-paris-backup": cache = ipcache.New(ctx, scamperDaemon, *ipcache.IPCacheTimeout, *ipcache.IPCacheUpdatePeriod) wg.Add(1) go func() { diff --git a/caller_test.go b/caller_test.go index 8894f28..81f65b0 100644 --- a/caller_test.go +++ b/caller_test.go @@ -33,6 +33,28 @@ func TestMain(t *testing.T) { *outputPath = dir *poll = true *scamperBin = "scamper" + tracerType.Value = "scamper-daemon" + ctx, cancel = context.WithCancel(context.Background()) + go func() { + time.Sleep(1 * time.Second) + cancel() + }() + main() +} + +func TestScamper(t *testing.T) { + dir, err := ioutil.TempDir("", "TestMain") + rtx.Must(err, "Could not create temp dir") + defer os.RemoveAll(dir) + + // Verify that main doesn't crash, and that it does exit when the context is canceled. + // TODO: verify more in this test. + *prometheusx.ListenAddress = ":0" + *scamperCtrlSocket = "" + *waitTime = time.Nanosecond // Run through the loop a few times. + *outputPath = dir + *poll = true + *scamperBin = "scamper" tracerType.Value = "scamper" ctx, cancel = context.WithCancel(context.Background()) go func() { @@ -77,7 +99,7 @@ func TestMainWithBackupPT(t *testing.T) { *poll = false *scamperCtrlSocket = dir + "/scamper.sock" *scamperBin = "false" - tracerType.Value = "scamper-with-paris-backup" + tracerType.Value = "scamper-daemon-with-paris-backup" ctx, cancel = context.WithCancel(context.Background()) go srv.Serve(ctx) diff --git a/tracer/scamper.go b/tracer/scamper.go index 915a20d..04bd0c9 100644 --- a/tracer/scamper.go +++ b/tracer/scamper.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "errors" + "fmt" "io/ioutil" "log" "os" @@ -20,6 +21,98 @@ import ( pipe "gopkg.in/m-lab/pipe.v3" ) +// Scamper uses scamper in non-daemon mode to perform traceroutes. This is much +// less efficient, but when scamper crashes, it has a much lower "blast radius". +type Scamper struct { + Binary, OutputPath string + ScamperTimeout time.Duration +} + +// generatesFilename creates the string filename for storing the data. +func (*Scamper) generateFilename(cookie string, t time.Time) string { + c, err := strconv.ParseInt(cookie, 16, 64) + rtx.PanicOnError(err, "Could not turn cookie into number") + return t.Format("20060102T150405Z") + "_" + uuid.FromCookie(uint64(c)) + ".jsonl" +} + +// TraceFromCachedTrace creates a file containing traceroute results that came from a +// cache result, rather than performing the traceroute with scamper. Because +// scamper-in-standalone and scamper-as-daemon use the same output format, this +// function is the same code for both. +func (s *Scamper) TraceFromCachedTrace(conn connection.Connection, t time.Time, cachedTest string) error { + dir, err := createTimePath(s.OutputPath, t) + if err != nil { + log.Println("Could not create directories") + tracerCacheErrors.WithLabelValues("scamper", "baddir").Inc() + return err + } + filename := dir + s.generateFilename(conn.Cookie, t) + log.Println("Starting a cached trace to be put in", filename) + + // remove the first line of cachedTest + split := strings.Index(cachedTest, "\n") + + if split <= 0 || split == len(cachedTest) { + log.Println("Invalid cached test") + tracerCacheErrors.WithLabelValues("scamper", "badcache").Inc() + return errors.New("Invalid cached test") + } + + // Get the uuid from the first line of cachedTest + newTest := GetMetaline(conn, true, extractUUID(cachedTest[:split])) + cachedTest[split+1:] + return ioutil.WriteFile(filename, []byte(newTest), 0666) +} + +// DontTrace does not perform a trace that would have been performed, had the +// previous round not already returned an error. This should increment a counter +// that tracks the number of tests which have been "transitively failed". +func (*Scamper) DontTrace(conn connection.Connection, err error) { + tracesNotPerformed.WithLabelValues("scamper").Inc() +} + +// Trace starts a new scamper process running the paris-traceroute algorithm to +// every node. This uses more resources per-traceroute, but segfaults in the +// called binaries have a much smaller "blast radius". +func (s *Scamper) Trace(conn connection.Connection, t time.Time) (out string, err error) { + defer func() { + if r := recover(); r != nil { + log.Printf("Recovered (%v) a crashed trace for %v at %v\n", r, conn, t) + crashedTraces.WithLabelValues("scamper").Inc() + err = errors.New(fmt.Sprint(r)) + } + }() + tracesInProgress.WithLabelValues("scamper").Inc() + defer tracesInProgress.WithLabelValues("scamper").Dec() + return s.trace(conn, t) +} + +// trace a single connection using scamper as a standalone binary. +func (s *Scamper) trace(conn connection.Connection, t time.Time) (string, error) { + dir, err := createTimePath(s.OutputPath, t) + rtx.PanicOnError(err, "Could not create directory") + filename := dir + s.generateFilename(conn.Cookie, t) + log.Println("Starting a trace to be put in", filename) + buff := bytes.Buffer{} + + _, err = buff.WriteString(GetMetaline(conn, false, "")) + rtx.PanicOnError(err, "Could not write to buffer") + + cmd := pipe.Line( + pipe.Exec(s.Binary, "-I", "tracelb -P icmp-echo -q 3 -O ptr "+conn.RemoteIP, "-o-", "-O", "json"), + pipe.Write(&buff), + ) + err = pipe.RunTimeout(cmd, s.ScamperTimeout) + tracesPerformed.WithLabelValues("scamper").Inc() + if err != nil && err.Error() == pipe.ErrTimeout.Error() { + log.Println("TimeOut for Trace: ", cmd) + return "", err + } + + rtx.PanicOnError(err, "Command %v failed", cmd) + rtx.PanicOnError(ioutil.WriteFile(filename, buff.Bytes(), 0666), "Could not save output to file") + return string(buff.Bytes()), nil +} + // ScamperDaemon contains a single instance of a scamper process. Once the ScamperDaemon has // been started, you can call Trace and then all traces will be centrally run // and managed. @@ -29,8 +122,8 @@ import ( // all traces are centrally managed, so if the central daemon goes wrong for // some reason, there is a much larger blast radius. type ScamperDaemon struct { - Binary, AttachBinary, Warts2JSONBinary, ControlSocket, OutputPath string - ScamperTimeout time.Duration + *Scamper + AttachBinary, Warts2JSONBinary, ControlSocket string } // MustStart starts a scamper binary running and listening to the given context. @@ -76,15 +169,16 @@ func (d *ScamperDaemon) MustStart(ctx context.Context) { // All checks inside of this function and its subfunctions should call // PanicOnError instead of Must because each trace is independent of the others, // so we should prevent a single failed trace from crashing everything. -func (d *ScamperDaemon) Trace(conn connection.Connection, t time.Time) (string, error) { +func (d *ScamperDaemon) Trace(conn connection.Connection, t time.Time) (out string, err error) { defer func() { if r := recover(); r != nil { log.Printf("Recovered (%v) a crashed trace for %v at %v\n", r, conn, t) - crashedTraces.WithLabelValues("scamper").Inc() + crashedTraces.WithLabelValues("scamper-daemon").Inc() + err = errors.New(fmt.Sprint(r)) } }() - tracesInProgress.WithLabelValues("scamper").Inc() - defer tracesInProgress.WithLabelValues("scamper").Dec() + tracesInProgress.WithLabelValues("scamper-daemon").Inc() + defer tracesInProgress.WithLabelValues("scamper-daemon").Dec() return d.trace(conn, t) } @@ -96,39 +190,6 @@ func (d *ScamperDaemon) TraceAll(connections []connection.Connection) { } } -// generatesFilename creates the string filename for storing the data. -func (*ScamperDaemon) generateFilename(cookie string, t time.Time) string { - c, err := strconv.ParseInt(cookie, 16, 64) - rtx.PanicOnError(err, "Could not turn cookie into number") - return t.Format("20060102T150405Z") + "_" + uuid.FromCookie(uint64(c)) + ".jsonl" -} - -// TraceFromCachedTrace creates a file containing traceroute results that came from a -// cache result, rather than performing the traceroute with scamper. -func (d *ScamperDaemon) TraceFromCachedTrace(conn connection.Connection, t time.Time, cachedTest string) error { - dir, err := createTimePath(d.OutputPath, t) - if err != nil { - log.Println("Could not create directories") - tracerCacheErrors.WithLabelValues("scamper", "baddir").Inc() - return err - } - filename := dir + d.generateFilename(conn.Cookie, t) - log.Println("Starting a cached trace to be put in", filename) - - // remove the first line of cachedTest - split := strings.Index(cachedTest, "\n") - - if split <= 0 || split == len(cachedTest) { - log.Println("Invalid cached test") - tracerCacheErrors.WithLabelValues("scamper", "badcache").Inc() - return errors.New("Invalid cached test") - } - - // Get the uuid from the first line of cachedTest - newTest := GetMetaline(conn, true, extractUUID(cachedTest[:split])) + cachedTest[split+1:] - return ioutil.WriteFile(filename, []byte(newTest), 0666) -} - func (d *ScamperDaemon) trace(conn connection.Connection, t time.Time) (string, error) { dir, err := createTimePath(d.OutputPath, t) rtx.PanicOnError(err, "Could not create directory") @@ -149,7 +210,7 @@ func (d *ScamperDaemon) trace(conn connection.Connection, t time.Time) (string, pipe.Write(&buff), ) err = pipe.RunTimeout(cmd, d.ScamperTimeout) - tracesPerformed.WithLabelValues("scamper").Inc() + tracesPerformed.WithLabelValues("scamper-daemon").Inc() if err != nil && err.Error() == pipe.ErrTimeout.Error() { log.Println("TimeOut for Trace: ", cmd) return "", err @@ -159,10 +220,3 @@ func (d *ScamperDaemon) trace(conn connection.Connection, t time.Time) (string, rtx.PanicOnError(ioutil.WriteFile(filename, buff.Bytes(), 0666), "Could not save output to file") return string(buff.Bytes()), nil } - -// DontTrace does not perform a trace that would have been performed, had the -// previous round not already returned an error. This should increment a counter -// that tracks the number of tests which have been "transitively failed". -func (d *ScamperDaemon) DontTrace(conn connection.Connection, err error) { - tracesNotPerformed.WithLabelValues("scamper").Inc() -} diff --git a/tracer/scamper_test.go b/tracer/scamper_test.go index 9274cc8..192343b 100644 --- a/tracer/scamper_test.go +++ b/tracer/scamper_test.go @@ -19,18 +19,74 @@ import ( "github.com/m-lab/uuid/prefix" ) +func TestScamper(t *testing.T) { + dir, err := ioutil.TempDir("", "TestScamper") + rtx.Must(err, "Could not create tempdir") + + s := &Scamper{ + OutputPath: dir, + Binary: "echo", + ScamperTimeout: time.Duration(time.Hour), + } + + // Test that it can perform a trace + now := time.Date(2003, 11, 9, 15, 55, 59, 0, time.UTC) + conn := connection.Connection{ + RemoteIP: "10.1.1.1", + RemotePort: 123, + LocalIP: "192.768.0.1", + LocalPort: 456, + Cookie: "12AB", + } + s.DontTrace(conn, nil) // No crash == success + + // Test Trace + out, err := s.Trace(conn, now) + if err != nil { + t.Error(err) + } + uuid, err := conn.UUID() + rtx.Must(err, "Could not make uuid") + expected := `{"UUID":"` + uuid + `","TracerouteCallerVersion":"` + prometheusx.GitShortCommit + `","CachedResult":false,"CachedUUID":""} +-I tracelb -P icmp-echo -q 3 -O ptr 10.1.1.1 -o- -O json +` + if strings.TrimSpace(out) != strings.TrimSpace(expected) { + t.Error("Bad output:", out) + } + contents, err := ioutil.ReadFile(dir + "/2003/11/09/20031109T155559Z_" + prefix.UnsafeString() + "_00000000000012AB.jsonl") + rtx.Must(err, "Could not read file") + if string(contents) != out { + t.Error("The contents of the file should equal the returned values from scraper") + } + + s.Binary = "false" + _, err = s.Trace(conn, now) + if err == nil { + t.Error("A failed call to the scamper binary should cause an error") + } + + s.Binary = "yes" + s.ScamperTimeout = time.Nanosecond + _, err = s.Trace(conn, now) + if err == nil { + t.Error("A timed-out call to the scamper binary should cause an error") + } +} + func TestCancelStopsDaemon(t *testing.T) { tempdir, err := ioutil.TempDir("", "CancelStopsDaemon") rtx.Must(err, "Could not create tempdir") defer os.RemoveAll(tempdir) d := ScamperDaemon{ // Let the shell use the path to discover these. - Binary: "scamper", + Scamper: &Scamper{ + Binary: "scamper", + OutputPath: tempdir, + ScamperTimeout: 1 * time.Minute, + }, AttachBinary: "sc_attach", Warts2JSONBinary: "sc_warts2json", ControlSocket: tempdir + "/ctrl", - OutputPath: tempdir, - ScamperTimeout: 1 * time.Minute, } d.DontTrace(connection.Connection{}, errors.New("")) ctx, cancel := context.WithCancel(context.Background()) @@ -78,12 +134,14 @@ func TestExistingFileStopsDaemonCreation(t *testing.T) { rtx.Must(ioutil.WriteFile(tempdir+"/ctrl", []byte("test"), 0666), "Could not create file") d := ScamperDaemon{ // Let the shell use the path to discover these. - Binary: "scamper", + Scamper: &Scamper{ + Binary: "scamper", + OutputPath: tempdir, + ScamperTimeout: 1 * time.Minute, + }, AttachBinary: "sc_attach", Warts2JSONBinary: "sc_warts2json", ControlSocket: tempdir + "/ctrl", - OutputPath: tempdir, - ScamperTimeout: 1 * time.Minute, } defer func() { @@ -108,10 +166,12 @@ func TestTraceWritesMeta(t *testing.T) { hostname = "testhostname" d := ScamperDaemon{ + Scamper: &Scamper{ + OutputPath: tempdir, + ScamperTimeout: 1 * time.Minute, + }, AttachBinary: "echo", Warts2JSONBinary: "cat", - OutputPath: tempdir, - ScamperTimeout: 1 * time.Minute, } c := connection.Connection{ @@ -154,10 +214,12 @@ func TestTraceTimeout(t *testing.T) { defer os.RemoveAll(tempdir) d := ScamperDaemon{ + Scamper: &Scamper{ + OutputPath: tempdir, + ScamperTimeout: 1 * time.Nanosecond, + }, AttachBinary: "yes", Warts2JSONBinary: "cat", - OutputPath: tempdir, - ScamperTimeout: 1 * time.Nanosecond, } defer func() { @@ -195,10 +257,12 @@ func TestCreateCacheTest(t *testing.T) { hostname = "testhostname" d := ScamperDaemon{ + Scamper: &Scamper{ + OutputPath: tempdir, + ScamperTimeout: 1 * time.Minute, + }, AttachBinary: "echo", Warts2JSONBinary: "cat", - OutputPath: tempdir, - ScamperTimeout: 1 * time.Minute, } c := connection.Connection{ @@ -269,10 +333,12 @@ func TestRecovery(t *testing.T) { hostname = "testhostname" d := ScamperDaemon{ + Scamper: &Scamper{ + OutputPath: tempdir, + ScamperTimeout: 1 * time.Minute, + }, AttachBinary: "echo", Warts2JSONBinary: "cat", - OutputPath: tempdir, - ScamperTimeout: 1 * time.Minute, } c := connection.Connection{