Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PMM-6579 Add monitoring for current and recent slow queries #157

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 246 additions & 0 deletions collector/mongod/database_profiler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package mongod

import (
"context"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

var (
slowQueries = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "profiler",
Name: "slow_query",
Help: "Number of slow queries recoded in the system.profile collection",
}, []string{"db", "collection"})
slowOps = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "profiler",
Name: "slow_ops",
Help: "Number of slow operations reported by $currentOp",
}, []string{"db", "collection"})

// Track collections for wich slow queries have been seen so they can be re-set to
// zero in case no slow queries are detected during metrics refresh.
// This is done to avoid gaps in reported metrics that would otherwise confuse prometheus.
trackedLabelsSet = make(map[string]map[string]bool)
)

// DatabaseProfilerRecord represents records returned by the aggregate query.
type DatabaseProfilerRecord struct {
Namespace string `bson:"_id,omitempty"`
SlowQueries int `bson:"count,omitempty"`
}

// DatabaseProfilerStatList contains stats from all databases
type DatabaseProfilerStatsList struct {
Members []DatabaseProfilerStats
}

// Describe describes database stats for prometheus
func (dbStatsList *DatabaseProfilerStatsList) Describe(ch chan<- *prometheus.Desc) {
slowQueries.Describe(ch)
}

// Export exports database stats to prometheus
func (dbStatsList *DatabaseProfilerStatsList) Export(ch chan<- prometheus.Metric) {
skipValueReset := make(map[string]map[string]bool)
for _, member := range dbStatsList.Members {
ls := prometheus.Labels{"db": member.Database, "collection": member.Collection}
slowQueries.With(ls).Set(float64(member.SlowQueries))

// Book-keeping around seen collections for resetting correctly.
if _, ok := skipValueReset[member.Database]; !ok {
skipValueReset[member.Database] = make(map[string]bool)
}
skipValueReset[member.Database][member.Collection] = true
if _, ok := trackedLabelsSet[member.Database]; !ok {
trackedLabelsSet[member.Database] = make(map[string]bool)
}
trackedLabelsSet[member.Database][member.Collection] = true
}

// Set stale slow queries back to 0
for db, colls := range trackedLabelsSet {
for coll := range colls {
if skipValueReset[db][coll] {
continue
}
ls := prometheus.Labels{"db": db, "collection": coll}
slowQueries.With(ls).Set(0.0)
}
}
slowQueries.Collect(ch)
}

// DatabaseProfilerStats represents profiler aggregated data grouped by db and collection.
type DatabaseProfilerStats struct {
Database string
Collection string
SlowQueries int
}

// GetDatabaseProfilerStats returns profiler stats for all databases
func GetDatabaseProfilerStats(client *mongo.Client, lookback int64, millis int64) *DatabaseProfilerStatsList {
dbStatsList := &DatabaseProfilerStatsList{}
dbNames, err := client.ListDatabaseNames(context.TODO(), bson.M{})
if err != nil {
log.Errorf("Failed to get database names, %v", err)
return nil
}
dbsToSkip := map[string]bool{
"admin": true,
"config": true,
"local": true,
"test": true,
}
for _, db := range dbNames {
if dbsToSkip[db] {
continue
}
from := time.Unix(time.Now().UTC().Unix()-lookback, 0)
match := bson.M{"$match": bson.M{
"ts": bson.M{"$gt": from},
"millis": bson.M{"$gte": millis},
}}
group := bson.M{"$group": bson.M{
"_id": "$ns",
"count": bson.M{"$sum": 1},
}}
pipeline := []bson.M{match, group}
cursor, err := client.Database(db).Collection("system.profile").Aggregate(context.TODO(), pipeline)
if err != nil {
log.Errorf("Failed to get database profiler stats: %s.", err)
return nil
}
defer cursor.Close(context.TODO())
for cursor.Next(context.TODO()) {
record := DatabaseProfilerRecord{}
err := cursor.Decode(&record)
if err != nil {
log.Errorf("Failed to iterate database profiler stats: %s.", err)
return nil
}
ns := strings.SplitN(record.Namespace, ".", 2)
db := ns[0]
coll := ns[1]
stats := DatabaseProfilerStats{
Database: db,
Collection: coll,
SlowQueries: record.SlowQueries,
}
dbStatsList.Members = append(dbStatsList.Members, stats)
}
if err := cursor.Err(); err != nil {
log.Errorf("Failed to iterate database profiler stats: %s.", err)
return nil
}
}
return dbStatsList
}

// DatabaseCurrentOpStatsList contains stats from all databases
type DatabaseCurrentOpStatsList struct {
Members []DatabaseProfilerStats
}

// Describe describes $currentOp stats for prometheus
func (dbStatsList *DatabaseCurrentOpStatsList) Describe(ch chan<- *prometheus.Desc) {
slowOps.Describe(ch)
}

// Export exports database stats to prometheus
func (dbStatsList *DatabaseCurrentOpStatsList) Export(ch chan<- prometheus.Metric) {
skipValueReset := make(map[string]map[string]bool)
for _, member := range dbStatsList.Members {
ls := prometheus.Labels{"db": member.Database, "collection": member.Collection}
slowOps.With(ls).Set(float64(member.SlowQueries))

// Book-keeping around seen collections for resetting correctly.
if _, ok := skipValueReset[member.Database]; !ok {
skipValueReset[member.Database] = make(map[string]bool)
}
skipValueReset[member.Database][member.Collection] = true
if _, ok := trackedLabelsSet[member.Database]; !ok {
trackedLabelsSet[member.Database] = make(map[string]bool)
}
trackedLabelsSet[member.Database][member.Collection] = true
}

// Set stale slow ops back to 0
for db, colls := range trackedLabelsSet {
for coll := range colls {
if skipValueReset[db][coll] {
continue
}
ls := prometheus.Labels{"db": db, "collection": coll}
slowOps.With(ls).Set(0.0)
}
}
slowOps.Collect(ch)
}

