diff --git a/Makefile b/Makefile index 32711a3..f9de6d9 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ MODULE_big = pg_wait_sampling OBJS = pg_wait_sampling.o collector.o EXTENSION = pg_wait_sampling -DATA = pg_wait_sampling--1.1.sql pg_wait_sampling--1.0--1.1.sql +DATA = pg_wait_sampling--1.1.sql pg_wait_sampling--1.0--1.1.sql pg_wait_sampling--1.1--1.2.sql REGRESS = load queries diff --git a/README.md b/README.md index bbdbd20..50e742e 100644 --- a/README.md +++ b/README.md @@ -9,13 +9,13 @@ Introduction PostgreSQL provides information about current wait event of particular process. However, in order to gather descriptive statistics of server -behavior user have to sample current wait event multiple times. +behavior users have to sample current wait events multiple times. `pg_wait_sampling` is an extension for collecting sampling statistics of wait events. The module must be loaded by adding `pg_wait_sampling` to `shared_preload_libraries` in postgresql.conf, because it requires additional -shared memory and launches background worker. This means that a server restart +shared memory and launches a background worker. This means that a server restart is needed to add or remove the module. When used with `pg_stat_statements` it is recommended to put `pg_stat_statements` @@ -25,17 +25,16 @@ utility statements are not rewritten by the former. When `pg_wait_sampling` is enabled, it collects two kinds of statistics. * History of waits events. It's implemented as in-memory ring buffer where - samples of each process wait events are written with given (configurable) + samples of each process' wait events are written with given (configurable) period. Therefore, for each running process user can see some number of - recent samples depending on history size (configurable). Assuming there is - a client who periodically read this history and dump it somewhere, user - can have continuous history. - * Waits profile. It's implemented as in-memory hash table where count - of samples are accumulated per each process and each wait event - (and each query with `pg_stat_statements`). This hash - table can be reset by user request. Assuming there is a client who - periodically dumps profile and resets it, user can have statistics of - intensivity of wait events among time. + recent samples depending on history size (configurable). Assuming there is + a client who periodically reads this history and dumps it somewhere, user + can have continuous history of wait events. + * Waits profile. It's implemented as in-memory hash table where samples + are accumulated per each wait event and can be divided by process, wait event, + query and other dimensions. This hash table can be reset by user request. + Assuming there is a client who periodically dumps profile and resets it, + user can have statistics of wait events over time. In combination with `pg_stat_statements` this extension can also provide per query statistics. @@ -66,10 +65,10 @@ Manual build higher. Before build and install you should ensure following: * PostgreSQL version is 13 or higher. - * You have development package of PostgreSQL installed or you built + * You have development package of PostgreSQL installed or you have built PostgreSQL from source. * Your PATH variable is configured so that `pg_config` command available, or - set PG_CONFIG variable. + PG_CONFIG variable is set. Typical installation procedure may look like this: @@ -98,14 +97,23 @@ Usage `pg_wait_sampling` interacts with user by set of views and functions. `pg_wait_sampling_current` view – information about current wait events for -all processed including background workers. - -| Column name | Column type | Description | -| ----------- | ----------- | ----------------------- | -| pid | int4 | Id of process | -| event_type | text | Name of wait event type | -| event | text | Name of wait event | -| queryid | int8 | Id of query | +all processes including background workers. + +| Column name | Column type | Description | +| ------------------- | ----------- | --------------------------- | +| pid | int4 | Id of process | +| event_type | text | Name of wait event type | +| event | text | Name of wait event | +| queryid | int8 | Id of query | +| role_id | int4 | Id of role | +| database_id | int4 | Id of database | +| parallel_leader_pid | int4 | Id of parallel query leader | +| backend_type | text | Name of backend type | +| backend_state | text | Name of backend state | +| proc_start | timestamptz | Timestamp of process start | +| client_addr | text | Client address | +| client_hostname | text | Client hostname | +| appname | text | Application name | `pg_wait_sampling_get_current(pid int4)` returns the same table for single given process. @@ -113,38 +121,60 @@ process. `pg_wait_sampling_history` view – history of wait events obtained by sampling into in-memory ring buffer. -| Column name | Column type | Description | -| ----------- | ----------- | ----------------------- | -| pid | int4 | Id of process | -| ts | timestamptz | Sample timestamp | -| event_type | text | Name of wait event type | -| event | text | Name of wait event | -| queryid | int8 | Id of query | +| Column name | Column type | Description | +| ------------------- | ----------- | --------------------------- | +| pid | int4 | Id of process | +| event_type | text | Name of wait event type | +| event | text | Name of wait event | +| queryid | int8 | Id of query | +| role_id | int4 | Id of role | +| database_id | int4 | Id of database | +| parallel_leader_pid | int4 | Id of parallel query leader | +| backend_type | text | Name of backend type | +| backend_state | text | Name of backend state | +| proc_start | timestamptz | Timestamp of process start | +| client_addr | text | Client address | +| client_hostname | text | Client hostname | +| appname | text | Application name | +| ts | timestamptz | Sample timestamp | + +`pg_wait_sampling_reset_history()` function resets the history. `pg_wait_sampling_profile` view – profile of wait events obtained by sampling into in-memory hash table. -| Column name | Column type | Description | -| ----------- | ----------- | ----------------------- | -| pid | int4 | Id of process | -| event_type | text | Name of wait event type | -| event | text | Name of wait event | -| queryid | int8 | Id of query | -| count | text | Count of samples | +| Column name | Column type | Description | +| ------------------- | ----------- | --------------------------- | +| pid | int4 | Id of process | +| event_type | text | Name of wait event type | +| event | text | Name of wait event | +| queryid | int8 | Id of query | +| role_id | int4 | Id of role | +| database_id | int4 | Id of database | +| parallel_leader_pid | int4 | Id of parallel query leader | +| backend_type | text | Name of backend type | +| backend_state | text | Name of backend state | +| proc_start | timestamptz | Timestamp of process start | +| client_addr | text | Client address | +| client_hostname | text | Client hostname | +| appname | text | Application name | +| count | int8 | Count of samples | `pg_wait_sampling_reset_profile()` function resets the profile. The work of wait event statistics collector worker is controlled by following GUCs. -| Parameter name | Data type | Description | Default value | -|----------------------------------| --------- |---------------------------------------------|--------------:| -| pg_wait_sampling.history_size | int4 | Size of history in-memory ring buffer | 5000 | -| pg_wait_sampling.history_period | int4 | Period for history sampling in milliseconds | 10 | -| pg_wait_sampling.profile_period | int4 | Period for profile sampling in milliseconds | 10 | -| pg_wait_sampling.profile_pid | bool | Whether profile should be per pid | true | -| pg_wait_sampling.profile_queries | enum | Whether profile should be per query | top | -| pg_wait_sampling.sample_cpu | bool | Whether on CPU backends should be sampled | true | +| Parameter name | Data type | Description | Default value | +|-------------------------------------| --------- |---------------------------------------------|----------------------------------------------| +| pg_wait_sampling.history_size | int4 | Size of history in-memory ring buffer | 5000 | +| pg_wait_sampling.history_period | int4 | Period for history sampling in milliseconds | 10 | +| pg_wait_sampling.profile_period | int4 | Period for profile sampling in milliseconds | 10 | +| pg_wait_sampling.profile_pid | bool | Whether profile should be per pid | true | +| pg_wait_sampling.profile_queries | enum | Whether profile should be per query | top | +| pg_wait_sampling.sample_cpu | bool | Whether on CPU backends should be sampled | true | +| pg_wait_sampling.history_dimensions | text | Additional columns in extended history view | 'pid, wait_event_type, wait_event, query_id' | +| pg_wait_sampling.profile_dimensions | text | Additional columns in extended profile view | 'pid, wait_event_type, wait_event, query_id' | If `pg_wait_sampling.profile_pid` is set to false, sampling profile wouldn't be collected in per-process manner. In this case the value of pid could would @@ -158,10 +188,28 @@ If `pg_wait_sampling.sample_cpu` is set to true then processes that are not waiting on anything are also sampled. The wait event columns for such processes will be NULL. +`pg_wait_sampling.history_dimenstions` and `pg_wait_sampling.profile_dimensions` +determine what additional columns will be sampled in `history/profile_extended` +views. Possible values are `all`, `pid`, `wait_event_type`, `wait_event`, +`query_id`, `role_id`, `database_id`, `parallel_leader_pid`, `backend_type`, +`backend_state`, `backend_start_time`, `client_addr`, `client_hostname`, +`appname` and any combination of column names. +`all` cannot be used together with any other values and must be used alone. + +> [!WARNING] +> Turning on any of the following columns: `backend_type`, `backend_state`, +> `backend_start_time`, `client_addr`, `client_hostname`, `appname` will reduce +> performance compared to sampling none of those due to the need to look into +> BackendStatusTable. This is especially noticeable with PostgreSQL 13-16 + Values of these GUC variables can be changed only in config file or with ALTER SYSTEM. -Then you need to reload server's configuration (such as with pg_reload_conf function) +Then you need to reload server's configuration (such as with `pg_reload_conf` function) for changes to take effect. +> [!WARNING] +> When using `pg_reload_conf` you also need to invoke `pg_wait_sampling_reset_history()` +> and `pg_wait_sampling_reset_profile()` for correct application of new dimensions + See [PostgreSQL documentation](http://www.postgresql.org/docs/devel/static/monitoring-stats.html#WAIT-EVENT-TABLE) for list of possible wait events. @@ -170,7 +218,7 @@ Contribution ------------ Please, notice, that `pg_wait_sampling` is still under development and while -it's stable and tested, it may contains some bugs. Don't hesitate to raise +it's stable and tested, it may contain some bugs. Don't hesitate to raise [issues at github](https://github.com/postgrespro/pg_wait_sampling/issues) with your bug reports. diff --git a/collector.c b/collector.c index 721299f..ec03fee 100644 --- a/collector.c +++ b/collector.c @@ -10,6 +10,7 @@ #include "postgres.h" #include +#include #include "compat.h" #include "miscadmin.h" @@ -30,8 +31,18 @@ #include "utils/resowner.h" #include "utils/timestamp.h" +#define check_bestatus_dimensions(dimensions) \ + (dimensions & (PGWS_DIMENSIONS_BE_TYPE |\ + PGWS_DIMENSIONS_BE_STATE |\ + PGWS_DIMENSIONS_BE_START_TIME |\ + PGWS_DIMENSIONS_CLIENT_ADDR |\ + PGWS_DIMENSIONS_CLIENT_HOSTNAME |\ + PGWS_DIMENSIONS_APPNAME)) static volatile sig_atomic_t shutdown_requested = false; +int saved_profile_dimensions; +int saved_history_dimensions; + static void handle_sigterm(SIGNAL_ARGS); /* @@ -61,7 +72,12 @@ pgws_register_wait_collector(void) static void alloc_history(History *observations, int count) { - observations->items = (HistoryItem *) palloc0(sizeof(HistoryItem) * count); + int serialized_size; + + saved_history_dimensions = pgws_history_dimensions; + serialized_size = get_serialized_size(saved_history_dimensions, true); + + observations->serialized_items = (char *) palloc0(serialized_size * count); observations->index = 0; observations->count = count; observations->wraparound = false; @@ -73,13 +89,16 @@ alloc_history(History *observations, int count) static void realloc_history(History *observations, int count) { - HistoryItem *newitems; + char *newitems; int copyCount, i, j; + int serialized_size; + + serialized_size = get_serialized_size(saved_history_dimensions, true); /* Allocate new array for history */ - newitems = (HistoryItem *) palloc0(sizeof(HistoryItem) * count); + newitems = (char *) palloc0(serialized_size * count); /* Copy entries from old array to the new */ if (observations->wraparound) @@ -98,14 +117,16 @@ realloc_history(History *observations, int count) { if (j >= observations->count) j = 0; - memcpy(&newitems[i], &observations->items[j], sizeof(HistoryItem)); + memcpy((newitems + i * serialized_size), + (observations->serialized_items + j * serialized_size), + serialized_size); i++; j++; } /* Switch to new history array */ - pfree(observations->items); - observations->items = newitems; + pfree(observations->serialized_items); + observations->serialized_items = newitems; observations->index = copyCount; observations->count = count; observations->wraparound = false; @@ -125,10 +146,11 @@ handle_sigterm(SIGNAL_ARGS) /* * Get next item of history with rotation. */ -static HistoryItem * +static char * get_next_observation(History *observations) { - HistoryItem *result; + char *result; + int serialized_size = get_serialized_size(saved_history_dimensions, true); /* Check for wraparound */ if (observations->index >= observations->count) @@ -136,11 +158,404 @@ get_next_observation(History *observations) observations->index = 0; observations->wraparound = true; } - result = &observations->items[observations->index]; + result = &observations->serialized_items[observations->index * serialized_size]; observations->index++; return result; } +void +fill_dimensions(SamplingDimensions *dimensions, PGPROC *proc, + int pid, uint32 wait_event_info, uint64 queryId, + int dimensions_mask) +{ + Oid role_id = proc->roleId; + Oid database_id = proc->databaseId; + PGPROC *lockGroupLeader = proc->lockGroupLeader; +#if PG_VERSION_NUM >= 180000 + bool is_regular_backend = proc->isRegularBackend; +#else + bool is_regular_backend = !proc->isBackgroundWorker; +#endif + + if (dimensions_mask & PGWS_DIMENSIONS_PID) + dimensions->pid = pid; + + if (dimensions_mask & PGWS_DIMENSIONS_WAIT_EVENT_TYPE || + dimensions_mask & PGWS_DIMENSIONS_WAIT_EVENT) + dimensions->wait_event_info = wait_event_info; + + if (pgws_profileQueries || (dimensions_mask & PGWD_DIMENSIONS_QUERY_ID)) + dimensions->queryId = queryId; + + /* Copy everything we need from PGPROC */ + if (dimensions_mask & PGWS_DIMENSIONS_ROLE_ID) + dimensions->role_id = role_id; + + if (dimensions_mask & PGWS_DIMENSIONS_DB_ID) + dimensions->database_id = database_id; + + if (dimensions_mask & PGWS_DIMENSIONS_PARALLEL_LEADER_PID) + dimensions->parallel_leader_pid = (lockGroupLeader ? + lockGroupLeader->pid : + 0); + + if (dimensions_mask & PGWS_DIMENSIONS_IS_REGULAR_BE) + dimensions->is_regular_backend = is_regular_backend; + + /* Look into BackendStatus only if necessary */ + if (check_bestatus_dimensions(dimensions_mask)) + { +#if PG_VERSION_NUM >= 170000 + PgBackendStatus *bestatus = pgstat_get_beentry_by_proc_number(GetNumberFromPGProc(proc)); +#else + PgBackendStatus *bestatus = get_beentry_by_procpid(proc->pid); +#endif + /* Copy everything we need from BackendStatus */ + if (bestatus) + { + if (dimensions_mask & PGWS_DIMENSIONS_BE_TYPE) + dimensions->backend_type = bestatus->st_backendType; + + if (dimensions_mask & PGWS_DIMENSIONS_BE_STATE) + dimensions->backend_state = bestatus->st_state; + + if (dimensions_mask & PGWS_DIMENSIONS_BE_START_TIME) + dimensions->proc_start = bestatus->st_proc_start_timestamp; + + if (dimensions_mask & PGWS_DIMENSIONS_CLIENT_ADDR) + dimensions->client_addr = bestatus->st_clientaddr; + + if (dimensions_mask & PGWS_DIMENSIONS_CLIENT_HOSTNAME) + strcpy(dimensions->client_hostname, bestatus->st_clienthostname); + + if (dimensions_mask & PGWS_DIMENSIONS_APPNAME) + strcpy(dimensions->appname, bestatus->st_appname); + } + } +} + +static void +copy_dimensions (SamplingDimensions *dst, SamplingDimensions *src, + int dst_dimensions_mask) +{ + if (dst_dimensions_mask & PGWS_DIMENSIONS_PID) + dst->pid = src->pid; + + if (dst_dimensions_mask & PGWS_DIMENSIONS_WAIT_EVENT_TYPE || + dst_dimensions_mask & PGWS_DIMENSIONS_WAIT_EVENT) + dst->wait_event_info = src->wait_event_info; + + if (dst_dimensions_mask & PGWD_DIMENSIONS_QUERY_ID) + dst->queryId = src->queryId; + + if (dst_dimensions_mask & PGWD_DIMENSIONS_QUERY_ID) + dst->role_id = src->role_id; + + if (dst_dimensions_mask & PGWS_DIMENSIONS_DB_ID) + dst->database_id = src->database_id; + + if (dst_dimensions_mask & PGWS_DIMENSIONS_PARALLEL_LEADER_PID) + dst->parallel_leader_pid = src->parallel_leader_pid; + + if (dst_dimensions_mask & PGWS_DIMENSIONS_IS_REGULAR_BE) + dst->is_regular_backend = src->is_regular_backend; + + if (dst_dimensions_mask & PGWS_DIMENSIONS_BE_TYPE) + dst->backend_type = src->backend_type; + + if (dst_dimensions_mask & PGWS_DIMENSIONS_BE_STATE) + dst->backend_state = src->backend_state; + + if (dst_dimensions_mask & PGWS_DIMENSIONS_BE_START_TIME) + dst->proc_start = src->proc_start; + + if (dst_dimensions_mask & PGWS_DIMENSIONS_CLIENT_ADDR) + dst->client_addr = src->client_addr; + + if (dst_dimensions_mask & PGWS_DIMENSIONS_CLIENT_HOSTNAME) + strcpy(dst->client_hostname, src->client_hostname); + + if (dst_dimensions_mask & PGWS_DIMENSIONS_APPNAME) + strcpy(dst->appname, src->appname); +} + +int +get_serialized_size(int dimensions_mask, bool need_last_field) +{ + int serialized_size = 0; + SamplingDimensions dimensions = {0}; /* Used only for sizeof */ + + if (dimensions_mask & PGWS_DIMENSIONS_PID) + serialized_size += sizeof(dimensions.pid); + if (dimensions_mask & PGWS_DIMENSIONS_WAIT_EVENT_TYPE || + dimensions_mask & PGWS_DIMENSIONS_WAIT_EVENT) + serialized_size += sizeof(dimensions.wait_event_info); + if (dimensions_mask & PGWD_DIMENSIONS_QUERY_ID) + serialized_size += sizeof(dimensions.queryId); + if (dimensions_mask & PGWS_DIMENSIONS_ROLE_ID) + serialized_size += sizeof(dimensions.role_id); + if (dimensions_mask & PGWS_DIMENSIONS_DB_ID) + serialized_size += sizeof(dimensions.database_id); + if (dimensions_mask & PGWS_DIMENSIONS_PARALLEL_LEADER_PID) + serialized_size += sizeof(dimensions.parallel_leader_pid); + if (dimensions_mask & PGWS_DIMENSIONS_IS_REGULAR_BE) + serialized_size += sizeof(dimensions.is_regular_backend); + if (dimensions_mask & PGWS_DIMENSIONS_BE_TYPE) + serialized_size += sizeof(dimensions.backend_type); + if (dimensions_mask & PGWS_DIMENSIONS_BE_STATE) + serialized_size += sizeof(dimensions.backend_state); + if (dimensions_mask & PGWS_DIMENSIONS_BE_START_TIME) + serialized_size += sizeof(dimensions.proc_start); + if (dimensions_mask & PGWS_DIMENSIONS_CLIENT_ADDR) + serialized_size += sizeof(dimensions.client_addr); + if (dimensions_mask & PGWS_DIMENSIONS_CLIENT_HOSTNAME) + serialized_size += sizeof(dimensions.client_hostname); + if (dimensions_mask & PGWS_DIMENSIONS_APPNAME) + serialized_size += sizeof(dimensions.appname); + /* timestamp of history and count of profile are both 8 bytes */ + if (need_last_field) + serialized_size += sizeof(uint64); + return serialized_size; +} + +static void +serialize_item(SamplingDimensions dimensions, int dimensions_mask, + char **serialized_item, char **serialized_key, int *serialized_size, + TimestampTz ts, uint64 count, bool is_history) +{ + char dummy_array[sizeof(SamplingDimensions) + sizeof(uint64) + 1]; + + memset(dummy_array, 0, sizeof(dummy_array)); + + if (dimensions_mask & PGWS_DIMENSIONS_PID) + { + memcpy(dummy_array + *serialized_size, &dimensions.pid, + sizeof(dimensions.pid)); + *serialized_size += sizeof(dimensions.pid); + } + + if (dimensions_mask & PGWS_DIMENSIONS_WAIT_EVENT_TYPE || + dimensions_mask & PGWS_DIMENSIONS_WAIT_EVENT) + { + memcpy(dummy_array + *serialized_size, &dimensions.wait_event_info, + sizeof(dimensions.wait_event_info)); + *serialized_size += sizeof(dimensions.wait_event_info); + } + + if (dimensions_mask & PGWD_DIMENSIONS_QUERY_ID) + { + memcpy(dummy_array + *serialized_size, &dimensions.queryId, + sizeof(dimensions.queryId)); + *serialized_size += sizeof(dimensions.queryId); + } + + if (dimensions_mask & PGWS_DIMENSIONS_ROLE_ID) + { + memcpy(dummy_array + *serialized_size, &dimensions.role_id, + sizeof(dimensions.role_id)); + *serialized_size += sizeof(dimensions.role_id); + } + + if (dimensions_mask & PGWS_DIMENSIONS_DB_ID) + { + memcpy(dummy_array + *serialized_size, &dimensions.database_id, + sizeof(dimensions.database_id)); + *serialized_size += sizeof(dimensions.database_id); + } + + if (dimensions_mask & PGWS_DIMENSIONS_PARALLEL_LEADER_PID) + { + memcpy(dummy_array + *serialized_size, &dimensions.parallel_leader_pid, + sizeof(dimensions.parallel_leader_pid)); + *serialized_size += sizeof(dimensions.parallel_leader_pid); + } + + if (dimensions_mask & PGWS_DIMENSIONS_IS_REGULAR_BE) + { + memcpy(dummy_array + *serialized_size, &dimensions.is_regular_backend, + sizeof(dimensions.is_regular_backend)); + *serialized_size += sizeof(dimensions.is_regular_backend); + } + + if (dimensions_mask & PGWS_DIMENSIONS_BE_TYPE) + { + memcpy(dummy_array + *serialized_size, &dimensions.backend_type, + sizeof(dimensions.backend_type)); + *serialized_size += sizeof(dimensions.backend_type); + } + + if (dimensions_mask & PGWS_DIMENSIONS_BE_STATE) + { + memcpy(dummy_array + *serialized_size, &dimensions.backend_state, + sizeof(dimensions.backend_state)); + *serialized_size += sizeof(dimensions.backend_state); + } + + if (dimensions_mask & PGWS_DIMENSIONS_BE_START_TIME) + { + memcpy(dummy_array + *serialized_size, &dimensions.proc_start, + sizeof(dimensions.proc_start)); + *serialized_size += sizeof(dimensions.proc_start); + } + + if (dimensions_mask & PGWS_DIMENSIONS_CLIENT_ADDR) + { + memcpy(dummy_array + *serialized_size, &dimensions.client_addr, + sizeof(dimensions.client_addr)); + *serialized_size += sizeof(dimensions.client_addr); + } + + if (dimensions_mask & PGWS_DIMENSIONS_CLIENT_HOSTNAME) + { + memcpy(dummy_array + *serialized_size, &dimensions.client_hostname, + sizeof(dimensions.client_hostname)); + *serialized_size += sizeof(dimensions.client_hostname); + } + + if (dimensions_mask & PGWS_DIMENSIONS_APPNAME) + { + memcpy(dummy_array + *serialized_size, &dimensions.appname, + sizeof(dimensions.appname)); + *serialized_size += sizeof(dimensions.appname); + } + + /* copy all the fields without ts/count */ + *serialized_key = palloc0(*serialized_size); + memcpy(*serialized_key, dummy_array, *serialized_size); + + if (is_history) + { + memcpy(dummy_array + *serialized_size, &ts, + sizeof(TimestampTz)); + *serialized_size += sizeof(TimestampTz); + } + else + { + memcpy(dummy_array + *serialized_size, &count, + sizeof(uint64)); + *serialized_size += sizeof(uint64); + } + + /* copy everything */ + *serialized_item = palloc0(*serialized_size); + memcpy(*serialized_item, dummy_array, *serialized_size); +} + +void +deserialize_item(SamplingDimensions *dimensions, char *serialized_item, + int dimensions_mask, TimestampTz *ts, uint64 *count) +{ + int idx = 0; + + memset(dimensions, 0, sizeof(SamplingDimensions)); + + if (dimensions_mask & PGWS_DIMENSIONS_PID) + { + memcpy(&dimensions->pid, serialized_item + idx, + sizeof(dimensions->pid)); + idx += sizeof(dimensions->pid); + } + + if (dimensions_mask & PGWS_DIMENSIONS_WAIT_EVENT_TYPE || + dimensions_mask & PGWS_DIMENSIONS_WAIT_EVENT) + { + memcpy(&dimensions->wait_event_info, serialized_item + idx, + sizeof(dimensions->wait_event_info)); + idx += sizeof(dimensions->wait_event_info); + } + + if (dimensions_mask & PGWD_DIMENSIONS_QUERY_ID) + { + memcpy(&dimensions->queryId, serialized_item + idx, + sizeof(dimensions->queryId)); + idx += sizeof(dimensions->queryId); + } + + if (dimensions_mask & PGWS_DIMENSIONS_ROLE_ID) + { + memcpy(&dimensions->role_id, serialized_item + idx, + sizeof(dimensions->role_id)); + idx += sizeof(dimensions->role_id); + } + + if (dimensions_mask & PGWS_DIMENSIONS_DB_ID) + { + memcpy(&dimensions->database_id, serialized_item + idx, + sizeof(dimensions->database_id)); + idx += sizeof(dimensions->database_id); + } + + if (dimensions_mask & PGWS_DIMENSIONS_PARALLEL_LEADER_PID) + { + memcpy(&dimensions->parallel_leader_pid, serialized_item + idx, + sizeof(dimensions->parallel_leader_pid)); + idx += sizeof(dimensions->parallel_leader_pid); + } + + if (dimensions_mask & PGWS_DIMENSIONS_IS_REGULAR_BE) + { + memcpy(&dimensions->is_regular_backend, serialized_item + idx, + sizeof(dimensions->is_regular_backend)); + idx += sizeof(dimensions->is_regular_backend); + } + + if (dimensions_mask & PGWS_DIMENSIONS_BE_TYPE) + { + memcpy(&dimensions->backend_type, serialized_item + idx, + sizeof(dimensions->backend_type)); + idx += sizeof(dimensions->backend_type); + } + + if (dimensions_mask & PGWS_DIMENSIONS_BE_STATE) + { + memcpy(&dimensions->backend_state, serialized_item + idx, + sizeof(dimensions->backend_state)); + idx += sizeof(dimensions->backend_state); + } + + if (dimensions_mask & PGWS_DIMENSIONS_BE_START_TIME) + { + memcpy(&dimensions->proc_start, serialized_item + idx, + sizeof(dimensions->proc_start)); + idx += sizeof(dimensions->proc_start); + } + + if (dimensions_mask & PGWS_DIMENSIONS_CLIENT_ADDR) + { + memcpy(&dimensions->client_addr, serialized_item + idx, + sizeof(dimensions->client_addr)); + idx += sizeof(dimensions->client_addr); + } + + if (dimensions_mask & PGWS_DIMENSIONS_CLIENT_HOSTNAME) + { + memcpy(&dimensions->client_hostname, serialized_item + idx, + sizeof(dimensions->client_hostname)); + idx += sizeof(dimensions->client_hostname); + } + + if (dimensions_mask & PGWS_DIMENSIONS_APPNAME) + { + memcpy(&dimensions->appname, serialized_item + idx, + sizeof(dimensions->appname)); + idx += sizeof(dimensions->appname); + } + + if (ts) + { + memcpy(ts, serialized_item + idx, + sizeof(TimestampTz)); + idx += sizeof(TimestampTz); + } + + if (count) + { + memcpy(count, serialized_item + idx, + sizeof(uint64)); + idx += sizeof(uint64); + } +} + /* * Read current waits from backends and write them to history array * and/or profile hash. @@ -162,44 +577,89 @@ probe_waits(History *observations, HTAB *profile_hash, LWLockAcquire(ProcArrayLock, LW_SHARED); for (i = 0; i < ProcGlobal->allProcCount; i++) { - HistoryItem item, - *observation; PGPROC *proc = &ProcGlobal->allProcs[i]; - - if (!pgws_should_sample_proc(proc, &item.pid, &item.wait_event_info)) + int pid; + uint32 wait_event_info; + SamplingDimensions common_dimensions, + history_dimensions, + profile_dimensions; + int dimensions_mask_common = saved_history_dimensions | + saved_profile_dimensions; + + /* Check if we need to sample this process */ + if (!pgws_should_sample_proc(proc, &pid, &wait_event_info)) continue; - if (pgws_profileQueries) - item.queryId = pgws_proc_queryids[i]; - else - item.queryId = 0; + /* + * We zero dimensions with memset to avoid doing it field-by-field + */ + memset(&history_dimensions, 0, sizeof(SamplingDimensions)); + memset(&profile_dimensions, 0, sizeof(SamplingDimensions)); + memset(&common_dimensions, 0, sizeof(SamplingDimensions)); + + fill_dimensions(&common_dimensions, proc, pid, wait_event_info, + pgws_proc_queryids[i], dimensions_mask_common); - item.ts = ts; + copy_dimensions(&history_dimensions, + &common_dimensions, + saved_history_dimensions); + copy_dimensions(&profile_dimensions, + &common_dimensions, + saved_profile_dimensions); /* Write to the history if needed */ if (write_history) { + char *serialized_key, + *serialized_item, + *observation; + int serialized_size = 0; + observation = get_next_observation(observations); - *observation = item; + serialize_item(history_dimensions, saved_history_dimensions, + &serialized_item, &serialized_key, &serialized_size, + ts, (uint64) 0, true); + memcpy(observation, serialized_item, serialized_size); } /* Write to the profile if needed */ if (write_profile) { - ProfileItem *profileItem; - bool found; + bool found; + int serialized_size = 0; + uint64 count = 1; + char *serialized_key, + *serialized_item, + *stored_item; if (!profile_pid) - item.pid = 0; + profile_dimensions.pid = 0; + + serialize_item(profile_dimensions, saved_profile_dimensions, + &serialized_item, &serialized_key, &serialized_size, + (TimestampTz) 0, count, false); + + stored_item = (char *) hash_search(profile_hash, serialized_key, + HASH_ENTER, &found); - profileItem = (ProfileItem *) hash_search(profile_hash, &item, HASH_ENTER, &found); if (found) - profileItem->count++; + { + memcpy(&count, (stored_item + serialized_size - sizeof(uint64)), + sizeof(uint64)); + count++; + memcpy((stored_item + serialized_size - sizeof(uint64)), &count, + sizeof(uint64)); + } else - profileItem->count = 1; + memcpy(stored_item, serialized_item, serialized_size); } } LWLockRelease(ProcArrayLock); +#if PG_VERSION_NUM >= 140000 + pgstat_clear_backend_activity_snapshot(); +#else + pgstat_clear_snapshot(); +#endif } /* @@ -208,6 +668,7 @@ probe_waits(History *observations, HTAB *profile_hash, static void send_history(History *observations, shm_mq_handle *mqh) { + int serialized_size = get_serialized_size(saved_history_dimensions, true); Size count, i; shm_mq_result mq_result; @@ -226,11 +687,20 @@ send_history(History *observations, shm_mq_handle *mqh) "receiver of message queue has been detached"))); return; } + /* Send saved_dimensions next */ + mq_result = shm_mq_send_compat(mqh, sizeof(saved_history_dimensions), &saved_history_dimensions, false, true); + if (mq_result == SHM_MQ_DETACHED) + { + ereport(WARNING, + (errmsg("pg_wait_sampling collector: " + "receiver of message queue has been detached"))); + return; + } for (i = 0; i < count; i++) { mq_result = shm_mq_send_compat(mqh, - sizeof(HistoryItem), - &observations->items[i], + serialized_size, + (observations->serialized_items + i * serialized_size), false, true); if (mq_result == SHM_MQ_DETACHED) @@ -250,7 +720,8 @@ static void send_profile(HTAB *profile_hash, shm_mq_handle *mqh) { HASH_SEQ_STATUS scan_status; - ProfileItem *item; + char *serialized_item; + int serialized_size = get_serialized_size(saved_profile_dimensions, true); Size count = hash_get_num_entries(profile_hash); shm_mq_result mq_result; @@ -263,10 +734,19 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh) "receiver of message queue has been detached"))); return; } + /* Send saved_dimensions next */ + mq_result = shm_mq_send_compat(mqh, sizeof(saved_profile_dimensions), &saved_profile_dimensions, false, true); + if (mq_result == SHM_MQ_DETACHED) + { + ereport(WARNING, + (errmsg("pg_wait_sampling collector: " + "receiver of message queue has been detached"))); + return; + } hash_seq_init(&scan_status, profile_hash); - while ((item = (ProfileItem *) hash_seq_search(&scan_status)) != NULL) + while ((serialized_item = (char *) hash_seq_search(&scan_status)) != NULL) { - mq_result = shm_mq_send_compat(mqh, sizeof(ProfileItem), item, false, + mq_result = shm_mq_send_compat(mqh, serialized_size, serialized_item, false, true); if (mq_result == SHM_MQ_DETACHED) { @@ -287,12 +767,17 @@ make_profile_hash() { HASHCTL hash_ctl; - if (pgws_profileQueries) - hash_ctl.keysize = offsetof(ProfileItem, count); - else - hash_ctl.keysize = offsetof(ProfileItem, queryId); + saved_profile_dimensions = pgws_profile_dimensions; + + /* + * Since adding additional dimensions we use SamplingDimensions as + * hashtable key. This is fine for cases when some fields are 0 since + * it doesn't impede our ability to search the hash table for entries + */ + hash_ctl.keysize = get_serialized_size(saved_profile_dimensions, false); + /* entry includes SamplingDimensions and ts/count */ + hash_ctl.entrysize = get_serialized_size(saved_profile_dimensions, true); - hash_ctl.entrysize = sizeof(ProfileItem); return hash_create("Waits profile hash", 1024, &hash_ctl, HASH_ELEM | HASH_BLOBS); } @@ -469,6 +954,12 @@ pgws_collector_main(Datum main_arg) } shm_mq_detach(mqh); } + else if (request == HISTORY_RESET) + { + /* Reset history */ + pfree(observations.items); + alloc_history(&observations, pgws_historySize); + } else if (request == PROFILE_RESET) { /* Reset profile hash */ diff --git a/expected/load.out b/expected/load.out index b7de0ac..94e9075 100644 --- a/expected/load.out +++ b/expected/load.out @@ -1,31 +1,61 @@ CREATE EXTENSION pg_wait_sampling; \d pg_wait_sampling_current -View "public.pg_wait_sampling_current" - Column | Type | Modifiers -------------+---------+----------- - pid | integer | - event_type | text | - event | text | - queryid | bigint | + View "public.pg_wait_sampling_current" + Column | Type | Collation | Nullable | Default +---------------------+--------------------------+-----------+----------+--------- + pid | integer | | | + event_type | text | | | + event | text | | | + queryid | bigint | | | + role_id | bigint | | | + database_id | bigint | | | + parallel_leader_pid | integer | | | + is_regular_backend | boolean | | | + backend_type | text | | | + backend_state | text | | | + proc_start | timestamp with time zone | | | + client_addr | text | | | + client_hostname | text | | | + appname | text | | | \d pg_wait_sampling_history - View "public.pg_wait_sampling_history" - Column | Type | Modifiers -------------+--------------------------+----------- - pid | integer | - ts | timestamp with time zone | - event_type | text | - event | text | - queryid | bigint | + View "public.pg_wait_sampling_history" + Column | Type | Collation | Nullable | Default +---------------------+--------------------------+-----------+----------+--------- + pid | integer | | | + event_type | text | | | + event | text | | | + queryid | bigint | | | + role_id | bigint | | | + database_id | bigint | | | + parallel_leader_pid | integer | | | + is_regular_backend | boolean | | | + backend_type | text | | | + backend_state | text | | | + proc_start | timestamp with time zone | | | + client_addr | text | | | + client_hostname | text | | | + appname | text | | | + ts | timestamp with time zone | | | \d pg_wait_sampling_profile -View "public.pg_wait_sampling_profile" - Column | Type | Modifiers -------------+---------+----------- - pid | integer | - event_type | text | - event | text | - queryid | bigint | - count | bigint | + View "public.pg_wait_sampling_profile" + Column | Type | Collation | Nullable | Default +---------------------+--------------------------+-----------+----------+--------- + pid | integer | | | + event_type | text | | | + event | text | | | + queryid | bigint | | | + role_id | bigint | | | + database_id | bigint | | | + parallel_leader_pid | integer | | | + is_regular_backend | boolean | | | + backend_type | text | | | + backend_state | text | | | + proc_start | timestamp with time zone | | | + client_addr | text | | | + client_hostname | text | | | + appname | text | | | + count | bigint | | | DROP EXTENSION pg_wait_sampling; diff --git a/expected/load_1.out b/expected/load_1.out deleted file mode 100644 index 1a1358a..0000000 --- a/expected/load_1.out +++ /dev/null @@ -1,31 +0,0 @@ -CREATE EXTENSION pg_wait_sampling; -\d pg_wait_sampling_current - View "public.pg_wait_sampling_current" - Column | Type | Collation | Nullable | Default -------------+---------+-----------+----------+--------- - pid | integer | | | - event_type | text | | | - event | text | | | - queryid | bigint | | | - -\d pg_wait_sampling_history - View "public.pg_wait_sampling_history" - Column | Type | Collation | Nullable | Default -------------+--------------------------+-----------+----------+--------- - pid | integer | | | - ts | timestamp with time zone | | | - event_type | text | | | - event | text | | | - queryid | bigint | | | - -\d pg_wait_sampling_profile - View "public.pg_wait_sampling_profile" - Column | Type | Collation | Nullable | Default -------------+---------+-----------+----------+--------- - pid | integer | | | - event_type | text | | | - event | text | | | - queryid | bigint | | | - count | bigint | | | - -DROP EXTENSION pg_wait_sampling; diff --git a/meson.build b/meson.build index c3c3dc9..162bb0e 100644 --- a/meson.build +++ b/meson.build @@ -24,6 +24,7 @@ install_data( 'pg_wait_sampling.control', 'pg_wait_sampling--1.0--1.1.sql', 'pg_wait_sampling--1.1.sql', + 'pg_wait_sampling--1.1--1.2.sql', kwargs: contrib_data_args, ) diff --git a/pg_wait_sampling--1.1--1.2.sql b/pg_wait_sampling--1.1--1.2.sql new file mode 100644 index 0000000..e9499c5 --- /dev/null +++ b/pg_wait_sampling--1.1--1.2.sql @@ -0,0 +1,111 @@ +/* contrib/pg_wait_sampling/pg_wait_sampling--1.1--1.2.sql */ + +-- complain if script is sourced in psql, rather than via ALTER EXTENSION +\echo Use "ALTER EXTENSION pg_wait_sampling UPDATE TO 1.2" to load this file. \quit + +DROP FUNCTION pg_wait_sampling_get_current ( + pid int4, + OUT pid int4, + OUT event_type text, + OUT event text +) CASCADE; + +DROP FUNCTION pg_wait_sampling_get_history ( + OUT pid int4, + OUT ts timestamptz, + OUT event_type text, + OUT event text +) CASCADE; + +DROP FUNCTION pg_wait_sampling_get_profile ( + OUT pid int4, + OUT event_type text, + OUT event text, + OUT count bigint +) CASCADE; + +CREATE FUNCTION pg_wait_sampling_get_current ( + pid int4, + OUT pid int4, + OUT event_type text, + OUT event text, + OUT queryid int8, + OUT role_id int8, + OUT database_id int8, + OUT parallel_leader_pid int4, + OUT is_regular_backend bool, + OUT backend_type text, + OUT backend_state text, + OUT proc_start timestamptz, + OUT client_addr text, + OUT client_hostname text, + OUT appname text +) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'pg_wait_sampling_get_current_1_2' +LANGUAGE C VOLATILE CALLED ON NULL INPUT; + +CREATE VIEW pg_wait_sampling_current AS + SELECT * FROM pg_wait_sampling_get_current(NULL::integer); + +GRANT SELECT ON pg_wait_sampling_current TO PUBLIC; + +CREATE FUNCTION pg_wait_sampling_get_history ( + OUT pid int4, + OUT event_type text, + OUT event text, + OUT queryid int8, + OUT role_id int8, + OUT database_id int8, + OUT parallel_leader_pid int4, + OUT is_regular_backend bool, + OUT backend_type text, + OUT backend_state text, + OUT proc_start timestamptz, + OUT client_addr text, + OUT client_hostname text, + OUT appname text, + OUT ts timestamptz +) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'pg_wait_sampling_get_history_1_2' +LANGUAGE C VOLATILE STRICT; + +CREATE VIEW pg_wait_sampling_history AS + SELECT * FROM pg_wait_sampling_get_history(); + +GRANT SELECT ON pg_wait_sampling_history TO PUBLIC; + +CREATE FUNCTION pg_wait_sampling_get_profile ( + OUT pid int4, + OUT event_type text, + OUT event text, + OUT queryid int8, + OUT role_id int8, + OUT database_id int8, + OUT parallel_leader_pid int4, + OUT is_regular_backend bool, + OUT backend_type text, + OUT backend_state text, + OUT proc_start timestamptz, + OUT client_addr text, + OUT client_hostname text, + OUT appname text, + OUT count int8 +) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'pg_wait_sampling_get_profile_1_2' +LANGUAGE C VOLATILE STRICT; + +CREATE VIEW pg_wait_sampling_profile AS + SELECT * FROM pg_wait_sampling_get_profile(); + +GRANT SELECT ON pg_wait_sampling_profile TO PUBLIC; + +CREATE FUNCTION pg_wait_sampling_reset_history() +RETURNS void +AS 'MODULE_PATHNAME' +LANGUAGE C VOLATILE STRICT; + +-- Don't want this to be available to non-superusers. +REVOKE ALL ON FUNCTION pg_wait_sampling_reset_history() FROM PUBLIC; diff --git a/pg_wait_sampling.c b/pg_wait_sampling.c index e165a6a..6a82bba 100644 --- a/pg_wait_sampling.c +++ b/pg_wait_sampling.c @@ -13,6 +13,7 @@ #include "access/htup_details.h" #include "catalog/pg_type_d.h" +#include "common/ip.h" #include "executor/executor.h" #include "funcapi.h" #include "miscadmin.h" @@ -32,6 +33,7 @@ #include "utils/guc.h" #include "utils/memutils.h" #include "utils/timestamp.h" +#include "utils/varlena.h" #if PG_VERSION_NUM < 150000 #include "postmaster/autovacuum.h" @@ -127,12 +129,30 @@ static const struct config_enum_entry pgws_profile_queries_options[] = {NULL, 0, false} }; +/* like in pg_stat_statements */ +typedef enum pgwsVersion +{ + PGWS_V1_1 = 0, + PGWS_V1_2, +} pgwsVersion; + +Datum pg_wait_sampling_get_current_internal(FunctionCallInfo fcinfo, + pgwsVersion api_version); +Datum pg_wait_sampling_get_profile_internal(FunctionCallInfo fcinfo, + pgwsVersion api_version); +Datum pg_wait_sampling_get_history_internal(FunctionCallInfo fcinfo, + pgwsVersion api_version); + int pgws_historySize = 5000; int pgws_historyPeriod = 10; int pgws_profilePeriod = 10; bool pgws_profilePid = true; int pgws_profileQueries = PGWS_PROFILE_QUERIES_TOP; bool pgws_sampleCpu = true; +static char *pgws_history_dimensions_string = NULL; +static char *pgws_profile_dimensions_string = NULL; +int pgws_history_dimensions; /* bit mask that is derived from GUC */ +int pgws_profile_dimensions; /* bit mask that is derived from GUC */ #define pgws_enabled(level) \ ((pgws_profileQueries == PGWS_PROFILE_QUERIES_ALL) || \ @@ -301,6 +321,117 @@ pgws_cleanup_callback(int code, Datum arg) LockRelease(&queueTag, ExclusiveLock, false); } +/* + * Check tokens of string and fill bitmask accordingly + * Mostly copied from plpgsql_extra_checks_check_hook + */ +static bool +pgws_general_dimensions_check_hook (char **newvalue, void **extra, GucSource source) +{ + char *rawstring; + List *elemlist; + ListCell *l; + int extrachecks = 0; + int *myextra; + + /* Check special case when we turn all dimensions */ + if (pg_strcasecmp(*newvalue, "all") == 0) + extrachecks = PGWS_DIMENSIONS_ALL; + else + { + /* Need a modifiable copy of string */ + rawstring = pstrdup(*newvalue); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawstring, ',', &elemlist)) + { + /* syntax error in list */ + GUC_check_errdetail("List syntax is invalid."); + pfree(rawstring); + list_free(elemlist); + return false; + } + + /* Loop over all recieved options */ + foreach(l, elemlist) + { + char *tok = (char *) lfirst(l); + + /* Process all allowed values */ + if (pg_strcasecmp(tok, "pid") == 0) + extrachecks |= PGWS_DIMENSIONS_PID; + else if (pg_strcasecmp(tok, "wait_event_type") == 0) + extrachecks |= PGWS_DIMENSIONS_WAIT_EVENT_TYPE; + else if (pg_strcasecmp(tok, "wait_event") == 0) + extrachecks |= PGWS_DIMENSIONS_WAIT_EVENT; + else if (pg_strcasecmp(tok, "query_id") == 0) + extrachecks |= PGWD_DIMENSIONS_QUERY_ID; + else if (pg_strcasecmp(tok, "role_id") == 0) + extrachecks |= PGWS_DIMENSIONS_ROLE_ID; + else if (pg_strcasecmp(tok, "database_id") == 0) + extrachecks |= PGWS_DIMENSIONS_DB_ID; + else if (pg_strcasecmp(tok, "parallel_leader_pid") == 0) + extrachecks |= PGWS_DIMENSIONS_PARALLEL_LEADER_PID; + else if (pg_strcasecmp(tok, "is_regular_backend") == 0) + extrachecks |= PGWS_DIMENSIONS_IS_REGULAR_BE; + else if (pg_strcasecmp(tok, "backend_type") == 0) + extrachecks |= PGWS_DIMENSIONS_BE_TYPE; + else if (pg_strcasecmp(tok, "backend_state") == 0) + extrachecks |= PGWS_DIMENSIONS_BE_STATE; + else if (pg_strcasecmp(tok, "backend_start_time") == 0) + extrachecks |= PGWS_DIMENSIONS_BE_START_TIME; + else if (pg_strcasecmp(tok, "client_addr") == 0) + extrachecks |= PGWS_DIMENSIONS_CLIENT_ADDR; + else if (pg_strcasecmp(tok, "client_hostname") == 0) + extrachecks |= PGWS_DIMENSIONS_CLIENT_HOSTNAME; + else if (pg_strcasecmp(tok, "appname") == 0) + extrachecks |= PGWS_DIMENSIONS_APPNAME; + else if (pg_strcasecmp(tok, "all") == 0) + { + GUC_check_errdetail("Key word \"%s\" cannot be combined with other key words.", tok); + pfree(rawstring); + list_free(elemlist); + return false; + } + else + { + GUC_check_errdetail("Unrecognized key word: \"%s\".", tok); + pfree(rawstring); + list_free(elemlist); + return false; + } + } + + pfree(rawstring); + list_free(elemlist); + } +#if PG_VERSION_NUM >= 160000 + myextra = (int *) guc_malloc(LOG, sizeof(int)); +#else + myextra = (int *) malloc(sizeof(int)); +#endif + if (!myextra) + return false; + *myextra = extrachecks; + *extra = myextra; + + return true; +} + +/* Assign actual value to dimension bitmask */ +static void +pgws_history_dimensions_assign_hook (const char *newvalue, void *extra) +{ + pgws_history_dimensions = *((int *) extra); +} + +/* Assign actual value to dimension bitmask */ +static void +pgws_profile_dimensions_assign_hook (const char *newvalue, void *extra) +{ + pgws_profile_dimensions = *((int *) extra); +} + /* * Module load callback */ @@ -421,6 +552,28 @@ _PG_init(void) NULL, NULL); + DefineCustomStringVariable("pg_wait_sampling.history_dimensions", + "Sets sampling dimensions for history", + NULL, + &pgws_history_dimensions_string, + "pid, wait_event_type, wait_event, query_id", + PGC_SIGHUP, + GUC_LIST_INPUT, + pgws_general_dimensions_check_hook, + pgws_history_dimensions_assign_hook, + NULL); + + DefineCustomStringVariable("pg_wait_sampling.profile_dimensions", + "Sets sampling dimensions for profile", + NULL, + &pgws_profile_dimensions_string, + "pid, wait_event_type, wait_event, query_id", + PGC_SIGHUP, + GUC_LIST_INPUT, + pgws_general_dimensions_check_hook, + pgws_profile_dimensions_assign_hook, + NULL); + #if PG_VERSION_NUM >= 150000 MarkGUCPrefixReserved("pg_wait_sampling"); #endif @@ -487,15 +640,291 @@ typedef struct TimestampTz ts; } WaitCurrentContext; +static Datum +GetBackendState(BackendState state, bool *is_null) +{ + switch (state) + { +#if PG_VERSION_NUM >= 180000 + case STATE_STARTING: + return CStringGetTextDatum("starting"); +#endif + case STATE_IDLE: + return CStringGetTextDatum("idle"); + case STATE_RUNNING: + return CStringGetTextDatum("active"); + case STATE_IDLEINTRANSACTION: + return CStringGetTextDatum("idle in transaction"); + case STATE_FASTPATH: + return CStringGetTextDatum("fastpath function call"); + case STATE_IDLEINTRANSACTION_ABORTED: + return CStringGetTextDatum("idle in transaction (aborted)"); + case STATE_DISABLED: + return CStringGetTextDatum("disabled"); + case STATE_UNDEFINED: + *is_null = true; + } + return (Datum) 0; +} + +/* Copied from pg_stat_get_backend_client_addr */ +static Datum +get_backend_client_addr(SockAddr client_addr, bool *is_null) +{ + char remote_host[NI_MAXHOST]; + int ret; + + /* A zeroed client addr means we don't know */ +#if PG_VERSION_NUM >= 180000 + if (pg_memory_is_all_zeros(&client_addr, + sizeof(client_addr))) +#else + SockAddr zero_clientaddr; + + memset(&zero_clientaddr, 0, sizeof(zero_clientaddr)); + if (memcmp(&client_addr, &zero_clientaddr, + sizeof(zero_clientaddr)) == 0) +#endif + { + *is_null = true; + return (Datum) 0; + } + + switch (client_addr.addr.ss_family) + { + case AF_INET: + case AF_INET6: + break; + default: + *is_null = true; + return (Datum) 0; + } + + remote_host[0] = '\0'; + ret = pg_getnameinfo_all(&client_addr.addr, + client_addr.salen, + remote_host, sizeof(remote_host), + NULL, 0, + NI_NUMERICHOST | NI_NUMERICSERV); + if (ret != 0) + { + *is_null = true; + return (Datum) 0; + } + + clean_ipv6_addr(client_addr.addr.ss_family, remote_host); + + return (DirectFunctionCall1(inet_in, CStringGetDatum(remote_host))); +} + +/* + * Needed for PostgreSQL 16 and earlier since there is no good way to get + * PgBackendStatus when having only PGPROC structure. + * + * pgstat_fetch_stat_beentry (13-15) works with indices of localBackendStatusTable + * pgstat_get_beentry_by_backend_id (16) works with "backend_ids", but we still + * cannot get them without looking into LocalPgBackendStatus, so work with indices + * + * This function is very inefficient + * + * Maybe we should just iterate over localBackendStatusTable and somehow get + * PGPROC entries from there but it is up for discussion + */ +PgBackendStatus * +get_beentry_by_procpid(int pid) +{ + int backend_num = pgstat_fetch_stat_numbackends(), cur_be_idx; + + for (cur_be_idx = 1; cur_be_idx <= backend_num; cur_be_idx++) + { + LocalPgBackendStatus *local_beentry; + +#if PG_VERSION_NUM >= 160000 + local_beentry = pgstat_get_local_beentry_by_index(cur_be_idx); +#else + /* Here beid is just index in localBackendStatusTable */ + local_beentry = pgstat_fetch_stat_local_beentry(cur_be_idx); +#endif +#if defined(PGPRO_EE) || defined(PGPRO_STD) && PG_VERSION_NUM >= 160000 + if (local_beentry->backendStatus->st_procpid == pid) + return local_beentry->backendStatus; +#else + if (local_beentry->backendStatus.st_procpid == pid) + { + PgBackendStatus *result = palloc0(sizeof(PgBackendStatus)); + *result = local_beentry->backendStatus; + return result; + } +#endif + } + return NULL; +} + +/* like in pg_stat_statements */ +#define PG_WAIT_SAMPLING_COLS_V1_1 5 +#define PG_WAIT_SAMPLING_COLS_V1_2 15 +#define PG_WAIT_SAMPLING_COLS 15 /* maximum of above */ + +/* + * Common routine to fill "dimensions" part of tupdesc + */ +static void +fill_tuple_desc (TupleDesc tupdesc, pgwsVersion api_version) +{ + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pid", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "type", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "event", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "queryid", + INT8OID, -1, 0); + if (api_version >= PGWS_V1_2) + { + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "role_id", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "database_id", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "parallel_leader_pid", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "is_regular_backend", + BOOLOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "backend_type", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "backend_state", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "proc_start", + TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "client_addr", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "client_hostname", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 14, "appname", + TEXTOID, -1, 0); + } +} + +static void +fill_values_and_nulls(Datum *values, bool *nulls, SamplingDimensions dimensions, + int dimensions_mask, pgwsVersion api_version) +{ + const char *event_type, + *event, + *backend_type; + Datum backend_state, proc_start, client_addr; + bool is_null_be_state = false, + is_null_client_addr = false; + + event_type = pgstat_get_wait_event_type(dimensions.wait_event_info); + event = pgstat_get_wait_event(dimensions.wait_event_info); + backend_type = GetBackendTypeDesc(dimensions.backend_type); + backend_state = GetBackendState(dimensions.backend_state, &is_null_be_state); + proc_start = TimestampTzGetDatum(dimensions.proc_start); + client_addr = get_backend_client_addr(dimensions.client_addr, &is_null_client_addr); + + if (dimensions_mask & PGWS_DIMENSIONS_PID) + values[0] = Int32GetDatum(dimensions.pid); + else + values[0] = (Datum) 0; + if (event_type && (dimensions_mask & PGWS_DIMENSIONS_WAIT_EVENT_TYPE)) + values[1] = PointerGetDatum(cstring_to_text(event_type)); + else + nulls[1] = true; + if (event && (dimensions_mask & PGWS_DIMENSIONS_WAIT_EVENT)) + values[2] = PointerGetDatum(cstring_to_text(event)); + else + nulls[2] = true; + if (pgws_profileQueries || (dimensions_mask & PGWD_DIMENSIONS_QUERY_ID)) + values[3] = UInt64GetDatum(dimensions.queryId); + else + values[3] = (Datum) 0; + if (api_version >= PGWS_V1_2) + { + if (dimensions_mask & PGWS_DIMENSIONS_ROLE_ID) + values[4] = ObjectIdGetDatum(dimensions.role_id); + else + nulls[4] = true; + if (dimensions_mask & PGWS_DIMENSIONS_DB_ID) + values[5] = ObjectIdGetDatum(dimensions.database_id); + else + nulls[5] = true; + if (dimensions_mask & PGWS_DIMENSIONS_PARALLEL_LEADER_PID) + values[6] = Int32GetDatum(dimensions.parallel_leader_pid); + else + nulls[6] = true; + if (dimensions_mask & PGWS_DIMENSIONS_IS_REGULAR_BE) + values[7] = BoolGetDatum(dimensions.is_regular_backend); + else + nulls[7] = true; + if (backend_type && (dimensions_mask & PGWS_DIMENSIONS_BE_TYPE)) + values[8] = PointerGetDatum(cstring_to_text(backend_type)); + else + nulls[8] = true; + if (!is_null_be_state && (dimensions_mask & PGWS_DIMENSIONS_BE_STATE)) + values[9] = backend_state; + else + nulls[9] = true; + if (dimensions_mask & PGWS_DIMENSIONS_BE_START_TIME) + values[10] = proc_start; + else + nulls[10] = true; + if (!is_null_client_addr && (dimensions_mask & PGWS_DIMENSIONS_CLIENT_ADDR)) + values[11] = client_addr; + else + nulls[11] = true; + if (dimensions_mask & PGWS_DIMENSIONS_CLIENT_HOSTNAME) + values[12] = PointerGetDatum(cstring_to_text(dimensions.client_hostname)); + else + nulls[12] = true; + if (dimensions_mask & PGWS_DIMENSIONS_APPNAME) + values[13] = PointerGetDatum(cstring_to_text(dimensions.appname)); + else + nulls[13] = true; + } +} + PG_FUNCTION_INFO_V1(pg_wait_sampling_get_current); Datum pg_wait_sampling_get_current(PG_FUNCTION_ARGS) +{ + return pg_wait_sampling_get_current_internal(fcinfo, PGWS_V1_1); +} + +PG_FUNCTION_INFO_V1(pg_wait_sampling_get_current_1_2); +Datum +pg_wait_sampling_get_current_1_2(PG_FUNCTION_ARGS) +{ + return pg_wait_sampling_get_current_internal(fcinfo, PGWS_V1_2); +} + +Datum +pg_wait_sampling_get_current_internal(FunctionCallInfo fcinfo, + pgwsVersion api_version) { FuncCallContext *funcctx; WaitCurrentContext *params; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; check_shmem(); + /* + * Check we have the expected number of output arguments. Safety check + */ + switch (rsinfo->expectedDesc->natts + 1) + { + case PG_WAIT_SAMPLING_COLS_V1_1: + if (api_version != PGWS_V1_1) + elog(ERROR, "incorrect number of output arguments"); + break; + case PG_WAIT_SAMPLING_COLS_V1_2: + if (api_version != PGWS_V1_2) + elog(ERROR, "incorrect number of output arguments"); + break; + default: + elog(ERROR, "incorrect number of output arguments"); + } + + /* Initialization, done only on the first call */ if (SRF_IS_FIRSTCALL()) { MemoryContext oldcontext; @@ -508,32 +937,29 @@ pg_wait_sampling_get_current(PG_FUNCTION_ARGS) params->ts = GetCurrentTimestamp(); funcctx->user_fctx = params; - tupdesc = CreateTemplateTupleDesc(4); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pid", - INT4OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 2, "type", - TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "event", - TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "queryid", - INT8OID, -1, 0); - + /* Setup tuple desc */ + tupdesc = CreateTemplateTupleDesc(rsinfo->expectedDesc->natts); + fill_tuple_desc (tupdesc, api_version); funcctx->tuple_desc = BlessTupleDesc(tupdesc); LWLockAcquire(ProcArrayLock, LW_SHARED); if (!PG_ARGISNULL(0)) { - /* pg_wait_sampling_get_current(pid int4) function */ - HistoryItem *item; - PGPROC *proc; + /* pg_wait_sampling_get_current_extended(pid int4) function */ + HistoryItem *item; + PGPROC *proc; proc = search_proc(PG_GETARG_UINT32(0)); + params->items = (HistoryItem *) palloc0(sizeof(HistoryItem)); item = ¶ms->items[0]; - item->pid = proc->pid; - item->wait_event_info = proc->wait_event_info; - item->queryId = pgws_proc_queryids[proc - ProcGlobal->allProcs]; + + fill_dimensions(&item->dimensions, proc, proc->pid, + proc->wait_event_info, + pgws_proc_queryids[proc - ProcGlobal->allProcs], + PGWS_DIMENSIONS_ALL); + funcctx->max_calls = 1; } else @@ -546,22 +972,29 @@ pg_wait_sampling_get_current(PG_FUNCTION_ARGS) params->items = (HistoryItem *) palloc0(sizeof(HistoryItem) * procCount); for (i = 0; i < procCount; i++) { - PGPROC *proc = &ProcGlobal->allProcs[i]; + PGPROC *proc = &ProcGlobal->allProcs[i]; if (!pgws_should_sample_proc(proc, - ¶ms->items[j].pid, - ¶ms->items[j].wait_event_info)) + ¶ms->items[j].dimensions.pid, + ¶ms->items[j].dimensions.wait_event_info)) continue; - params->items[j].pid = proc->pid; - params->items[j].wait_event_info = proc->wait_event_info; - params->items[j].queryId = pgws_proc_queryids[i]; + fill_dimensions(¶ms->items[j].dimensions, proc, proc->pid, + proc->wait_event_info, + pgws_proc_queryids[proc - ProcGlobal->allProcs], + PGWS_DIMENSIONS_ALL); + j++; } funcctx->max_calls = j; } LWLockRelease(ProcArrayLock); +#if PG_VERSION_NUM >= 140000 + pgstat_clear_backend_activity_snapshot(); +#else + pgstat_clear_snapshot(); +#endif MemoryContextSwitchTo(oldcontext); } @@ -573,10 +1006,8 @@ pg_wait_sampling_get_current(PG_FUNCTION_ARGS) if (funcctx->call_cntr < funcctx->max_calls) { HeapTuple tuple; - Datum values[4]; - bool nulls[4]; - const char *event_type, - *event; + Datum values[PG_WAIT_SAMPLING_COLS - 1]; + bool nulls[PG_WAIT_SAMPLING_COLS - 1]; HistoryItem *item; item = ¶ms->items[funcctx->call_cntr]; @@ -585,19 +1016,8 @@ pg_wait_sampling_get_current(PG_FUNCTION_ARGS) MemSet(values, 0, sizeof(values)); MemSet(nulls, 0, sizeof(nulls)); - event_type = pgstat_get_wait_event_type(item->wait_event_info); - event = pgstat_get_wait_event(item->wait_event_info); - values[0] = Int32GetDatum(item->pid); - if (event_type) - values[1] = PointerGetDatum(cstring_to_text(event_type)); - else - nulls[1] = true; - if (event) - values[2] = PointerGetDatum(cstring_to_text(event)); - else - nulls[2] = true; + fill_values_and_nulls(values, nulls, item->dimensions, PGWS_DIMENSIONS_ALL, api_version); - values[3] = UInt64GetDatum(item->queryId); tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); @@ -628,7 +1048,7 @@ pgws_init_lock_tag(LOCKTAG *tag, uint32 lock) /* Get array (history or profile data) from shared memory */ static void * -receive_array(SHMRequest request, Size item_size, Size *count) +receive_array(SHMRequest request, Size *item_size, Size *count, int *dimensions_mask) { LOCKTAG collectorTag; shm_mq_result res; @@ -690,17 +1110,25 @@ receive_array(SHMRequest request, Size item_size, Size *count) memcpy(count, data, sizeof(*count)); - result = palloc(item_size * (*count)); + res = shm_mq_receive(recv_mqh, &len, &data, false); + if (res != SHM_MQ_SUCCESS || len != sizeof(*dimensions_mask)) + elog(ERROR, "error reading mq"); + + memcpy(dimensions_mask, data, sizeof(*dimensions_mask)); + + *item_size = get_serialized_size(*dimensions_mask, true); + + result = palloc(*item_size * (*count)); ptr = result; for (i = 0; i < *count; i++) { res = shm_mq_receive(recv_mqh, &len, &data, false); - if (res != SHM_MQ_SUCCESS || len != item_size) + if (res != SHM_MQ_SUCCESS || len != *item_size) elog(ERROR, "error reading mq"); - memcpy(ptr, data, item_size); - ptr += item_size; + memcpy(ptr, data, *item_size); + ptr += *item_size; } } PG_END_ENSURE_ERROR_CLEANUP(pgws_cleanup_callback, 0); @@ -712,43 +1140,113 @@ receive_array(SHMRequest request, Size item_size, Size *count) return result; } +static void * +deserialize_array(void *tmp_array, int count, bool is_history) +{ + Pointer result, + ptr; + int i; + int dimensions_mask = (is_history ? saved_history_dimensions : saved_profile_dimensions); + int serialized_size = get_serialized_size(dimensions_mask, true); + + result = palloc0((is_history ? sizeof(HistoryItem) : sizeof(ProfileItem)) * count); + ptr = result; + + for (i = 0; i < count; i++) + { + SamplingDimensions tmp_dimensions; + char *cur_item; + TimestampTz *ts; + uint64 *count; + int ts_count_size = sizeof(uint64); /* is 8 bytes anyway */ + + cur_item = (((char *) tmp_array) + i * serialized_size); + ts = (is_history ? palloc0(sizeof(TimestampTz)) : NULL); + count = (is_history ? NULL : palloc0(sizeof(uint64))); + + deserialize_item(&tmp_dimensions, cur_item, dimensions_mask, ts, count); + + memcpy(ptr, &tmp_dimensions, sizeof(SamplingDimensions)); + ptr += sizeof(SamplingDimensions); + if (is_history) + { + memcpy(ptr, ts, ts_count_size); + ptr += sizeof(TimestampTz); + } + else + { + memcpy(ptr, count, ts_count_size); + ptr += sizeof(uint64); + } + } + + return result; +} PG_FUNCTION_INFO_V1(pg_wait_sampling_get_profile); Datum pg_wait_sampling_get_profile(PG_FUNCTION_ARGS) +{ + return pg_wait_sampling_get_profile_internal(fcinfo, PGWS_V1_1); +} + +PG_FUNCTION_INFO_V1(pg_wait_sampling_get_profile_1_2); +Datum +pg_wait_sampling_get_profile_1_2(PG_FUNCTION_ARGS) +{ + return pg_wait_sampling_get_profile_internal(fcinfo, PGWS_V1_2); +} + +Datum +pg_wait_sampling_get_profile_internal(FunctionCallInfo fcinfo, + pgwsVersion api_version) { Profile *profile; FuncCallContext *funcctx; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; check_shmem(); + /* + * Check we have the expected number of output arguments. Safety check + */ + switch (rsinfo->expectedDesc->natts) + { + case PG_WAIT_SAMPLING_COLS_V1_1: + if (api_version != PGWS_V1_1) + elog(ERROR, "incorrect number of output arguments"); + break; + case PG_WAIT_SAMPLING_COLS_V1_2: + if (api_version != PGWS_V1_2) + elog(ERROR, "incorrect number of output arguments"); + break; + default: + elog(ERROR, "incorrect number of output arguments"); + } + if (SRF_IS_FIRSTCALL()) { MemoryContext oldcontext; TupleDesc tupdesc; + void *tmp_array; + Size serialized_size; funcctx = SRF_FIRSTCALL_INIT(); oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); /* Receive profile from shmq */ profile = (Profile *) palloc0(sizeof(Profile)); - profile->items = (ProfileItem *) receive_array(PROFILE_REQUEST, - sizeof(ProfileItem), &profile->count); - + + tmp_array = receive_array(PROFILE_REQUEST, &serialized_size, + &profile->count, &saved_profile_dimensions); + profile->items = (ProfileItem *) deserialize_array(tmp_array, profile->count, false); funcctx->user_fctx = profile; funcctx->max_calls = profile->count; /* Make tuple descriptor */ - tupdesc = CreateTemplateTupleDesc(5); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pid", - INT4OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 2, "type", - TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "event", - TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "queryid", - INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "count", + tupdesc = CreateTemplateTupleDesc(rsinfo->expectedDesc->natts); + fill_tuple_desc (tupdesc, api_version); + TupleDescInitEntry(tupdesc, (AttrNumber) rsinfo->expectedDesc->natts, "count", INT8OID, -1, 0); funcctx->tuple_desc = BlessTupleDesc(tupdesc); @@ -763,38 +1261,21 @@ pg_wait_sampling_get_profile(PG_FUNCTION_ARGS) if (funcctx->call_cntr < funcctx->max_calls) { /* for each row */ - Datum values[5]; - bool nulls[5]; + Datum values[PG_WAIT_SAMPLING_COLS]; + bool nulls[PG_WAIT_SAMPLING_COLS]; HeapTuple tuple; ProfileItem *item; - const char *event_type, - *event; item = &profile->items[funcctx->call_cntr]; + /* Make and return next tuple to caller */ MemSet(values, 0, sizeof(values)); MemSet(nulls, 0, sizeof(nulls)); - /* Make and return next tuple to caller */ - event_type = pgstat_get_wait_event_type(item->wait_event_info); - event = pgstat_get_wait_event(item->wait_event_info); - values[0] = Int32GetDatum(item->pid); - if (event_type) - values[1] = PointerGetDatum(cstring_to_text(event_type)); - else - nulls[1] = true; - if (event) - values[2] = PointerGetDatum(cstring_to_text(event)); - else - nulls[2] = true; - - if (pgws_profileQueries) - values[3] = UInt64GetDatum(item->queryId); - else - values[3] = (Datum) 0; - - values[4] = UInt64GetDatum(item->count); - + fill_values_and_nulls(values, nulls, item->dimensions, + pgws_profile_dimensions, api_version); + values[rsinfo->expectedDesc->natts - 1] = UInt64GetDatum(item->count); + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); @@ -838,43 +1319,98 @@ pg_wait_sampling_reset_profile(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +PG_FUNCTION_INFO_V1(pg_wait_sampling_reset_history); +Datum +pg_wait_sampling_reset_history(PG_FUNCTION_ARGS) +{ + LOCKTAG collectorTag; + + check_shmem(); + + pgws_init_lock_tag(&queueTag, PGWS_QUEUE_LOCK); + + LockAcquire(&queueTag, ExclusiveLock, false, false); + + pgws_init_lock_tag(&collectorTag, PGWS_COLLECTOR_LOCK); + LockAcquire(&collectorTag, ExclusiveLock, false, false); + pgws_collector_hdr->request = HISTORY_RESET; + LockRelease(&collectorTag, ExclusiveLock, false); + + if (!pgws_collector_hdr->latch) + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("pg_wait_sampling collector wasn't started"))); + + SetLatch(pgws_collector_hdr->latch); + + LockRelease(&queueTag, ExclusiveLock, false); + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(pg_wait_sampling_get_history); Datum pg_wait_sampling_get_history(PG_FUNCTION_ARGS) +{ + return pg_wait_sampling_get_history_internal(fcinfo, PGWS_V1_1); +} + +PG_FUNCTION_INFO_V1(pg_wait_sampling_get_history_1_2); +Datum +pg_wait_sampling_get_history_1_2(PG_FUNCTION_ARGS) +{ + return pg_wait_sampling_get_history_internal(fcinfo, PGWS_V1_2); +} + +Datum +pg_wait_sampling_get_history_internal(FunctionCallInfo fcinfo, + pgwsVersion api_version) { History *history; FuncCallContext *funcctx; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; check_shmem(); + /* + * Check we have the expected number of output arguments. Safety check + */ + switch (rsinfo->expectedDesc->natts) + { + case PG_WAIT_SAMPLING_COLS_V1_1: + if (api_version != PGWS_V1_1) + elog(ERROR, "incorrect number of output arguments"); + break; + case PG_WAIT_SAMPLING_COLS_V1_2: + if (api_version != PGWS_V1_2) + elog(ERROR, "incorrect number of output arguments"); + break; + default: + elog(ERROR, "incorrect number of output arguments"); + } + if (SRF_IS_FIRSTCALL()) { MemoryContext oldcontext; TupleDesc tupdesc; + void *tmp_array; + Size serialized_size; funcctx = SRF_FIRSTCALL_INIT(); oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); /* Receive history from shmq */ history = (History *) palloc0(sizeof(History)); - history->items = (HistoryItem *) receive_array(HISTORY_REQUEST, - sizeof(HistoryItem), &history->count); - + tmp_array = receive_array(HISTORY_REQUEST, &serialized_size, + &history->count, &saved_history_dimensions); + history->items = (HistoryItem *) deserialize_array(tmp_array, history->count, true); funcctx->user_fctx = history; funcctx->max_calls = history->count; /* Make tuple descriptor */ - tupdesc = CreateTemplateTupleDesc(5); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pid", - INT4OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 2, "sample_ts", + tupdesc = CreateTemplateTupleDesc(rsinfo->expectedDesc->natts); + fill_tuple_desc (tupdesc, api_version); + TupleDescInitEntry(tupdesc, (AttrNumber) rsinfo->expectedDesc->natts, "sample_ts", //XXX we have moved this to the end to have it more in line with current and profile; debatable; maybe move it to first place? TIMESTAMPTZOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "type", - TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "event", - TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "queryid", - INT8OID, -1, 0); funcctx->tuple_desc = BlessTupleDesc(tupdesc); MemoryContextSwitchTo(oldcontext); @@ -889,10 +1425,8 @@ pg_wait_sampling_get_history(PG_FUNCTION_ARGS) { HeapTuple tuple; HistoryItem *item; - Datum values[5]; - bool nulls[5]; - const char *event_type, - *event; + Datum values[PG_WAIT_SAMPLING_COLS]; + bool nulls[PG_WAIT_SAMPLING_COLS]; item = &history->items[history->index]; @@ -900,20 +1434,10 @@ pg_wait_sampling_get_history(PG_FUNCTION_ARGS) MemSet(values, 0, sizeof(values)); MemSet(nulls, 0, sizeof(nulls)); - event_type = pgstat_get_wait_event_type(item->wait_event_info); - event = pgstat_get_wait_event(item->wait_event_info); - values[0] = Int32GetDatum(item->pid); - values[1] = TimestampTzGetDatum(item->ts); - if (event_type) - values[2] = PointerGetDatum(cstring_to_text(event_type)); - else - nulls[2] = true; - if (event) - values[3] = PointerGetDatum(cstring_to_text(event)); - else - nulls[3] = true; + fill_values_and_nulls(values, nulls, item->dimensions, + pgws_history_dimensions, api_version); + values[rsinfo->expectedDesc->natts - 1] = TimestampTzGetDatum(item->ts); //XXX same as above - values[4] = UInt64GetDatum(item->queryId); tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); history->index++; diff --git a/pg_wait_sampling.control b/pg_wait_sampling.control index 97d9a34..d2d0ffe 100644 --- a/pg_wait_sampling.control +++ b/pg_wait_sampling.control @@ -1,5 +1,5 @@ # pg_wait_sampling extension comment = 'sampling based statistics of wait events' -default_version = '1.1' +default_version = '1.2' module_pathname = '$libdir/pg_wait_sampling' relocatable = true diff --git a/pg_wait_sampling.h b/pg_wait_sampling.h index dab773c..5f457f4 100644 --- a/pg_wait_sampling.h +++ b/pg_wait_sampling.h @@ -15,26 +15,73 @@ #include "storage/lock.h" #include "storage/shm_mq.h" +#if PG_VERSION_NUM >= 140000 +#include "utils/backend_status.h" +#else +#include "pgstat.h" +#endif + #define PG_WAIT_SAMPLING_MAGIC 0xCA94B107 #define COLLECTOR_QUEUE_SIZE (16 * 1024) #define HISTORY_TIME_MULTIPLIER 10 #define PGWS_QUEUE_LOCK 0 #define PGWS_COLLECTOR_LOCK 1 +/* Values for sampling dimensions */ +#define PGWS_DIMENSIONS_PID (1 << 0) +#define PGWS_DIMENSIONS_WAIT_EVENT_TYPE (1 << 1) +#define PGWS_DIMENSIONS_WAIT_EVENT (1 << 2) +#define PGWD_DIMENSIONS_QUERY_ID (1 << 3) +#define PGWS_DIMENSIONS_ROLE_ID (1 << 4) +#define PGWS_DIMENSIONS_DB_ID (1 << 5) +#define PGWS_DIMENSIONS_PARALLEL_LEADER_PID (1 << 6) +#define PGWS_DIMENSIONS_IS_REGULAR_BE (1 << 7) +#define PGWS_DIMENSIONS_BE_TYPE (1 << 8) +#define PGWS_DIMENSIONS_BE_STATE (1 << 9) +#define PGWS_DIMENSIONS_BE_START_TIME (1 << 10) +#define PGWS_DIMENSIONS_CLIENT_ADDR (1 << 11) +#define PGWS_DIMENSIONS_CLIENT_HOSTNAME (1 << 12) +#define PGWS_DIMENSIONS_APPNAME (1 << 13) + +#define PGWS_DIMENSIONS_ALL ((int) ~0) +/* ^ all 1 in binary */ + +/* + * Common data (sampling dimenstions) for ProfileItem and HistoryItem + */ +typedef struct +{ + /* Fields from PGPROC */ + int pid; + uint32 wait_event_info; + uint64 queryId; + Oid role_id; + Oid database_id; + int parallel_leader_pid; + bool is_regular_backend; + /* Fields from BackendStatus */ + BackendType backend_type; + BackendState backend_state; + TimestampTz proc_start; + SockAddr client_addr; + char client_hostname[NAMEDATALEN]; + char appname[NAMEDATALEN]; +} SamplingDimensions; + +/* + * Next two structures must match in fields until count/ts so make_profile_hash + * works properly + */ typedef struct { - int pid; - uint32 wait_event_info; - uint64 queryId; - uint64 count; + SamplingDimensions dimensions; + uint64 count; } ProfileItem; typedef struct { - int pid; - uint32 wait_event_info; - uint64 queryId; - TimestampTz ts; + SamplingDimensions dimensions; + TimestampTz ts; } HistoryItem; typedef struct @@ -42,6 +89,8 @@ typedef struct bool wraparound; Size index; Size count; + char *serialized_items; + /* used only in pg_wait_sampling.c */ HistoryItem *items; } History; @@ -49,6 +98,7 @@ typedef enum { NO_REQUEST, HISTORY_REQUEST, + HISTORY_RESET, PROFILE_REQUEST, PROFILE_RESET } SHMRequest; @@ -66,6 +116,8 @@ extern int pgws_profilePeriod; extern bool pgws_profilePid; extern int pgws_profileQueries; extern bool pgws_sampleCpu; +extern int pgws_history_dimensions; +extern int pgws_profile_dimensions; /* pg_wait_sampling.c */ extern CollectorShmqHeader *pgws_collector_hdr; @@ -73,8 +125,17 @@ extern shm_mq *pgws_collector_mq; extern uint64 *pgws_proc_queryids; extern void pgws_init_lock_tag(LOCKTAG *tag, uint32 lock); extern bool pgws_should_sample_proc(PGPROC *proc, int *pid_p, uint32 *wait_event_info_p); +extern PgBackendStatus* get_beentry_by_procpid(int pid); /* collector.c */ +extern int saved_profile_dimensions; +extern int saved_history_dimensions; +extern void fill_dimensions(SamplingDimensions *dimensions, PGPROC *proc, + int pid, uint32 wait_event_info, uint64 queryId, + int dimensions_mask); +extern void deserialize_item(SamplingDimensions* dimensions, char* serialized_item, + int dimensions_mask, TimestampTz* ts, uint64* count); +extern int get_serialized_size(int dimensions_mask, bool need_last_field); extern void pgws_register_wait_collector(void); extern PGDLLEXPORT void pgws_collector_main(Datum main_arg);