diff --git a/enhanced/metrics.go b/enhanced/metrics.go index c62b29cd..cf62f930 100644 --- a/enhanced/metrics.go +++ b/enhanced/metrics.go @@ -11,6 +11,27 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +var ( + diskRead = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "node", + Subsystem: "disk", + Name: "read_bytes_total", + Help: "The total number of bytes read successfully.", + }, + []string{"region", "instance", "device"}, + ) + diskWritten = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "node", + Subsystem: "disk", + Name: "written_bytes_total", + Help: "The total number of bytes written successfully.", + }, + []string{"region", "instance", "device"}, + ) +) + // osMetrics represents available Enhanced Monitoring OS metrics from CloudWatch Logs. // // See https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_Monitoring.OS.html#USER_Monitoring.OS.CloudWatchLogs @@ -265,19 +286,23 @@ func makeRDSDiskIOMetrics(s *diskIO, constLabels prometheus.Labels) []prometheus // makeNodeDiskMetrics returns node_exporter-like node_disk_ metrics. func makeNodeDiskMetrics(s *diskIO, constLabels prometheus.Labels) []prometheus.Metric { - // move device name to label - labelKeys := []string{"device"} - labelValues := []string{s.Device} + labels := make(prometheus.Labels, len(constLabels)+1) + + for k, v := range constLabels { + labels[k] = v + } + + labels["device"] = s.Device res := make([]prometheus.Metric, 0, 2) if s.ReadKb != nil { - desc := prometheus.NewDesc("node_disk_read_bytes_total", "The total number of bytes read successfully.", labelKeys, constLabels) - m := prometheus.MustNewConstMetric(desc, prometheus.CounterValue, float64(*s.ReadKb*1024), labelValues...) + m := diskRead.With(labels) + m.Add(float64(*s.ReadKb * 1024)) res = append(res, m) } if s.WriteKb != nil { - desc := prometheus.NewDesc("node_disk_written_bytes_total", "The total number of bytes written successfully.", labelKeys, constLabels) - m := prometheus.MustNewConstMetric(desc, prometheus.CounterValue, float64(*s.WriteKb*1024), labelValues...) + m := diskWritten.With(labels) + m.Add(float64(*s.WriteKb * 1024)) res = append(res, m) } diff --git a/enhanced/metrics_test.go b/enhanced/metrics_test.go index a9182fda..c7a5e6cb 100644 --- a/enhanced/metrics_test.go +++ b/enhanced/metrics_test.go @@ -23,13 +23,21 @@ func TestParse(t *testing.T) { t.Run(data.instance, func(t *testing.T) { // Test that metrics created from fixed testdata JSON produce expected result. - m, err := parseOSMetrics(readTestDataJSON(t, data.instance), true) + d := readTestDataJSON(t, data.instance) + + m, err := parseOSMetrics(d, true) + require.NoError(t, err) + + m2, err := parseOSMetrics(d, true) require.NoError(t, err) actualMetrics := helpers.ReadMetrics(m.makePrometheusMetrics(data.region, nil)) sort.Slice(actualMetrics, func(i, j int) bool { return actualMetrics[i].Less(actualMetrics[j]) }) actualLines := helpers.Format(helpers.WriteMetrics(actualMetrics)) + actualMetrics2 := helpers.ReadMetrics(m2.makePrometheusMetrics(data.region, nil)) + sort.Slice(actualMetrics2, func(i, j int) bool { return actualMetrics2[i].Less(actualMetrics2[j]) }) + if *goldenTXT { writeTestDataMetrics(t, data.instance, actualLines) } @@ -41,6 +49,15 @@ func TestParse(t *testing.T) { // compare both to try to avoid go-difflib bug assert.Equal(t, expectedLines, actualLines) assert.Equal(t, expectedMetrics, actualMetrics) + + for i, v := range actualMetrics { + switch v.Name { + case "node_disk_read_bytes_total", "node_disk_written_bytes_total": + assert.Equal(t, 2*v.Value, actualMetrics2[i].Value) + default: + assert.Equal(t, v.Value, actualMetrics2[i].Value) + } + } }) } } diff --git a/enhanced/scraper.go b/enhanced/scraper.go index e00d79f8..be4009a1 100644 --- a/enhanced/scraper.go +++ b/enhanced/scraper.go @@ -79,9 +79,10 @@ func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, m l = l.With("IngestionTime", aws.MillisecondsTimeValue(event.IngestionTime).UTC()) var instance *sessions.Instance - for _, i := range s.instances { - if i.ResourceID == *event.LogStreamName { - instance = &i + + for i := range s.instances { + if s.instances[i].ResourceID == *event.LogStreamName { + instance = &s.instances[i] break } } @@ -89,6 +90,12 @@ func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, m l.Errorf("Failed to find instance.") continue } + + if *event.Timestamp <= instance.LastEventTimestamp { + continue + } + + instance.LastEventTimestamp = *event.Timestamp l = l.With("region", instance.Region).With("instance", instance.Instance) // l.Debugf("Message:\n%s", *event.Message) diff --git a/sessions/sessions.go b/sessions/sessions.go index d93ccf4d..2f849070 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -23,6 +23,7 @@ type Instance struct { ResourceID string Labels map[string]string EnhancedMonitoringInterval time.Duration + LastEventTimestamp int64 } func (i Instance) String() string {