Skip to content

Commit

Permalink
Merge branch 'master' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Shinderuk committed Sep 30, 2022
2 parents b984f33 + 57c1ecf commit 034456d
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 224 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
*.o
*.so
/results
*pg_wait_sampling--1.1.sql
.log
Dockerfile
/log/
25 changes: 13 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ script:
- docker-compose run tests

env:
- PG_VERSION=9.6 CHECK_CODE=clang
- PG_VERSION=9.6 CHECK_CODE=cppcheck
- PG_VERSION=9.6 CHECK_CODE=false
- PG_VERSION=10 CHECK_CODE=clang
- PG_VERSION=10 CHECK_CODE=cppcheck
- PG_VERSION=10 CHECK_CODE=false
- PG_VERSION=11 CHECK_CODE=clang
- PG_VERSION=11 CHECK_CODE=false
- PG_VERSION=12 CHECK_CODE=clang
- PG_VERSION=12 CHECK_CODE=false
- PG_VERSION=13 CHECK_CODE=clang
- PG_VERSION=13 CHECK_CODE=false
- PG_VERSION=10 CHECK_CODE=clang
- PG_VERSION=10 CHECK_CODE=cppcheck
- PG_VERSION=10 CHECK_CODE=false
- PG_VERSION=11 CHECK_CODE=clang
- PG_VERSION=11 CHECK_CODE=false
- PG_VERSION=12 CHECK_CODE=clang
- PG_VERSION=12 CHECK_CODE=false
- PG_VERSION=13 CHECK_CODE=clang
- PG_VERSION=13 CHECK_CODE=false
- PG_VERSION=14 CHECK_CODE=clang
- PG_VERSION=14 CHECK_CODE=false
- PG_VERSION=15beta3 CHECK_CODE=clang
- PG_VERSION=15beta3 CHECK_CODE=false
4 changes: 2 additions & 2 deletions META.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "pg_wait_sampling",
"abstract": "Sampling based statistics of wait events",
"description": "pg_wait_sampling provides functions for detailed per backend and per query statistics about PostgreSQL wait events",
"version": "1.1.3",
"version": "1.1.4",
"maintainer": [
"Alexander Korotkov <[email protected]>",
"Ildus Kurbangaliev <[email protected]>"
Expand All @@ -21,7 +21,7 @@
"pg_wait_sampling": {
"file": "pg_wait_sampling--1.1.sql",
"docfile": "README.md",
"version": "1.1.3",
"version": "1.1.4",
"abstract": "Sampling based statistics of wait events"
}
},
Expand Down
19 changes: 6 additions & 13 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
# contrib/pg_wait_sampling/Makefile

MODULE_big = pg_wait_sampling
OBJS = pg_wait_sampling.o collector.o compat.o
OBJS = pg_wait_sampling.o collector.o

EXTENSION = pg_wait_sampling
EXTVERSION = 1.1
DATA_built = pg_wait_sampling--$(EXTVERSION).sql
DATA = pg_wait_sampling--1.0--1.1.sql
DATA = pg_wait_sampling--1.1.sql pg_wait_sampling--1.0--1.1.sql

REGRESS = load queries

EXTRA_REGRESS_OPTS=--temp-config=$(top_srcdir)/$(subdir)/conf.add
EXTRA_CLEAN = pg_wait_sampling--$(EXTVERSION).sql

ifdef USE_PGXS
PG_CONFIG = pg_config
Expand All @@ -24,15 +21,11 @@ include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif

$(EXTENSION)--$(EXTVERSION).sql: setup.sql
cat $^ > $@

# Prepare the package for PGXN submission
DISTVERSION := $(shell git tag -l | tail -n 1 | cut -d 'v' -f 2)
package: dist dist/$(EXTENSION)-$(DISTVERSION).zip
package: dist .git
$(eval DISTVERSION := $(shell git tag -l | tail -n 1 | cut -d 'v' -f 2))
$(info Generating zip file for version $(DISTVERSION)...)
git archive --format zip --prefix=$(EXTENSION)-${DISTVERSION}/ --output dist/$(EXTENSION)-${DISTVERSION}.zip HEAD

dist:
mkdir -p dist

dist/$(EXTENSION)-$(DISTVERSION).zip:
git archive --format zip --prefix=$(EXTENSION)-$(DISTVERSION)/ --output $@ HEAD
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ GUCs.
| pg_wait_sampling.history_period | int4 | Period for history sampling in milliseconds | 10 |
| pg_wait_sampling.profile_period | int4 | Period for profile sampling in milliseconds | 10 |
| pg_wait_sampling.profile_pid | bool | Whether profile should be per pid | true |
| pg_wait_sampling.profile_queries | bool | Whether profile should be per query | false |
| pg_wait_sampling.profile_queries | bool | Whether profile should be per query | true |

If `pg_wait_sampling.profile_pid` is set to false, sampling profile wouldn't be
collected in per-process manner. In this case the value of pid could would
Expand Down
148 changes: 74 additions & 74 deletions collector.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "utils/resowner.h"
#include "pgstat.h"

#include "compat.h"
#include "pg_wait_sampling.h"

