From ab33346cbdb19df89d1c0ca8b7691379089eb4f1 Mon Sep 17 00:00:00 2001 From: Joe Adams Date: Wed, 29 Mar 2023 21:25:27 -0400 Subject: [PATCH] Add the instance struct to handle connections The intent is to use the instance struct to hold the connection to the database as well as metadata about the instance. Currently this metadata only includes the version of postgres for the instance which can be used in the collectors to decide what query to run. In the future this could hold more metadata but for now it keeps the Collector interface arguments to a reasonable number. Signed-off-by: Joe Adams --- collector/collector.go | 18 ++---- collector/instance.go | 85 +++++++++++++++++++++++++ collector/pg_database.go | 4 +- collector/pg_database_test.go | 4 +- collector/pg_postmaster.go | 4 +- collector/pg_postmaster_test.go | 4 +- collector/pg_process_idle.go | 4 +- collector/pg_replication_slot.go | 4 +- collector/pg_replication_slot_test.go | 8 ++- collector/pg_stat_bgwriter.go | 4 +- collector/pg_stat_bgwriter_test.go | 4 +- collector/pg_stat_database.go | 3 +- collector/pg_stat_statements.go | 4 +- collector/pg_stat_statements_test.go | 4 +- collector/pg_stat_user_tables.go | 4 +- collector/pg_stat_user_tables_test.go | 4 +- collector/pg_statio_user_tables.go | 4 +- collector/pg_statio_user_tables_test.go | 4 +- collector/probe.go | 13 ++-- 19 files changed, 139 insertions(+), 44 deletions(-) create mode 100644 collector/instance.go diff --git a/collector/collector.go b/collector/collector.go index d50e1e72a..c1bf2af9a 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "errors" "fmt" "sync" @@ -59,7 +58,7 @@ var ( ) type Collector interface { - Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error + Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error } type collectorConfig struct { @@ -92,7 +91,7 @@ type PostgresCollector struct { Collectors map[string]Collector logger log.Logger - db *sql.DB + instance *instance } type Option func(*PostgresCollector) error @@ -149,14 +148,11 @@ func NewPostgresCollector(logger log.Logger, excludeDatabases []string, dsn stri return nil, errors.New("empty dsn") } - db, err := sql.Open("postgres", dsn) + instance, err := newInstance(dsn) if err != nil { return nil, err } - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) - - p.db = db + p.instance = instance return p, nil } @@ -174,16 +170,16 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { wg.Add(len(p.Collectors)) for name, c := range p.Collectors { go func(name string, c Collector) { - execute(ctx, name, c, p.db, ch, p.logger) + execute(ctx, name, c, p.instance, ch, p.logger) wg.Done() }(name, c) } wg.Wait() } -func execute(ctx context.Context, name string, c Collector, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) { +func execute(ctx context.Context, name string, c Collector, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) { begin := time.Now() - err := c.Update(ctx, db, ch) + err := c.Update(ctx, instance, ch) duration := time.Since(begin) var success float64 diff --git a/collector/instance.go b/collector/instance.go new file mode 100644 index 000000000..9b2bbf47f --- /dev/null +++ b/collector/instance.go @@ -0,0 +1,85 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "database/sql" + "fmt" + "regexp" + + "github.com/blang/semver/v4" +) + +type instance struct { + db *sql.DB + version semver.Version +} + +func newInstance(dsn string) (*instance, error) { + i := &instance{} + db, err := sql.Open("postgres", dsn) + if err != nil { + return nil, err + } + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + i.db = db + + version, err := queryVersion(db) + if err != nil { + db.Close() + return nil, err + } + + i.version = version + + return i, nil +} + +func (i *instance) getDB() *sql.DB { + return i.db +} + +func (i *instance) Close() error { + return i.db.Close() +} + +// Regex used to get the "short-version" from the postgres version field. +// The result of SELECT version() is something like "PostgreSQL 9.6.2 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 6.2.1 20160830, 64-bit" +var versionRegex = regexp.MustCompile(`^\w+ ((\d+)(\.\d+)?(\.\d+)?)`) +var serverVersionRegex = regexp.MustCompile(`^((\d+)(\.\d+)?(\.\d+)?)`) + +func queryVersion(db *sql.DB) (semver.Version, error) { + var version string + err := db.QueryRow("SELECT version();").Scan(&version) + if err != nil { + return semver.Version{}, err + } + submatches := versionRegex.FindStringSubmatch(version) + if len(submatches) > 1 { + return semver.ParseTolerant(submatches[1]) + } + + // We could also try to parse the version from the server_version field. + // This is of the format 13.3 (Debian 13.3-1.pgdg100+1) + err = db.QueryRow("SHOW server_version;").Scan(&version) + if err != nil { + return semver.Version{}, err + } + submatches = serverVersionRegex.FindStringSubmatch(version) + if len(submatches) > 1 { + return semver.ParseTolerant(submatches[1]) + } + return semver.Version{}, fmt.Errorf("could not parse version from %q", version) +} diff --git a/collector/pg_database.go b/collector/pg_database.go index 661f84cd8..a4ea50d0d 100644 --- a/collector/pg_database.go +++ b/collector/pg_database.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -66,7 +65,8 @@ var ( // each database individually. This is because we can't filter the // list of databases in the query because the list of excluded // databases is dynamic. -func (c PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (c PGDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() // Query the list of databases rows, err := db.QueryContext(ctx, pgDatabaseQuery, diff --git a/collector/pg_database_test.go b/collector/pg_database_test.go index bb108bb86..058a6d252 100644 --- a/collector/pg_database_test.go +++ b/collector/pg_database_test.go @@ -29,6 +29,8 @@ func TestPGDatabaseCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname"}). AddRow("postgres")) @@ -39,7 +41,7 @@ func TestPGDatabaseCollector(t *testing.T) { go func() { defer close(ch) c := PGDatabaseCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGDatabaseCollector.Update: %s", err) } }() diff --git a/collector/pg_postmaster.go b/collector/pg_postmaster.go index 4a0cec6d4..eae82d567 100644 --- a/collector/pg_postmaster.go +++ b/collector/pg_postmaster.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/prometheus/client_golang/prometheus" ) @@ -47,7 +46,8 @@ var ( pgPostmasterQuery = "SELECT pg_postmaster_start_time from pg_postmaster_start_time();" ) -func (c *PGPostmasterCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (c *PGPostmasterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() row := db.QueryRowContext(ctx, pgPostmasterQuery) diff --git a/collector/pg_postmaster_test.go b/collector/pg_postmaster_test.go index 9b93a5c91..c40fe03ad 100644 --- a/collector/pg_postmaster_test.go +++ b/collector/pg_postmaster_test.go @@ -29,6 +29,8 @@ func TestPgPostmasterCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + mock.ExpectQuery(sanitizeQuery(pgPostmasterQuery)).WillReturnRows(sqlmock.NewRows([]string{"pg_postmaster_start_time"}). AddRow(1685739904)) @@ -37,7 +39,7 @@ func TestPgPostmasterCollector(t *testing.T) { defer close(ch) c := PGPostmasterCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGPostmasterCollector.Update: %s", err) } }() diff --git a/collector/pg_process_idle.go b/collector/pg_process_idle.go index 8ee65a436..06244975b 100644 --- a/collector/pg_process_idle.go +++ b/collector/pg_process_idle.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -42,7 +41,8 @@ var pgProcessIdleSeconds = prometheus.NewDesc( prometheus.Labels{}, ) -func (PGProcessIdleCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() row := db.QueryRowContext(ctx, `WITH metrics AS ( diff --git a/collector/pg_replication_slot.go b/collector/pg_replication_slot.go index 8f105ff49..4278923f8 100644 --- a/collector/pg_replication_slot.go +++ b/collector/pg_replication_slot.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -73,7 +72,8 @@ var ( pg_replication_slots;` ) -func (PGReplicationSlotCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, pgReplicationSlotQuery) if err != nil { diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index 53bafafad..cb25b755a 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -29,6 +29,8 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} rows := sqlmock.NewRows(columns). AddRow("test_slot", 5, 3, true) @@ -39,7 +41,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { defer close(ch) c := PGReplicationSlotCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGPostmasterCollector.Update: %s", err) } }() @@ -68,6 +70,8 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} rows := sqlmock.NewRows(columns). AddRow("test_slot", 6, 12, false) @@ -78,7 +82,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { defer close(ch) c := PGReplicationSlotCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGReplicationSlotCollector.Update: %s", err) } }() diff --git a/collector/pg_stat_bgwriter.go b/collector/pg_stat_bgwriter.go index 5daf606c9..2bdef8d40 100644 --- a/collector/pg_stat_bgwriter.go +++ b/collector/pg_stat_bgwriter.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "time" "github.com/prometheus/client_golang/prometheus" @@ -117,7 +116,8 @@ var ( FROM pg_stat_bgwriter;` ) -func (PGStatBGWriterCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGStatBGWriterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() row := db.QueryRowContext(ctx, statBGWriterQuery) diff --git a/collector/pg_stat_bgwriter_test.go b/collector/pg_stat_bgwriter_test.go index 54f625c9e..11f55f6be 100644 --- a/collector/pg_stat_bgwriter_test.go +++ b/collector/pg_stat_bgwriter_test.go @@ -30,6 +30,8 @@ func TestPGStatBGWriterCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{ "checkpoints_timed", "checkpoints_req", @@ -57,7 +59,7 @@ func TestPGStatBGWriterCollector(t *testing.T) { defer close(ch) c := PGStatBGWriterCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatBGWriterCollector.Update: %s", err) } }() diff --git a/collector/pg_stat_database.go b/collector/pg_stat_database.go index 346ed9ea9..bb39a84b1 100644 --- a/collector/pg_stat_database.go +++ b/collector/pg_stat_database.go @@ -204,7 +204,8 @@ var ( ) ) -func (PGStatDatabaseCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, `SELECT datid diff --git a/collector/pg_stat_statements.go b/collector/pg_stat_statements.go index 23e1f1567..eb629c381 100644 --- a/collector/pg_stat_statements.go +++ b/collector/pg_stat_statements.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -92,7 +91,8 @@ var ( LIMIT 100;` ) -func (PGStatStatementsCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, pgStatStatementsQuery) diff --git a/collector/pg_stat_statements_test.go b/collector/pg_stat_statements_test.go index a5c5cab57..241699ad4 100644 --- a/collector/pg_stat_statements_test.go +++ b/collector/pg_stat_statements_test.go @@ -29,6 +29,8 @@ func TestPGStateStatementsCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) @@ -39,7 +41,7 @@ func TestPGStateStatementsCollector(t *testing.T) { defer close(ch) c := PGStatStatementsCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) } }() diff --git a/collector/pg_stat_user_tables.go b/collector/pg_stat_user_tables.go index 05aced91f..48ae96eb8 100644 --- a/collector/pg_stat_user_tables.go +++ b/collector/pg_stat_user_tables.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "time" "github.com/go-kit/log" @@ -179,7 +178,8 @@ var ( pg_stat_user_tables` ) -func (c *PGStatUserTablesCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (c *PGStatUserTablesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, statUserTablesQuery) diff --git a/collector/pg_stat_user_tables_test.go b/collector/pg_stat_user_tables_test.go index 29b5d15f1..8bb9bc31b 100644 --- a/collector/pg_stat_user_tables_test.go +++ b/collector/pg_stat_user_tables_test.go @@ -30,6 +30,8 @@ func TestPGStatUserTablesCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + lastVacuumTime, err := time.Parse("2006-01-02Z", "2023-06-02Z") if err != nil { t.Fatalf("Error parsing vacuum time: %s", err) @@ -99,7 +101,7 @@ func TestPGStatUserTablesCollector(t *testing.T) { defer close(ch) c := PGStatUserTablesCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatUserTablesCollector.Update: %s", err) } }() diff --git a/collector/pg_statio_user_tables.go b/collector/pg_statio_user_tables.go index 043433d86..03d541615 100644 --- a/collector/pg_statio_user_tables.go +++ b/collector/pg_statio_user_tables.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -100,7 +99,8 @@ var ( FROM pg_statio_user_tables` ) -func (PGStatIOUserTablesCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGStatIOUserTablesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, statioUserTablesQuery) diff --git a/collector/pg_statio_user_tables_test.go b/collector/pg_statio_user_tables_test.go index 0a7174d80..d57cab9f5 100644 --- a/collector/pg_statio_user_tables_test.go +++ b/collector/pg_statio_user_tables_test.go @@ -29,6 +29,8 @@ func TestPGStatIOUserTablesCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{ "datname", "schemaname", @@ -60,7 +62,7 @@ func TestPGStatIOUserTablesCollector(t *testing.T) { defer close(ch) c := PGStatIOUserTablesCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatIOUserTablesCollector.Update: %s", err) } }() diff --git a/collector/probe.go b/collector/probe.go index 9044c40f9..834c65177 100644 --- a/collector/probe.go +++ b/collector/probe.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "sync" "github.com/go-kit/log" @@ -27,7 +26,7 @@ type ProbeCollector struct { registry *prometheus.Registry collectors map[string]Collector logger log.Logger - db *sql.DB + instance *instance } func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN) (*ProbeCollector, error) { @@ -58,18 +57,16 @@ func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *p } } - db, err := sql.Open("postgres", dsn.GetConnectionString()) + instance, err := newInstance(dsn.GetConnectionString()) if err != nil { return nil, err } - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) return &ProbeCollector{ registry: registry, collectors: collectors, logger: logger, - db: db, + instance: instance, }, nil } @@ -81,7 +78,7 @@ func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) { wg.Add(len(pc.collectors)) for name, c := range pc.collectors { go func(name string, c Collector) { - execute(context.TODO(), name, c, pc.db, ch, pc.logger) + execute(context.TODO(), name, c, pc.instance, ch, pc.logger) wg.Done() }(name, c) } @@ -89,5 +86,5 @@ func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) { } func (pc *ProbeCollector) Close() error { - return pc.db.Close() + return pc.instance.Close() }