Skip to content

Commit

Permalink
devtools/cmd/db: show waiting queries
Browse files Browse the repository at this point in the history
Add a subcommand to show which queries are waiting for others.

Change-Id: Ia5b9f40c4ca45e81b63fe24a4e987cceaf8267ce
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/555460
TryBot-Result: Gopher Robot <[email protected]>
TryBot-Bypass: Jonathan Amsterdam <[email protected]>
Reviewed-by: Michael Matloob <[email protected]>
Run-TryBot: Jonathan Amsterdam <[email protected]>
  • Loading branch information
jba committed Jan 18, 2024
1 parent 9bc5594 commit 24c36df
Showing 1 changed file with 99 additions and 0 deletions.
99 changes: 99 additions & 0 deletions devtools/cmd/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ package main

import (
"context"
"database/sql"
"flag"
"fmt"
"os"
"strings"
"time"

_ "github.com/jackc/pgx/v4/stdlib" // for pgx driver
"github.com/lib/pq"
"golang.org/x/pkgsite/internal/config/serverconfig"
"golang.org/x/pkgsite/internal/database"
"golang.org/x/pkgsite/internal/log"
Expand Down Expand Up @@ -43,6 +46,7 @@ func main() {
if err != nil {
log.Fatal(ctx, err)
}
log.SetLevel("info")

dbName := serverconfig.GetEnv("GO_DISCOVERY_DATABASE_NAME", "discovery-db")
if err := run(ctx, flag.Args()[0], dbName, cfg.DBConnInfo()); err != nil {
Expand All @@ -62,6 +66,8 @@ func run(ctx context.Context, cmd, dbName, connectionInfo string) error {
return recreate(ctx, dbName)
case "truncate":
return truncate(ctx, connectionInfo)
case "waiting":
return waiting(ctx, connectionInfo)
default:
return fmt.Errorf("unsupported arg: %q", cmd)
}
Expand Down Expand Up @@ -123,3 +129,96 @@ func truncate(ctx context.Context, connectionInfo string) error {
defer ddb.Close()
return database.ResetDB(ctx, ddb)
}

type ProcessInfo struct {
pid int64
start time.Time
state string
waitEventType *string
waitEvent *string
blockingPIDs []int64
query string
pos int
}

func waiting(ctx context.Context, connectionInfo string) error {
var processInfos []*ProcessInfo
db, err := database.Open("pgx", connectionInfo, "dbadmin")
if err != nil {
return err
}
defer db.Close()

query := `
SELECT pid, query_start, state, wait_event_type, wait_event, pg_blocking_pids(pid), query
FROM pg_stat_activity
WHERE usename='worker'
ORDER BY 2
`

err = db.RunQuery(ctx, query, func(rows *sql.Rows) error {
var pi ProcessInfo
if err := rows.Scan(&pi.pid, &pi.start, &pi.state, &pi.waitEventType, &pi.waitEvent, pq.Array(&pi.blockingPIDs), &pi.query); err != nil {
return err
}
processInfos = append(processInfos, &pi)
return nil
})
if err != nil {
return err
}

byPid := map[int64]*ProcessInfo{}
for _, pi := range processInfos {
byPid[pi.pid] = pi
}
sorted := topoSort(processInfos, byPid)
for i, p := range sorted {
p.pos = i + 1
}
for _, pi := range sorted {
var wps []int
for _, w := range pi.blockingPIDs {
wps = append(wps, byPid[w].pos)
}
pi.query = strings.TrimSpace(pi.query)
pi.query = strings.Join(strings.Fields(pi.query), " ")
secs := time.Since(pi.start).Seconds()
mins := int(secs / 60)
secs -= float64(mins) * 60
fmt.Printf("%3d %d %2d:%2.3fs %v\t %s\n", pi.pos, pi.pid, mins, secs, wps, left(pi.query, 50))
}
return nil
}

func left(s string, n int) string {
if len(s) < n {
return s
}
return s[:n]
}

func topoSort(items []*ProcessInfo, byPid map[int64]*ProcessInfo) []*ProcessInfo {
var res []*ProcessInfo

visited := map[*ProcessInfo]bool{}
var visit func(*ProcessInfo)
visit = func(pi *ProcessInfo) {
if visited[pi] {
return
}
visited[pi] = true
for _, bpid := range pi.blockingPIDs {
visit(byPid[bpid])
}
res = append(res, pi)

}
for _, it := range items {
if !visited[it] {
visit(it)
}
}

return res
}

0 comments on commit 24c36df

Please sign in to comment.