diff --git a/cmd/bucky-fill/main.go b/cmd/bucky-fill/main.go index b4a2e1d..734437e 100644 --- a/cmd/bucky-fill/main.go +++ b/cmd/bucky-fill/main.go @@ -9,8 +9,8 @@ import ( "strings" ) -import "github.com/jjneely/buckytools" -import "github.com/jjneely/buckytools/fill" +import "github.com/Civil/buckytools" +import "github.com/Civil/buckytools/fill" var ( deleteSourceFiles bool diff --git a/cmd/bucky-isempty/main.go b/cmd/bucky-isempty/main.go index a581332..2020d4c 100644 --- a/cmd/bucky-isempty/main.go +++ b/cmd/bucky-isempty/main.go @@ -11,9 +11,9 @@ import ( ) import ( - "github.com/jjneely/buckytools" - "github.com/jjneely/buckytools/metrics" - "github.com/jjneely/buckytools/whisper" + "github.com/Civil/buckytools" + "github.com/Civil/buckytools/metrics" + "github.com/Civil/buckytools/whisper" ) // Command Line Flags diff --git a/cmd/bucky-sparsify/main.go b/cmd/bucky-sparsify/main.go index 3a22277..ec9fe7a 100644 --- a/cmd/bucky-sparsify/main.go +++ b/cmd/bucky-sparsify/main.go @@ -12,8 +12,8 @@ import ( ) import ( - "github.com/jjneely/buckytools" - "github.com/jjneely/buckytools/lock" + "github.com/Civil/buckytools" + "github.com/Civil/buckytools/lock" ) // Command Line Flags diff --git a/cmd/bucky/backfill.go b/cmd/bucky/backfill.go index 1181c1b..8fe60ba 100644 --- a/cmd/bucky/backfill.go +++ b/cmd/bucky/backfill.go @@ -10,7 +10,7 @@ import ( "time" ) -// import "github.com/jjneely/buckytools/hashing" +// import "github.com/Civil/buckytools/hashing" type MigrateWork struct { oldName string diff --git a/cmd/bucky/cluster.go b/cmd/bucky/cluster.go index 4af0585..ce12c47 100644 --- a/cmd/bucky/cluster.go +++ b/cmd/bucky/cluster.go @@ -6,7 +6,7 @@ import ( "net" ) -import "github.com/jjneely/buckytools/hashing" +import "github.com/Civil/buckytools/hashing" type ClusterConfig struct { // Port is the port remote buckyd daemons listen on diff --git a/cmd/bucky/common.go b/cmd/bucky/common.go index c3c7d40..97f062f 100644 --- a/cmd/bucky/common.go +++ b/cmd/bucky/common.go @@ -15,8 +15,8 @@ import ( import "github.com/golang/snappy" -import . "github.com/jjneely/buckytools/metrics" -import "github.com/jjneely/buckytools/hashing" +import . "github.com/Civil/buckytools/metrics" +import "github.com/Civil/buckytools/hashing" // HostPort is a convenience variable for sub-commands. This holds the // HOST:PORT to connect to if SetupHostname() is called in init() diff --git a/cmd/bucky/help.go b/cmd/bucky/help.go index b20a109..240fae7 100644 --- a/cmd/bucky/help.go +++ b/cmd/bucky/help.go @@ -5,7 +5,7 @@ import ( "os" ) -import . "github.com/jjneely/buckytools" +import . "github.com/Civil/buckytools" func init() { usage := "[sub-command]" diff --git a/cmd/bucky/inconsistent.go b/cmd/bucky/inconsistent.go index d25ca47..67bfcae 100644 --- a/cmd/bucky/inconsistent.go +++ b/cmd/bucky/inconsistent.go @@ -11,7 +11,7 @@ import ( "time" ) -// import "github.com/jjneely/buckytools/hashing" +// import "github.com/Civil/buckytools/hashing" func init() { usage := "[options]" diff --git a/cmd/bucky/main.go b/cmd/bucky/main.go index c8fa46a..bf71a84 100644 --- a/cmd/bucky/main.go +++ b/cmd/bucky/main.go @@ -9,7 +9,7 @@ import ( "strings" ) -import . "github.com/jjneely/buckytools" +import . "github.com/Civil/buckytools" // We use STDIN and STDOUT as much as possible for handing lists, and // other data. Status, errors, and other data not related to pushing diff --git a/cmd/bucky/restore.go b/cmd/bucky/restore.go index e420881..f7b5936 100644 --- a/cmd/bucky/restore.go +++ b/cmd/bucky/restore.go @@ -11,7 +11,7 @@ import ( "sync" ) -import . "github.com/jjneely/buckytools/metrics" +import . "github.com/Civil/buckytools/metrics" var tarPrefix string diff --git a/cmd/bucky/stat.go b/cmd/bucky/stat.go index 85223ab..583b811 100644 --- a/cmd/bucky/stat.go +++ b/cmd/bucky/stat.go @@ -11,7 +11,7 @@ import ( "time" ) -import . "github.com/jjneely/buckytools/metrics" +import . "github.com/Civil/buckytools/metrics" func init() { usage := "[options] " diff --git a/cmd/bucky/tar.go b/cmd/bucky/tar.go index 15e51ac..7476ce8 100644 --- a/cmd/bucky/tar.go +++ b/cmd/bucky/tar.go @@ -14,7 +14,7 @@ import ( ) import "github.com/golang/crypto/ssh/terminal" -import "github.com/jjneely/buckytools/metrics" +import "github.com/Civil/buckytools/metrics" var metricWorkers int var workerErrors bool diff --git a/cmd/buckyd/main.go b/cmd/buckyd/main.go index e784912..b812543 100644 --- a/cmd/buckyd/main.go +++ b/cmd/buckyd/main.go @@ -10,9 +10,9 @@ import ( "strings" ) -import . "github.com/jjneely/buckytools" -import "github.com/jjneely/buckytools/metrics" -import "github.com/jjneely/buckytools/hashing" +import . "github.com/Civil/buckytools" +import "github.com/Civil/buckytools/metrics" +import "github.com/Civil/buckytools/hashing" var metricsCache *metrics.MetricsCacheType var tmpDir string diff --git a/cmd/buckyd/metrics.go b/cmd/buckyd/metrics.go index 0581322..679f5d0 100644 --- a/cmd/buckyd/metrics.go +++ b/cmd/buckyd/metrics.go @@ -15,8 +15,8 @@ import ( import "github.com/golang/snappy" -import . "github.com/jjneely/buckytools/metrics" -import "github.com/jjneely/buckytools/fill" +import . "github.com/Civil/buckytools/metrics" +import "github.com/Civil/buckytools/fill" // listMetrics retrieves a list of metrics on the localhost and sends // it to the client. diff --git a/cmd/findhash/main.go b/cmd/findhash/main.go index 75af3f8..1d629c1 100644 --- a/cmd/findhash/main.go +++ b/cmd/findhash/main.go @@ -11,7 +11,7 @@ import ( "strings" ) -import "github.com/jjneely/buckytools/hashing" +import "github.com/Civil/buckytools/hashing" import "github.com/pborman/uuid" func getConfig(file string) []string { diff --git a/datapoints.go b/datapoints.go index 1bbf328..7eae76b 100644 --- a/datapoints.go +++ b/datapoints.go @@ -6,7 +6,7 @@ import ( "time" ) -import "github.com/jjneely/buckytools/whisper" +import "github.com/Civil/buckytools/whisper" // FindValidDataPoints does a backwards walk through time to examine the // highest resolution data for each archive / time period. We collect valid diff --git a/fill/fill.go b/fill/fill.go index 92008dc..c6f1f78 100644 --- a/fill/fill.go +++ b/fill/fill.go @@ -6,7 +6,7 @@ import ( "time" ) -import "github.com/jjneely/buckytools/whisper" +import "github.com/Civil/buckytools/whisper" // fillArchive() is a private function that fills data points from srcWSP // into dstWsp. Used by FIll() @@ -54,7 +54,7 @@ func fillArchive(srcWsp, dstWsp *whisper.Whisper, start, stop int) error { } tsStart += ts.Step() } - dstWsp.UpdateMany(points) + dstWsp.UpdateManyWithRetention(points, v.MaxRetention()) stop = fromTime if start >= stop { diff --git a/fill/fill_test.go b/fill/fill_test.go index 40dc6a2..73435ee 100644 --- a/fill/fill_test.go +++ b/fill/fill_test.go @@ -11,7 +11,7 @@ import ( "time" ) -import "github.com/jjneely/buckytools/whisper" +import "github.com/Civil/buckytools/whisper" func whisperCreateData(path string, ts []*whisper.TimeSeriesPoint) error { os.Remove(path) // Don't care if it fails @@ -113,7 +113,9 @@ func validateWhisper(path string, ts []*whisper.TimeSeriesPoint) error { } defer wsp.Close() - wspData, err := wsp.Fetch(0, int(time.Now().Unix())) + now := int(time.Now().Unix()) + fromTime := now - wsp.Retentions()[0].MaxRetention() + wspData, err := wsp.Fetch(fromTime, now) if err != nil { return err } @@ -151,8 +153,11 @@ func fetchFromFile(path string) ([]*whisper.TimeSeriesPoint, error) { } defer wsp.Close() + now := int(time.Now().Unix()) + fromTime := now - wsp.Retentions()[0].MaxRetention() + // Parse and fetch data from it - ts, err := wsp.Fetch(0, int(time.Now().Unix())) + ts, err := wsp.Fetch(fromTime, now) if err != nil { return tsp, err } @@ -164,7 +169,7 @@ func simulateFill(a, b []*whisper.TimeSeriesPoint) []*whisper.TimeSeriesPoint { // Assume that we are simulating the fill operation on WSP DBs created // with the above functions. // This is a shallow copy operation. - dataMerged := make([]*whisper.TimeSeriesPoint, 30) + dataMerged := make([]*whisper.TimeSeriesPoint, len(b)) // copy everything in b over to our return value copy(dataMerged, b) gapstart := -1 @@ -240,6 +245,155 @@ func TestFill(t *testing.T) { fmt.Println() } +func whisperCreateDataMany(path string, ts []*whisper.TimeSeriesPoint) error { + os.Remove(path) // Don't care if it fails + retentions, err := whisper.ParseRetentionDefs("1m:30m,5m:60m,15m:150m") + if err != nil { + return err + } + wsp, err := whisper.Create(path, retentions, whisper.Sum, 0) + if err != nil { + return err + } + defer wsp.Close() + + // Iterate through the slice so we can support null values + wsp.UpdateMany(ts) + + return nil +} + +func whisperCreateNullsManyArchives(path string) ([]*whisper.TimeSeriesPoint, error) { + values := []float64{ + math.NaN(), + math.NaN(), + math.NaN(), + 0.0, + 7.0, + 1.0, + 7.0, + 2.0, + 9.0, + 9.0, + 9.0, + 4.0, + 3.0, + 2.0, + 7.0, + 5.0, + 1.0, + 9.0, + 4.0, + 4.0, + 1.0, + 5.0, + 5.0, + 8.0, + 4.0, + 2.0, + 6.0, + 0.0, + math.NaN(), + math.NaN(), + } + + now := int(time.Now().Unix()) + ts := make([]*whisper.TimeSeriesPoint, 0, len(values)) + for _, v := range values { + ts = append(ts, &whisper.TimeSeriesPoint{Value: v, Time: now}) + now -= 60 + } + + os.Remove(path) // Don't care if it fails + retentions, err := whisper.ParseRetentionDefs("1m:30m,5m:60m,15m:150m") + if err != nil { + return ts, err + } + wsp, err := whisper.Create(path, retentions, whisper.Sum, 0) + if err != nil { + return ts, err + } + defer wsp.Close() + + for _, point := range ts { + _ = wsp.Update(point.Value, point.Time) + } + + return ts, nil +} + +func TestTwoArchives(t *testing.T) { + dataC, err := whisperCreateNullsManyArchives("c1.wsp") + if err != nil { + t.Fatal(err) + } + + // Create an identical set of test data + err = whisperCreateDataMany("c2.wsp", dataC) + if err != nil { + t.Fatal(err) + } + + err = whisperCreateDataMany("d1.wsp", dataC) + if err != nil { + t.Fatal(err) + } + + err = whisperCreateDataMany("d2.wsp", dataC) + if err != nil { + t.Fatal(err) + } + + // whisper-fill.py needs to be in the PATH somewhere + log.Println("Running whisper-fill.py...") + c := exec.Command("whisper-fill.py", "c1.wsp", "d1.wsp") + + reference_err := c.Run() + pythonFill, err := fetchFromFile("d1.wsp") + + // Run my version + err = Files("c2.wsp", "d2.wsp", int(time.Now().Unix())) + if err != nil { + t.Error(err) + } + goFill, err := fetchFromFile("d2.wsp") + if err != nil { + t.Error(err) + } + + // Compare to what we think our version should be + simuFill := simulateFill(dataC, dataC) + + err = validateWhisper("d2.wsp", simuFill) + if err != nil { + t.Error(err) + } + + // Validate the reference if whisper-fill.py was found + if reference_err == nil { + err = validateWhisper("d1.wsp", simuFill) + if err != nil { + t.Error(err) + } + } + + if len(goFill) != len(pythonFill) { + t.Fatalf("length mismatch, python=%v, go=%v, expected=%v", len(goFill), len(pythonFill)) + } + + // Now try to print out a table of C, D, Python, Go, Simu + fmt.Printf("C \tD \tPython\tGo \tSimu\n") + fmt.Printf("======\t======\t======\t======\t======\n") + for i := 0; i < len(goFill); i++ { + if reference_err != nil { + fmt.Printf("%6.1f\t%6.1f\t%6.1f\t%6.1f\t%6.1f\n", dataC[i].Value, dataC[i].Value, math.NaN(), goFill[i].Value, simuFill[i].Value) + } else { + fmt.Printf("%6.1f\t%6.1f\t%6.1f\t%6.1f\t%6.1f\n", dataC[i].Value, dataC[i].Value, pythonFill[i].Value, goFill[i].Value, simuFill[i].Value) + } + } + fmt.Println() +} + func TestReference(t *testing.T) { // Create our random test data dataA, err := whisperCreate("a1.wsp") diff --git a/whisper/whisper.go b/whisper/whisper.go index 42b5568..e1c42f5 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -369,7 +369,9 @@ func (whisper *Whisper) Update(value float64, timestamp int) (err error) { return nil } -func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) { +// UpdateManyWithRetention updates only archive with specified retention. +// retention = -1 means "update all possible archives" +func (whisper *Whisper) UpdateManyWithRetention(points []*TimeSeriesPoint, retention int) { // sort the points, newest first sort.Sort(timeSeriesPointsNewestFirst{points}) @@ -377,6 +379,9 @@ func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) { var currentPoints []*TimeSeriesPoint for _, archive := range whisper.archives { + if retention != -1 && retention != archive.MaxRetention() { + continue + } currentPoints, points = extractPoints(points, now, archive.MaxRetention()) if len(currentPoints) == 0 { continue @@ -393,6 +398,10 @@ func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) { } } +func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) { + whisper.UpdateManyWithRetention(points, -1) +} + func (whisper *Whisper) archiveUpdateMany(archive *archiveInfo, points []*TimeSeriesPoint) { alignedPoints := alignPoints(archive, points) intervals, packedBlocks := packSequences(archive, alignedPoints)