// GetDatabaseCurrentOpStats returns $currentOp stats for all databases
func GetDatabaseCurrentOpStats(client *mongo.Client, millis int64) *DatabaseCurrentOpStatsList {
dbStatsList := &DatabaseCurrentOpStatsList{}
currentOp := bson.M{"$currentOp": bson.M{
"allUsers": true,
}}
match := bson.M{"$match": bson.M{
"microsecs_running": bson.M{"$gte": millis * 1000},
}}
group := bson.M{"$group": bson.M{
"_id": "$ns",
"count": bson.M{"$sum": 1},
}}
pipeline := []bson.M{currentOp, match, group}
// Need the command version of aggregate to use $currentOp.
// https://docs.mongodb.com/manual/reference/command/aggregate/#dbcmd.aggregate
aggregate := bson.D{
{"aggregate", 1},
{"pipeline", pipeline},
{"cursor", bson.M{}},
}
cursor, err := client.Database("admin").RunCommandCursor(context.TODO(), aggregate)
if err != nil {
log.Errorf("Failed to get $currentOp stats: %v", err)
return nil
}
defer cursor.Close(context.TODO())
dbsToSkip := map[string]bool{
"admin": true,
"config": true,
"local": true,
"test": true,
}
for cursor.Next(context.TODO()) {
record := DatabaseProfilerRecord{}
err := cursor.Decode(&record)
if err != nil {
log.Errorf("Failed to iterate $currentOp stats: %s.", err)
return nil
}
ns := strings.SplitN(record.Namespace, ".", 2)
db := ns[0]
if dbsToSkip[db] {
continue
}
coll := ns[1]
stats := DatabaseProfilerStats{
Database: db,
Collection: coll,
SlowQueries: record.SlowQueries,
}
dbStatsList.Members = append(dbStatsList.Members, stats)
}
if err := cursor.Err(); err != nil {
log.Errorf("Failed to iterate $currentOp stats: %s.", err)
return nil
}
return dbStatsList
}
105 changes: 105 additions & 0 deletions collector/mongod/database_profiler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package mongod

import (
"context"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/bson"

"github.com/percona/mongodb_exporter/shared"
"github.com/percona/mongodb_exporter/testutils"
)

func TestGetDatabaseProfilerStatsDecodesFine(t *testing.T) {
// setup
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client := testutils.MustGetConnectedReplSetClient(ctx, t)
defer client.Disconnect(ctx)

// enable profiling and run a query
db := client.Database("test-profile")
assert.NotNil(t, db)
ok := db.RunCommand(ctx, bson.D{{"profile", 2}})
assert.NoErrorf(t, ok.Err(), "failed to enable profiling")
coll := db.Collection("test")
assert.NotNil(t, coll)
_, err := coll.InsertOne(ctx, bson.M{})
assert.NoErrorf(t, err, "failed to run a profiled find")

// run
loopback := int64(10) // seconds.
threshold := int64(0) // milliseconds.
stats := GetDatabaseProfilerStats(client, loopback, threshold)

// test
assert.NotNil(t, stats)
assert.Truef(t, len(stats.Members) >= 1, "expected at least one slow query")
}

func TestGetDatabaseProfilerStatsMetrics(t *testing.T) {
if testing.Short() {
t.Skip("-short is passed, skipping functional test")
}

// setup
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client := testutils.MustGetConnectedReplSetClient(ctx, t)
defer client.Disconnect(ctx)

// enable profiling and run a query
db := client.Database("test-profile")
assert.NotNil(t, db)
ok := db.RunCommand(ctx, bson.D{{"profile", 2}})
assert.NoErrorf(t, ok.Err(), "failed to enable profiling")
coll := db.Collection("test")
assert.NotNil(t, coll)
_, err := coll.InsertOne(ctx, bson.M{})
assert.NoErrorf(t, err, "failed to run a profiled find")

// run
loopback := int64(10) // seconds.
threshold := int64(1) // milliseconds.
stats := GetDatabaseProfilerStats(client, loopback, threshold)

// test
assert.NotNil(t, stats)
metricCh := make(chan prometheus.Metric)
go func() {
stats.Export(metricCh)
close(metricCh)
}()

var metricsCount int
for range metricCh {
metricsCount++
}
assert.Truef(t, metricsCount >= 1, "expected at least one slow query metric")
}

func TestGetDatabaseCurrentOpStatsDecodesFine(t *testing.T) {
// setup
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client := testutils.MustGetConnectedReplSetClient(ctx, t)
defer client.Disconnect(ctx)

// GetDatabaseCurrentOpStats requires MongoDB 3.6+
// Skip this test if the version does not match.
buildInfo, err := shared.GetBuildInfo(client)
assert.NoErrorf(t, err, "failed to check MongoDB version")
if buildInfo.VersionArray[0] < 3 || (buildInfo.VersionArray[0] == 3 && buildInfo.VersionArray[1] < 6) {
t.Skip("MongoDB is not 3.6+, skipping test that requires $currentOp")
}

// run
threshold := int64(1) // milliseconds.
stats := GetDatabaseCurrentOpStats(client, threshold)

// test
assert.NotNil(t, stats)
}
Loading