From 39e23b34cdfa19c0611e15bf8729086a25437714 Mon Sep 17 00:00:00 2001 From: John Morris Date: Wed, 30 Aug 2023 21:21:06 -0700 Subject: [PATCH] Rebasing memtrack changes onto recent main branch. --- doc/src/sgml/config.sgml | 30 ++ doc/src/sgml/monitoring.sgml | 276 ++++++++++ src/backend/catalog/system_views.sql | 38 +- src/backend/postmaster/fork_process.c | 7 + src/backend/postmaster/postmaster.c | 6 + src/backend/storage/ipc/dsm.c | 10 +- src/backend/storage/ipc/dsm_impl.c | 46 +- src/backend/storage/lmgr/proc.c | 47 ++ src/backend/utils/activity/Makefile | 1 + src/backend/utils/activity/backend_status.c | 73 ++- src/backend/utils/activity/memtrack.c | 142 +++++ src/backend/utils/activity/meson.build | 1 + src/backend/utils/adt/pgstatfuncs.c | 89 ++++ src/backend/utils/misc/guc_tables.c | 12 + src/backend/utils/misc/postgresql.conf.sample | 3 + src/backend/utils/mmgr/aset.c | 24 +- src/backend/utils/mmgr/generation.c | 21 +- src/backend/utils/mmgr/slab.c | 23 +- src/include/catalog/pg_proc.dat | 17 + src/include/storage/proc.h | 4 + src/include/utils/backend_status.h | 8 + src/include/utils/memtrack.h | 288 ++++++++++ src/test/modules/meson.build | 1 + src/test/modules/test_memtrack/.gitignore | 4 + src/test/modules/test_memtrack/Makefile | 25 + src/test/modules/test_memtrack/README | 36 ++ .../test_memtrack/expected/test_memtrack.out | 80 +++ src/test/modules/test_memtrack/meson.build | 38 ++ .../test_memtrack/sql/test_memtrack.sql | 31 ++ .../test_memtrack/test_memtrack--1.0.sql | 20 + .../modules/test_memtrack/test_memtrack.c | 496 ++++++++++++++++++ .../modules/test_memtrack/test_memtrack.conf | 1 + .../test_memtrack/test_memtrack.control | 4 + src/test/modules/test_memtrack/worker_pool.c | 416 +++++++++++++++ src/test/modules/test_memtrack/worker_pool.h | 69 +++ src/test/regress/expected/rules.out | 29 + src/test/regress/expected/stats.out | 36 ++ src/test/regress/sql/stats.sql | 20 + 38 files changed, 2437 insertions(+), 35 deletions(-) create mode 100644 src/backend/utils/activity/memtrack.c create mode 100644 src/include/utils/memtrack.h create mode 100644 src/test/modules/test_memtrack/.gitignore create mode 100644 src/test/modules/test_memtrack/Makefile create mode 100644 src/test/modules/test_memtrack/README create mode 100644 src/test/modules/test_memtrack/expected/test_memtrack.out create mode 100644 src/test/modules/test_memtrack/meson.build create mode 100644 src/test/modules/test_memtrack/sql/test_memtrack.sql create mode 100644 src/test/modules/test_memtrack/test_memtrack--1.0.sql create mode 100644 src/test/modules/test_memtrack/test_memtrack.c create mode 100644 src/test/modules/test_memtrack/test_memtrack.conf create mode 100644 src/test/modules/test_memtrack/test_memtrack.control create mode 100644 src/test/modules/test_memtrack/worker_pool.c create mode 100644 src/test/modules/test_memtrack/worker_pool.h diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 694d667bf9..0df7016c3f 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2145,6 +2145,36 @@ include_dir 'conf.d' + + max_total_backend_memory (integer) + + max_total_backend_memory configuration parameter + + + + + Specifies a limit to the amount of memory (MB) that may be allocated to + backends in total (i.e. this is not a per user or per backend limit). + If unset, or set to 0 it is disabled. At databse startup + max_total_backend_memory is reduced by shared_memory_size_mb + (includes shared buffers and other memory required for initialization). + Each backend process is intialized with a 1MB local allowance which + also reduces total_bkend_mem_bytes_available. Keep this in mind when + setting this value. A backend request that would exhaust the limit will + be denied with an out of memory error causing that backend's current + query/transaction to fail. Further requests will not be allocated until + dropping below the limit. This limit does not affect auxiliary backend + processes + or the postmaster process. + Backend memory allocations (allocated_bytes) are + displayed in the + pg_stat_memory_allocation + view. Due to the dynamic nature of memory allocations, this limit is + not exact. + + + + diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 4ff415d6a0..98084a9055 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -4522,6 +4522,282 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + <structname>pg_stat_memory_allocation</structname> + + + pg_stat_memory_allocation + + + + The pg_stat_memory_allocation view will have one + row per server process, showing information related to the current memory + allocation of that process in total and by allocator type. Due to the + dynamic nature of memory allocations the allocated bytes values may not be + exact but should be sufficient for the intended purposes. Dynamic shared + memory allocations are included only in the value displayed for the backend + that created them, they are not included in the value for backends that are + attached to them to avoid double counting. Use + pg_size_pretty described in + to make these values more easily + readable. + + + + <structname>pg_stat_memory_allocation</structname> View + + + + + Column Type + + + Description + + + + + + + + datid oid + + + OID of the database this backend is connected to + + + + + + pid integer + + + Process ID of this backend + + + + + + allocated_bytes bigint + + + Memory currently allocated to this backend in bytes. This is the balance + of bytes allocated and freed by this backend. + + + + + + aset_allocated_bytes bigint + + + Memory currently allocated to this backend in bytes via the allocation + set allocator. + + + + + + dsm_allocated_bytes bigint + + + Memory currently allocated to this backend in bytes via the dynamic + shared memory allocator. Upon process exit, dsm allocations that have + not been freed are considered long lived and added to + global_dsm_allocated_bytes found in the + + pg_stat_global_memory_allocation view. + + + + + + generation_allocated_bytes bigint + + + Memory currently allocated to this backend in bytes via the generation + allocator. + + + + + + slab_allocated_bytes bigint + + + Memory currently allocated to this backend in bytes via the slab + allocator. + + + + + +
+ +
+ + + <structname>pg_stat_global_memory_allocation</structname> + + + pg_stat_global-memory_allocation + + + + The pg_stat_global_memory_allocation view will + have one row showing information related to current shared memory + allocations. Due to the dynamic nature of memory allocations the allocated + bytes values may not be exact but should be sufficient for the intended + purposes. Use pg_size_pretty described in + to make the byte populated values + more easily readable. + + + + <structname>pg_stat_global_memory_allocation</structname> View + + + + + Column Type + + + Description + + + + + + + + datid oid + + + OID of the database this backend is connected to + + + + + + shared_memory_size_mb integer + + + Reports the size of the main shared memory area, rounded up to the + nearest megabyte. See . + + + + + + shared_memory_size_in_huge_pages bigint + + + Reports the number of huge pages that are needed for the main shared + memory area based on the specified huge_page_size. If huge pages are not + supported, this will be -1. See + . + + + + + + max_total_backend_memory_bytes bigint + + + Reports the user defined backend maximum allowed shared memory in bytes. + 0 if disabled or not set. See + . + + + + + + total_bkend_mem_bytes_available bigint + + + Tracks max_total_backend_memory (in bytes) available for allocation. At + database startup, total_bkend_mem_bytes_available is reduced by the + byte equivalent of shared_memory_size_mb. Each backend process is + intialized with a 1MB local allowance which also reduces + total_bkend_mem_bytes_available. A process's allocation requests reduce + it's local allowance. If a process's allocation request exceeds it's + remaining allowance, an attempt is made to refill the local allowance + from total_bkend_mem_bytes_available. If the refill request fails, then + the requesting process will fail with an out of memory error resulting + in the cancellation of that process's active query/transaction. The + default refill allocation quantity is 1MB. If a request is greater than + 1MB, an attempt will be made to allocate the full amount. If + max_total_backend_memory is disabled, this will be -1. + . + + + + + + global_dsm_allocated_bytes bigint + + + Long lived dynamically allocated memory currently allocated to the + database. Upon process exit, dsm allocations that have not been freed + are considered long lived and added to + global_dsm_allocated_bytes. + + + + + + total_aset_allocated_bytes bigint + + + Sum total of aset_allocated_bytes for all + backend processes from + + pg_stat_memory_allocation view. + + + + + + total_dsm_allocated_bytes bigint + + + Sum total of dsm_allocated_bytes for all + backend processes from + + pg_stat_memory_allocation view. + + + + + + total_generation_allocated_bytes bigint + + + Sum total of generation_allocated_bytes for + all backend processes from + + pg_stat_memory_allocation view. + + + + + + total_slab_allocated_bytes bigint + + + Sum total of slab_allocated_bytes for all + backend processes from + + pg_stat_memory_allocation view. + + + + + +
+ +
+ Statistics Functions diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 77b06e2a7a..364ead785e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1343,5 +1343,41 @@ CREATE VIEW pg_stat_subscription_stats AS FROM pg_subscription as s, pg_stat_get_subscription_stats(s.oid) as ss; +CREATE VIEW pg_stat_memory_allocation AS + SELECT + S.datid AS datid, + S.pid, + S.allocated_bytes, + S.aset_allocated_bytes, + S.dsm_allocated_bytes, + S.generation_allocated_bytes, + S.slab_allocated_bytes + FROM pg_stat_get_memory_allocation(NULL) AS S + LEFT JOIN pg_database AS D ON (S.datid = D.oid); + CREATE VIEW pg_wait_events AS - SELECT * FROM pg_get_wait_events(); + SELECT * FRom pg_get_wait_events(); + +CREATE VIEW pg_stat_global_memory_allocation AS +WITH sums AS ( + SELECT + SUM(aset_allocated_bytes) AS total_aset_allocated_bytes, + SUM(dsm_allocated_bytes) AS total_dsm_allocated_bytes, + SUM(generation_allocated_bytes) AS total_generation_allocated_bytes, + SUM(slab_allocated_bytes) AS total_slab_allocated_bytes + FROM + pg_stat_memory_allocation +) +SELECT + S.datid AS datid, + current_setting('shared_memory_size', true) as shared_memory_size, + (current_setting('shared_memory_size_in_huge_pages', true))::integer as shared_memory_size_in_huge_pages, + pg_size_bytes(current_setting('max_total_backend_memory', true)) as max_total_backend_memory_bytes, + S.total_bkend_mem_bytes_available, + S.global_dsm_allocated_bytes, + sums.total_aset_allocated_bytes, + sums.total_dsm_allocated_bytes, + sums.total_generation_allocated_bytes, + sums.total_slab_allocated_bytes + FROM sums, pg_stat_get_global_memory_allocation() AS S + LEFT JOIN pg_database AS D ON (S.datid = D.oid); diff --git a/src/backend/postmaster/fork_process.c b/src/backend/postmaster/fork_process.c index 6f9c2765d6..b7fbd31e4f 100644 --- a/src/backend/postmaster/fork_process.c +++ b/src/backend/postmaster/fork_process.c @@ -20,6 +20,7 @@ #include "libpq/pqsignal.h" #include "postmaster/fork_process.h" +#include "utils/memtrack.h" #ifndef WIN32 /* @@ -111,6 +112,12 @@ fork_process(void) } } + /* + * Reset memory tracking to the initial state. + * (They are corrrect on exec() but not after fork()) + */ + init_backend_memory(); + /* do post-fork initialization for random number generation */ pg_strong_random_init(); } diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index d7bfb28ff3..794bec2672 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -538,6 +538,7 @@ typedef struct #endif char my_exec_path[MAXPGPATH]; char pkglib_path[MAXPGPATH]; + int max_total_bkend_mem; } BackendParameters; static void read_backend_variables(char *id, Port *port); @@ -6097,6 +6098,8 @@ save_backend_variables(BackendParameters *param, Port *port, strlcpy(param->pkglib_path, pkglib_path, MAXPGPATH); + param->max_total_bkend_mem = max_total_bkend_mem; + return true; } @@ -6325,6 +6328,9 @@ restore_backend_variables(BackendParameters *param, Port *port) strlcpy(pkglib_path, param->pkglib_path, MAXPGPATH); + max_total_bkend_mem = param->max_total_bkend_mem; + max_global_bkend_bytes = (uint64)max_total_bkend_mem * 1024 * 1024; + /* * We need to restore fd.c's counts of externally-opened FDs; to avoid * confusion, be sure to do this after restoring max_safe_fds. (Note: diff --git a/src/backend/storage/ipc/dsm.c b/src/backend/storage/ipc/dsm.c index 10b029bb16..8ef1a5d0c3 100644 --- a/src/backend/storage/ipc/dsm.c +++ b/src/backend/storage/ipc/dsm.c @@ -775,6 +775,14 @@ dsm_detach_all(void) void dsm_detach(dsm_segment *seg) { + /* + * Retain mapped_size to pass into destroy call in cases where the detach + * is the last reference. mapped_size is zeroed as part of the detach + * process, but is needed later in these cases for dsm_allocated_bytes + * accounting. + */ + Size save_mapped_size = seg->mapped_size; + /* * Invoke registered callbacks. Just in case one of those callbacks * throws a further error that brings us back here, pop the callback @@ -855,7 +863,7 @@ dsm_detach(dsm_segment *seg) */ if (is_main_region_dsm_handle(seg->handle) || dsm_impl_op(DSM_OP_DESTROY, seg->handle, 0, &seg->impl_private, - &seg->mapped_address, &seg->mapped_size, WARNING)) + &seg->mapped_address, &save_mapped_size, WARNING)) { LWLockAcquire(DynamicSharedMemoryControlLock, LW_EXCLUSIVE); if (is_main_region_dsm_handle(seg->handle)) diff --git a/src/backend/storage/ipc/dsm_impl.c b/src/backend/storage/ipc/dsm_impl.c index 35fa910d6f..437a9d3d67 100644 --- a/src/backend/storage/ipc/dsm_impl.c +++ b/src/backend/storage/ipc/dsm_impl.c @@ -160,37 +160,67 @@ dsm_impl_op(dsm_op op, dsm_handle handle, Size request_size, void **impl_private, void **mapped_address, Size *mapped_size, int elevel) { + bool success; + int64 save_mapped_size = *mapped_size; + Assert(op == DSM_OP_CREATE || request_size == 0); Assert((op != DSM_OP_CREATE && op != DSM_OP_ATTACH) || (*mapped_address == NULL && *mapped_size == 0)); + /* Try to reserve memory for the backend if we're creating a new segment. */ + if (op == DSM_OP_CREATE && !reserve_backend_memory(request_size, PG_ALLOC_DSM)) + { + ereport(elevel, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("Unable to reserve backend memory for dynamic shared memory segment."))); + return false; + } + switch (dynamic_shared_memory_type) { #ifdef USE_DSM_POSIX case DSM_IMPL_POSIX: - return dsm_impl_posix(op, handle, request_size, impl_private, - mapped_address, mapped_size, elevel); + success = dsm_impl_posix(op, handle, request_size, impl_private, + mapped_address, mapped_size, elevel); + break; #endif #ifdef USE_DSM_SYSV case DSM_IMPL_SYSV: - return dsm_impl_sysv(op, handle, request_size, impl_private, - mapped_address, mapped_size, elevel); + success = dsm_impl_sysv(op, handle, request_size, impl_private, + mapped_address, mapped_size, elevel); + break; #endif #ifdef USE_DSM_WINDOWS case DSM_IMPL_WINDOWS: - return dsm_impl_windows(op, handle, request_size, impl_private, + success = dsm_impl_windows(op, handle, request_size, impl_private, mapped_address, mapped_size, elevel); + break #endif #ifdef USE_DSM_MMAP case DSM_IMPL_MMAP: - return dsm_impl_mmap(op, handle, request_size, impl_private, - mapped_address, mapped_size, elevel); + success = dsm_impl_mmap(op, handle, request_size, impl_private, + mapped_address, mapped_size, elevel); + break; #endif default: elog(ERROR, "unexpected dynamic shared memory type: %d", dynamic_shared_memory_type); - return false; + success = false; } + + /* CASE: we failed to create a new segment. Release the reservation */ + if ( !success && op == DSM_OP_CREATE ) + release_backend_memory(request_size, PG_ALLOC_DSM); + + /* CASE: we created a different size than expected. Record the change, but don't abort */ + else if (success && op == DSM_OP_CREATE && *mapped_size != request_size) + update_local_allocation(*mapped_size - request_size, PG_ALLOC_DSM); + + /* CASE: we destroyed a segment. Release the reservation */ + else if (success && op == DSM_OP_DESTROY) + release_backend_memory(save_mapped_size, PG_ALLOC_DSM); + + return success; } #ifdef USE_DSM_POSIX diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 5b663a2997..d966707792 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -51,6 +51,7 @@ #include "storage/procsignal.h" #include "storage/spin.h" #include "storage/standby.h" +#include "utils/guc.h" #include "utils/timeout.h" #include "utils/timestamp.h" @@ -180,6 +181,52 @@ InitProcGlobal(void) ProcGlobal->checkpointerLatch = NULL; pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PGPROCNO); pg_atomic_init_u32(&ProcGlobal->clogGroupFirst, INVALID_PGPROCNO); + pg_atomic_init_u64(&ProcGlobal->total_bkend_mem_bytes, 0); + pg_atomic_init_u64(&ProcGlobal->global_dsm_allocation, 0); + + /* Setup backend memory limiting if configured */ + if (max_total_bkend_mem > 0) + { + /* + * Convert max_total_bkend_mem to bytes, account for + * shared_memory_size, and initialize total_bkend_mem_bytes. + */ + int result = 0; + + /* Get integer value of shared_memory_size */ + if (parse_int(GetConfigOption("shared_memory_size", true, false), &result, 0, NULL)) + { + /* + * Error on startup if backend memory limit is less than shared + * memory size. Warn on startup if backend memory available is + * less than arbitrarily picked value of 100MB. + */ + + elog(WARNING, "proc init: max_total=%d result=%d\n", max_total_bkend_mem, result); + + if (max_total_bkend_mem <= result) + { + ereport(ERROR, + errmsg("configured max_total_backend_memory %dMB is <= shared_memory_size %dMB", + max_total_bkend_mem, result), + errhint("Disable or increase the configuration parameter \"max_total_backend_memory\".")); + } + else if (max_total_bkend_mem < result + 100) + { + ereport(WARNING, + errmsg("max_total_backend_memory %dMB - shared_memory_size %dMB is <= 100MB", + max_total_bkend_mem, result), + errhint("Consider increasing the configuration parameter \"max_total_backend_memory\".")); + } + + /* + * We would like to use it as bytes rather than MB. + */ + max_total_bkend_bytes = (uint64)max_total_bkend_mem * 1024 * 1024; + } + else + ereport(ERROR, errmsg("max_total_backend_memory initialization is unable to parse shared_memory_size")); + } /* * Create and initialize all the PGPROC structures we'll need. There are diff --git a/src/backend/utils/activity/Makefile b/src/backend/utils/activity/Makefile index f57cf3958c..0e0051e34a 100644 --- a/src/backend/utils/activity/Makefile +++ b/src/backend/utils/activity/Makefile @@ -18,6 +18,7 @@ override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS) OBJS = \ backend_progress.o \ backend_status.o \ + memtrack.o \ pgstat.o \ pgstat_archiver.o \ pgstat_bgwriter.o \ diff --git a/src/backend/utils/activity/backend_status.c b/src/backend/utils/activity/backend_status.c index 722c5acf38..aa398f3ac0 100644 --- a/src/backend/utils/activity/backend_status.c +++ b/src/backend/utils/activity/backend_status.c @@ -24,6 +24,7 @@ #include "utils/backend_status.h" #include "utils/guc.h" /* for application_name */ #include "utils/memutils.h" +#include "utils/memtrack.h" /* ---------- @@ -49,7 +50,6 @@ int pgstat_track_activity_query_size = 1024; /* exposed so that backend_progress.c can access it */ PgBackendStatus *MyBEEntry = NULL; - static PgBackendStatus *BackendStatusArray = NULL; static char *BackendAppnameBuffer = NULL; static char *BackendClientHostnameBuffer = NULL; @@ -62,7 +62,6 @@ static PgBackendSSLStatus *BackendSslStatusBuffer = NULL; static PgBackendGSSStatus *BackendGssStatusBuffer = NULL; #endif - /* Status for backends including auxiliary */ static LocalPgBackendStatus *localBackendStatusTable = NULL; @@ -71,7 +70,6 @@ static int localNumBackends = 0; static MemoryContext backendStatusSnapContext; - static void pgstat_beshutdown_hook(int code, Datum arg); static void pgstat_read_current_status(void); static void pgstat_setup_backend_status_context(void); @@ -401,6 +399,9 @@ pgstat_bestart(void) lbeentry.st_progress_command_target = InvalidOid; lbeentry.st_query_id = UINT64CONST(0); + /* No allocations have been reported yet */ + lbeentry.st_memory = NO_BACKEND_MEMORY; + /* * we don't zero st_progress_param here to save cycles; nobody should * examine it until st_progress_command has been set to something other @@ -471,6 +472,9 @@ pgstat_beshutdown_hook(int code, Datum arg) PGSTAT_END_WRITE_ACTIVITY(beentry); + /* Stop reporting memory allocation changes to shared memory */ + exit_backend_memory(); + /* so that functions can check if backend_status.c is up via MyBEEntry */ MyBEEntry = NULL; } @@ -1153,6 +1157,69 @@ pgstat_get_local_beentry_by_index(int idx) return &localBackendStatusTable[idx - 1]; } +/* ---------- + * pgstat_fetch_stat_beentry() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * our local copy of the current-activity entry for one backend, + * or NULL if the given beid doesn't identify any known session. + * + * The beid argument is the BackendId of the desired session + * (note that this is unlike pgstat_fetch_stat_local_beentry()). + * + * NB: caller is responsible for a check if the user is permitted to see + * this info (especially the querystring). + * ---------- + */ +PgBackendStatus * +pgstat_fetch_stat_beentry(BackendId beid) +{ + LocalPgBackendStatus key; + LocalPgBackendStatus *ret; + + pgstat_read_current_status(); + + /* + * Since the localBackendStatusTable is in order by backend_id, we can use + * bsearch() to search it efficiently. + */ + key.backend_id = beid; + ret = (LocalPgBackendStatus *) bsearch(&key, localBackendStatusTable, + localNumBackends, + sizeof(LocalPgBackendStatus), + cmp_lbestatus); + if (ret) + return &ret->backendStatus; + + return NULL; +} + + +/* ---------- + * pgstat_fetch_stat_local_beentry() - + * + * Like pgstat_fetch_stat_beentry() but with locally computed additions (like + * xid and xmin values of the backend) + * + * The beid argument is a 1-based index in the localBackendStatusTable + * (note that this is unlike pgstat_fetch_stat_beentry()). + * Returns NULL if the argument is out of range (no current caller does that). + * + * NB: caller is responsible for a check if the user is permitted to see + * this info (especially the querystring). + * ---------- + */ +LocalPgBackendStatus * +pgstat_fetch_stat_local_beentry(int beid) +{ + pgstat_read_current_status(); + + if (beid < 1 || beid > localNumBackends) + return NULL; + + return &localBackendStatusTable[beid - 1]; +} + /* ---------- * pgstat_fetch_stat_numbackends() - diff --git a/src/backend/utils/activity/memtrack.c b/src/backend/utils/activity/memtrack.c new file mode 100644 index 0000000000..73df1d916e --- /dev/null +++ b/src/backend/utils/activity/memtrack.c @@ -0,0 +1,142 @@ + +#include + +#include "postgres.h" +#include "miscadmin.h" +#include "utils/backend_status.h" +#include "utils/memtrack.h" +#include "storage/proc.h" + +/* + * Max backend memory allocation allowed (MB). 0 = disabled. + * Max backend bytes is the same but in bytes. + * These default to "0", meaning don't check bounds for total memory. + */ +int max_total_bkend_mem = 0; +int64 max_total_bkend_bytes = 0; + +/* + * Local variables for tracking memory use. + * These values are preset so memory tracking is active on startup. + * After a fork(), they must be reset using 'init_backend_memory()'. + */ +PgBackendMemoryStatus my_memory = INIT_BACKEND_MEMORY; +PgBackendMemoryStatus reported_memory = NO_BACKEND_MEMORY; +int64 allocation_lower_bound = 0; +int64 allocation_upper_bound = 0; + +/* --------- + * init_backend_memory() - + * + * Called immediately after a fork(). + * Resets local memory counters to their initial startup values + */ +void +init_backend_memory(void) +{ + debug("\n"); + /* Start with nothing allocated. */ + my_memory = INIT_BACKEND_MEMORY; + reported_memory = NO_BACKEND_MEMORY; + + /* Force the next allocation to do global bounds checking. */ + allocation_lower_bound = 0; + allocation_upper_bound = 0; +} + +/* + * Clean up memory counters as backend is exiting. + * + * DSM memory is not automatically returned, so it persists in the counters. + * All other memory will disappear, so those counters are set to zero. + * + * Ideally, this function would be called last, but in practice there are some + * late memory releases that happen after it is called. + */ +void +exit_backend_memory(void) +{ + debug("\n"); + /* + * Release non-dsm memory. + * We don't release dsm shared memory since it survives process exit. + */ + for (int type = 0; type < PG_ALLOC_TYPE_MAX; type++) + if (type != PG_ALLOC_DSM) + release_backend_memory(my_memory.allocated_bytes_by_type[type], type); + + /* Force the final values to be posted to shmem */ + update_global_allocation(0, PG_ALLOC_OTHER); + + /* If we get a late request, send it to the long path. */ + allocation_lower_bound = 0; + allocation_upper_bound = 0; +} + + +/* + * Update backend memory allocation for a new request. + * + * There are two versions of this function. This one, which updates + * global values in shared memory, and an optimized update_local_allocation() + * which only updates local values. + * + * This version is the "slow path". We invoke it periodically to update + * global values and pgstat statistics. + */ +bool update_global_allocation(int64 size, pg_allocator_type type) +{ + int64 new_allocated_bytes; + int64 dsm_delta; + + /* If we are still initializing, ignore the request. It should be part of initial allocation. */ + if (ProcGlobal == NULL || MyProcPid == 0) + return update_local_allocation(size, type); + + debug("size=%ld type=%d\n", size, type); + + /* Calculate new number of bytes reflecting the reservation/release */ + new_allocated_bytes = my_memory.allocated_bytes + size; + + /* If reserving new memory and we are limited by max_total_bkend ... */ + if (size > 0 && max_total_bkend_bytes > 0 && MyAuxProcType == NotAnAuxProcess && MyProcPid != PostmasterPid) + { + /* Update the global total memory counter subject to the upper limit. */ + if (!atomic_add_within_bounds_i64(&ProcGlobal->total_bkend_mem_bytes, + new_allocated_bytes - reported_memory.allocated_bytes, + 0, max_total_bkend_bytes)) + return false; + } + + /* Otherwise, update the global counter with no limit checking */ + else + pg_atomic_add_fetch_u64(&ProcGlobal->total_bkend_mem_bytes, + new_allocated_bytes - reported_memory.allocated_bytes); + + /* Update the local memory counters. This must happen after bounds checking */ + update_local_allocation(size, type); + + /* Update the global dsm memory counter to reflect changes since our last report */ + dsm_delta = my_memory.allocated_bytes_by_type[PG_ALLOC_DSM] - reported_memory.allocated_bytes_by_type[PG_ALLOC_DSM]; + pg_atomic_add_fetch_u64(&ProcGlobal->global_dsm_allocation, dsm_delta); + + /* Update pgstat statistics if we are initialized as a backend process. */ + if (MyBEEntry != NULL) + { + PGSTAT_BEGIN_WRITE_ACTIVITY(MyBEEntry); + MyBEEntry->st_memory = my_memory; + PGSTAT_END_WRITE_ACTIVITY(MyBEEntry); + } + + /* Remember the values we just reported to pgstat */ + reported_memory = my_memory; + + /* Update bounds so they bracket our new allocation size. */ + allocation_upper_bound = my_memory.allocated_bytes + allocation_allowance_refill_qty; + allocation_lower_bound = my_memory.allocated_bytes - allocation_allowance_refill_qty; + + Assert((int64)pg_atomic_read_u64(&ProcGlobal->total_bkend_mem_bytes) >= 0); + Assert((int64)pg_atomic_read_u64(&ProcGlobal->global_dsm_allocation) >= 0); + + return true; +} diff --git a/src/backend/utils/activity/meson.build b/src/backend/utils/activity/meson.build index 46a27e7548..d3db919704 100644 --- a/src/backend/utils/activity/meson.build +++ b/src/backend/utils/activity/meson.build @@ -3,6 +3,7 @@ backend_sources += files( 'backend_progress.c', 'backend_status.c', + 'memtrack.c', 'pgstat.c', 'pgstat_archiver.c', 'pgstat_bgwriter.c', diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index dd5094a2d4..1f5aac422b 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2067,3 +2067,92 @@ pg_stat_have_stats(PG_FUNCTION_ARGS) PG_RETURN_BOOL(pgstat_have_entry(kind, dboid, objoid)); } + +/* + * Get the memory allocation of PG backends. + */ +Datum +pg_stat_get_memory_allocation(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_MEMORY_ALLOCATION_COLS 7 + int num_backends = pgstat_fetch_stat_numbackends(); + int curr_backend; + int pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + + InitMaterializedSRF(fcinfo, 0); + + /* 1-based index */ + for (curr_backend = 1; curr_backend <= num_backends; curr_backend++) + { + /* for each row */ + Datum values[PG_STAT_GET_MEMORY_ALLOCATION_COLS] = {0}; + bool nulls[PG_STAT_GET_MEMORY_ALLOCATION_COLS] = {0}; + LocalPgBackendStatus *local_beentry; + PgBackendStatus *beentry; + + /* Get the next one in the list */ + local_beentry = pgstat_fetch_stat_local_beentry(curr_backend); + beentry = &local_beentry->backendStatus; + + /* If looking for specific PID, ignore all the others */ + if (pid != -1 && beentry->st_procpid != pid) + continue; + + /* Values available to all callers */ + if (beentry->st_databaseid != InvalidOid) + values[0] = ObjectIdGetDatum(beentry->st_databaseid); + else + nulls[0] = true; + + values[1] = Int32GetDatum(beentry->st_procpid); + values[2] = UInt64GetDatum(beentry->st_memory.allocated_bytes); + values[3] = UInt64GetDatum(beentry->st_memory.allocated_bytes_by_type[PG_ALLOC_ASET]); + values[4] = UInt64GetDatum(beentry->st_memory.allocated_bytes_by_type[PG_ALLOC_DSM]); + values[5] = UInt64GetDatum(beentry->st_memory.allocated_bytes_by_type[PG_ALLOC_GENERATION]); + values[6] = UInt64GetDatum(beentry->st_memory.allocated_bytes_by_type[PG_ALLOC_SLAB]); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + + /* If only a single backend was requested, and we found it, break. */ + if (pid != -1) + break; + } + + return (Datum) 0; +} + +/* + * Get the global memory allocation statistics. + */ +Datum +pg_stat_get_global_memory_allocation(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_GLOBAL_MEMORY_ALLOCATION_COLS 3 + TupleDesc tupdesc; + Datum values[PG_STAT_GET_GLOBAL_MEMORY_ALLOCATION_COLS] = {0}; + bool nulls[PG_STAT_GET_GLOBAL_MEMORY_ALLOCATION_COLS] = {0}; + volatile PROC_HDR *procglobal = ProcGlobal; + + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_GLOBAL_MEMORY_ALLOCATION_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "datid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "total_bkend_mem_bytes_available", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "global_dsm_allocated_bytes", + INT8OID, -1, 0); + BlessTupleDesc(tupdesc); + + /* datid */ + values[0] = ObjectIdGetDatum(MyDatabaseId); + + /* Get total_bkend_mem_bytes */ + values[1] = Int64GetDatum(pg_atomic_read_u64(&procglobal->total_bkend_mem_bytes)); + + /* Get global_dsm_allocated_bytes */ + values[2] = Int64GetDatum(pg_atomic_read_u64(&procglobal->global_dsm_allocation)); + + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index e565a3092f..9d2dea3948 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -77,6 +77,7 @@ #include "utils/float.h" #include "utils/guc_hooks.h" #include "utils/guc_tables.h" +#include "utils/memtrack.h" #include "utils/memutils.h" #include "utils/pg_locale.h" #include "utils/portal.h" @@ -3507,6 +3508,17 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_total_backend_memory", PGC_SU_BACKEND, RESOURCES_MEM, + gettext_noop("Restrict total backend memory allocations to this max."), + gettext_noop("0 turns this feature off."), + GUC_UNIT_MB + }, + &max_total_bkend_mem, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index c768af9a73..bb75e2756a 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -160,6 +160,9 @@ #vacuum_buffer_usage_limit = 256kB # size of vacuum and analyze buffer access strategy ring; # 0 to disable vacuum buffer access strategy; # range 128kB to 16GB +#max_total_backend_memory = 0MB # Restrict total backend memory allocations + # to this max (in MB). 0 turns this feature + # off. # - Disk - diff --git a/src/backend/utils/mmgr/aset.c b/src/backend/utils/mmgr/aset.c index c3affaf5a8..fe8ed8a712 100644 --- a/src/backend/utils/mmgr/aset.c +++ b/src/backend/utils/mmgr/aset.c @@ -48,6 +48,7 @@ #include "port/pg_bitutils.h" #include "utils/memdebug.h" +#include "utils/memtrack.h" #include "utils/memutils.h" #include "utils/memutils_memorychunk.h" #include "utils/memutils_internal.h" @@ -441,7 +442,7 @@ AllocSetContextCreateInternal(MemoryContext parent, * Allocate the initial block. Unlike other aset.c blocks, it starts with * the context header and its block header follows that. */ - set = (AllocSet) malloc(firstBlockSize); + set = (AllocSet) malloc_backend(firstBlockSize, PG_ALLOC_ASET); if (set == NULL) { if (TopMemoryContext) @@ -539,6 +540,7 @@ AllocSetReset(MemoryContext context) AllocSet set = (AllocSet) context; AllocBlock block; Size keepersize PG_USED_FOR_ASSERTS_ONLY; + uint64 deallocation = 0; Assert(AllocSetIsValid(set)); @@ -581,6 +583,7 @@ AllocSetReset(MemoryContext context) { /* Normal case, release the block */ context->mem_allocated -= block->endptr - ((char *) block); + deallocation += block->endptr - ((char *) block); #ifdef CLOBBER_FREED_MEMORY wipe_mem(block, block->freeptr - ((char *) block)); @@ -591,6 +594,7 @@ AllocSetReset(MemoryContext context) } Assert(context->mem_allocated == keepersize); + release_backend_memory(deallocation, PG_ALLOC_ASET); /* Reset block size allocation sequence, too */ set->nextBlockSize = set->initBlockSize; @@ -609,6 +613,7 @@ AllocSetDelete(MemoryContext context) AllocSet set = (AllocSet) context; AllocBlock block = set->blocks; Size keepersize PG_USED_FOR_ASSERTS_ONLY; + uint64 deallocation = 0; Assert(AllocSetIsValid(set)); @@ -647,11 +652,13 @@ AllocSetDelete(MemoryContext context) freelist->first_free = (AllocSetContext *) oldset->header.nextchild; freelist->num_free--; + deallocation += oldset->header.mem_allocated; /* All that remains is to free the header/initial block */ free(oldset); } Assert(freelist->num_free == 0); + release_backend_memory(deallocation, PG_ALLOC_ASET); } /* Now add the just-deleted context to the freelist. */ @@ -668,7 +675,10 @@ AllocSetDelete(MemoryContext context) AllocBlock next = block->next; if (!IsKeeperBlock(set, block)) + { context->mem_allocated -= block->endptr - ((char *) block); + deallocation += block->endptr - ((char *) block); + } #ifdef CLOBBER_FREED_MEMORY wipe_mem(block, block->freeptr - ((char *) block)); @@ -681,6 +691,7 @@ AllocSetDelete(MemoryContext context) } Assert(context->mem_allocated == keepersize); + release_backend_memory(deallocation + context->mem_allocated, PG_ALLOC_ASET); /* Finally, free the context header, including the keeper block */ free(set); @@ -725,7 +736,7 @@ AllocSetAlloc(MemoryContext context, Size size) #endif blksize = chunk_size + ALLOC_BLOCKHDRSZ + ALLOC_CHUNKHDRSZ; - block = (AllocBlock) malloc(blksize); + block = (AllocBlock) malloc_backend(blksize, PG_ALLOC_ASET); if (block == NULL) return NULL; @@ -925,7 +936,7 @@ AllocSetAlloc(MemoryContext context, Size size) blksize <<= 1; /* Try to allocate it */ - block = (AllocBlock) malloc(blksize); + block = (AllocBlock) malloc_backend(blksize, PG_ALLOC_ASET); /* * We could be asking for pretty big blocks here, so cope if malloc @@ -936,7 +947,7 @@ AllocSetAlloc(MemoryContext context, Size size) blksize >>= 1; if (blksize < required_size) break; - block = (AllocBlock) malloc(blksize); + block = (AllocBlock) malloc_backend(blksize, PG_ALLOC_ASET); } if (block == NULL) @@ -1044,7 +1055,7 @@ AllocSetFree(void *pointer) #ifdef CLOBBER_FREED_MEMORY wipe_mem(block, block->freeptr - ((char *) block)); #endif - free(block); + free_backend(block, block->endptr - ((char *) block), PG_ALLOC_ASET); } else { @@ -1159,8 +1170,7 @@ AllocSetRealloc(void *pointer, Size size) /* Do the realloc */ blksize = chksize + ALLOC_BLOCKHDRSZ + ALLOC_CHUNKHDRSZ; oldblksize = block->endptr - ((char *) block); - - block = (AllocBlock) realloc(block, blksize); + block = (AllocBlock) realloc_backend(block, blksize, oldblksize, PG_ALLOC_ASET); if (block == NULL) { /* Disallow access to the chunk header. */ diff --git a/src/backend/utils/mmgr/generation.c b/src/backend/utils/mmgr/generation.c index 92401ccf73..b2bc39b472 100644 --- a/src/backend/utils/mmgr/generation.c +++ b/src/backend/utils/mmgr/generation.c @@ -37,6 +37,7 @@ #include "lib/ilist.h" #include "port/pg_bitutils.h" +#include "utils/backend_status.h" #include "utils/memdebug.h" #include "utils/memutils.h" #include "utils/memutils_memorychunk.h" @@ -203,7 +204,7 @@ GenerationContextCreate(MemoryContext parent, * Allocate the initial block. Unlike other generation.c blocks, it * starts with the context header and its block header follows that. */ - set = (GenerationContext *) malloc(allocSize); + set = (GenerationContext *) malloc_backend(allocSize, PG_ALLOC_GENERATION); if (set == NULL) { MemoryContextStats(TopMemoryContext); @@ -279,6 +280,7 @@ GenerationReset(MemoryContext context) { GenerationContext *set = (GenerationContext *) context; dlist_mutable_iter miter; + uint64 deallocation = 0; Assert(GenerationIsValid(set)); @@ -301,9 +303,14 @@ GenerationReset(MemoryContext context) if (IsKeeperBlock(set, block)) GenerationBlockMarkEmpty(block); else + { + deallocation += block->blksize; GenerationBlockFree(set, block); + } } + release_backend_memory(deallocation, PG_ALLOC_GENERATION); + /* set it so new allocations to make use of the keeper block */ set->block = KeeperBlock(set); @@ -322,10 +329,12 @@ GenerationReset(MemoryContext context) void GenerationDelete(MemoryContext context) { + /* Reset to release all releasable GenerationBlocks */ GenerationReset(context); + /* And free the context header and keeper block */ - free(context); + free_backend(context, MAXALIGN(sizeof(GenerationContext)) + context->mem_allocated, PG_ALLOC_GENERATION); } /* @@ -365,7 +374,7 @@ GenerationAlloc(MemoryContext context, Size size) { Size blksize = required_size + Generation_BLOCKHDRSZ; - block = (GenerationBlock *) malloc(blksize); + block = (GenerationBlock *) malloc_backend(blksize, PG_ALLOC_GENERATION); if (block == NULL) return NULL; @@ -467,8 +476,7 @@ GenerationAlloc(MemoryContext context, Size size) if (blksize < required_size) blksize = pg_nextpower2_size_t(required_size); - block = (GenerationBlock *) malloc(blksize); - + block = (GenerationBlock *) malloc_backend(blksize, PG_ALLOC_GENERATION); if (block == NULL) return NULL; @@ -723,9 +731,8 @@ GenerationFree(void *pointer) * list of blocks, then return it to malloc(). */ dlist_delete(&block->node); - set->header.mem_allocated -= block->blksize; - free(block); + free_backend(block, block->blksize, PG_ALLOC_GENERATION); } /* diff --git a/src/backend/utils/mmgr/slab.c b/src/backend/utils/mmgr/slab.c index 40c1d401c4..436be421fb 100644 --- a/src/backend/utils/mmgr/slab.c +++ b/src/backend/utils/mmgr/slab.c @@ -69,6 +69,7 @@ #include "postgres.h" #include "lib/ilist.h" +#include "utils/backend_status.h" #include "utils/memdebug.h" #include "utils/memutils.h" #include "utils/memutils_memorychunk.h" @@ -359,9 +360,7 @@ SlabContextCreate(MemoryContext parent, elog(ERROR, "block size %zu for slab is too small for %zu-byte chunks", blockSize, chunkSize); - - - slab = (SlabContext *) malloc(Slab_CONTEXT_HDRSZ(chunksPerBlock)); + slab = (SlabContext *) malloc_backend(Slab_CONTEXT_HDRSZ(chunksPerBlock), PG_ALLOC_SLAB); if (slab == NULL) { MemoryContextStats(TopMemoryContext); @@ -433,6 +432,7 @@ SlabReset(MemoryContext context) SlabContext *slab = (SlabContext *) context; dlist_mutable_iter miter; int i; + uint64 deallocation = 0; Assert(SlabIsValid(slab)); @@ -453,6 +453,7 @@ SlabReset(MemoryContext context) #endif free(block); context->mem_allocated -= slab->blockSize; + deallocation += slab->blockSize; } /* walk over blocklist and free the blocks */ @@ -469,9 +470,11 @@ SlabReset(MemoryContext context) #endif free(block); context->mem_allocated -= slab->blockSize; + deallocation += slab->blockSize; } } + release_backend_memory(deallocation, PG_ALLOC_SLAB); slab->curBlocklistIndex = 0; Assert(context->mem_allocated == 0); @@ -486,8 +489,13 @@ SlabDelete(MemoryContext context) { /* Reset to release all the SlabBlocks */ SlabReset(context); - /* And free the context header */ - free(context); + + /* + * Until context header allocation is included in context->mem_allocated, + * cast to slab and decrement the header allocation + */ + free_backend(context, Slab_CONTEXT_HDRSZ(((SlabContext *) context)->chunksPerBlock), + PG_ALLOC_SLAB); } /* @@ -543,8 +551,7 @@ SlabAlloc(MemoryContext context, Size size) } else { - block = (SlabBlock *) malloc(slab->blockSize); - + block = (SlabBlock *) malloc_backend(slab->blockSize, PG_ALLOC_SLAB); if (unlikely(block == NULL)) return NULL; @@ -742,7 +749,7 @@ SlabFree(void *pointer) #ifdef CLOBBER_FREED_MEMORY wipe_mem(block, slab->blockSize); #endif - free(block); + free_backend(block, slab->blockSize, PG_ALLOC_SLAB); slab->header.mem_allocated -= slab->blockSize; } diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9805bc6118..7a7f9ac1e2 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5420,6 +5420,23 @@ proname => 'pg_stat_get_backend_idset', prorows => '100', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'int4', proargtypes => '', prosrc => 'pg_stat_get_backend_idset' }, +{ oid => '9890', + descr => 'statistics: memory allocation information for backends', + proname => 'pg_stat_get_memory_allocation', prorows => '100', proisstrict => 'f', + proretset => 't', provolatile => 's', proparallel => 'r', + prorettype => 'record', proargtypes => 'int4', + proallargtypes => '{int4,oid,int4,int8,int8,int8,int8,int8}', + proargmodes => '{i,o,o,o,o,o,o,o}', + proargnames => '{pid,datid,pid,allocated_bytes,aset_allocated_bytes,dsm_allocated_bytes,generation_allocated_bytes,slab_allocated_bytes}', + prosrc => 'pg_stat_get_memory_allocation' }, +{ oid => '9891', + descr => 'statistics: global memory allocation information', + proname => 'pg_stat_get_global_memory_allocation', proisstrict => 'f', + provolatile => 's', proparallel => 'r', prorettype => 'record', + proargtypes => '', proallargtypes => '{oid,int8,int8}', + proargmodes => '{o,o,o}', + proargnames => '{datid,total_bkend_mem_bytes_available,global_dsm_allocated_bytes}', + prosrc =>'pg_stat_get_global_memory_allocation' }, { oid => '2022', descr => 'statistics: information about currently active backends', proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f', diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index ef74f32693..b3a2322328 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -404,6 +404,10 @@ typedef struct PROC_HDR int spins_per_delay; /* Buffer id of the buffer that Startup process waits for pin on, or -1 */ int startupBufferPinWaitBufId; + /* Global dsm allocations */ + pg_atomic_uint64 global_dsm_allocation; + /* Track how much memory the backends have allocated */ + pg_atomic_uint64 total_bkend_mem_bytes; } PROC_HDR; extern PGDLLIMPORT PROC_HDR *ProcGlobal; diff --git a/src/include/utils/backend_status.h b/src/include/utils/backend_status.h index d51c840daf..5ec310c769 100644 --- a/src/include/utils/backend_status.h +++ b/src/include/utils/backend_status.h @@ -10,11 +10,13 @@ #ifndef BACKEND_STATUS_H #define BACKEND_STATUS_H +#include "postgres.h" #include "datatype/timestamp.h" #include "libpq/pqcomm.h" #include "miscadmin.h" /* for BackendType */ #include "storage/backendid.h" #include "utils/backend_progress.h" +#include "utils/memtrack.h" /* ---------- @@ -170,6 +172,10 @@ typedef struct PgBackendStatus /* query identifier, optionally computed using post_parse_analyze_hook */ uint64 st_query_id; + + /* Memory allocated to this backend, both total and subtotals by type. */ + PgBackendMemoryStatus st_memory; + } PgBackendStatus; @@ -332,6 +338,8 @@ extern uint64 pgstat_get_my_query_id(void); * generate the pgstat* views. * ---------- */ +extern LocalPgBackendStatus *pgstat_fetch_stat_local_beentry(int beid); +extern PgBackendStatus *pgstat_fetch_stat_beentry(int beid); extern int pgstat_fetch_stat_numbackends(void); extern PgBackendStatus *pgstat_get_beentry_by_backend_id(BackendId beid); extern LocalPgBackendStatus *pgstat_get_local_beentry_by_backend_id(BackendId beid); diff --git a/src/include/utils/memtrack.h b/src/include/utils/memtrack.h new file mode 100644 index 0000000000..12b56cbf58 --- /dev/null +++ b/src/include/utils/memtrack.h @@ -0,0 +1,288 @@ + +#ifndef MEMTRACK_H +#define MEMTRACK_H + +/* ---------- + * Backend memory accounting functions. + * Track how much memory each backend is using + * and place a cluster-wide limit on the total amount of backend memory. + * + * The main functions are: + * init_backend_memory() + * reserve_backend_memory() + * release_backend_memory() + * exit_backend_memory() + * + * All local variables are properly initialized at startup, so init_backend_memory() + * only needs to be called after a fork() system call. + * + * The reserve/release functions implement both a "fast path" and a "slow path". + * The fast path is used for most allocations, and it only references + * local variables. The slow path is invoked periodically; it updates + * shared memory and checks for limits on total backend memory. + * + * The following local variables represent the "TRUTH" of this backend's memory allocations. + * my_memory.allocated_bytes: total amount of memory allocated by this backend. + * my_memory.allocated_bytes_by_type[type]: subtotals by allocator type. + * + * The local values are periodically reported to pgstgt. + * The following variables hold the last reported values + * reported_memory.allocated_bytes + * reported_memory.allocated_bytes_by_type[type]: + * + * The "slow path" is invoked when my_memory.allocate_bytes exceeds these bounds. + * Once invokoked, it updates the reported values and sets new bounds. + * allocation_upper_bound: update when my_memory.allocated_bytes exceeds this + * allocation_lower_bound: update when my_memory.allocated_bytes drops below this + * allocation_allowance_refill_qty amount of memory to allocate or release before updating again. + * + * These counters are the values seen by pgstat. They are a copy of reported_memory. + * proc->st_memory.allocated_bytes: last total reported to pgstat + * proc->st_memory.allocated_bytes_by_type[type]: last reported subtotals reported to pgstat + * + * Limits on total backend memory. If max_total_bkend_bytes is zero, there is no limit. + * ProcGlobal->total_bkend_mem_bytes: total amount of memory reserved by all backends, including shared memory + * max_total_bkend_bytes: maximum amount of memory allowed to be reserved by all backends. + * + * And finally, + * initial_allocation_allowance: each backend consumes this much memory simply by existing. + * ProcGlobal->global_dsm_allocated_bytes: total amount of shared memory allocated by all backends. + * ---------- + */ + +#include +#include "postgres.h" +#include "port/atomics.h" + +/* + * Define a debug macro which becomes noop() when debug is disabled. + */ +#define debug(args...) (void)0 + + +/* Various types of memory allocators we are tracking. */ +typedef enum pg_allocator_type +{ + PG_ALLOC_OTHER = 0, /* Not tracked, but part of total memory */ + PG_ALLOC_ASET, /* Allocation Set */ + PG_ALLOC_DSM, /* Dynamic shared memory */ + PG_ALLOC_GENERATION, /* Generation Context (all freed at once) */ + PG_ALLOC_SLAB, /* Slab allocator */ + PG_ALLOC_TYPE_MAX, /* (Last, for array sizing) */ +} pg_allocator_type; + + +/* + * PgBackendMemoryStatus + * + * For each backend, track how much memory has been allocated. + * Note may be possible to have negative values, say if one backend + * creates DSM segments and another backend destroys them. + */ +typedef struct PgBackendMemoryStatus +{ + int64 allocated_bytes; + int64 allocated_bytes_by_type[PG_ALLOC_TYPE_MAX]; +} PgBackendMemoryStatus; + + +/* These values are candidates for GUC variables. We chose 1MV arbitrarily. */ +static const int64 initial_allocation_allowance = 1024 * 1024; /* 1MB */ +static const int64 allocation_allowance_refill_qty = 1024 * 1024; /* 1MB */ + +/* Compile time initialization constants */ +#define INIT_BACKEND_MEMORY (PgBackendMemoryStatus) \ + {initial_allocation_allowance, {initial_allocation_allowance}} +#define NO_BACKEND_MEMORY (PgBackendMemoryStatus) \ + {0, {0}} + +/* Manage memory allocation for backends. */ +extern PGDLLIMPORT PgBackendMemoryStatus my_memory; +extern PGDLLIMPORT PgBackendMemoryStatus reported_memory; +extern PGDLLIMPORT int64 allocation_upper_bound; +extern PGDLLIMPORT int64 allocation_lower_bound; + +extern PGDLLIMPORT int64 max_total_bkend_bytes; +extern PGDLLIMPORT int32 max_total_bkend_mem; + +/* These are the main entry points for backend memory accounting */ +extern void init_backend_memory(void); +static inline bool reserve_backend_memory(int64 size, pg_allocator_type type); +static inline void release_backend_memory(int64 size, pg_allocator_type type); +extern void exit_backend_memory(void); + +/* Helper functions for backend memory accounting */ +static inline bool update_local_allocation(int64 size, pg_allocator_type type); +extern bool update_global_allocation(int64 size, pg_allocator_type type); + +/* ---------- + * reserve_backend_memory() - + * Called to report a desired increase in memory for this backend. + * true if successful. + */ +static inline bool +reserve_backend_memory(int64 size, pg_allocator_type type) +{ + Assert(size >= 0); + + /* quick optimization */ + if (size == 0) + return true; + + /* CASE: the new allocation is within bounds. Take the fast path. */ + else if (my_memory.allocated_bytes + size <= allocation_upper_bound) + return update_local_allocation(size, type); + + /* CASE: out of bounds. Update pgstat and check memory limits */ + else + return update_global_allocation(size, type); +} + +/* ---------- + * unreserve_memory() - + * Called to report decrease in memory allocated for this backend. + * Note we should have already called "reserve_backend_memory" + * so we should never end up with a negative total allocation. + */ +static inline void +release_backend_memory(int64 size, pg_allocator_type type) +{ + Assert(size >= 0); + + /* quick optimization */ + if (size == 0) + return; + + /* CASE: In bounds, take the fast path */ + else if (my_memory.allocated_bytes - size >= allocation_lower_bound) + update_local_allocation(-size, type); + + /* CASE: Out of bounds. Update pgstat and memory totals */ + else + update_global_allocation(-size, type); +} + + +/* +* Fast path for reserving and releasing memory. +* This version is used for most allocations, and it +* is stripped down to the bare minimum to reduce impact +* on performance. It only updates local variables. +*/ +static inline bool +update_local_allocation(int64 size, pg_allocator_type type) +{ + /* Update our local memory counters. */ + my_memory.allocated_bytes += size; + my_memory.allocated_bytes_by_type[type] += size; + + return true; +} + + +/*-------------------------------------------- + * Convenience functions based on malloc/free + *------------------------------------------*/ + +/* + * Reserve memory from malloc if we can. + */ +static inline void * +malloc_backend(int64 size, pg_allocator_type type) +{ + void *ptr; + + /* reserve the memory if able to */ + if (!reserve_backend_memory(size, type)) + return NULL; + + /* Allocate the memory, releasing the reservation if failed */ + ptr = malloc(size); + if (ptr == NULL) + release_backend_memory(size, type); + + return ptr; +} + +/* + * Free memory which was allocated with malloc_reserved. + * Note: most mallocs have a non-portable method to + * get the size of a block of memory. Dropping the "size" parameter + * would greatly simplify the calling code. + */ +static inline void +free_backend(void *ptr, int64 size, pg_allocator_type type) +{ + release_backend_memory(size, type); + free(ptr); +} + + +/* + * Realloc reserved memory. + */ +static inline void * +realloc_backend(void *block, int64 new_size, int64 old_size, pg_allocator_type type) +{ + void *ptr; + bool success; + + /* Update the reservation to the new size */ + release_backend_memory(old_size, type); + success = reserve_backend_memory(new_size, type); + + /* If unable, free the old memory and return NULL */ + if (!success) + { + free(block); + return NULL; + } + + /* Now, actually resize the memory */ + ptr = realloc(block, new_size); + + /* + * If unable to resize, release the allocation. + * The actual memory has already been freed. + */ + if (ptr == NULL) + release_backend_memory(new_size, type); + + return ptr; +} + + +/* True if adding a and b would overflow */ +static inline bool addition_overflow(int64 a, int64 b) +{ + int64 result = a + b; + return (a > 0 && b > 0 && result < 0) || (a < 0 && b < 0 && result > 0); +} + +/* + * Helper function to add to an atomic sum, as long as the result is within bounds. + * TODO: consider moving to atomics.h + */ +static inline bool +atomic_add_within_bounds_i64(pg_atomic_uint64 *ptr, int64 add, int64 lower_bound, int64 upper_bound) +{ + int64 oldval; + int64 newval; + + for (;;) + { + oldval = (int64)pg_atomic_read_u64(ptr); + newval = oldval + add; + + /* check if we are out of bounds */ + if (newval < lower_bound || newval > upper_bound || addition_overflow(oldval, add)) + return false; + + if (pg_atomic_compare_exchange_u64(ptr, (uint64 *)&oldval, newval)) + return true; + } +} + + + +#endif //POSTGRES_IDE_MEMTRACK_H diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index 17d369e378..6bdcc6dac4 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -19,6 +19,7 @@ subdir('test_extensions') subdir('test_ginpostinglist') subdir('test_integerset') subdir('test_lfind') +subdir('test_memtrack') subdir('test_misc') subdir('test_oat_hooks') subdir('test_parser') diff --git a/src/test/modules/test_memtrack/.gitignore b/src/test/modules/test_memtrack/.gitignore new file mode 100644 index 0000000000..5dcb3ff972 --- /dev/null +++ b/src/test/modules/test_memtrack/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_memtrack/Makefile b/src/test/modules/test_memtrack/Makefile new file mode 100644 index 0000000000..db44ca0575 --- /dev/null +++ b/src/test/modules/test_memtrack/Makefile @@ -0,0 +1,25 @@ +# src/test/modules/test_memtrack/Makefile + +MODULE_big = test_memtrack +OBJS = \ + $(WIN32RES) \ + test_memtrack.o \ + worker_pool.o +PGFILEDESC = "test_memtrack - example use of shared memory message queue" + +EXTENSION = test_memtrack +DATA = test_memtrack--1.0.sql + +REGRESS = test_memtrack +REGRESS_OPTS = --temp-config=$(top_srcdir)/src/test/modules/test_memtrack/test_memtrack.conf + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_memtrack +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_memtrack/README b/src/test/modules/test_memtrack/README new file mode 100644 index 0000000000..75a6c1e84a --- /dev/null +++ b/src/test/modules/test_memtrack/README @@ -0,0 +1,36 @@ +test_memtrack tests the memory tracking facilities of PostgreSQL. + + +Functions +========= +test_allocation(# workers, type of memory, # blocks, block size) + + This function controls the test. It creates a pool of workers and passes them messages + asking them to allocate or free memory. It verifies the memory is allocated or freed, + and it confirms the total memory has been freed at the end of the test. + + Like test_shm_queue, this function spawns worker backends which communicate + through message queues. + + Unlike test_shm_queue, each worker is associatied with two queues, one for requests + and another for responses. All messages are exchanged between the main test backend + and the workers. The workers do not communicate with each other. + +test_memtrack(# workers, type of memory, # blocks, block size) + + This function is like test_allocation, but it only updates the memory tracking counters + and does not actually allocate or free memory. + + +Messages +======== +The message sent to the workers + request type (ALLOCATE or FREE) + type of memory + number of blocks to allocate or free + size of each block + +The response sent back. + error code (0 if no error) + starting memory usage totals and subtotals. + ending memory usage totals and subtotals. diff --git a/src/test/modules/test_memtrack/expected/test_memtrack.out b/src/test/modules/test_memtrack/expected/test_memtrack.out new file mode 100644 index 0000000000..23e51b4e80 --- /dev/null +++ b/src/test/modules/test_memtrack/expected/test_memtrack.out @@ -0,0 +1,80 @@ +CREATE EXTENSION test_memtrack; +-- Make sure we can track memory usage of a single task +-- Since logic is the same for all memory managers, only test one. +SELECT test_memtrack(1, 1, 0, 1024); + test_memtrack +--------------- + +(1 row) + +-- Make sure we can track memory usage of multiple tasks. +-- By default we are limited to 8 tasks, so stay below the limit. +SELECT test_memtrack(5, 2, 1024, 5*1024); + test_memtrack +--------------- + +(1 row) + +-- Do it again. We had a bug where the second call would fail. +SELECT test_memtrack(5, 3, 1024, 5*1024); + test_memtrack +--------------- + +(1 row) + +-- Now we're going to actualy do memory allocations. +-- We'll test each type of memory manager, except DSM. +-- Verify we can create and destroy contexts. +SELECT test_allocation(1,1,0,1024); + test_allocation +----------------- + +(1 row) + +SELECT test_allocation(1,2,0, 1024); + test_allocation +----------------- + +(1 row) + +SELECT test_allocation(1,3, 0,1024); + test_allocation +----------------- + +(1 row) + +SELECT test_allocation(1,4,0,1024); + test_allocation +----------------- + +(1 row) + +-- Create and free blocks of memory. +SELECT test_allocation(5,1,5*1024,1024); + test_allocation +----------------- + +(1 row) + +SELECT test_allocation(5,2,5,1024*1024); /* Fewer, don't exceed shmem limit */ + test_allocation +----------------- + +(1 row) + +SELECT test_allocation(5,3,5*1024,1024); + test_allocation +----------------- + +(1 row) + +SELECT test_allocation(5,4,5*1024,1024); + test_allocation +----------------- + +(1 row) + +-- Allocate more memory than we have available. +-- (this should fail because we configured max_backend_memory to 1024 Mb) +SELECT test_memtrack(5, 2, 1024, 1024*1024*1024); +ERROR: worker ran out of memory diff --git a/src/test/modules/test_memtrack/meson.build b/src/test/modules/test_memtrack/meson.build new file mode 100644 index 0000000000..4b861ab62f --- /dev/null +++ b/src/test/modules/test_memtrack/meson.build @@ -0,0 +1,38 @@ +# Copyright (c) 2022-2023, PostgreSQL Global Development Group + +test_memtrack_sources = files( + 'test_memtrack.c', + 'worker_pool.c', +) + +if host_system == 'windows' + test_memtrack_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'test_memtrack', + '--FILEDESC', 'test_memtrack - test memory tracking',]) +endif + +test_memtrack = shared_module('test_memtrack', + test_memtrack_sources, + kwargs: pg_test_mod_args, +) +test_install_libs += test_memtrack + +test_install_data += files( + 'test_memtrack.control', + 'test_memtrack--1.0.sql', +) + +tests += { + 'name': 'test_memtrack', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'test_memtrack', + ], + 'regress_args': [ + '--temp-config', files('test_memtrack.conf'), + ], + + }, +} diff --git a/src/test/modules/test_memtrack/sql/test_memtrack.sql b/src/test/modules/test_memtrack/sql/test_memtrack.sql new file mode 100644 index 0000000000..39318276ec --- /dev/null +++ b/src/test/modules/test_memtrack/sql/test_memtrack.sql @@ -0,0 +1,31 @@ +CREATE EXTENSION test_memtrack; + +-- Make sure we can track memory usage of a single task +-- Since logic is the same for all memory managers, only test one. +SELECT test_memtrack(1, 1, 0, 1024); + +-- Make sure we can track memory usage of multiple tasks. +-- By default we are limited to 8 tasks, so stay below the limit. +SELECT test_memtrack(5, 2, 1024, 5*1024); + +-- Do it again. We had a bug where the second call would fail. +SELECT test_memtrack(5, 3, 1024, 5*1024); + +-- Now we're going to actualy do memory allocations. +-- We'll test each type of memory manager, except DSM. + +-- Verify we can create and destroy contexts. +SELECT test_allocation(1,1,0,1024); +SELECT test_allocation(1,2,0, 1024); +SELECT test_allocation(1,3, 0,1024); +SELECT test_allocation(1,4,0,1024); + +-- Create and free blocks of memory. +SELECT test_allocation(5,1,5*1024,1024); +SELECT test_allocation(5,2,5,1024*1024); /* Fewer, don't exceed shmem limit */ +SELECT test_allocation(5,3,5*1024,1024); +SELECT test_allocation(5,4,5*1024,1024); + +-- Allocate more memory than we have available. +-- (this should fail because we configured max_backend_memory to 1024 Mb) +SELECT test_memtrack(5, 2, 1024, 1024*1024*1024); diff --git a/src/test/modules/test_memtrack/test_memtrack--1.0.sql b/src/test/modules/test_memtrack/test_memtrack--1.0.sql new file mode 100644 index 0000000000..9a39f24ee4 --- /dev/null +++ b/src/test/modules/test_memtrack/test_memtrack--1.0.sql @@ -0,0 +1,20 @@ +/* src/test/modules/test_memtrack/test_memtrack--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_memtrack" to load this file. \quit + +CREATE FUNCTION test_memtrack( + num_workers pg_catalog.int4 default 1, + type pg_catalog.int4 default 1, + num_blocks pg_catalog.int4 default 1, + block_size pg_catalog.int4 default 1024) + RETURNS pg_catalog.void STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION test_allocation( + num_workers pg_catalog.int4 default 1, + type pg_catalog.int4 default 1, + num_blocks pg_catalog.int4 default 1, + block_size pg_catalog.int4 default 1024) + RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_memtrack/test_memtrack.c b/src/test/modules/test_memtrack/test_memtrack.c new file mode 100644 index 0000000000..e9a8a7844f --- /dev/null +++ b/src/test/modules/test_memtrack/test_memtrack.c @@ -0,0 +1,496 @@ +/*-------------------------------------------------------------------------- + * + * test_memstack.c + * Test harness code + * + * Copyright (c) 2013-2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_memtrack/test_memtrack.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "fmgr.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "varatt.h" + +#include "storage/dsm.h" +#include "utils/backend_status.h" +#include "utils/memutils_internal.h" + +#include "worker_pool.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(test_memtrack); +PG_FUNCTION_INFO_V1(test_allocation); + +typedef enum {ALLOCATE, RELEASE} Action; +typedef struct AllocateRequest +{ + Action action; + pg_allocator_type type; + int32 nWorkers; + int32 nBlocks; + int64 blockSize; +} AllocateRequest; + +typedef struct ResponseData +{ + int32 errorCode; + PgBackendMemoryStatus memory; + PgBackendMemoryStatus startingMemory; +} ResponseData; + +/* Forward references */ +static bool reserveBlocks(pg_allocator_type type, int nBlocks, int blockSize); +static bool releaseBlocks(pg_allocator_type type, int nBlocks, int blockSize); +static void validateArgs(int nWorkers, pg_allocator_type type, int nBlocks, int blockSize); +static void sendRequest(WorkerPool *pool, int worker, Action action, pg_allocator_type type, int nBlocks, int blockSize); +static void processReply(WorkerPool *pool, int worker, Action actions, pg_allocator_type type, int nBlocks, int blockSize); +static Datum test_nonDSM(FunctionCallInfo fcinfo, char *workerFunction); + +/* Test the memory tracking features standalone */ +PGDLLEXPORT Datum test_memtrack(PG_FUNCTION_ARGS); +PGDLLEXPORT void test_memtrack_worker(Datum arg); + +/* Test the memory tracking features with actual allocations */ +PGDLLEXPORT Datum test_allocation(PG_FUNCTION_ARGS); +PGDLLEXPORT void test_allocation_worker(Datum arg); +/* + * Test the memory tracking features. + */ +Datum +test_memtrack(PG_FUNCTION_ARGS) +{ + return test_nonDSM(fcinfo, "test_memtrack_worker"); +} + + +Datum +test_allocation(PG_FUNCTION_ARGS) +{ + return test_nonDSM(fcinfo, "test_allocation_worker"); +} + + + +/**************** + * Parent task to test the memory tracking features. + * Schedules a pool of workers and verifies the results. + * This version tests non-shared memory allocations. + */ +static +Datum test_nonDSM(FunctionCallInfo fcinfo, char *workerFunction) +{ + WorkerPool *pool; + int64 delta, starting_bkend_bytes; + int64 expected, fudge; + + /* Get the SQL function arguments */ + int32 nWorkers = PG_GETARG_INT32(0); + pg_allocator_type type = PG_GETARG_INT32(1); + int32 nBlocks = PG_GETARG_INT32(2); + int32 blockSize = PG_GETARG_INT32(3); + + debug("nWorkers=%d type=%d nBlocks=%d blockSize=%d\n", nWorkers, type, nBlocks, blockSize); + validateArgs(nWorkers, type, nBlocks, blockSize); + + /* Our global totals may be off by an allocation allowance per worker */ + fudge = nWorkers * allocation_allowance_refill_qty * nWorkers; + expected = nWorkers * nBlocks * blockSize; + + /* Set up pool of background workers */ + pool = createWorkerPool(nWorkers, sizeof(AllocateRequest) * 10, sizeof(ResponseData) * 10, + "test_memtrack", workerFunction); + + /* Remember the total memory before we start allocations */ + starting_bkend_bytes = pg_atomic_read_u32((void *)&ProcGlobal->total_bkend_mem_bytes); + + /* Tell the workers to start their first batch of allocations */ + for (int w = 0; w < nWorkers; w++) + sendRequest(pool, w, ALLOCATE, type, nBlocks, blockSize); + + /* Get the workers response and verify all is good */ + for (int w = 0; w < nWorkers; w++) + processReply(pool, w, ALLOCATE, type, nBlocks, blockSize); + + /* Confirm the total backend memory is greater than what we just allocated */ + delta = pg_atomic_read_u32((void *)&ProcGlobal->total_bkend_mem_bytes) - starting_bkend_bytes; + debug("starting_bkend_bytes=%zd delta=%zd\n", starting_bkend_bytes, delta); + if (delta < expected - fudge || delta > expected + fudge) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Total allocated memory %zd doesn't match expected %zd fudge=%zd\n", delta, expected, fudge))); + + /* Tell the workers to release their memory */ + for (int w=0; w < nWorkers; w++) + sendRequest(pool, w, RELEASE, type, nBlocks, blockSize); + + /* Get the workers response and verify all is good */ + for (int w = 0; w < nWorkers; w++) + processReply(pool, w, RELEASE, type, nBlocks, blockSize); + + /* Verify the new total is reasonable */ + delta = pg_atomic_read_u32((void *)&ProcGlobal->total_bkend_mem_bytes) - starting_bkend_bytes; + debug("After release: delta=%zd expected=%d\n", delta, 0); + if (delta < -fudge || delta > fudge) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Total allocated memory %zd is less than expected %zd\n", delta, 0l))); + + /* Clean up */ + freeWorkerPool(pool); + debug("done\n"); + PG_RETURN_VOID(); +} + + +static +void validateArgs(int nWorkers, pg_allocator_type type, int nBlocks, int blockSize) +{ + /* A negative blockcount is nonsensical. */ + if (nBlocks < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("repeat count size must be an integer value greater than or equal to zero"))); + + /* a minimum of 1 worker is required. */ + if (nWorkers <= 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("number of workers must be an integer value greater than zero"))); + + /* block size must be > 0 */ + if (blockSize <= 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("block size must be an integer value greater than zero"))); + + /* Valid type? */ + if (type < 0 || type >= PG_ALLOC_TYPE_MAX) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid allocation type"))); +} + +static +void sendRequest(WorkerPool *pool, int worker, Action action, pg_allocator_type type, int nBlocks, int blockSize) +{ + AllocateRequest req; + int result; + debug("worker=%d action=%d type=%d nBlocks=%d blockSize=%d\n", worker, action, type, nBlocks, blockSize); + + req = (AllocateRequest) {.nBlocks = nBlocks, .action = action, .type = type, .blockSize = blockSize}; + + result = sendToWorker(pool, worker, &req, sizeof(req)); + if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send message"))); +} + + +void processReply(WorkerPool *pool, int worker, Action action, pg_allocator_type type, int nBlocks, int blockSize) +{ + int64 delta; + ResponseData *resp; + Size len; + int result; + debug("worker=%d action=%d type=%d nBlocks=%d blockSize=%d\n", worker, action, type, nBlocks, blockSize); + + /* Receive a message. Returns a pointer to the message and a length */ + result = recvFromWorker(pool, worker, (void *)&resp, &len); + if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not receive message"))); + + if (resp->errorCode != 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("worker ran out of memory"))); + + /* Verify the totals */ + delta = resp->memory.allocated_bytes_by_type[type] - resp->startingMemory.allocated_bytes_by_type[type]; + debug("delta=%ld expected=%ld\n", delta, (int64)nBlocks*blockSize); + if (action == ALLOCATE && delta < nBlocks * blockSize) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("worker reported %ld bytes, expected %d", resp->memory.allocated_bytes_by_type[type], nBlocks * blockSize))); + + if (action == RELEASE && delta != 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("worker reported %ld bytes, expected 0", resp->memory.allocated_bytes_by_type[type]))); +} + + + +void test_memtrack_worker(Datum arg) +{ + AllocateRequest *req; + Size reqSize; + + ResponseData resp[1]; + shm_mq_result result; + + PgBackendMemoryStatus startingMemory; + + debug(""); + workerInit(arg); + + /* Now that we're running, make note of how much memory has been already allocated */ + startingMemory = my_memory; + + do + { + result = workerRecv((void *)&req, &reqSize); + debug("Received result=%d action=%d type=%d nBlocks=%d blockSize=%ld\n", result, req->action, req->type, req->nBlocks, req->blockSize); + if (result != SHM_MQ_SUCCESS) + break; + + /* Allocate or release the blocks */ + if (req->action == ALLOCATE && reserveBlocks(req->type, req->nBlocks, req->blockSize)) + resp->errorCode = 0; + else if (req->action == RELEASE && releaseBlocks(req->type, req->nBlocks, req->blockSize)) + resp->errorCode = 0; + else + resp->errorCode = 1; + + /* Get the current memory totals */ + resp->memory = my_memory; + resp->startingMemory = startingMemory; + debug("MyBEEntry=%p allocated_bytes=%ld type=%d\n", MyBEEntry, my_memory.allocated_bytes_by_type[req->type], req->type); + + /* Send the response */ + result = workerSend(resp, sizeof(resp[1])); + debug("Send errorCode=%d result=%d\n", resp->errorCode, result); + + } while (result == SHM_MQ_SUCCESS); + + workerExit(0); +} + + +bool reserveBlocks(pg_allocator_type type, int nBlocks, int blockSize) +{ + debug("type=%d nBlocks=%d blockSize=%d prev=%zd\n", type, nBlocks, blockSize, my_memory.allocated_bytes_by_type[type]); + /* Allocate the requested number of blocks */ + for (int i = 0; i < nBlocks; i++) + if (!reserve_backend_memory(blockSize, type)) + return false; + + debug("success: allocated_bytes=%zd\n", my_memory.allocated_bytes_by_type[type]); + return true; +} + + +bool releaseBlocks(pg_allocator_type type, int nBlocks, int blockSize) +{ + debug("type=%d nBlocks=%d blockSize=%d\n", type, nBlocks, blockSize); + for (int i = 0; i < nBlocks; i++) + release_backend_memory(blockSize, type); + + return true; +} + + + +/*************** + * Worker which allocates and frees blocks, + * but this one does actual allocations. + */ + +/* Forward references for the allocation worker */ +static bool freeBlocks(pg_allocator_type type, int nBlocks, int blockSize); +static MemoryContext createTestContext(pg_allocator_type type); +static bool allocateBlocks(pg_allocator_type type, int nBlocks, int blockSize); +static bool allocateDSMBlocks(int nBlocks, int blockSize); +static bool freeDSMBlocks(int nBlocks, int blockSize); +static bool allocateContextBlocks(pg_allocator_type type, int nBlocks, int blockSize); +static bool freeContextBlocks(pg_allocator_type type, int nBlocks, int blockSize); + + +/* An array of pointers to the allocated blocks */ +static void **allocations; +static MemoryContext testContext; + + +void test_allocation_worker(Datum arg) +{ + AllocateRequest *req; + Size reqSize; + + ResponseData resp[1]; + shm_mq_result result; + + PgBackendMemoryStatus startingMemory; + + debug("\n"); + workerInit(arg); + + /* Now that we're running, make note of how much memory has been already allocated */ + startingMemory = my_memory; + + do + { + result = workerRecv((void *)&req, &reqSize); + debug("Received result=%d action=%d type=%d nBlocks=%d blockSize=%ld\n", result, req->action, req->type, req->nBlocks, req->blockSize); + if (result != SHM_MQ_SUCCESS) + break; + + /* Allocate or release the blocks */ + if (req->action == ALLOCATE && allocateBlocks(req->type, req->nBlocks, req->blockSize)) + resp->errorCode = 0; + else if (req->action == RELEASE && freeBlocks(req->type, req->nBlocks, req->blockSize)) + resp->errorCode = 0; + else + resp->errorCode = 1; + + /* Get the current memory totals */ + resp->memory = my_memory; + resp->startingMemory = startingMemory; + debug("MyBEEntry=%p allocated_bytes=%ld type=%d\n", MyBEEntry, my_memory.allocated_bytes_by_type[req->type], req->type); + + /* Send the response */ + result = workerSend(resp, sizeof(resp[1])); + + } while (result == SHM_MQ_SUCCESS); + + workerExit(0); +} + + + +static bool allocateBlocks(pg_allocator_type type, int nBlocks, int blockSize) +{ + debug("type=%d nBlocks=%d blockSize=%d prev=%zd\n", type, nBlocks, blockSize, my_memory.allocated_bytes_by_type[type]); + if (type == PG_ALLOC_DSM) + return allocateDSMBlocks(nBlocks, blockSize); + else + return allocateContextBlocks(type, nBlocks, blockSize); +} + +static bool freeBlocks(pg_allocator_type type, int nBlocks, int blockSize) +{ + debug("type=%d nBlocks=%d blockSize=%d\n", type, nBlocks, blockSize); + if (type == PG_ALLOC_DSM) + return freeDSMBlocks(nBlocks, blockSize); + else + return freeContextBlocks(type, nBlocks, blockSize); +} + + + + +static +MemoryContext createTestContext(pg_allocator_type type) +{ + switch (type) + { + case PG_ALLOC_ASET: + return AllocSetContextCreate(TopMemoryContext, "test_aset", 64, 1024, 16 * 1024); + case PG_ALLOC_SLAB: + return SlabContextCreate(TopMemoryContext, "test_slab", 16 * 1024, 1024); + case PG_ALLOC_GENERATION: + return GenerationContextCreate(TopMemoryContext, "test_gen", 64, 1024, 16 * 1024); + default: return NULL; + + } +} + + +static +bool allocateContextBlocks(pg_allocator_type type, int nBlocks, int blockSize) +{ + MemoryContext old; + + debug("type=%d nBlocks=%d blockSize=%d prev=%zd\n", type, nBlocks, blockSize, my_memory.allocated_bytes_by_type[type]); + if (type == PG_ALLOC_DSM || type == PG_ALLOC_OTHER) + return false; + + /* Create a memory context for the allocations */ + testContext = createTestContext(type); + if (testContext == NULL) + return false; + + /* Switch to the new context */ + old = MemoryContextSwitchTo(testContext); + + /* Create a list of block pointers - not in the context */ + allocations = malloc(sizeof(void *) * nBlocks); + if (allocations == NULL) + return false; + + /* Allocate the requested number of blocks */ + for (int i = 0; i < nBlocks; i++) + { + allocations[i] = palloc(blockSize); + if (allocations[i] == NULL) + return false; + } + + /* Switch back to the old context */ + MemoryContextSwitchTo(old); + + debug("success: allocated_bytes=%zd\n", my_memory.allocated_bytes_by_type[type]); + return true; + } + + +static +bool freeContextBlocks(pg_allocator_type type, int nBlocks, int blockSize) +{ + debug("type=%d nBlocks=%d blockSize=%d\n", type, nBlocks, blockSize); + for (int i = 0; i < nBlocks; i++) + pfree(allocations[i]); + + MemoryContextDelete(testContext); + free(allocations); + allocations = NULL; + + return true; +} + + + + +static bool allocateDSMBlocks(int nBlocks, int blockSize) +{ + debug("nBlocks=%d blockSize=%d\n", nBlocks, blockSize); + + /* Create a list of block pointers - not in the context */ + allocations = malloc(sizeof(void *) * nBlocks); + if (allocations == NULL) + return false; + + for (int i = 0; i < nBlocks; i++) + { + allocations[i] = dsm_create(blockSize, 0); + if (allocations[i] == NULL) + return false; + debug("segment=%p size=%ld\n", allocations[i], dsm_segment_map_length(allocations[i])); + } + + return true; +} + + +static bool freeDSMBlocks(int nBlocks, int blockSize) +{ + debug("nBlocks=%d blockSize=%d\n", nBlocks, blockSize); + + for (int i = 0; i < nBlocks; i++) + dsm_detach(allocations[i]); + + free(allocations); + allocations = NULL; + + + return true; +} diff --git a/src/test/modules/test_memtrack/test_memtrack.conf b/src/test/modules/test_memtrack/test_memtrack.conf new file mode 100644 index 0000000000..256347e92c --- /dev/null +++ b/src/test/modules/test_memtrack/test_memtrack.conf @@ -0,0 +1 @@ +max_total_backend_memory = 8192 diff --git a/src/test/modules/test_memtrack/test_memtrack.control b/src/test/modules/test_memtrack/test_memtrack.control new file mode 100644 index 0000000000..548109e313 --- /dev/null +++ b/src/test/modules/test_memtrack/test_memtrack.control @@ -0,0 +1,4 @@ +comment = 'Test code for PostgreSQL memory tracking' +default_version = '1.0' +module_pathname = '$libdir/test_memtrack' +relocatable = true diff --git a/src/test/modules/test_memtrack/worker_pool.c b/src/test/modules/test_memtrack/worker_pool.c new file mode 100644 index 0000000000..8e2810a3a4 --- /dev/null +++ b/src/test/modules/test_memtrack/worker_pool.c @@ -0,0 +1,416 @@ +// +// Created by John Morris on 8/18/23. +// + + +#include "postgres.h" +#include "storage/s_lock.h" +#include "postmaster/bgworker.h" +#include "storage/dsm.h" +#include "storage/shm_toc.h" +#include "tcop/tcopprot.h" +#include "storage/spin.h" +#include "storage/procarray.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "worker_pool.h" +#include "utils/memutils.h" +#include "miscadmin.h" +#include "utils/memtrack.h" /* for debug() */ +#include "utils/wait_event_types.h" +#include "storage/barrier.h" + + +#define WORKER_POOL_MAGIC 0x7843732 + +typedef struct WorkerPoolStartup WorkerPoolStartup; + + +struct WorkerPool +{ + dsm_segment *seg; + shm_toc *toc; + int nWorkers; + MemQue *inQ; + MemQue *outQ; + BackgroundWorkerHandle **handle; + WorkerPoolStartup *hdr; +}; + + +/* + * This structure is stored in the dynamic shared memory segment at toc(0). + * We use it to assign worker numbers to each worker. + */ +struct WorkerPoolStartup +{ + uint32 nWorkers; + pg_atomic_uint32 nextWorker; + Barrier barrier[1]; +}; + +/* Forward references */ +static int64 estimateDsmSize(int nWorkers, int inSize, int outSize); +static void cleanupWorkers(dsm_segment *seg, Datum arg); +static MemQue attachToQueue(dsm_segment *seg, shm_toc *toc, int workerIdx, int queueIdx, bool isSender); + +/* + * Create a new pool of backend workers. + */ +WorkerPool *createWorkerPool(int nWorkers, int inSize, int outSize, char *libName, char *procName) +{ + WorkerPool *pool; + int64 dsmSize; + MemoryContext oldcontext; + BackgroundWorker worker; + pid_t pid; + + debug("createWorkerPool: nWorkers=%d, inSize=%d, outSize=%d\n", nWorkers, inSize, outSize); + + /* + * We need the worker pool objects to allocated in CurTransactionContext + * rather than ExprContext; otherwise, they'll be destroyed before the on_dsm_detach + * hooks run. + */ + oldcontext = MemoryContextSwitchTo(CurTransactionContext); + + /* Create local worker pool object and allocate arrays for the headers and queues. */ + pool = MemoryContextAlloc(TopTransactionContext, sizeof(WorkerPool)); + pool->nWorkers = nWorkers; + pool->inQ = MemoryContextAlloc(TopTransactionContext, sizeof(MemQue) * nWorkers); + pool->outQ = MemoryContextAlloc(TopTransactionContext, sizeof(MemQue) * nWorkers); + pool->handle = MemoryContextAlloc(TopTransactionContext, sizeof(BackgroundWorkerHandle *) * nWorkers); + + /* Estimate the size of the shared memory and allocate shared memory */ + dsmSize = estimateDsmSize(nWorkers, inSize, outSize); + pool->seg = dsm_create(dsmSize, 0); + + /* Create table of contents in dsm so we can access contents */ + pool->toc = shm_toc_create(WORKER_POOL_MAGIC, dsm_segment_address(pool->seg), dsmSize); + + /* Set up the startup header as region 0 */ + pool->hdr = shm_toc_allocate(pool->toc, sizeof(WorkerPoolStartup)); + shm_toc_insert(pool->toc, 0, pool->hdr); + pool->hdr->nWorkers = nWorkers; + pg_atomic_init_u32(&pool->hdr->nextWorker, 0); + BarrierInit(pool->hdr->barrier, nWorkers+1); + + /* Create memory queues for each worker */ + for (int w = 0; w < nWorkers; w++) + { + shm_mq *mqIn, *mqOut; + + /* Allocate the "In" queue */ + mqIn = shm_toc_allocate(pool->toc, shm_mq_minimum_size + inSize); + shm_toc_insert(pool->toc, 1 + 2 * w, mqIn); + mqIn = shm_mq_create(mqIn, shm_mq_minimum_size + inSize); + shm_mq_set_sender(mqIn, MyProc); + + /* Allocate the "Out" queue */ + mqOut = shm_toc_allocate(pool->toc, shm_mq_minimum_size + outSize); + shm_toc_insert(pool->toc, 2 + 2 * w, mqOut); + mqOut = shm_mq_create(mqOut, shm_mq_minimum_size + outSize); + shm_mq_set_receiver(mqOut, MyProc); + } + + /* + * Arrange to kill all the workers if we abort before all workers are + * finished hooking themselves up to the dynamic shared memory segment. + * + * If we die after all the workers have finished hooking themselves up to + * the dynamic shared memory segment, we'll mark the two queues to which + * we're directly connected as detached, and the worker(s) connected to + * those queues will exit, marking any other queues to which they are + * connected as detached. This will cause any as-yet-unaware workers + * connected to those queues to exit in their turn, and so on, until + * everybody exits. + * + * But suppose the workers which are supposed to connect to the queues to + * which we're directly attached exit due to some error before theynin + * actually attach the queues. The remaining workers will have no way of + * knowing this. From their perspective, they're still waiting for those + * workers to start, when in fact they've already died. + */ + debug("configuring on_detach: seg=%p fn=%p pool=%ld\n", pool->seg, cleanupWorkers, PointerGetDatum(pool)); + on_dsm_detach(pool->seg, cleanupWorkers, PointerGetDatum(pool)); + + + /* Configure a prototypical worker. */ + worker = (BackgroundWorker) + { + .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_CLASS_PARALLEL, + .bgw_start_time = BgWorkerStart_ConsistentState, + .bgw_restart_time = BGW_NEVER_RESTART, + .bgw_notify_pid = MyProcPid, + .bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pool->seg)), + }; + strlcpy(worker.bgw_library_name, libName, sizeof(worker.bgw_library_name)); + strlcpy(worker.bgw_function_name, procName, sizeof(worker.bgw_function_name)); + snprintf(worker.bgw_type, sizeof(worker.bgw_type), "%s worker", libName); + snprintf(worker.bgw_name, sizeof(worker.bgw_name), "%s/%s worker for [%d]", libName, procName, MyProcPid); + debug("lib=%s proc=%s .bgw_main_arg=%d\n", libName, procName, DatumGetInt32(worker.bgw_main_arg)); + + /* Do for each worker */ + for (int w = 0; w < nWorkers; w++) + { + /* Create the worker */ + if (!RegisterDynamicBackgroundWorker(&worker, &pool->handle[w])) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not register background process"), + errhint("You may need to increase max_worker_processes."))); + + /* Attach the worker's memory queues */ + pool->inQ[w] = attachToQueue(pool->seg, pool->toc, w, 0, true); + pool->outQ[w] = attachToQueue(pool->seg, pool->toc, w, 1, false); + } + + /* + * Wait for workers to become ready. + * We could just wait on the barrier, but if a worker fails to reach + * the barrier, we would end up waiting forever. + */ + for (int w = 0; w < nWorkers; w++) + if (WaitForBackgroundWorkerStartup(pool->handle[w], &pid) != BGWH_STARTED) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not start background process"), + errhint("You may need to increase max_worker_processes."))); + + /* Wait for workers to attach to the shared memory segment */ + debug("Leader waiting on barrier\n"); + BarrierArriveAndWait(pool->hdr->barrier, 0); + + /* + * Once we reach this point, all workers are ready. We no longer need to + * kill them if we die; they'll die on their own as the message queues + * shut down. For now, kill workers on detach anyway, just to be safe. + */ + //cancel_on_dsm_detach(pool->seg, cleanupWorkers, PointerGetDatum(pool)); + + /* Resume using the original memory context */ + MemoryContextSwitchTo(oldcontext); + + return pool; +} + + +shm_mq_result sendToWorker(WorkerPool *pool, int workerIdx, void *msg, Size len) +{ + shm_mq_result result; + debug("workerIdx=%d, len=%zd\n", workerIdx, len); + + result = shm_mq_send(pool->inQ[workerIdx], len, msg, true, true); + + return result; +} + +shm_mq_result recvFromWorker(WorkerPool *pool, int workerIdx, void**msg, Size *len) +{ + shm_mq_result result; + + debug("waiting workerIdx=%d\n", workerIdx); + result = shm_mq_receive(pool->outQ[workerIdx], len, msg, false); + debug("received result=%d workerIdx=%d, len=%zd\n", result, workerIdx, *len); + + return result; +} + +void freeWorkerPool(WorkerPool *pool) +{ + /* Only free the pool once. (possibly reentrant) */ + dsm_segment *seg = pool->seg; + pool->seg = NULL; + debug("pool=%p seg=%p\n", pool, seg); + if (seg == NULL) + return; + + /* Terminate the background workers */ + cleanupWorkers(pool->seg, PointerGetDatum(pool)); + + /* Detach from the message queues */ + for (int w = 0; w < pool->nWorkers; w++) + { + shm_mq_detach(pool->inQ[w]); + shm_mq_detach(pool->outQ[w]); + } + + /* Detach and destroy the shared memory segment (if we haven't already) */ + dsm_detach(seg); + + /* Free the pool object */ + pfree(pool->inQ); + pfree(pool->outQ); + pfree(pool->handle); + pfree(pool); +} + + +/* Wrapper to remove workers when detaching from dsm */ +static void cleanupWorkers(dsm_segment *seg, Datum arg) +{ + WorkerPool *pool = (WorkerPool *)DatumGetPointer(arg); + debug("seg=%p pool=%p\n", seg, pool); + for (int w = 0; w < pool->nWorkers; w++) + TerminateBackgroundWorker(pool->handle[w]); + for (int w = 0; w < pool->nWorkers; w++) + WaitForBackgroundWorkerShutdown(pool->handle[w]); +} + + +/* + * Estimate how much shared memory we need for thw pool of workers. + * + * Because the TOC machinery may choose to insert padding of oddly-sized + * requests, we must estimate each chunk separately. + * + * We need one key to register the location of the header, and we need + * nworkers * 2 keys to track the locations of the message queues. + */ +static +int64 estimateDsmSize(int nWorkers, int inSize, int outSize) +{ + shm_toc_estimator e[1]; + + shm_toc_initialize_estimator(e); + + shm_toc_estimate_keys(e, 1 + 2 * nWorkers); + + shm_toc_estimate_chunk(e, sizeof(WorkerPoolStartup)); + + for (int w = 0; w < nWorkers; w++) + { + shm_toc_estimate_chunk(e, shm_mq_minimum_size + inSize); + shm_toc_estimate_chunk(e, shm_mq_minimum_size + outSize); + } + + return shm_toc_estimate(e); +} + + + +/* + * ---------------------------------------------------------------- + * Worker process code. + * ---------------------------------------------------------------- + */ + +static dsm_segment *seg; +static shm_toc *toc; + +static int myWorkerNumber; +static MemQue inQ; +static MemQue outQ; +static WorkerPoolStartup *hdr; + +shm_mq_result workerRecv(void **msg, Size *msgSize) +{ + debug("MyWorkerNumber=%d\n", myWorkerNumber); + return shm_mq_receive(inQ, msgSize, msg, false); +} + +shm_mq_result workerSend(void *msg, Size msgSize) +{ + debug("MyWorkerNumber=%d msgSize=%zd\n", myWorkerNumber, msgSize); + return shm_mq_send(outQ, msgSize, msg, false, true); +} + + +/* + * This must be called from the worker process's main function. + */ +void workerInit(Datum arg) +{ + dsm_handle handle; + + /* We are passed the dsm handle of the worker pool */ + handle = DatumGetInt32(arg); + debug("handle=%d\n", handle); + + /* + * Establish signal handlers. + * + * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as + * it would a normal user backend. To make that happen, we use die(). + */ + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Connect to the dynamic shared memory segment. + * + * The backend that registered this worker passed us the ID of a shared + * memory segment to which we must attach for further instructions. Once + * we've mapped the segment in our address space, attach to the table of + * contents so we can locate the various data structures we'll need to + * find within the segment. + * + * Note: at this point, we have not created any ResourceOwner in this + * process. This will result in our DSM mapping surviving until process + * exit, which is fine. If there were a ResourceOwner, it would acquire + * ownership of the mapping, but we have no need for that. + */ + seg = dsm_attach(handle); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + toc = shm_toc_attach(WORKER_POOL_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* Attach to the startup header and get our worker idx */ + hdr = shm_toc_lookup(toc, 0, false); + myWorkerNumber = pg_atomic_fetch_add_u32(&hdr->nextWorker, 1); + debug("myWorkerNumber=%d\n", myWorkerNumber); + if (myWorkerNumber >= hdr->nWorkers) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("too many workers created in pool. Max=%d", hdr->nWorkers))); + + + /* Attach to the in and out message queues. */ + inQ = attachToQueue(seg, toc, myWorkerNumber, 0, false); + outQ = attachToQueue(seg, toc, myWorkerNumber, 1, true); + + /* Wait for everybody else to become ready */ + debug("Waiting on barrier\n"); + BarrierArriveAndWait(hdr->barrier, 0); + debug("Barrier passed\n"); +} + + +void workerExit(int code) +{ + debug("code=%d\n", code); + /* + * We're done. For cleanliness, explicitly detach from the shared memory + * segment (that would happen anyway during process exit, though). + */ + //dsm_detach(seg); + proc_exit(code); +} + + +static MemQue attachToQueue(dsm_segment *seg, shm_toc *toc, int workerIdx, int queueIdx, bool isSender) +{ + MemQue que; + shm_mq *mq; + + /* Attach to the appropriate message queues. */ + mq = shm_toc_lookup(toc, 1 + 2 * workerIdx + queueIdx, false); + + /* Make note whether we are sending or receiving */ + if (isSender) + shm_mq_set_sender(mq, MyProc); + else + shm_mq_set_receiver(mq, MyProc); + + /* Attach to the queue */ + que = shm_mq_attach(mq, seg, NULL); + + debug("workerIdx=%d, queueIdx=%d handle=%p\n", workerIdx, queueIdx, que); + return que; +} diff --git a/src/test/modules/test_memtrack/worker_pool.h b/src/test/modules/test_memtrack/worker_pool.h new file mode 100644 index 0000000000..b4c2a84939 --- /dev/null +++ b/src/test/modules/test_memtrack/worker_pool.h @@ -0,0 +1,69 @@ +/* + * Implement a pool of workers that can be used to perform work in parallel. + * This version is derived from the postgres test_shm_mq module, but it is + * intended to be more general purpose. Besides being useful for running + * tests, it could eventually replace the current implementation of parallel query. + * For now, it is just a piece of infrastructure for running parallel tests. + * + * The worker pool is created by the owner process. It creates a shared memory + * segment and a set of background workers. Each background worker is provided + * with three message queues: + * - inQ: used to send messages to the worker + * - outQ: used to reply to the owner + * - errQ: used to send error exceptions back to the owner. + * Note: errQ is not yet implemented. + * + * The flow of control for the owner is: + * pool = createWorkerPool(nWorkers, inSize, outSize, libName, procName) + * repeat + * sendToWorker(pool, workerIdx, msg, len) + * recvFromWorker(pool, workerIdx, &msg, &len) + * until done + * freeWorkerPool(pool) + * + * The flow of control for the worker is: + * workerInit(arg) + * repeat + * workerRecv(&msg, &len) + * workerSend(msg, len) + * until done + * workerExit() + * + * If the workers don't exit on their own, they will be terminated when + * the owner calls freeWorkerPool. + * + * Currently, the worker entry point is passed as the name of a procedure + * and the name of a shared library. + * Future updates may allow the worker to be specified as function pointer + * in the main postgres executable. + */ + +#ifndef WORKER_POOL_H +#define WORKER_POOL_H +#include +#include "postgres.h" +#include "storage/dsm.h" +#include "storage/shm_toc.h" +#include "storage/shm_mq.h" +#include "storage/s_lock.h" + +typedef struct WorkerPool WorkerPool; +typedef shm_mq_handle *MemQue; + +/* + * Procedures called from the worker pool owner + */ +WorkerPool *createWorkerPool(int nWorkers, int inSize, int outSize, char *libName, char *procName); +shm_mq_result sendToWorker(WorkerPool *pool, int workerIdx, void *msg, Size len); +shm_mq_result recvFromWorker(WorkerPool *pool, int workerIdx, void **msg, Size *len); +void freeWorkerPool(WorkerPool *pool); + +/* + * Procedures called from the worker + */ +void workerInit(Datum arg); +shm_mq_result workerRecv(void **msg, Size *msgSize); +shm_mq_result workerSend(void *msg, Size msgSize); +void workerExit(int code); + +#endif //WORKER_POOL_H diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 5058be5411..171dffa456 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1873,6 +1873,26 @@ pg_stat_database_conflicts| SELECT oid AS datid, pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock, pg_stat_get_db_conflict_logicalslot(oid) AS confl_active_logicalslot FROM pg_database d; +pg_stat_global_memory_allocation| WITH sums AS ( + SELECT sum(pg_stat_memory_allocation.aset_allocated_bytes) AS total_aset_allocated_bytes, + sum(pg_stat_memory_allocation.dsm_allocated_bytes) AS total_dsm_allocated_bytes, + sum(pg_stat_memory_allocation.generation_allocated_bytes) AS total_generation_allocated_bytes, + sum(pg_stat_memory_allocation.slab_allocated_bytes) AS total_slab_allocated_bytes + FROM pg_stat_memory_allocation + ) + SELECT s.datid, + current_setting('shared_memory_size'::text, true) AS shared_memory_size, + (current_setting('shared_memory_size_in_huge_pages'::text, true))::integer AS shared_memory_size_in_huge_pages, + pg_size_bytes(current_setting('max_total_backend_memory'::text, true)) AS max_total_backend_memory_bytes, + s.total_bkend_mem_bytes_available, + s.global_dsm_allocated_bytes, + sums.total_aset_allocated_bytes, + sums.total_dsm_allocated_bytes, + sums.total_generation_allocated_bytes, + sums.total_slab_allocated_bytes + FROM sums, + (pg_stat_get_global_memory_allocation() s(datid, total_bkend_mem_bytes_available, global_dsm_allocated_bytes) + LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_gssapi| SELECT pid, gss_auth AS gss_authenticated, gss_princ AS principal, @@ -1899,6 +1919,15 @@ pg_stat_io| SELECT backend_type, fsync_time, stats_reset FROM pg_stat_get_io() b(backend_type, object, context, reads, read_time, writes, write_time, writebacks, writeback_time, extends, extend_time, op_bytes, hits, evictions, reuses, fsyncs, fsync_time, stats_reset); +pg_stat_memory_allocation| SELECT s.datid, + s.pid, + s.allocated_bytes, + s.aset_allocated_bytes, + s.dsm_allocated_bytes, + s.generation_allocated_bytes, + s.slab_allocated_bytes + FROM (pg_stat_get_memory_allocation(NULL::integer) s(datid, pid, allocated_bytes, aset_allocated_bytes, dsm_allocated_bytes, generation_allocated_bytes, slab_allocated_bytes) + LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_progress_analyze| SELECT s.pid, s.datid, d.datname, diff --git a/src/test/regress/expected/stats.out b/src/test/regress/expected/stats.out index 94187e59cf..8e70c836cd 100644 --- a/src/test/regress/expected/stats.out +++ b/src/test/regress/expected/stats.out @@ -1628,4 +1628,40 @@ SELECT COUNT(*) FROM brin_hot_3 WHERE a = 2; DROP TABLE brin_hot_3; SET enable_seqscan = on; +-- ensure that allocated_bytes exist for backends +SELECT + allocated_bytes >= 0 AS result +FROM + pg_stat_activity ps + JOIN pg_stat_memory_allocation pa ON (pa.pid = ps.pid) +WHERE + backend_type IN ('checkpointer', 'background writer', 'walwriter', 'autovacuum launcher'); + result +-------- + t + t + t + t +(4 rows) + +-- ensure that pg_stat_global_memory_allocation view exists +SELECT + datid > 0, pg_size_bytes(shared_memory_size) >= 0, shared_memory_size_in_huge_pages >= -1, global_dsm_allocated_bytes >= 0 +FROM + pg_stat_global_memory_allocation; + ?column? | ?column? | ?column? | ?column? +----------+----------+----------+---------- + t | t | t | t +(1 row) + +-- ensure that pg_stat_memory_allocation view exists +SELECT + pid > 0, allocated_bytes >= 0, aset_allocated_bytes >= 0, dsm_allocated_bytes >= 0, generation_allocated_bytes >= 0, slab_allocated_bytes >= 0 +FROM + pg_stat_memory_allocation limit 1; + ?column? | ?column? | ?column? | ?column? | ?column? | ?column? +----------+----------+----------+----------+----------+---------- + t | t | t | t | t | t +(1 row) + -- End of Stats Test diff --git a/src/test/regress/sql/stats.sql b/src/test/regress/sql/stats.sql index 1e21e55c6d..cdc8508309 100644 --- a/src/test/regress/sql/stats.sql +++ b/src/test/regress/sql/stats.sql @@ -840,4 +840,24 @@ DROP TABLE brin_hot_3; SET enable_seqscan = on; +-- ensure that allocated_bytes exist for backends +SELECT + allocated_bytes >= 0 AS result +FROM + pg_stat_activity ps + JOIN pg_stat_memory_allocation pa ON (pa.pid = ps.pid) +WHERE + backend_type IN ('checkpointer', 'background writer', 'walwriter', 'autovacuum launcher'); + +-- ensure that pg_stat_global_memory_allocation view exists +SELECT + datid > 0, pg_size_bytes(shared_memory_size) >= 0, shared_memory_size_in_huge_pages >= -1, global_dsm_allocated_bytes >= 0 +FROM + pg_stat_global_memory_allocation; + +-- ensure that pg_stat_memory_allocation view exists +SELECT + pid > 0, allocated_bytes >= 0, aset_allocated_bytes >= 0, dsm_allocated_bytes >= 0, generation_allocated_bytes >= 0, slab_allocated_bytes >= 0 +FROM + pg_stat_memory_allocation limit 1; -- End of Stats Test -- 2.33.0