static volatile sig_atomic_t shutdown_requested = false;
Expand All @@ -36,18 +37,18 @@ static void handle_sigterm(SIGNAL_ARGS);
* Register background worker for collecting waits history.
*/
void
register_wait_collector(void)
pgws_register_wait_collector(void)
{
BackgroundWorker worker;

/* Set up background worker parameters */
memset(&worker, 0, sizeof(worker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = 0;
worker.bgw_restart_time = 1;
worker.bgw_notify_pid = 0;
snprintf(worker.bgw_library_name, BGW_MAXLEN, "pg_wait_sampling");
snprintf(worker.bgw_function_name, BGW_MAXLEN, CppAsString(collector_main));
snprintf(worker.bgw_function_name, BGW_MAXLEN, CppAsString(pgws_collector_main));
snprintf(worker.bgw_name, BGW_MAXLEN, "pg_wait_sampling collector");
worker.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&worker);
Expand All @@ -56,7 +57,7 @@ register_wait_collector(void)
/*
* Allocate memory for waits history.
*/
void
static void
alloc_history(History *observations, int count)
{
observations->items = (HistoryItem *) palloc0(sizeof(HistoryItem) * count);
Expand Down Expand Up @@ -150,7 +151,7 @@ probe_waits(History *observations, HTAB *profile_hash,
TimestampTz ts = GetCurrentTimestamp();

/* Realloc waits history if needed */
newSize = collector_hdr->historySize;
newSize = pgws_collector_hdr->historySize;
if (observations->count != newSize)
realloc_history(observations, newSize);

Expand All @@ -172,8 +173,8 @@ probe_waits(History *observations, HTAB *profile_hash,
item.pid = proc->pid;
item.wait_event_info = proc->wait_event_info;

if (collector_hdr->profileQueries)
item.queryId = proc_queryids[i];
if (pgws_collector_hdr->profileQueries)
item.queryId = pgws_proc_queryids[i];
else
item.queryId = 0;

Expand Down Expand Up @@ -220,7 +221,7 @@ send_history(History *observations, shm_mq_handle *mqh)
else
count = observations->index;

mq_result = shm_mq_send(mqh, sizeof(count), &count, false);
mq_result = shm_mq_send_compat(mqh, sizeof(count), &count, false, true);
if (mq_result == SHM_MQ_DETACHED)
{
ereport(WARNING,
Expand All @@ -230,10 +231,11 @@ send_history(History *observations, shm_mq_handle *mqh)
}
for (i = 0; i < count; i++)
{
mq_result = shm_mq_send(mqh,
mq_result = shm_mq_send_compat(mqh,
sizeof(HistoryItem),
&observations->items[i],
false);
false,
true);
if (mq_result == SHM_MQ_DETACHED)
{
ereport(WARNING,
Expand All @@ -255,7 +257,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
Size count = hash_get_num_entries(profile_hash);
shm_mq_result mq_result;

mq_result = shm_mq_send(mqh, sizeof(count), &count, false);
mq_result = shm_mq_send_compat(mqh, sizeof(count), &count, false, true);
if (mq_result == SHM_MQ_DETACHED)
{
ereport(WARNING,
Expand All @@ -266,7 +268,8 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
hash_seq_init(&scan_status, profile_hash);
while ((item = (ProfileItem *) hash_seq_search(&scan_status)) != NULL)
{
mq_result = shm_mq_send(mqh, sizeof(ProfileItem), item, false);
mq_result = shm_mq_send_compat(mqh, sizeof(ProfileItem), item, false,
true);
if (mq_result == SHM_MQ_DETACHED)
{
hash_seq_term(&scan_status);
Expand All @@ -289,7 +292,7 @@ make_profile_hash()
hash_ctl.hash = tag_hash;
hash_ctl.hcxt = TopMemoryContext;

if (collector_hdr->profileQueries)
if (pgws_collector_hdr->profileQueries)
hash_ctl.keysize = offsetof(ProfileItem, count);
else
hash_ctl.keysize = offsetof(ProfileItem, queryId);
Expand Down Expand Up @@ -318,7 +321,7 @@ millisecs_diff(TimestampTz tz1, TimestampTz tz2)
* Main routine of wait history collector.
*/
void
collector_main(Datum main_arg)
pgws_collector_main(Datum main_arg)
{
HTAB *profile_hash = NULL;
History observations;
Expand All @@ -337,28 +340,31 @@ collector_main(Datum main_arg)
* any equivalent of the backend's command-read loop, where interrupts can
* be processed immediately, so make sure ImmediateInterruptOK is turned
* off.
*
* We also want to respond to the ProcSignal notifications. This is done
* in the upstream provided procsignal_sigusr1_handler, which is
* automatically used if a bgworker connects to a database. But since our
* worker doesn't connect to any database even though it calls
* InitPostgres, which will still initializze a new backend and thus
* partitipate to the ProcSignal infrastructure.
*/
pqsignal(SIGTERM, handle_sigterm);
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
BackgroundWorkerUnblockSignals();

#if PG_VERSION_NUM >= 110000
InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL, false);
#else
InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL);
#endif
InitPostgresCompat(NULL, InvalidOid, NULL, InvalidOid, false, false, NULL);
SetProcessingMode(NormalProcessing);

/* Make pg_wait_sampling recognisable in pg_stat_activity */
pgstat_report_appname("pg_wait_sampling collector");

profile_hash = make_profile_hash();
collector_hdr->latch = &MyProc->procLatch;
pgws_collector_hdr->latch = &MyProc->procLatch;

CurrentResourceOwner = ResourceOwnerCreate(NULL, "pg_wait_sampling collector");
collector_context = AllocSetContextCreate(TopMemoryContext,
"pg_wait_sampling context", ALLOCSET_DEFAULT_SIZES);
old_context = MemoryContextSwitchTo(collector_context);
alloc_history(&observations, collector_hdr->historySize);
alloc_history(&observations, pgws_collector_hdr->historySize);
MemoryContextSwitchTo(old_context);

ereport(LOG, (errmsg("pg_wait_sampling collector started")));
Expand All @@ -377,21 +383,24 @@ collector_main(Datum main_arg)
bool write_history,
write_profile;

/* We need an explicit call for at least ProcSignal notifications. */
CHECK_FOR_INTERRUPTS();

/* Wait calculate time to next sample for history or profile */
current_ts = GetCurrentTimestamp();

history_diff = millisecs_diff(history_ts, current_ts);
profile_diff = millisecs_diff(profile_ts, current_ts);
history_period = collector_hdr->historyPeriod;
profile_period = collector_hdr->profilePeriod;
history_period = pgws_collector_hdr->historyPeriod;
profile_period = pgws_collector_hdr->profilePeriod;

write_history = (history_diff >= (int64)history_period);
write_profile = (profile_diff >= (int64)profile_period);

if (write_history || write_profile)
{
probe_waits(&observations, profile_hash,
write_history, write_profile, collector_hdr->profilePid);
write_history, write_profile, pgws_collector_hdr->profilePid);

if (write_history)
{
Expand Down Expand Up @@ -430,67 +439,58 @@ collector_main(Datum main_arg)
ResetLatch(&MyProc->procLatch);

/* Handle request if any */
if (collector_hdr->request != NO_REQUEST)
if (pgws_collector_hdr->request != NO_REQUEST)
{
LOCKTAG tag;
SHMRequest request = collector_hdr->request;
SHMRequest request;

init_lock_tag(&tag, PGWS_COLLECTOR_LOCK);
pgws_init_lock_tag(&tag, PGWS_COLLECTOR_LOCK);

LockAcquire(&tag, ExclusiveLock, false, false);
collector_hdr->request = NO_REQUEST;
request = pgws_collector_hdr->request;
pgws_collector_hdr->request = NO_REQUEST;

PG_TRY();
if (request == HISTORY_REQUEST || request == PROFILE_REQUEST)
{
if (request == HISTORY_REQUEST || request == PROFILE_REQUEST)
{
shm_mq_result mq_result;

/* Send history or profile */
shm_mq_set_sender(collector_mq, MyProc);
mqh = shm_mq_attach(collector_mq, NULL, NULL);
mq_result = shm_mq_wait_for_attach(mqh);
switch (mq_result)
{
case SHM_MQ_SUCCESS:
switch (request)
{
case HISTORY_REQUEST:
send_history(&observations, mqh);
break;
case PROFILE_REQUEST:
send_profile(profile_hash, mqh);
break;
default:
AssertState(false);
}
break;
case SHM_MQ_DETACHED:
ereport(WARNING,
(errmsg("pg_wait_sampling collector: "
"receiver of message queue has been "
"detached")));
break;
default:
AssertState(false);
}
shm_mq_detach_compat(mqh, collector_mq);
}
else if (request == PROFILE_RESET)
shm_mq_result mq_result;

/* Send history or profile */
shm_mq_set_sender(pgws_collector_mq, MyProc);
mqh = shm_mq_attach(pgws_collector_mq, NULL, NULL);
mq_result = shm_mq_wait_for_attach(mqh);
switch (mq_result)
{
/* Reset profile hash */
hash_destroy(profile_hash);
profile_hash = make_profile_hash();
case SHM_MQ_SUCCESS:
switch (request)
{
case HISTORY_REQUEST:
send_history(&observations, mqh);
break;
case PROFILE_REQUEST:
send_profile(profile_hash, mqh);
break;
default:
AssertState(false);
}
break;
case SHM_MQ_DETACHED:
ereport(WARNING,
(errmsg("pg_wait_sampling collector: "
"receiver of message queue have been "
"detached")));
break;
default:
AssertState(false);
}

LockRelease(&tag, ExclusiveLock, false);
shm_mq_detach_compat(mqh, pgws_collector_mq);
}
PG_CATCH();
else if (request == PROFILE_RESET)
{
LockRelease(&tag, ExclusiveLock, false);
PG_RE_THROW();
/* Reset profile hash */
hash_destroy(profile_hash);
profile_hash = make_profile_hash();
}
PG_END_TRY();
LockRelease(&tag, ExclusiveLock, false);
}
}

Expand Down
Loading

0 comments on commit 034456d

Please sign in to comment.