Skip to content

Commit

Permalink
Export metrics about metric ingestion (siglens#2036)
Browse files Browse the repository at this point in the history
* Add metrics to track metric ingestion

* Implement gauge for number of metrics

* Implement gauge for number of series

* Implement gauges for tag keys and values

* Implement gauge for number of datapoints

* Implement gauge for metric on-disk bytes

* Cleanup

* Compute more expensive metrics every 15 minutes

---------

Co-authored-by: Kunal Nawale <[email protected]>
  • Loading branch information
AndrewHess and nkunal authored Jan 7, 2025
1 parent a812492 commit cc6d605
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 39 deletions.
14 changes: 14 additions & 0 deletions pkg/common/fileutils/fileutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,17 @@ func DoesFileExist(filePath string) bool {
_, err := os.Stat(filePath)
return err == nil
}

func GetDirSize(path string) (uint64, error) {
var size uint64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += uint64(info.Size())
}
return nil
})
return size, err
}
37 changes: 37 additions & 0 deletions pkg/common/fileutils/fileutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fileutils
import (
"os"
"path/filepath"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -90,3 +91,39 @@ func Test_GetAllFilesWithSpecificExtensions(t *testing.T) {
assert.Equal(t, 4, len(retVal))
assert.ElementsMatch(t, expectedPaths, retVal)
}

func Test_GetDirSize_empty(t *testing.T) {
tmpDir := t.TempDir()
size, err := GetDirSize(tmpDir)
assert.NoError(t, err)
assert.Equal(t, int64(0), int64(size))
}

func Test_GetDirSize_nonExistent(t *testing.T) {
size, err := GetDirSize("non-existent")
assert.Error(t, err)
assert.Equal(t, int64(0), int64(size))
}

func Test_GetDirSize(t *testing.T) {
tmpDir := t.TempDir()
file1Size := 100
file2Size := 200
file3Size := 300
expectedSize := int64(file1Size + file2Size + file3Size)

rootFile := filepath.Join(tmpDir, "root.txt")
assert.NoError(t, os.WriteFile(rootFile, []byte(strings.Repeat("a", file1Size)), 0644))

nestedDir := filepath.Join(tmpDir, "nested")
assert.NoError(t, os.Mkdir(nestedDir, 0755))

file1 := filepath.Join(nestedDir, "file1.txt")
file2 := filepath.Join(nestedDir, "file2.txt")
assert.NoError(t, os.WriteFile(file1, []byte(strings.Repeat("b", file2Size)), 0644))
assert.NoError(t, os.WriteFile(file2, []byte(strings.Repeat("c", file3Size)), 0644))

size, err := GetDirSize(tmpDir)
assert.NoError(t, err)
assert.Equal(t, expectedSize, int64(size))
}
103 changes: 101 additions & 2 deletions pkg/ingest/ingestionStats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@
package ingest

import (
"path/filepath"
"time"

dtu "github.com/siglens/siglens/pkg/common/dtypeutils"
"github.com/siglens/siglens/pkg/common/fileutils"
"github.com/siglens/siglens/pkg/config"
"github.com/siglens/siglens/pkg/instrumentation"
rutils "github.com/siglens/siglens/pkg/readerUtils"
"github.com/siglens/siglens/pkg/segment/query"
"github.com/siglens/siglens/pkg/segment/query/summary"
segwriter "github.com/siglens/siglens/pkg/segment/writer"

log "github.com/sirupsen/logrus"
)

func InitIngestionMetrics() {
go ingestionMetricsLooper()
go metricsLooper()
}

func ingestionMetricsLooper() {
Expand Down Expand Up @@ -65,6 +72,98 @@ func ingestionMetricsLooper() {

instrumentation.SetTotalEventCount(currentEventCount)
instrumentation.SetTotalBytesReceived(currentBytesReceived)
instrumentation.SetTotalOnDiskBytes(currentOnDiskBytes)
instrumentation.SetTotalLogOnDiskBytes(currentOnDiskBytes)
}
}

func metricsLooper() {
oneMinuteTicker := time.NewTicker(1 * time.Minute)
fifteenMinuteTicker := time.NewTicker(15 * time.Minute)
for {
select {
case <-oneMinuteTicker.C:
setNumMetricNames()
setMetricOnDiskBytes()
case <-fifteenMinuteTicker.C:
setNumSeries()
setNumKeysAndValues()
}
}
}

func setNumMetricNames() {
allPreviousTime := &dtu.MetricsTimeRange{
StartEpochSec: 0,
EndEpochSec: uint32(time.Now().Unix()),
}
names, err := query.GetAllMetricNamesOverTheTimeRange(allPreviousTime, 0)
if err != nil {
log.Errorf("setNumMetricNames: failed to get all metric names: %v", err)
return
}

instrumentation.SetTotalMetricNames(int64(len(names)))
}

func setNumSeries() {
allPreviousTime := &dtu.MetricsTimeRange{
StartEpochSec: 0,
EndEpochSec: uint32(time.Now().Unix()),
}
numSeries, err := query.GetSeriesCardinalityOverTimeRange(allPreviousTime, 0)
if err != nil {
log.Errorf("setNumSeries: failed to get all series: %v", err)
return
}

instrumentation.SetTotalTimeSeries(int64(numSeries))
}

func setNumKeysAndValues() {
allPreviousTime := &dtu.MetricsTimeRange{
StartEpochSec: 0,
EndEpochSec: uint32(time.Now().Unix()),
}
myid := uint64(0)
querySummary := summary.InitQuerySummary(summary.METRICS, rutils.GetNextQid())
defer querySummary.LogMetricsQuerySummary(myid)
tagsTreeReaders, err := query.GetAllTagsTreesWithinTimeRange(allPreviousTime, myid, querySummary)
if err != nil {
log.Errorf("setNumKeysAndValues: failed to get tags trees: %v", err)
return
}

keys := make(map[string]struct{})
values := make(map[string]struct{})
for _, segmentTagTreeReader := range tagsTreeReaders {
segmentTagPairs := segmentTagTreeReader.GetAllTagPairs()

for key, valueSet := range segmentTagPairs {
keys[key] = struct{}{}
for value := range valueSet {
values[value] = struct{}{}
}
}
}

instrumentation.SetTotalTagKeyCount(int64(len(keys)))
instrumentation.SetTotalTagValueCount(int64(len(values)))
}

func setMetricOnDiskBytes() {
tagsTreeHolderDir := filepath.Join(config.GetDataPath(), config.GetHostID(), "final", "tth")
tagsTreeHolderSize, err := fileutils.GetDirSize(tagsTreeHolderDir)
if err != nil {
log.Errorf("setMetricOnDiskBytes: failed to get tags tree holder size: %v", err)
return
}

timeSeriesDir := filepath.Join(config.GetDataPath(), config.GetHostID(), "final", "ts")
timeSeriesSize, err := fileutils.GetDirSize(timeSeriesDir)
if err != nil {
log.Errorf("setMetricOnDiskBytes: failed to get time series size: %v", err)
return
}

instrumentation.SetTotalMetricOnDiskBytes(int64(tagsTreeHolderSize + timeSeriesSize))
}
60 changes: 51 additions & 9 deletions pkg/instrumentation/ssgauges.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,39 @@ type Gauge int
const (
TotalEventCount Gauge = iota + 1
TotalBytesReceived
TotalOnDiskBytes
TotalLogOnDiskBytes
TotalMetricOnDiskBytes
TotalSegstoreCount
TotalSegmentMicroindexCount
TotalEventsSearched
TotalEventsMatched
TotalMetricNames
TotalTimeSeries
PastMinuteNumDataPoints
TotalTagKeyCount
TotalTagValueCount
)

var allSimpleGauges = map[Gauge]*simpleInt64Gauge{
TotalEventCount: {
name: "ss.current.event.count",
name: "ss.total.event.count",
unit: "count",
description: "Current total number of events",
description: "Total number of events",
},
TotalBytesReceived: {
name: "ss.current.bytes.received",
name: "ss.total.bytes.received",
unit: "bytes",
description: "Current count of bytes received",
description: "Total number of bytes received",
},
TotalOnDiskBytes: {
name: "ss.current.on.disk.bytes",
TotalLogOnDiskBytes: {
name: "ss.total.logs.on.disk.bytes",
unit: "bytes",
description: "Current number of bytes on disk",
description: "Total number of bytes on disk for log data",
},
TotalMetricOnDiskBytes: {
name: "ss.total.metrics.on.disk.bytes",
unit: "bytes",
description: "Total number of metric bytes on disk",
},
TotalSegstoreCount: {
name: "ss.current.segstore.count",
Expand All @@ -107,16 +118,47 @@ var allSimpleGauges = map[Gauge]*simpleInt64Gauge{
unit: "count",
description: "Current number of events matched",
},
TotalMetricNames: {
name: "ss.total.metric.names",
unit: "count",
description: "Total number of metric names",
},
TotalTimeSeries: {
name: "ss.total.time.series",
unit: "count",
description: "Total number of time series",
},
PastMinuteNumDataPoints: {
name: "ss.past.minute.num.data.points",
unit: "count",
description: "Number of metric data points ingested in the past minute",
},
TotalTagKeyCount: {
name: "ss.total.tag.key.count",
unit: "count",
description: "Total number of tag keys",
},
TotalTagValueCount: {
name: "ss.total.tag.value.count",
unit: "count",
description: "Total number of tag values",
},
}

var (
SetTotalEventCount = makeGaugeSetter(TotalEventCount)
SetTotalBytesReceived = makeGaugeSetter(TotalBytesReceived)
SetTotalOnDiskBytes = makeGaugeSetter(TotalOnDiskBytes)
SetTotalLogOnDiskBytes = makeGaugeSetter(TotalLogOnDiskBytes)
SetTotalMetricOnDiskBytes = makeGaugeSetter(TotalMetricOnDiskBytes)
SetTotalSegstoreCount = makeGaugeSetter(TotalSegstoreCount)
SetTotalSegmentMicroindexCount = makeGaugeSetter(TotalSegmentMicroindexCount)
SetTotalEventsSearched = makeGaugeSetter(TotalEventsSearched)
SetTotalEventsMatched = makeGaugeSetter(TotalEventsMatched)
SetTotalMetricNames = makeGaugeSetter(TotalMetricNames)
SetTotalTimeSeries = makeGaugeSetter(TotalTimeSeries)
SetPastMinuteNumDataPoints = makeGaugeSetter(PastMinuteNumDataPoints)
SetTotalTagKeyCount = makeGaugeSetter(TotalTagKeyCount)
SetTotalTagValueCount = makeGaugeSetter(TotalTagValueCount)
)

func init() {
Expand Down
29 changes: 3 additions & 26 deletions pkg/integrations/prometheus/promql/metricsSearchHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,36 +955,13 @@ func ProcessGetMetricSeriesCardinalityRequest(ctx *fasthttp.RequestCtx, myid uin
return
}

querySummary := summary.InitQuerySummary(summary.METRICS, rutils.GetNextQid())
defer querySummary.LogMetricsQuerySummary(myid)
tagsTreeReaders, err := query.GetAllTagsTreesWithinTimeRange(timeRange, myid, querySummary)
cardinality, err := query.GetSeriesCardinalityOverTimeRange(timeRange, myid)
if err != nil {
utils.SendInternalError(ctx, "Failed to search metrics", "Failed to get tags trees", err)
return
}

tagKeys := make(map[string]struct{})
for _, segmentTagTreeReader := range tagsTreeReaders {
tagKeys = utils.MergeMaps(tagKeys, segmentTagTreeReader.GetAllTagKeys())
}

allTsids := structs.CreateNewHll()
for _, segmentTagTreeReader := range tagsTreeReaders {
for tagKey := range tagKeys {
tsids, err := segmentTagTreeReader.GetTSIDsForKey(tagKey)
if err != nil {
utils.SendInternalError(ctx, "Failed to search metrics", fmt.Sprintf("Failed to get tsids for key %v", tagKey), err)
return
}

for tsid := range tsids {
allTsids.AddRaw(tsid)
}
}
utils.SendInternalError(ctx, "Failed to compute cardinality", "", err)
}

output := outputStruct{
SeriesCardinality: allTsids.Cardinality(),
SeriesCardinality: cardinality,
}

ctx.SetStatusCode(fasthttp.StatusOK)
Expand Down
34 changes: 34 additions & 0 deletions pkg/segment/query/metricsquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cespare/xxhash"
dtu "github.com/siglens/siglens/pkg/common/dtypeutils"
"github.com/siglens/siglens/pkg/config"
rutils "github.com/siglens/siglens/pkg/readerUtils"
segmetadata "github.com/siglens/siglens/pkg/segment/metadata"
"github.com/siglens/siglens/pkg/segment/query/summary"
"github.com/siglens/siglens/pkg/segment/reader/metrics/series"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/siglens/siglens/pkg/segment/structs"
"github.com/siglens/siglens/pkg/segment/utils"
"github.com/siglens/siglens/pkg/segment/writer/metrics"
toputils "github.com/siglens/siglens/pkg/utils"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -413,3 +415,35 @@ func getRegexMatchedMetricNames(mSegSearchReq *structs.MetricsSearchRequest, reg

return metricNames, nil
}

func GetSeriesCardinalityOverTimeRange(timeRange *dtu.MetricsTimeRange, myid uint64) (uint64, error) {
querySummary := summary.InitQuerySummary(summary.METRICS, rutils.GetNextQid())
defer querySummary.LogMetricsQuerySummary(myid)
tagsTreeReaders, err := GetAllTagsTreesWithinTimeRange(timeRange, myid, querySummary)
if err != nil {
log.Errorf("GetSeriesCardinalityOverTimeRange: failed to get tags trees within time range %+v; err=%v", timeRange, err)
return 0, err
}

tagKeys := make(map[string]struct{})
for _, segmentTagTreeReader := range tagsTreeReaders {
tagKeys = toputils.MergeMaps(tagKeys, segmentTagTreeReader.GetAllTagKeys())
}

allTsids := structs.CreateNewHll()
for _, segmentTagTreeReader := range tagsTreeReaders {
for tagKey := range tagKeys {
tsids, err := segmentTagTreeReader.GetTSIDsForKey(tagKey)
if err != nil {
log.Errorf("GetSeriesCardinalityOverTimeRange: failed to get tsids for key %v; err=%v", tagKey, err)
return 0, err
}

for tsid := range tsids {
allTsids.AddRaw(tsid)
}
}
}

return allTsids.Cardinality(), nil
}
4 changes: 2 additions & 2 deletions pkg/segment/writer/metrics/metricssegment.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func getBaseMetricsKey(suffix uint64, mId string) (string, error) {
/*
Returns <<dataDir>>/<<hostname>>/final/<<mid>>/suffix
*/
func getFinalMetricsDir(mId string, suffix uint64) string {
func GetFinalMetricsDir(mId string, suffix uint64) string {
// TODO: use filepath.Join
var sb strings.Builder
sb.WriteString(config.GetRunningConfig().DataPath)
Expand Down Expand Up @@ -1124,7 +1124,7 @@ func (ms *MetricsSegment) rotateSegment(forceRotate bool) error {
log.Errorf("rotateSegment: failed to flush metric names for base=%s, suffix=%d, orgid=%v. Error %+v", ms.metricsKeyBase, ms.Suffix, ms.Orgid, err)
return err
}
finalDir := getFinalMetricsDir(ms.Mid, ms.Suffix)
finalDir := GetFinalMetricsDir(ms.Mid, ms.Suffix)
metaEntry := ms.getMetaEntry(finalDir, ms.Suffix)
err = os.MkdirAll(path.Dir(path.Dir(finalDir)), 0764)
if err != nil {
Expand Down
Loading

0 comments on commit cc6d605

Please sign in to comment.