diff --git a/.gitignore b/.gitignore index be4254b..1c13cb9 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,6 @@ *.o *.so /results -*pg_wait_sampling--1.1.sql .log Dockerfile +/log/ diff --git a/.travis.yml b/.travis.yml index 69c38e0..4ea7793 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,15 +17,16 @@ script: - docker-compose run tests env: - - PG_VERSION=9.6 CHECK_CODE=clang - - PG_VERSION=9.6 CHECK_CODE=cppcheck - - PG_VERSION=9.6 CHECK_CODE=false - - PG_VERSION=10 CHECK_CODE=clang - - PG_VERSION=10 CHECK_CODE=cppcheck - - PG_VERSION=10 CHECK_CODE=false - - PG_VERSION=11 CHECK_CODE=clang - - PG_VERSION=11 CHECK_CODE=false - - PG_VERSION=12 CHECK_CODE=clang - - PG_VERSION=12 CHECK_CODE=false - - PG_VERSION=13 CHECK_CODE=clang - - PG_VERSION=13 CHECK_CODE=false + - PG_VERSION=10 CHECK_CODE=clang + - PG_VERSION=10 CHECK_CODE=cppcheck + - PG_VERSION=10 CHECK_CODE=false + - PG_VERSION=11 CHECK_CODE=clang + - PG_VERSION=11 CHECK_CODE=false + - PG_VERSION=12 CHECK_CODE=clang + - PG_VERSION=12 CHECK_CODE=false + - PG_VERSION=13 CHECK_CODE=clang + - PG_VERSION=13 CHECK_CODE=false + - PG_VERSION=14 CHECK_CODE=clang + - PG_VERSION=14 CHECK_CODE=false + - PG_VERSION=15beta3 CHECK_CODE=clang + - PG_VERSION=15beta3 CHECK_CODE=false diff --git a/META.json b/META.json index f26554e..0f09ab0 100644 --- a/META.json +++ b/META.json @@ -2,7 +2,7 @@ "name": "pg_wait_sampling", "abstract": "Sampling based statistics of wait events", "description": "pg_wait_sampling provides functions for detailed per backend and per query statistics about PostgreSQL wait events", - "version": "1.1.3", + "version": "1.1.4", "maintainer": [ "Alexander Korotkov ", "Ildus Kurbangaliev " @@ -21,7 +21,7 @@ "pg_wait_sampling": { "file": "pg_wait_sampling--1.1.sql", "docfile": "README.md", - "version": "1.1.3", + "version": "1.1.4", "abstract": "Sampling based statistics of wait events" } }, diff --git a/Makefile b/Makefile index 65a54e3..67de107 100644 --- a/Makefile +++ b/Makefile @@ -1,17 +1,14 @@ # contrib/pg_wait_sampling/Makefile MODULE_big = pg_wait_sampling -OBJS = pg_wait_sampling.o collector.o compat.o +OBJS = pg_wait_sampling.o collector.o EXTENSION = pg_wait_sampling -EXTVERSION = 1.1 -DATA_built = pg_wait_sampling--$(EXTVERSION).sql -DATA = pg_wait_sampling--1.0--1.1.sql +DATA = pg_wait_sampling--1.1.sql pg_wait_sampling--1.0--1.1.sql REGRESS = load queries EXTRA_REGRESS_OPTS=--temp-config=$(top_srcdir)/$(subdir)/conf.add -EXTRA_CLEAN = pg_wait_sampling--$(EXTVERSION).sql ifdef USE_PGXS PG_CONFIG = pg_config @@ -24,15 +21,11 @@ include $(top_builddir)/src/Makefile.global include $(top_srcdir)/contrib/contrib-global.mk endif -$(EXTENSION)--$(EXTVERSION).sql: setup.sql - cat $^ > $@ - # Prepare the package for PGXN submission -DISTVERSION := $(shell git tag -l | tail -n 1 | cut -d 'v' -f 2) -package: dist dist/$(EXTENSION)-$(DISTVERSION).zip +package: dist .git + $(eval DISTVERSION := $(shell git tag -l | tail -n 1 | cut -d 'v' -f 2)) + $(info Generating zip file for version $(DISTVERSION)...) + git archive --format zip --prefix=$(EXTENSION)-${DISTVERSION}/ --output dist/$(EXTENSION)-${DISTVERSION}.zip HEAD dist: mkdir -p dist - -dist/$(EXTENSION)-$(DISTVERSION).zip: - git archive --format zip --prefix=$(EXTENSION)-$(DISTVERSION)/ --output $@ HEAD diff --git a/README.md b/README.md index 0e47ad1..aa539d8 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,7 @@ GUCs. | pg_wait_sampling.history_period | int4 | Period for history sampling in milliseconds | 10 | | pg_wait_sampling.profile_period | int4 | Period for profile sampling in milliseconds | 10 | | pg_wait_sampling.profile_pid | bool | Whether profile should be per pid | true | -| pg_wait_sampling.profile_queries | bool | Whether profile should be per query | false | +| pg_wait_sampling.profile_queries | bool | Whether profile should be per query | true | If `pg_wait_sampling.profile_pid` is set to false, sampling profile wouldn't be collected in per-process manner. In this case the value of pid could would diff --git a/collector.c b/collector.c index f3f141c..e1e00cf 100644 --- a/collector.c +++ b/collector.c @@ -26,6 +26,7 @@ #include "utils/resowner.h" #include "pgstat.h" +#include "compat.h" #include "pg_wait_sampling.h" static volatile sig_atomic_t shutdown_requested = false; @@ -36,7 +37,7 @@ static void handle_sigterm(SIGNAL_ARGS); * Register background worker for collecting waits history. */ void -register_wait_collector(void) +pgws_register_wait_collector(void) { BackgroundWorker worker; @@ -44,10 +45,10 @@ register_wait_collector(void) memset(&worker, 0, sizeof(worker)); worker.bgw_flags = BGWORKER_SHMEM_ACCESS; worker.bgw_start_time = BgWorkerStart_ConsistentState; - worker.bgw_restart_time = 0; + worker.bgw_restart_time = 1; worker.bgw_notify_pid = 0; snprintf(worker.bgw_library_name, BGW_MAXLEN, "pg_wait_sampling"); - snprintf(worker.bgw_function_name, BGW_MAXLEN, CppAsString(collector_main)); + snprintf(worker.bgw_function_name, BGW_MAXLEN, CppAsString(pgws_collector_main)); snprintf(worker.bgw_name, BGW_MAXLEN, "pg_wait_sampling collector"); worker.bgw_main_arg = (Datum) 0; RegisterBackgroundWorker(&worker); @@ -56,7 +57,7 @@ register_wait_collector(void) /* * Allocate memory for waits history. */ -void +static void alloc_history(History *observations, int count) { observations->items = (HistoryItem *) palloc0(sizeof(HistoryItem) * count); @@ -150,7 +151,7 @@ probe_waits(History *observations, HTAB *profile_hash, TimestampTz ts = GetCurrentTimestamp(); /* Realloc waits history if needed */ - newSize = collector_hdr->historySize; + newSize = pgws_collector_hdr->historySize; if (observations->count != newSize) realloc_history(observations, newSize); @@ -172,8 +173,8 @@ probe_waits(History *observations, HTAB *profile_hash, item.pid = proc->pid; item.wait_event_info = proc->wait_event_info; - if (collector_hdr->profileQueries) - item.queryId = proc_queryids[i]; + if (pgws_collector_hdr->profileQueries) + item.queryId = pgws_proc_queryids[i]; else item.queryId = 0; @@ -220,7 +221,7 @@ send_history(History *observations, shm_mq_handle *mqh) else count = observations->index; - mq_result = shm_mq_send(mqh, sizeof(count), &count, false); + mq_result = shm_mq_send_compat(mqh, sizeof(count), &count, false, true); if (mq_result == SHM_MQ_DETACHED) { ereport(WARNING, @@ -230,10 +231,11 @@ send_history(History *observations, shm_mq_handle *mqh) } for (i = 0; i < count; i++) { - mq_result = shm_mq_send(mqh, + mq_result = shm_mq_send_compat(mqh, sizeof(HistoryItem), &observations->items[i], - false); + false, + true); if (mq_result == SHM_MQ_DETACHED) { ereport(WARNING, @@ -255,7 +257,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh) Size count = hash_get_num_entries(profile_hash); shm_mq_result mq_result; - mq_result = shm_mq_send(mqh, sizeof(count), &count, false); + mq_result = shm_mq_send_compat(mqh, sizeof(count), &count, false, true); if (mq_result == SHM_MQ_DETACHED) { ereport(WARNING, @@ -266,7 +268,8 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh) hash_seq_init(&scan_status, profile_hash); while ((item = (ProfileItem *) hash_seq_search(&scan_status)) != NULL) { - mq_result = shm_mq_send(mqh, sizeof(ProfileItem), item, false); + mq_result = shm_mq_send_compat(mqh, sizeof(ProfileItem), item, false, + true); if (mq_result == SHM_MQ_DETACHED) { hash_seq_term(&scan_status); @@ -289,7 +292,7 @@ make_profile_hash() hash_ctl.hash = tag_hash; hash_ctl.hcxt = TopMemoryContext; - if (collector_hdr->profileQueries) + if (pgws_collector_hdr->profileQueries) hash_ctl.keysize = offsetof(ProfileItem, count); else hash_ctl.keysize = offsetof(ProfileItem, queryId); @@ -318,7 +321,7 @@ millisecs_diff(TimestampTz tz1, TimestampTz tz2) * Main routine of wait history collector. */ void -collector_main(Datum main_arg) +pgws_collector_main(Datum main_arg) { HTAB *profile_hash = NULL; History observations; @@ -337,28 +340,31 @@ collector_main(Datum main_arg) * any equivalent of the backend's command-read loop, where interrupts can * be processed immediately, so make sure ImmediateInterruptOK is turned * off. + * + * We also want to respond to the ProcSignal notifications. This is done + * in the upstream provided procsignal_sigusr1_handler, which is + * automatically used if a bgworker connects to a database. But since our + * worker doesn't connect to any database even though it calls + * InitPostgres, which will still initializze a new backend and thus + * partitipate to the ProcSignal infrastructure. */ pqsignal(SIGTERM, handle_sigterm); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); BackgroundWorkerUnblockSignals(); - -#if PG_VERSION_NUM >= 110000 - InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL, false); -#else - InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL); -#endif + InitPostgresCompat(NULL, InvalidOid, NULL, InvalidOid, false, false, NULL); SetProcessingMode(NormalProcessing); /* Make pg_wait_sampling recognisable in pg_stat_activity */ pgstat_report_appname("pg_wait_sampling collector"); profile_hash = make_profile_hash(); - collector_hdr->latch = &MyProc->procLatch; + pgws_collector_hdr->latch = &MyProc->procLatch; CurrentResourceOwner = ResourceOwnerCreate(NULL, "pg_wait_sampling collector"); collector_context = AllocSetContextCreate(TopMemoryContext, "pg_wait_sampling context", ALLOCSET_DEFAULT_SIZES); old_context = MemoryContextSwitchTo(collector_context); - alloc_history(&observations, collector_hdr->historySize); + alloc_history(&observations, pgws_collector_hdr->historySize); MemoryContextSwitchTo(old_context); ereport(LOG, (errmsg("pg_wait_sampling collector started"))); @@ -377,13 +383,16 @@ collector_main(Datum main_arg) bool write_history, write_profile; + /* We need an explicit call for at least ProcSignal notifications. */ + CHECK_FOR_INTERRUPTS(); + /* Wait calculate time to next sample for history or profile */ current_ts = GetCurrentTimestamp(); history_diff = millisecs_diff(history_ts, current_ts); profile_diff = millisecs_diff(profile_ts, current_ts); - history_period = collector_hdr->historyPeriod; - profile_period = collector_hdr->profilePeriod; + history_period = pgws_collector_hdr->historyPeriod; + profile_period = pgws_collector_hdr->profilePeriod; write_history = (history_diff >= (int64)history_period); write_profile = (profile_diff >= (int64)profile_period); @@ -391,7 +400,7 @@ collector_main(Datum main_arg) if (write_history || write_profile) { probe_waits(&observations, profile_hash, - write_history, write_profile, collector_hdr->profilePid); + write_history, write_profile, pgws_collector_hdr->profilePid); if (write_history) { @@ -430,67 +439,58 @@ collector_main(Datum main_arg) ResetLatch(&MyProc->procLatch); /* Handle request if any */ - if (collector_hdr->request != NO_REQUEST) + if (pgws_collector_hdr->request != NO_REQUEST) { LOCKTAG tag; - SHMRequest request = collector_hdr->request; + SHMRequest request; - init_lock_tag(&tag, PGWS_COLLECTOR_LOCK); + pgws_init_lock_tag(&tag, PGWS_COLLECTOR_LOCK); LockAcquire(&tag, ExclusiveLock, false, false); - collector_hdr->request = NO_REQUEST; + request = pgws_collector_hdr->request; + pgws_collector_hdr->request = NO_REQUEST; - PG_TRY(); + if (request == HISTORY_REQUEST || request == PROFILE_REQUEST) { - if (request == HISTORY_REQUEST || request == PROFILE_REQUEST) - { - shm_mq_result mq_result; - - /* Send history or profile */ - shm_mq_set_sender(collector_mq, MyProc); - mqh = shm_mq_attach(collector_mq, NULL, NULL); - mq_result = shm_mq_wait_for_attach(mqh); - switch (mq_result) - { - case SHM_MQ_SUCCESS: - switch (request) - { - case HISTORY_REQUEST: - send_history(&observations, mqh); - break; - case PROFILE_REQUEST: - send_profile(profile_hash, mqh); - break; - default: - AssertState(false); - } - break; - case SHM_MQ_DETACHED: - ereport(WARNING, - (errmsg("pg_wait_sampling collector: " - "receiver of message queue has been " - "detached"))); - break; - default: - AssertState(false); - } - shm_mq_detach_compat(mqh, collector_mq); - } - else if (request == PROFILE_RESET) + shm_mq_result mq_result; + + /* Send history or profile */ + shm_mq_set_sender(pgws_collector_mq, MyProc); + mqh = shm_mq_attach(pgws_collector_mq, NULL, NULL); + mq_result = shm_mq_wait_for_attach(mqh); + switch (mq_result) { - /* Reset profile hash */ - hash_destroy(profile_hash); - profile_hash = make_profile_hash(); + case SHM_MQ_SUCCESS: + switch (request) + { + case HISTORY_REQUEST: + send_history(&observations, mqh); + break; + case PROFILE_REQUEST: + send_profile(profile_hash, mqh); + break; + default: + AssertState(false); + } + break; + case SHM_MQ_DETACHED: + ereport(WARNING, + (errmsg("pg_wait_sampling collector: " + "receiver of message queue have been " + "detached"))); + break; + default: + AssertState(false); } - - LockRelease(&tag, ExclusiveLock, false); + shm_mq_detach_compat(mqh, pgws_collector_mq); } - PG_CATCH(); + else if (request == PROFILE_RESET) { - LockRelease(&tag, ExclusiveLock, false); - PG_RE_THROW(); + /* Reset profile hash */ + hash_destroy(profile_hash); + profile_hash = make_profile_hash(); } - PG_END_TRY(); + LockRelease(&tag, ExclusiveLock, false); } } diff --git a/compat.c b/compat.c deleted file mode 100644 index 249a53a..0000000 --- a/compat.c +++ /dev/null @@ -1,24 +0,0 @@ -#include "postgres.h" -#include "access/tupdesc.h" - -#include "pg_wait_sampling.h" - -inline void -shm_mq_detach_compat(shm_mq_handle *mqh, shm_mq *mq) -{ -#if PG_VERSION_NUM >= 100000 - shm_mq_detach(mqh); -#else - shm_mq_detach(mq); -#endif -} - -inline TupleDesc -CreateTemplateTupleDescCompat(int nattrs, bool hasoid) -{ -#if PG_VERSION_NUM >= 120000 - return CreateTemplateTupleDesc(nattrs); -#else - return CreateTemplateTupleDesc(nattrs, hasoid); -#endif -} diff --git a/compat.h b/compat.h new file mode 100644 index 0000000..3f471ce --- /dev/null +++ b/compat.h @@ -0,0 +1,68 @@ +/* + * compat.h + * Definitions for function wrappers compatible between PG versions. + * + * Copyright (c) 2015-2022, Postgres Professional + * + * IDENTIFICATION + * contrib/pg_wait_sampling/compat.h + */ +#ifndef __COMPAT_H__ +#define __COMPAT_H__ + +#include "postgres.h" + +#include "access/tupdesc.h" +#include "miscadmin.h" +#include "storage/shm_mq.h" + +static inline TupleDesc +CreateTemplateTupleDescCompat(int nattrs, bool hasoid) +{ +#if PG_VERSION_NUM >= 120000 + return CreateTemplateTupleDesc(nattrs); +#else + return CreateTemplateTupleDesc(nattrs, hasoid); +#endif +} + +static inline void +shm_mq_detach_compat(shm_mq_handle *mqh, shm_mq *mq) +{ +#if PG_VERSION_NUM >= 100000 + shm_mq_detach(mqh); +#else + shm_mq_detach(mq); +#endif +} + +static inline shm_mq_result +shm_mq_send_compat(shm_mq_handle *mqh, Size nbytes, const void *data, + bool nowait, bool force_flush) +{ +#if PG_VERSION_NUM >= 150000 + return shm_mq_send(mqh, nbytes, data, nowait, force_flush); +#else + return shm_mq_send(mqh, nbytes, data, nowait); +#endif +} + +static inline void +InitPostgresCompat(const char *in_dbname, Oid dboid, + const char *username, Oid useroid, + bool load_session_libraries, + bool override_allow_connections, + char *out_dbname) +{ +#if PG_VERSION_NUM >= 150000 + InitPostgres(in_dbname, dboid, username, useroid, load_session_libraries, + override_allow_connections, out_dbname); +#elif PG_VERSION_NUM >= 110000 + InitPostgres(in_dbname, dboid, username, useroid, out_dbname, + override_allow_connections); +#else + InitPostgres(in_dbname, dboid, username, useroid, out_dbname); +#endif +} + +#endif diff --git a/setup.sql b/pg_wait_sampling--1.1.sql similarity index 100% rename from setup.sql rename to pg_wait_sampling--1.1.sql diff --git a/pg_wait_sampling.c b/pg_wait_sampling.c index a90a981..c77f980 100644 --- a/pg_wait_sampling.c +++ b/pg_wait_sampling.c @@ -17,6 +17,10 @@ #include "miscadmin.h" #include "optimizer/planner.h" #include "pgstat.h" +#include "postmaster/autovacuum.h" +#if PG_VERSION_NUM >= 120000 +#include "replication/walsender.h" +#endif #include "storage/ipc.h" #include "storage/pg_shmem.h" #include "storage/procarray.h" @@ -30,31 +34,32 @@ #include "utils/memutils.h" /* TopMemoryContext. Actually for PG 9.6 only, * but there should be no harm for others. */ +#include "compat.h" #include "pg_wait_sampling.h" PG_MODULE_MAGIC; void _PG_init(void); -void _PG_fini(void); -/* Global variables */ -bool shmem_initialized = false; +static bool shmem_initialized = false; /* Hooks */ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL; static planner_hook_type planner_hook_next = NULL; -/* Shared memory variables */ -shm_toc *toc = NULL; -shm_mq *collector_mq = NULL; -uint64 *proc_queryids = NULL; -CollectorShmqHeader *collector_hdr = NULL; +/* Pointers to shared memory objects */ +shm_mq *pgws_collector_mq = NULL; +uint64 *pgws_proc_queryids = NULL; +CollectorShmqHeader *pgws_collector_hdr = NULL; /* Receiver (backend) local shm_mq pointers and lock */ -shm_mq *recv_mq = NULL; -shm_mq_handle *recv_mqh = NULL; -LOCKTAG queueTag; +static shm_mq *recv_mq = NULL; +static shm_mq_handle *recv_mqh = NULL; +static LOCKTAG queueTag; +#if PG_VERSION_NUM >= 150000 +static shmem_request_hook_type prev_shmem_request_hook = NULL; +#endif static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static PGPROC * search_proc(int backendPid); static PlannedStmt *pgws_planner_hook(Query *parse, @@ -66,20 +71,64 @@ static void pgws_ExecutorEnd(QueryDesc *queryDesc); /* * Calculate max processes count. - * Look at InitProcGlobal (proc.c) and TotalProcs variable in it - * if something wrong here. + * + * The value has to be in sync with ProcGlobal->allProcCount, initialized in + * InitProcGlobal() (proc.c). + * */ static int get_max_procs_count(void) { int count = 0; - /* MyProcs, including autovacuum workers and launcher */ + /* First, add the maximum number of backends (MaxBackends). */ +#if PG_VERSION_NUM >= 150000 + /* + * On pg15+, we can directly access the MaxBackends variable, as it will + * have already been initialized in shmem_request_hook. + */ + Assert(MaxBackends > 0); count += MaxBackends; - /* AuxiliaryProcs */ +#else + /* + * On older versions, we need to compute MaxBackends: bgworkers, autovacuum + * workers and launcher. + * This has to be in sync with the value computed in + * InitializeMaxBackends() (postinit.c) + * + * Note that we need to calculate the value as it won't initialized when we + * need it during _PG_init(). + * + * Note also that the value returned during _PG_init() might be different + * from the value returned later if some third-party modules change one of + * the underlying GUC. This isn't ideal but can't lead to a crash, as the + * value returned during _PG_init() is only used to ask for additional + * shmem with RequestAddinShmemSpace(), and postgres has an extra 100kB of + * shmem to compensate some small unaccounted usage. So if the value later + * changes, we will allocate and initialize the new (and correct) memory + * size, which will either work thanks for the extra 100kB of shmem, of + * fail (and prevent postgres startup) due to an out of shared memory + * error. + */ + count += MaxConnections + autovacuum_max_workers + 1 + + max_worker_processes; + +#if PG_VERSION_NUM >= 140000 && defined(PGPRO_EE) + count += MaxATX; +#endif + + /* + * Starting with pg12, wal senders aren't part of MaxConnections anymore + * and have to be accounted for. + */ +#if PG_VERSION_NUM >= 120000 + count += max_wal_senders; +#endif /* pg 12+ */ +#endif /* pg 15- */ + /* End of MaxBackends calculation. */ + + /* Add AuxiliaryProcs */ count += NUM_AUXILIARY_PROCS; - /* Prepared xacts */ - count += max_prepared_xacts; return count; } @@ -167,63 +216,63 @@ setup_gucs() if (!strcmp(name, "pg_wait_sampling.history_size")) { history_size_found = true; - var->integer.variable = &collector_hdr->historySize; - collector_hdr->historySize = 5000; + var->integer.variable = &pgws_collector_hdr->historySize; + pgws_collector_hdr->historySize = 5000; } else if (!strcmp(name, "pg_wait_sampling.history_period")) { history_period_found = true; - var->integer.variable = &collector_hdr->historyPeriod; - collector_hdr->historyPeriod = 10; + var->integer.variable = &pgws_collector_hdr->historyPeriod; + pgws_collector_hdr->historyPeriod = 10; } else if (!strcmp(name, "pg_wait_sampling.profile_period")) { profile_period_found = true; - var->integer.variable = &collector_hdr->profilePeriod; - collector_hdr->profilePeriod = 10; + var->integer.variable = &pgws_collector_hdr->profilePeriod; + pgws_collector_hdr->profilePeriod = 10; } else if (!strcmp(name, "pg_wait_sampling.profile_pid")) { profile_pid_found = true; - var->_bool.variable = &collector_hdr->profilePid; - collector_hdr->profilePid = true; + var->_bool.variable = &pgws_collector_hdr->profilePid; + pgws_collector_hdr->profilePid = true; } else if (!strcmp(name, "pg_wait_sampling.profile_queries")) { profile_queries_found = true; - var->_bool.variable = &collector_hdr->profileQueries; - collector_hdr->profileQueries = true; + var->_bool.variable = &pgws_collector_hdr->profileQueries; + pgws_collector_hdr->profileQueries = true; } } if (!history_size_found) DefineCustomIntVariable("pg_wait_sampling.history_size", "Sets size of waits history.", NULL, - &collector_hdr->historySize, 5000, 100, INT_MAX, + &pgws_collector_hdr->historySize, 5000, 100, INT_MAX, PGC_SUSET, 0, shmem_int_guc_check_hook, NULL, NULL); if (!history_period_found) DefineCustomIntVariable("pg_wait_sampling.history_period", "Sets period of waits history sampling.", NULL, - &collector_hdr->historyPeriod, 10, 1, INT_MAX, + &pgws_collector_hdr->historyPeriod, 10, 1, INT_MAX, PGC_SUSET, 0, shmem_int_guc_check_hook, NULL, NULL); if (!profile_period_found) DefineCustomIntVariable("pg_wait_sampling.profile_period", "Sets period of waits profile sampling.", NULL, - &collector_hdr->profilePeriod, 10, 1, INT_MAX, + &pgws_collector_hdr->profilePeriod, 10, 1, INT_MAX, PGC_SUSET, 0, shmem_int_guc_check_hook, NULL, NULL); if (!profile_pid_found) DefineCustomBoolVariable("pg_wait_sampling.profile_pid", "Sets whether profile should be collected per pid.", NULL, - &collector_hdr->profilePid, true, + &pgws_collector_hdr->profilePid, true, PGC_SUSET, 0, shmem_bool_guc_check_hook, NULL, NULL); if (!profile_queries_found) DefineCustomBoolVariable("pg_wait_sampling.profile_queries", "Sets whether profile should be collected per query.", NULL, - &collector_hdr->profileQueries, true, + &pgws_collector_hdr->profileQueries, true, PGC_SUSET, 0, shmem_bool_guc_check_hook, NULL, NULL); if (history_size_found @@ -236,15 +285,33 @@ setup_gucs() } } +#if PG_VERSION_NUM >= 150000 +/* + * shmem_request hook: request additional shared memory resources. + * + * If you change code here, don't forget to also report the modifications in + * _PG_init() for pg14 and below. + */ +static void +pgws_shmem_request(void) +{ + if (prev_shmem_request_hook) + prev_shmem_request_hook(); + + RequestAddinShmemSpace(pgws_shmem_size()); +} +#endif + /* * Distribute shared memory. */ static void pgws_shmem_startup(void) { - bool found; - Size segsize = pgws_shmem_size(); - void *pgws; + bool found; + Size segsize = pgws_shmem_size(); + void *pgws; + shm_toc *toc; pgws = ShmemInitStruct("pg_wait_sampling", segsize, &found); @@ -252,14 +319,14 @@ pgws_shmem_startup(void) { toc = shm_toc_create(PG_WAIT_SAMPLING_MAGIC, pgws, segsize); - collector_hdr = shm_toc_allocate(toc, sizeof(CollectorShmqHeader)); - shm_toc_insert(toc, 0, collector_hdr); - collector_mq = shm_toc_allocate(toc, COLLECTOR_QUEUE_SIZE); - shm_toc_insert(toc, 1, collector_mq); - proc_queryids = shm_toc_allocate(toc, + pgws_collector_hdr = shm_toc_allocate(toc, sizeof(CollectorShmqHeader)); + shm_toc_insert(toc, 0, pgws_collector_hdr); + pgws_collector_mq = shm_toc_allocate(toc, COLLECTOR_QUEUE_SIZE); + shm_toc_insert(toc, 1, pgws_collector_mq); + pgws_proc_queryids = shm_toc_allocate(toc, sizeof(uint64) * get_max_procs_count()); - shm_toc_insert(toc, 2, proc_queryids); - MemSet(proc_queryids, 0, sizeof(uint64) * get_max_procs_count()); + shm_toc_insert(toc, 2, pgws_proc_queryids); + MemSet(pgws_proc_queryids, 0, sizeof(uint64) * get_max_procs_count()); /* Initialize GUC variables in shared memory */ setup_gucs(); @@ -269,13 +336,13 @@ pgws_shmem_startup(void) toc = shm_toc_attach(PG_WAIT_SAMPLING_MAGIC, pgws); #if PG_VERSION_NUM >= 100000 - collector_hdr = shm_toc_lookup(toc, 0, false); - collector_mq = shm_toc_lookup(toc, 1, false); - proc_queryids = shm_toc_lookup(toc, 2, false); + pgws_collector_hdr = shm_toc_lookup(toc, 0, false); + pgws_collector_mq = shm_toc_lookup(toc, 1, false); + pgws_proc_queryids = shm_toc_lookup(toc, 2, false); #else - collector_hdr = shm_toc_lookup(toc, 0); - collector_mq = shm_toc_lookup(toc, 1); - proc_queryids = shm_toc_lookup(toc, 2); + pgws_collector_hdr = shm_toc_lookup(toc, 0); + pgws_collector_mq = shm_toc_lookup(toc, 1); + pgws_proc_queryids = shm_toc_lookup(toc, 2); #endif } @@ -288,7 +355,7 @@ pgws_shmem_startup(void) /* * Check shared memory is initialized. Report an error otherwise. */ -void +static void check_shmem(void) { if (!shmem_initialized) @@ -315,18 +382,27 @@ _PG_init(void) if (!process_shared_preload_libraries_in_progress) return; +#if PG_VERSION_NUM < 150000 /* * Request additional shared resources. (These are no-ops if we're not in * the postmaster process.) We'll allocate or attach to the shared * resources in pgws_shmem_startup(). + * + * If you change code here, don't forget to also report the modifications + * in pgsp_shmem_request() for pg15 and later. */ RequestAddinShmemSpace(pgws_shmem_size()); +#endif - register_wait_collector(); + pgws_register_wait_collector(); /* * Install hooks. */ +#if PG_VERSION_NUM >= 150000 + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = pgws_shmem_request; +#endif prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = pgws_shmem_startup; planner_hook_next = planner_hook; @@ -335,16 +411,6 @@ _PG_init(void) ExecutorEnd_hook = pgws_ExecutorEnd; } -/* - * Module unload callback - */ -void -_PG_fini(void) -{ - /* Uninstall hooks. */ - shmem_startup_hook = prev_shmem_startup_hook; -} - /* * Find PGPROC entry responsible for given pid assuming ProcArrayLock was * already taken. @@ -423,7 +489,7 @@ pg_wait_sampling_get_current(PG_FUNCTION_ARGS) item = ¶ms->items[0]; item->pid = proc->pid; item->wait_event_info = proc->wait_event_info; - item->queryId = proc_queryids[proc - ProcGlobal->allProcs]; + item->queryId = pgws_proc_queryids[proc - ProcGlobal->allProcs]; funcctx->max_calls = 1; } else @@ -441,7 +507,7 @@ pg_wait_sampling_get_current(PG_FUNCTION_ARGS) { params->items[j].pid = proc->pid; params->items[j].wait_event_info = proc->wait_event_info; - params->items[j].queryId = proc_queryids[i]; + params->items[j].queryId = pgws_proc_queryids[i]; j++; } } @@ -502,7 +568,7 @@ typedef struct } Profile; void -init_lock_tag(LOCKTAG *tag, uint32 lock) +pgws_init_lock_tag(LOCKTAG *tag, uint32 lock) { tag->locktag_field1 = PG_WAIT_SAMPLING_MAGIC; tag->locktag_field2 = lock; @@ -525,22 +591,20 @@ receive_array(SHMRequest request, Size item_size, Size *count) MemoryContext oldctx; /* Ensure nobody else trying to send request to queue */ - init_lock_tag(&queueTag, PGWS_QUEUE_LOCK); + pgws_init_lock_tag(&queueTag, PGWS_QUEUE_LOCK); LockAcquire(&queueTag, ExclusiveLock, false, false); - /* Ensure collector has processed previous request */ - init_lock_tag(&collectorTag, PGWS_COLLECTOR_LOCK); + pgws_init_lock_tag(&collectorTag, PGWS_COLLECTOR_LOCK); LockAcquire(&collectorTag, ExclusiveLock, false, false); + recv_mq = shm_mq_create(pgws_collector_mq, COLLECTOR_QUEUE_SIZE); + pgws_collector_hdr->request = request; LockRelease(&collectorTag, ExclusiveLock, false); - recv_mq = shm_mq_create(collector_mq, COLLECTOR_QUEUE_SIZE); - collector_hdr->request = request; - - if (!collector_hdr->latch) + if (!pgws_collector_hdr->latch) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("pg_wait_sampling collector wasn't started"))); - SetLatch(collector_hdr->latch); + SetLatch(pgws_collector_hdr->latch); shm_mq_set_receiver(recv_mq, MyProc); @@ -671,7 +735,7 @@ pg_wait_sampling_get_profile(PG_FUNCTION_ARGS) else nulls[2] = true; - if (collector_hdr->profileQueries) + if (pgws_collector_hdr->profileQueries) values[3] = Int64GetDatumFast(item->queryId); else values[3] = (Datum) 0; @@ -693,23 +757,22 @@ PG_FUNCTION_INFO_V1(pg_wait_sampling_reset_profile); Datum pg_wait_sampling_reset_profile(PG_FUNCTION_ARGS) { - LOCKTAG tag; - LOCKTAG tagCollector; + LOCKTAG collectorTag; check_shmem(); - init_lock_tag(&tag, PGWS_QUEUE_LOCK); + pgws_init_lock_tag(&queueTag, PGWS_QUEUE_LOCK); - LockAcquire(&tag, ExclusiveLock, false, false); + LockAcquire(&queueTag, ExclusiveLock, false, false); - init_lock_tag(&tagCollector, PGWS_COLLECTOR_LOCK); - LockAcquire(&tagCollector, ExclusiveLock, false, false); - LockRelease(&tagCollector, ExclusiveLock, false); + pgws_init_lock_tag(&collectorTag, PGWS_COLLECTOR_LOCK); + LockAcquire(&collectorTag, ExclusiveLock, false, false); + pgws_collector_hdr->request = PROFILE_RESET; + LockRelease(&collectorTag, ExclusiveLock, false); - collector_hdr->request = PROFILE_RESET; - SetLatch(collector_hdr->latch); + SetLatch(pgws_collector_hdr->latch); - LockRelease(&tag, ExclusiveLock, false); + LockRelease(&queueTag, ExclusiveLock, false); PG_RETURN_VOID(); } @@ -829,8 +892,8 @@ pgws_planner_hook(Query *parse, StaticAssertExpr(sizeof(parse->queryId) == sizeof(uint32), "queryId size is not uint32"); #endif - if (!proc_queryids[i]) - proc_queryids[i] = parse->queryId; + if (!pgws_proc_queryids[i]) + pgws_proc_queryids[i] = parse->queryId; } @@ -856,7 +919,7 @@ static void pgws_ExecutorEnd(QueryDesc *queryDesc) { if (MyProc) - proc_queryids[MyProc - ProcGlobal->allProcs] = UINT64CONST(0); + pgws_proc_queryids[MyProc - ProcGlobal->allProcs] = UINT64CONST(0); if (prev_ExecutorEnd) prev_ExecutorEnd(queryDesc); diff --git a/pg_wait_sampling.h b/pg_wait_sampling.h index 1001f6f..29425fc 100644 --- a/pg_wait_sampling.h +++ b/pg_wait_sampling.h @@ -71,19 +71,13 @@ typedef struct } CollectorShmqHeader; /* pg_wait_sampling.c */ -extern void check_shmem(void); -extern CollectorShmqHeader *collector_hdr; -extern shm_mq *collector_mq; -extern uint64 *proc_queryids; -extern void read_current_wait(PGPROC *proc, HistoryItem *item); -extern void init_lock_tag(LOCKTAG *tag, uint32 lock); +extern CollectorShmqHeader *pgws_collector_hdr; +extern shm_mq *pgws_collector_mq; +extern uint64 *pgws_proc_queryids; +extern void pgws_init_lock_tag(LOCKTAG *tag, uint32 lock); /* collector.c */ -extern void register_wait_collector(void); -extern void alloc_history(History *, int); -extern void collector_main(Datum main_arg); - -extern void shm_mq_detach_compat(shm_mq_handle *mqh, shm_mq *mq); -extern TupleDesc CreateTemplateTupleDescCompat(int nattrs, bool hasoid); +extern void pgws_register_wait_collector(void); +extern PGDLLEXPORT void pgws_collector_main(Datum main_arg); #endif