Skip to content

Commit

Permalink
[coordinator] Add source label to dropped metrics counter (#3964)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Dec 3, 2021
1 parent 9acdf67 commit 00f21b1
Showing 1 changed file with 39 additions and 31 deletions.
70 changes: 39 additions & 31 deletions src/cmd/services/m3coordinator/ingest/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ var (
unaggregatedStoragePolicies = []policy.StoragePolicy{
unaggregatedStoragePolicy,
}

sourceTags = map[ts.SourceType]string{
ts.SourceTypePrometheus: "prometheus",
ts.SourceTypeGraphite: "graphite",
ts.SourceTypeOpenMetrics: "open-metrics",
}
)

// IterValue is the value returned by the iterator.
Expand Down Expand Up @@ -105,10 +111,21 @@ type WriteOptions struct {
}

type downsamplerAndWriterMetrics struct {
dropped tally.Counter
dropped metricsBySource
written metricsBySource
}

type metricsBySource struct {
bySource map[ts.SourceType]tally.Counter
byUnknown tally.Counter
}

writtenBySource map[ts.SourceType]tally.Counter
writtenByUnknown tally.Counter
func (m metricsBySource) report(source ts.SourceType) {
counter, ok := m.bySource[source]
if !ok {
counter = m.byUnknown
}
counter.Inc(1)
}

// downsamplerAndWriter encapsulates the logic for writing data to the downsampler,
Expand All @@ -130,31 +147,30 @@ func NewDownsamplerAndWriter(
) DownsamplerAndWriter {
scope := instrumentOpts.MetricsScope().SubScope("downsampler")

sourceTags := map[ts.SourceType]string{
ts.SourceTypePrometheus: "prometheus",
ts.SourceTypeGraphite: "graphite",
ts.SourceTypeOpenMetrics: "open-metrics",
}
writtenBySource := make(map[ts.SourceType]tally.Counter)
writtenName := "metrics_written"
for source, tag := range sourceTags {
c := scope.Tagged(map[string]string{"source": tag}).Counter(writtenName)
writtenBySource[source] = c
}
writtenByUnknown := scope.Tagged(map[string]string{"source": "unknown"}).Counter(writtenName)

return &downsamplerAndWriter{
store: store,
downsampler: downsampler,
workerPool: workerPool,
metrics: downsamplerAndWriterMetrics{
dropped: scope.Counter("metrics_dropped"),
writtenBySource: writtenBySource,
writtenByUnknown: writtenByUnknown,
dropped: newMetricsBySource(scope, "metrics_dropped"),
written: newMetricsBySource(scope, "metrics_written"),
},
}
}

func newMetricsBySource(scope tally.Scope, name string) metricsBySource {
metrics := metricsBySource{
bySource: make(map[ts.SourceType]tally.Counter, len(sourceTags)),
byUnknown: scope.Tagged(map[string]string{"source": "unknown"}).Counter(name),
}

for source, tag := range sourceTags {
metrics.bySource[source] = scope.Tagged(map[string]string{"source": tag}).Counter(name)
}

return metrics
}

func (d *downsamplerAndWriter) Write(
ctx context.Context,
tags models.Tags,
Expand All @@ -178,7 +194,7 @@ func (d *downsamplerAndWriter) Write(
}

if dropUnaggregated {
d.metrics.dropped.Inc(1)
d.metrics.dropped.report(source)
} else if d.shouldWrite(overrides) {
err := d.writeToStorage(ctx, tags, datapoints, unit, annotation, overrides, source)
if err != nil {
Expand Down Expand Up @@ -321,7 +337,7 @@ func (d *downsamplerAndWriter) writeToStorage(
overrides WriteOptions,
source ts.SourceType,
) error {
d.reportWrittenBySource(source)
d.metrics.written.report(source)

storagePolicies, ok := d.writeOverrideStoragePolicies(overrides)
if !ok {
Expand Down Expand Up @@ -426,11 +442,11 @@ func (d *downsamplerAndWriter) WriteBatch(
for iter.Next() {
value := iter.Current()
if value.Metadata.DropUnaggregated {
d.metrics.dropped.Inc(1)
d.metrics.dropped.report(value.Attributes.Source)
continue
}

d.reportWrittenBySource(value.Attributes.Source)
d.metrics.written.report(value.Attributes.Source)

for _, p := range storagePolicies {
p := p // Capture for lambda.
Expand Down Expand Up @@ -572,14 +588,6 @@ func (d *downsamplerAndWriter) Storage() storage.Storage {
return d.store
}

func (d *downsamplerAndWriter) reportWrittenBySource(source ts.SourceType) {
written, ok := d.metrics.writtenBySource[source]
if !ok {
written = d.metrics.writtenByUnknown
}
written.Inc(1)
}

func storageAttributesFromPolicy(
p policy.StoragePolicy,
) storagemetadata.Attributes {
Expand Down

0 comments on commit 00f21b1

Please sign in to comment.