Skip to content

Commit

Permalink
Merge pull request #62 from m-lab/single-serving-scamper
Browse files Browse the repository at this point in the history
Set up scamper in single-serving mode.
  • Loading branch information
pboothe authored Dec 16, 2019
2 parents 966aa47 + 9728454 commit 8106ad9
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 69 deletions.
19 changes: 12 additions & 7 deletions caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
waitTime = flag.Duration("waitTime", 5*time.Second, "how long to wait between subsequent listings of open connections")
poll = flag.Bool("poll", true, "Whether the polling method should be used to see new connections.")
tracerType = flagx.Enum{
Options: []string{"paris-traceroute", "scamper", "scamper-with-paris-backup"},
Options: []string{"paris-traceroute", "scamper", "scamper-daemon", "scamper-daemon-with-paris-backup"},
Value: "scamper",
}

Expand Down Expand Up @@ -63,13 +63,16 @@ func main() {
promSrv := prometheusx.MustServeMetrics()
defer promSrv.Shutdown(ctx)

scamper := &tracer.Scamper{
Binary: *scamperBin,
OutputPath: *outputPath,
ScamperTimeout: *scamperTimeout,
}
scamperDaemon := &tracer.ScamperDaemon{
Binary: *scamperBin,
Scamper: scamper,
AttachBinary: *scattachBin,
Warts2JSONBinary: *scwarts2jsonBin,
OutputPath: *outputPath,
ControlSocket: *scamperCtrlSocket,
ScamperTimeout: *scamperTimeout,
}
parisTracer := &tracer.Paris{
Binary: *parisBin,
Expand All @@ -81,7 +84,11 @@ func main() {

// Set up the cache three different ways, depending on the trace method requested.
switch tracerType.Value {
case "paris-traceroute":
cache = ipcache.New(ctx, parisTracer, *ipcache.IPCacheTimeout, *ipcache.IPCacheUpdatePeriod)
case "scamper":
cache = ipcache.New(ctx, scamper, *ipcache.IPCacheTimeout, *ipcache.IPCacheUpdatePeriod)
case "scamper-daemon":
cache = ipcache.New(ctx, scamperDaemon, *ipcache.IPCacheTimeout, *ipcache.IPCacheUpdatePeriod)
wg.Add(1)
go func() {
Expand All @@ -90,9 +97,7 @@ func main() {
cancel()
wg.Done()
}()
case "paris-traceroute":
cache = ipcache.New(ctx, parisTracer, *ipcache.IPCacheTimeout, *ipcache.IPCacheUpdatePeriod)
case "scamper-with-paris-backup":
case "scamper-daemon-with-paris-backup":
cache = ipcache.New(ctx, scamperDaemon, *ipcache.IPCacheTimeout, *ipcache.IPCacheUpdatePeriod)
wg.Add(1)
go func() {
Expand Down
24 changes: 23 additions & 1 deletion caller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,28 @@ func TestMain(t *testing.T) {
*outputPath = dir
*poll = true
*scamperBin = "scamper"
tracerType.Value = "scamper-daemon"
ctx, cancel = context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Second)
cancel()
}()
main()
}

func TestScamper(t *testing.T) {
dir, err := ioutil.TempDir("", "TestMain")
rtx.Must(err, "Could not create temp dir")
defer os.RemoveAll(dir)

// Verify that main doesn't crash, and that it does exit when the context is canceled.
// TODO: verify more in this test.
*prometheusx.ListenAddress = ":0"
*scamperCtrlSocket = ""
*waitTime = time.Nanosecond // Run through the loop a few times.
*outputPath = dir
*poll = true
*scamperBin = "scamper"
tracerType.Value = "scamper"
ctx, cancel = context.WithCancel(context.Background())
go func() {
Expand Down Expand Up @@ -77,7 +99,7 @@ func TestMainWithBackupPT(t *testing.T) {
*poll = false
*scamperCtrlSocket = dir + "/scamper.sock"
*scamperBin = "false"
tracerType.Value = "scamper-with-paris-backup"
tracerType.Value = "scamper-daemon-with-paris-backup"

ctx, cancel = context.WithCancel(context.Background())
go srv.Serve(ctx)
Expand Down
148 changes: 101 additions & 47 deletions tracer/scamper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
Expand All @@ -20,6 +21,98 @@ import (
pipe "gopkg.in/m-lab/pipe.v3"
)

// Scamper uses scamper in non-daemon mode to perform traceroutes. This is much
// less efficient, but when scamper crashes, it has a much lower "blast radius".
type Scamper struct {
Binary, OutputPath string
ScamperTimeout time.Duration
}

// generatesFilename creates the string filename for storing the data.
func (*Scamper) generateFilename(cookie string, t time.Time) string {
c, err := strconv.ParseInt(cookie, 16, 64)
rtx.PanicOnError(err, "Could not turn cookie into number")
return t.Format("20060102T150405Z") + "_" + uuid.FromCookie(uint64(c)) + ".jsonl"
}

// TraceFromCachedTrace creates a file containing traceroute results that came from a
// cache result, rather than performing the traceroute with scamper. Because
// scamper-in-standalone and scamper-as-daemon use the same output format, this
// function is the same code for both.
func (s *Scamper) TraceFromCachedTrace(conn connection.Connection, t time.Time, cachedTest string) error {
dir, err := createTimePath(s.OutputPath, t)
if err != nil {
log.Println("Could not create directories")
tracerCacheErrors.WithLabelValues("scamper", "baddir").Inc()
return err
}
filename := dir + s.generateFilename(conn.Cookie, t)
log.Println("Starting a cached trace to be put in", filename)

// remove the first line of cachedTest
split := strings.Index(cachedTest, "\n")

if split <= 0 || split == len(cachedTest) {
log.Println("Invalid cached test")
tracerCacheErrors.WithLabelValues("scamper", "badcache").Inc()
return errors.New("Invalid cached test")
}

// Get the uuid from the first line of cachedTest
newTest := GetMetaline(conn, true, extractUUID(cachedTest[:split])) + cachedTest[split+1:]
return ioutil.WriteFile(filename, []byte(newTest), 0666)
}

// DontTrace does not perform a trace that would have been performed, had the
// previous round not already returned an error. This should increment a counter
// that tracks the number of tests which have been "transitively failed".
func (*Scamper) DontTrace(conn connection.Connection, err error) {
tracesNotPerformed.WithLabelValues("scamper").Inc()
}

// Trace starts a new scamper process running the paris-traceroute algorithm to
// every node. This uses more resources per-traceroute, but segfaults in the
// called binaries have a much smaller "blast radius".
func (s *Scamper) Trace(conn connection.Connection, t time.Time) (out string, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Recovered (%v) a crashed trace for %v at %v\n", r, conn, t)
crashedTraces.WithLabelValues("scamper").Inc()
err = errors.New(fmt.Sprint(r))
}
}()
tracesInProgress.WithLabelValues("scamper").Inc()
defer tracesInProgress.WithLabelValues("scamper").Dec()
return s.trace(conn, t)
}

// trace a single connection using scamper as a standalone binary.
func (s *Scamper) trace(conn connection.Connection, t time.Time) (string, error) {
dir, err := createTimePath(s.OutputPath, t)
rtx.PanicOnError(err, "Could not create directory")
filename := dir + s.generateFilename(conn.Cookie, t)
log.Println("Starting a trace to be put in", filename)
buff := bytes.Buffer{}

_, err = buff.WriteString(GetMetaline(conn, false, ""))
rtx.PanicOnError(err, "Could not write to buffer")

cmd := pipe.Line(
pipe.Exec(s.Binary, "-I", "tracelb -P icmp-echo -q 3 -O ptr "+conn.RemoteIP, "-o-", "-O", "json"),
pipe.Write(&buff),
)
err = pipe.RunTimeout(cmd, s.ScamperTimeout)
tracesPerformed.WithLabelValues("scamper").Inc()
if err != nil && err.Error() == pipe.ErrTimeout.Error() {
log.Println("TimeOut for Trace: ", cmd)
return "", err
}

rtx.PanicOnError(err, "Command %v failed", cmd)
rtx.PanicOnError(ioutil.WriteFile(filename, buff.Bytes(), 0666), "Could not save output to file")
return string(buff.Bytes()), nil
}

// ScamperDaemon contains a single instance of a scamper process. Once the ScamperDaemon has
// been started, you can call Trace and then all traces will be centrally run
// and managed.
Expand All @@ -29,8 +122,8 @@ import (
// all traces are centrally managed, so if the central daemon goes wrong for
// some reason, there is a much larger blast radius.
type ScamperDaemon struct {
Binary, AttachBinary, Warts2JSONBinary, ControlSocket, OutputPath string
ScamperTimeout time.Duration
*Scamper
AttachBinary, Warts2JSONBinary, ControlSocket string
}

// MustStart starts a scamper binary running and listening to the given context.
Expand Down Expand Up @@ -76,15 +169,16 @@ func (d *ScamperDaemon) MustStart(ctx context.Context) {
// All checks inside of this function and its subfunctions should call
// PanicOnError instead of Must because each trace is independent of the others,
// so we should prevent a single failed trace from crashing everything.
func (d *ScamperDaemon) Trace(conn connection.Connection, t time.Time) (string, error) {
func (d *ScamperDaemon) Trace(conn connection.Connection, t time.Time) (out string, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Recovered (%v) a crashed trace for %v at %v\n", r, conn, t)
crashedTraces.WithLabelValues("scamper").Inc()
crashedTraces.WithLabelValues("scamper-daemon").Inc()
err = errors.New(fmt.Sprint(r))
}
}()
tracesInProgress.WithLabelValues("scamper").Inc()
defer tracesInProgress.WithLabelValues("scamper").Dec()
tracesInProgress.WithLabelValues("scamper-daemon").Inc()
defer tracesInProgress.WithLabelValues("scamper-daemon").Dec()
return d.trace(conn, t)
}

Expand All @@ -96,39 +190,6 @@ func (d *ScamperDaemon) TraceAll(connections []connection.Connection) {
}
}

// generatesFilename creates the string filename for storing the data.
func (*ScamperDaemon) generateFilename(cookie string, t time.Time) string {
c, err := strconv.ParseInt(cookie, 16, 64)
rtx.PanicOnError(err, "Could not turn cookie into number")
return t.Format("20060102T150405Z") + "_" + uuid.FromCookie(uint64(c)) + ".jsonl"
}

// TraceFromCachedTrace creates a file containing traceroute results that came from a
// cache result, rather than performing the traceroute with scamper.
func (d *ScamperDaemon) TraceFromCachedTrace(conn connection.Connection, t time.Time, cachedTest string) error {
dir, err := createTimePath(d.OutputPath, t)
if err != nil {
log.Println("Could not create directories")
tracerCacheErrors.WithLabelValues("scamper", "baddir").Inc()
return err
}
filename := dir + d.generateFilename(conn.Cookie, t)
log.Println("Starting a cached trace to be put in", filename)

// remove the first line of cachedTest
split := strings.Index(cachedTest, "\n")

if split <= 0 || split == len(cachedTest) {
log.Println("Invalid cached test")
tracerCacheErrors.WithLabelValues("scamper", "badcache").Inc()
return errors.New("Invalid cached test")
}

// Get the uuid from the first line of cachedTest
newTest := GetMetaline(conn, true, extractUUID(cachedTest[:split])) + cachedTest[split+1:]
return ioutil.WriteFile(filename, []byte(newTest), 0666)
}

func (d *ScamperDaemon) trace(conn connection.Connection, t time.Time) (string, error) {
dir, err := createTimePath(d.OutputPath, t)
rtx.PanicOnError(err, "Could not create directory")
Expand All @@ -149,7 +210,7 @@ func (d *ScamperDaemon) trace(conn connection.Connection, t time.Time) (string,
pipe.Write(&buff),
)
err = pipe.RunTimeout(cmd, d.ScamperTimeout)
tracesPerformed.WithLabelValues("scamper").Inc()
tracesPerformed.WithLabelValues("scamper-daemon").Inc()
if err != nil && err.Error() == pipe.ErrTimeout.Error() {
log.Println("TimeOut for Trace: ", cmd)
return "", err
Expand All @@ -159,10 +220,3 @@ func (d *ScamperDaemon) trace(conn connection.Connection, t time.Time) (string,
rtx.PanicOnError(ioutil.WriteFile(filename, buff.Bytes(), 0666), "Could not save output to file")
return string(buff.Bytes()), nil
}

// DontTrace does not perform a trace that would have been performed, had the
// previous round not already returned an error. This should increment a counter
// that tracks the number of tests which have been "transitively failed".
func (d *ScamperDaemon) DontTrace(conn connection.Connection, err error) {
tracesNotPerformed.WithLabelValues("scamper").Inc()
}
Loading

0 comments on commit 8106ad9

Please sign in to comment.