diff --git a/contrib/test_group_locking/Makefile b/contrib/test_group_locking/Makefile new file mode 100644 index 0000000..2d09341 --- /dev/null +++ b/contrib/test_group_locking/Makefile @@ -0,0 +1,21 @@ +# contrib/test_group_locking/Makefile + +MODULE_big = test_group_locking +OBJS = test_group_locking.o $(WIN32RES) +PGFILEDESC = "test_group_locking - test harness for group locking" + +EXTENSION = test_group_locking +DATA = test_group_locking--1.0.sql + +REGRESS = test_group_locking + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/test_group_locking +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/test_group_locking/test_group_locking--1.0.sql b/contrib/test_group_locking/test_group_locking--1.0.sql new file mode 100644 index 0000000..adb2be5 --- /dev/null +++ b/contrib/test_group_locking/test_group_locking--1.0.sql @@ -0,0 +1,8 @@ +/* contrib/test_group_locking/test_group_locking--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_group_locking" to load this file. \quit + +CREATE FUNCTION test_group_locking(spec pg_catalog.text) + RETURNS pg_catalog.void STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/contrib/test_group_locking/test_group_locking.c b/contrib/test_group_locking/test_group_locking.c new file mode 100644 index 0000000..904beff --- /dev/null +++ b/contrib/test_group_locking/test_group_locking.c @@ -0,0 +1,1071 @@ +/*-------------------------------------------------------------------------- + * + * test_group_locking.c + * Test harness code for group locking. + * + * Copyright (C) 2013, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/test_shm_mq/test.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "catalog/namespace.h" +#include "commands/dbcommands.h" +#include "fmgr.h" +#include "lib/ilist.h" +#include "lib/stringinfo.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "libpq/pqmq.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "parser/scansup.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/procarray.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "utils/builtins.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(test_group_locking); + +void test_group_locking_worker_main(Datum main_arg); + +/* Names of lock modes, for debug printouts */ +static const char *const lock_mode_names[] = +{ + "INVALID", + "AccessShareLock", + "RowShareLock", + "RowExclusiveLock", + "ShareUpdateExclusiveLock", + "ShareLock", + "ShareRowExclusiveLock", + "ExclusiveLock", + "AccessExclusiveLock" +}; + +typedef enum +{ + TGL_START, + TGL_STOP, + TGL_LOCK, + TGL_UNLOCK +} TestGroupLockOp; + +typedef struct +{ + TestGroupLockOp op; + LOCKMODE lockmode; + Oid relid; +} TestGroupLockCommand; + +typedef struct +{ + dlist_node node; + bool verified; + int group_id; + int task_id; + TestGroupLockCommand command; +} TestGroupLockStep; + +typedef struct +{ + int group_id; + int task_id; +} worker_key; + +typedef struct +{ + worker_key key; + dsm_segment *seg; + BackgroundWorkerHandle *handle; + shm_mq_handle *requesth; + shm_mq_handle *responseh; + bool awaiting_response; +} worker_info; + +typedef struct +{ + int group_id; + int leader_task_id; + bool has_followers; +} leader_info; + +/* Fixed-size data passed via our dynamic shared memory segment. */ +typedef struct worker_fixed_data +{ + Oid database_id; + Oid authenticated_user_id; + NameData database; + NameData authenticated_user; + bool use_group_locking; + pid_t leader_pid; +} worker_fixed_data; + +#define SHM_QUEUE_SIZE 32768 +#define TEST_GROUP_LOCKING_MAGIC 0x4c616e65 + +static void check_for_messages(HTAB *worker_hash); +static void determine_leader_info(dlist_head *plan, HTAB *leader_hash); +static void handle_sigterm(SIGNAL_ARGS); +static void process_message(HTAB *worker_hash, worker_info *info, + char *message, Size message_bytes); +static void rationalize_steps(dlist_head *plan); +static void rationalize_steps_for_task(dlist_head *plan, int group_id, + int task_id); +static void report_syntax_error(StringInfo buf); +static bool scan_character(StringInfo buf, char c); +static bool scan_eof(StringInfo buf); +static char *scan_identifier(StringInfo buf); +static bool scan_integer(StringInfo buf, int *result); +static LOCKMODE scan_lockmode(StringInfo buf); +static TestGroupLockOp scan_op(StringInfo buf); +static RangeVar *scan_qualified_identifier(StringInfo buf); +static char *scan_quoted_identifier(StringInfo buf); +static void send_command(HTAB *worker_hash, TestGroupLockStep *step); +static void start_worker(HTAB *worker_hash, int group_id, int task_id, + int leader_task_id); + +/*-------------------------------------------------------------------------- + * Main entrypoint. + * + * Start background workers and have them issue lock requests against + * specified relations. We use a little mini-language to control this: + * + * N[.M]:start + * N[.M]:stop + * N[.M]:lock:lockmode:relation + * N[.M]:unlock:lockmode:relation + * + * N and M should be integers. M can be omitted, in which case it defaults + * to 0. Each (N, M) pair identifies a separate worker; those with the + * same value of N are in the same lock group. All workers not started + * explicitly are started before any other actions are taken; and all + * workers not terminated explicitly are terminated after all other actions + * are taken. + *-------------------------------------------------------------------------- + */ +Datum +test_group_locking(PG_FUNCTION_ARGS) +{ + text *spec = PG_GETARG_TEXT_PP(0); + StringInfo buf = makeStringInfo(); + dlist_head plan; + dlist_iter iter; + HASHCTL hashctl; + HTAB *worker_hash; + HTAB *leader_hash; + + appendBinaryStringInfo(buf, VARDATA_ANY(spec), VARSIZE_ANY_EXHDR(spec)); + dlist_init(&plan); + + /* Parse the user-provided specification. */ + for (;;) + { + TestGroupLockStep *step; + + step = (TestGroupLockStep *) palloc0(sizeof(TestGroupLockStep)); + if (!scan_integer(buf, &step->group_id)) + report_syntax_error(buf); + if (scan_character(buf, '.') && !scan_integer(buf, &step->task_id)) + report_syntax_error(buf); + if (!scan_character(buf, ':')) + report_syntax_error(buf); + step->command.op = scan_op(buf); + + if (step->command.op == TGL_LOCK || step->command.op == TGL_UNLOCK) + { + RangeVar *rv; + if (!scan_character(buf, ':')) + report_syntax_error(buf); + step->command.lockmode = scan_lockmode(buf); + if (!scan_character(buf, ':')) + report_syntax_error(buf); + rv = scan_qualified_identifier(buf); + + /* + * Since we're trying to test locking here, don't take a lock + * when locking the relation. That's unsafe in the presence of + * concurrent DDL, but since this is just test code, we don't + * care. + */ + step->command.relid = RangeVarGetRelid(rv, NoLock, false); + } + + dlist_push_tail(&plan, &step->node); + + if (scan_eof(buf)) + break; + if (!scan_character(buf, ',')) + report_syntax_error(buf); + } + + /* Make sure the series of steps looks sensible. */ + rationalize_steps(&plan); + + /* Initialize worker hash table. */ + memset(&hashctl, 0, sizeof(HASHCTL)); + hashctl.keysize = sizeof(worker_key); + hashctl.entrysize = sizeof(worker_info); + hashctl.hcxt = CurrentMemoryContext; + hashctl.hash = tag_hash; + worker_hash = hash_create("test_group_locking workers", 16, &hashctl, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + /* Initialize leader hash table. */ + memset(&hashctl, 0, sizeof(HASHCTL)); + hashctl.keysize = sizeof(int); + hashctl.entrysize = sizeof(leader_info); + hashctl.hcxt = CurrentMemoryContext; + hashctl.hash = tag_hash; + leader_hash = hash_create("test_group_locking leaders", 16, &hashctl, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + /* Determine group leadership information. */ + determine_leader_info(&plan, leader_hash); + + /* Execute the plan. */ + dlist_foreach(iter, &plan) + { + TestGroupLockStep *step; + + step = dlist_container(TestGroupLockStep, node, iter.cur); + + if (step->command.op == TGL_START) + { + leader_info *li; + + li = hash_search(leader_hash, &step->group_id, HASH_FIND, NULL); + Assert(li != NULL); + start_worker(worker_hash, step->group_id, step->task_id, + li->has_followers ? li->leader_task_id : -1); + continue; + } + + send_command(worker_hash, step); + } + + PG_RETURN_VOID(); +} + +/* Check for messages from our workers. */ +static void +check_for_messages(HTAB *worker_hash) +{ + bool progress = true; + + while (progress) + { + HASH_SEQ_STATUS hash_seq; + worker_info *info; + + progress = false; + hash_seq_init(&hash_seq, worker_hash); + while ((info = hash_seq_search(&hash_seq)) != NULL) + { + shm_mq_result result; + Size nbytes; + void *data; + + result = shm_mq_receive(info->responseh, &nbytes, &data, true); + if (result == SHM_MQ_DETACHED) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection to background worker %d.%d lost", + info->key.group_id, info->key.task_id))); + if (result == SHM_MQ_SUCCESS) + { + progress = true; + process_message(worker_hash, info, data, nbytes); + } + } + } +} + +/* Determine leadership information for each group. */ +static void +determine_leader_info(dlist_head *plan, HTAB *leader_hash) +{ + dlist_iter iter; + + dlist_foreach(iter, plan) + { + TestGroupLockStep *step; + leader_info *li; + bool found; + + step = dlist_container(TestGroupLockStep, node, iter.cur); + li = hash_search(leader_hash, &step->group_id, HASH_ENTER, &found); + if (!found) + { + li->leader_task_id = step->task_id; + li->has_followers = false; + } + else if (step->task_id != li->leader_task_id) + li->has_followers = true; + } +} + +/* Error context callback. */ +static void +error_callback(void *arg) +{ + worker_info *info = arg; + + errcontext("background worker, group %d, task %d", info->key.group_id, + info->key.task_id); +} + +/* Handle SIGTERM. */ +static void +handle_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + + if (MyProc) + SetLatch(&MyProc->procLatch); + + if (!proc_exit_inprogress) + { + InterruptPending = true; + ProcDiePending = true; + } + + errno = save_errno; +} + +/* + * Make sure we have a rational series of steps, and add missing start and + * stop steps as needed. + */ +static void +rationalize_steps(dlist_head *plan) +{ + bool progress = true; + + while (progress) + { + dlist_iter iter; + progress = false; + + dlist_foreach(iter, plan) + { + TestGroupLockStep *step; + + step = dlist_container(TestGroupLockStep, node, iter.cur); + if (!step->verified) + { + rationalize_steps_for_task(plan, step->group_id, + step->task_id); + progress = true; + break; + } + } + } +} + +/* + * Linear search through the provided list of steps. Figure out whether any + * start action is unique and precedes all other actions for this task, and + * whether any stop action is unique and follow all other such actions. If + * the steps are out of order, error; if they are missing, add them at the + * beginning and end as appropriate. + */ +static void +rationalize_steps_for_task(dlist_head *plan, int group_id, int task_id) +{ + dlist_iter iter; + bool saw_start = false; + bool saw_stop = false; + bool saw_other = false; + + dlist_foreach(iter, plan) + { + TestGroupLockStep *step; + + step = dlist_container(TestGroupLockStep, node, iter.cur); + if (step->group_id != group_id || step->task_id != task_id) + continue; + step->verified = true; + + switch (step->command.op) + { + case TGL_START: + if (saw_start) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("can't start same worker more than once"))); + if (saw_stop || saw_other) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("can't start worker after stopping it"))); + if (saw_other) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("worker can't perform actions before being started"))); + saw_start = true; + break; + case TGL_STOP: + if (saw_stop) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("can't stop same worker more than once"))); + saw_stop = true; + break; + default: + if (saw_stop) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("worker can't perform actions after being stopped"))); + saw_other = true; + break; + } + } + + if (!saw_start) + { + TestGroupLockStep *step; + + step = (TestGroupLockStep *) palloc0(sizeof(TestGroupLockStep)); + step->group_id = group_id; + step->task_id = task_id; + step->command.op = TGL_START; + step->verified = true; + dlist_push_head(plan, &step->node); + } + + if (!saw_stop) + { + TestGroupLockStep *step; + + step = (TestGroupLockStep *) palloc0(sizeof(TestGroupLockStep)); + step->group_id = group_id; + step->task_id = task_id; + step->command.op = TGL_STOP; + step->verified = true; + dlist_push_tail(plan, &step->node); + } +} + +/* Report a syntax error. */ +static void +report_syntax_error(StringInfo buf) +{ + char badchar[MAX_MULTIBYTE_CHAR_LEN + 1]; + int badpos = pg_mbstrlen_with_len(buf->data, buf->cursor) + 1; + int badcharlen; + + if (buf->cursor >= buf->len) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unexpected end of string at position %d", badpos))); + + badcharlen = pg_mblen(&buf->data[buf->cursor]); + memcpy(badchar, &buf->data[buf->cursor], badcharlen); + badchar[badcharlen] = '\0'; + + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unexpected character \"%s\" at position %d", + badchar, badpos))); +} + +/* Scan a given character. */ +static bool +scan_character(StringInfo buf, char c) +{ + if (buf->cursor < buf->len && buf->data[buf->cursor] == c) + { + ++buf->cursor; + return true; + } + return false; +} + +/* Scan end-of-buffer. */ +static bool +scan_eof(StringInfo buf) +{ + return buf->cursor >= buf->len; +} + +/* Scan and return a single-part PostgreSQL identifier. */ +static char * +scan_identifier(StringInfo buf) +{ + int start = buf->cursor; + + if (buf->data[start] == '"') + return scan_quoted_identifier(buf); + + while (buf->cursor < buf->len) + { + int len = pg_mblen(&buf->data[buf->cursor]); + char c; + + /* Multibyte characters are allowed. */ + if (len != 1) + { + Assert(len > 0); + buf->cursor += len; + continue; + } + + /* Alphabetic characters, and underscore, are allowed. */ + c = buf->data[buf->cursor]; + if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || c == '_') + { + buf->cursor++; + continue; + } + + /* Numeric digits, and $, are allowed but not at character 1. */ + if (start != buf->cursor && ((c >= '0' && c <= '9') || c == '$')) + { + buf->cursor++; + continue; + } + + /* Anything else is not allowed. */ + break; + } + + /* Return NULL if we didn't find an identifier. */ + if (buf->cursor == start) + return NULL; + + /* Return a copy of the identifier with appropriate case-folding. */ + return downcase_truncate_identifier(&buf->data[start], buf->cursor - start, + false); +} + +/* Scan an integer. */ +static bool +scan_integer(StringInfo buf, int *result) +{ + int start = buf->cursor; + int val = 0; + + while (buf->cursor < buf->len) + { + char c = buf->data[buf->cursor]; + + if (c < '0' || c > '9') + break; + val = val * 10 + (c - '0'); + ++buf->cursor; + } + + if (buf->cursor == start) + return false; + *result = val; + return true; +} + +/* Scan and return a lock mode. */ +static LOCKMODE +scan_lockmode(StringInfo buf) +{ + char *mode = scan_identifier(buf); + + if (mode == NULL) + report_syntax_error(buf); + + if (pg_strcasecmp(mode, "AccessShareLock") == 0) + return AccessShareLock; + else if (pg_strcasecmp(mode, "RowShareLock") == 0) + return RowShareLock; + else if (pg_strcasecmp(mode, "RowExclusiveLock") == 0) + return RowExclusiveLock; + else if (pg_strcasecmp(mode, "ShareUpdateExclusiveLock") == 0) + return ShareUpdateExclusiveLock; + else if (pg_strcasecmp(mode, "ShareLock") == 0) + return ShareLock; + else if (pg_strcasecmp(mode, "ShareRowExclusiveLock") == 0) + return ShareRowExclusiveLock; + else if (pg_strcasecmp(mode, "ExclusiveLock") == 0) + return ExclusiveLock; + else if (pg_strcasecmp(mode, "AccessExclusiveLock") == 0) + return AccessExclusiveLock; + + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid lock mode: \"%s\"", mode))); +} + +/* Scan and return an operation name. */ +static TestGroupLockOp +scan_op(StringInfo buf) +{ + char *opname = scan_identifier(buf); + + if (opname == NULL) + report_syntax_error(buf); + + if (pg_strcasecmp(opname, "start") == 0) + return TGL_START; + else if (pg_strcasecmp(opname, "stop") == 0) + return TGL_STOP; + else if (pg_strcasecmp(opname, "lock") == 0) + return TGL_LOCK; + else if (pg_strcasecmp(opname, "unlock") == 0) + return TGL_UNLOCK; + + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid operation name: \"%s\"", opname))); +} + +/* Scan and return a possibly schema-qualified identifier. */ +static RangeVar * +scan_qualified_identifier(StringInfo buf) +{ + char *name1; + char *name2; + + name1 = scan_identifier(buf); + if (name1 == NULL) + report_syntax_error(buf); + if (buf->data[buf->cursor] != '.') + return makeRangeVar(NULL, name1, -1); + buf->cursor++; + name2 = scan_identifier(buf); + if (name2 == NULL) + report_syntax_error(buf); + return makeRangeVar(name1, name2, -1); +} + +/* Scan and return a quoted single-part identifier. */ +static char * +scan_quoted_identifier(StringInfo buf) +{ + StringInfoData result; + + initStringInfo(&result); + + if (buf->data[buf->cursor] != '"') + return NULL; + + while (++buf->cursor < buf->len) + { + char *s; + + /* If we see a byte that is not a quote, append to result. */ + s = &buf->data[buf->cursor]; + if (s[0] != '"') + { + appendStringInfoChar(&result, s[0]); + continue; + } + + /* If we see a byte that is a quote, check for a following quote. */ + if (++buf->cursor < buf->len && s[1] == '"') + { + appendStringInfoChar(&result, s[0]); + continue; + } + + /* We've found the terminating quote, so stop here. */ + return result.data; + } + + /* We ran off the end of the buffer with no close-quote. Oops. */ + return NULL; +} + +/* Process a message from a background worker. */ +static void +process_message(HTAB *worker_hash, worker_info *info, char *message, + Size message_bytes) +{ + StringInfoData msg; + char msgtype; + const char *tag; + + initStringInfo(&msg); + enlargeStringInfo(&msg, message_bytes); + appendBinaryStringInfo(&msg, message, message_bytes); + msgtype = pq_getmsgbyte(&msg); + + if (msgtype == 'E' || msgtype == 'N') + { + ErrorData edata; + ErrorContextCallback context; + + pq_parse_errornotice(&msg, &edata); + edata.elevel = Min(edata.elevel, ERROR); + context.callback = error_callback; + context.arg = info; + context.previous = error_context_stack; + error_context_stack = &context; + ThrowErrorData(&edata); + error_context_stack = context.previous; + return; + } + + /* Not error or notice, so must be command complete. */ + if (msgtype != 'C') + elog(ERROR, "unknown message type: %c (%zu bytes)", + msg.data[0], message_bytes); + tag = pq_getmsgstring(&msg); + + /* Hopefully we were waiting for a response... */ + if (!info->awaiting_response) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unexpected acknowledgement from worker %d.%d: \"%s\"", + info->key.group_id, info->key.task_id, tag))); + info->awaiting_response = false; + + /* If the client indicates that it will stop, detach from it. */ + if (strcmp(tag, "STOP") == 0) + { + dsm_detach(info->seg); + hash_search(worker_hash, &info->key, HASH_REMOVE, NULL); + } +} + +/* Send a command to a background worker. */ +static void +send_command(HTAB *worker_hash, TestGroupLockStep *step) +{ + worker_key key; + bool found; + worker_info *info; + shm_mq_result result; + + key.group_id = step->group_id; + key.task_id = step->task_id; + info = hash_search(worker_hash, &key, HASH_FIND, &found); + Assert(found); + + /* Display progress report. */ + switch (step->command.op) + { + case TGL_STOP: + ereport(NOTICE, + (errmsg("stopping worker %d.%d", + step->group_id, step->task_id))); + break; + + case TGL_LOCK: + ereport(NOTICE, + (errmsg("instructing worker %d.%d to acquire %s on relation with OID %u", + step->group_id, step->task_id, + lock_mode_names[step->command.lockmode], + step->command.relid))); + break; + + case TGL_UNLOCK: + ereport(NOTICE, + (errmsg("instructing worker %d.%d to release %s on relation with OID %u", + step->group_id, step->task_id, + lock_mode_names[step->command.lockmode], + step->command.relid))); + break; + default: + elog(ERROR, "bad operation code: %d", (int) step->command.op); + } + + /* Transmit command to worker. */ + result = shm_mq_send(info->requesth, sizeof(TestGroupLockCommand), + &step->command, false); + if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection to background worker lost"))); + info->awaiting_response = true; + for (;;) + { + check_for_messages(worker_hash); + if (!info->awaiting_response) + break; + WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0); + CHECK_FOR_INTERRUPTS(); + ResetLatch(&MyProc->procLatch); + } + +} + +/* Start a background worker. */ +static void +start_worker(HTAB *worker_hash, int group_id, int task_id, int leader_task_id) +{ + worker_key key; + worker_info *info; + bool found; + shm_toc_estimator e; + Size segsize; + shm_toc *toc; + worker_fixed_data *fdata; + shm_mq *requestq; + shm_mq *responseq; + BackgroundWorker worker; + + /* Set up entry in hash table. */ + key.group_id = group_id; + key.task_id = task_id; + info = hash_search(worker_hash, &key, HASH_ENTER, &found); + Assert(!found); + memset(((char *) info) + sizeof(worker_key), 0, + sizeof(worker_info) - sizeof(worker_key)); + + /* Log a message explaining what we're going to do. */ + if (leader_task_id < 0) + ereport(NOTICE, + (errmsg("starting worker %d.%d", group_id, task_id))); + else if (task_id == leader_task_id) + ereport(NOTICE, + (errmsg("starting worker %d.%d as group leader", + group_id, task_id))); + else + ereport(NOTICE, + (errmsg("starting worker %d.%d with group leader %d.%d", + group_id, task_id, group_id, leader_task_id))); + + /* Create dynamic shared memory segment and table of contents. */ + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, sizeof(worker_fixed_data)); + shm_toc_estimate_chunk(&e, SHM_QUEUE_SIZE); + shm_toc_estimate_chunk(&e, SHM_QUEUE_SIZE); + shm_toc_estimate_keys(&e, 3); + segsize = shm_toc_estimate(&e); + info->seg = dsm_create(segsize); + toc = shm_toc_create(TEST_GROUP_LOCKING_MAGIC, + dsm_segment_address(info->seg), segsize); + + /* Store fixed-size data in dynamic shared memory. */ + fdata = shm_toc_allocate(toc, sizeof(worker_fixed_data)); + fdata->database_id = MyDatabaseId; + fdata->authenticated_user_id = GetAuthenticatedUserId(); + namestrcpy(&fdata->database, get_database_name(MyDatabaseId)); + namestrcpy(&fdata->authenticated_user, + GetUserNameFromId(fdata->authenticated_user_id)); + shm_toc_insert(toc, 0, fdata); + if (leader_task_id >= 0) + { + fdata->use_group_locking = true; + if (task_id == leader_task_id) + fdata->leader_pid = 0; + else + { + worker_key lkey; + worker_info *leader_info; + + lkey.group_id = group_id; + lkey.task_id = leader_task_id; + leader_info = hash_search(worker_hash, &lkey, HASH_ENTER, &found); + Assert(found); + if (GetBackgroundWorkerPid(leader_info->handle, &fdata->leader_pid) + != BGWH_STARTED) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not determine PID of leader %d.%d", + group_id, leader_task_id))); + } + } + + /* Establish message queues in dynamic shared memory. */ + requestq = shm_mq_create(shm_toc_allocate(toc, SHM_QUEUE_SIZE), + SHM_QUEUE_SIZE); + shm_toc_insert(toc, 1, requestq); + shm_mq_set_sender(requestq, MyProc); + info->requesth = shm_mq_attach(requestq, info->seg, NULL); + responseq = shm_mq_create(shm_toc_allocate(toc, SHM_QUEUE_SIZE), + SHM_QUEUE_SIZE); + shm_toc_insert(toc, 2, responseq); + shm_mq_set_receiver(responseq, MyProc); + info->responseh = shm_mq_attach(responseq, info->seg, NULL); + + /* Configure a worker. */ + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = NULL; /* new worker might not have library loaded */ + sprintf(worker.bgw_library_name, "test_group_locking"); + sprintf(worker.bgw_function_name, "test_group_locking_worker_main"); + snprintf(worker.bgw_name, BGW_MAXLEN, + "test_group_locking %d/%d by PID %d", group_id, task_id, MyProcPid); + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(info->seg)); + /* set bgw_notify_pid, so we can detect if the worker stops */ + worker.bgw_notify_pid = MyProcPid; + + /* Register the worker. */ + if (!RegisterDynamicBackgroundWorker(&worker, &info->handle)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not register background process"), + errhint("You may need to increase max_worker_processes."))); + shm_mq_set_handle(info->requesth, info->handle); + shm_mq_set_handle(info->responseh, info->handle); + + /* Wait for the worker to come online. */ + info->awaiting_response = true; + for (;;) + { + check_for_messages(worker_hash); + if (!info->awaiting_response) + break; + WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0); + CHECK_FOR_INTERRUPTS(); + ResetLatch(&MyProc->procLatch); + } +} + +/* Background worker entrypoint. */ +void +test_group_locking_worker_main(Datum main_arg) +{ + dsm_segment *seg; + shm_toc *toc; + shm_mq *requestq; + shm_mq *responseq; + shm_mq_handle *requesth; + shm_mq_handle *responseh; + worker_fixed_data *fdata; + + /* Establish signal handlers. */ + pqsignal(SIGTERM, handle_sigterm); + BackgroundWorkerUnblockSignals(); + + /* Set up a memory context and resource owner. */ + Assert(CurrentResourceOwner == NULL); + CurrentResourceOwner = ResourceOwnerCreate(NULL, "test_group_locking"); + CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, + "test_group_locking", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* Connect to the dynamic shared memory segment. */ + seg = dsm_attach(DatumGetInt32(main_arg)); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + toc = shm_toc_attach(TEST_GROUP_LOCKING_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* Find shared memory queues and attach to them. */ + requestq = shm_toc_lookup(toc, 1); + shm_mq_set_receiver(requestq, MyProc); + requesth = shm_mq_attach(requestq, seg, NULL); + responseq = shm_toc_lookup(toc, 2); + shm_mq_set_sender(responseq, MyProc); + responseh = shm_mq_attach(responseq, seg, NULL); + pq_redirect_to_shm_mq(responseq, responseh); + + /* Connect to database. */ + fdata = shm_toc_lookup(toc, 0); + BackgroundWorkerInitializeConnection(NameStr(fdata->database), + NameStr(fdata->authenticated_user)); + if (fdata->database_id != MyDatabaseId || + fdata->authenticated_user_id != GetAuthenticatedUserId()) + ereport(ERROR, + (errmsg("user or database renamed during worker startup"))); + + /* Activate group locking, if appropriate. */ + if (fdata->use_group_locking) + { + if (fdata->leader_pid == 0) + BecomeLockGroupLeader(); + else + { + PGPROC *proc; + + /* + * This is a cheesy hack that I'm going with for the sake of + * getting this test code running. Don't really do it this way! + * + * In a real parallel computation, all of the workers in a lock + * group would be started by the same process, which should pass + * its own value of MyProc and its pid to those followers. That + * way, if the leader exits before the children are up and running, + * they'll fail to join the lock group unless (a) the same PID + * is again running and (b) it is a PostgreSQL process and (c) it + * it using the same PGPROC as before and (d) it is again a lock + * group leader. Looking up the proc using the PID, as we're doing + * here, loses the third of those guarantees - which is not a + * catastrophe, but best avoided. + */ + + proc = BackendPidGetProc(fdata->leader_pid); + if (proc == NULL + || !BecomeLockGroupFollower(proc, fdata->leader_pid)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not join lock group for leader PID %d", + fdata->leader_pid))); + } + } + + /* Inform the worker who started us that we're up and running. */ + pq_putmessage('C', "START", 6); + + /* Begin a transaction. */ + StartTransactionCommand(); + + /* Main loop: read and process messages. */ + for (;;) + { + Size nbytes; + void *data; + shm_mq_result result; + TestGroupLockCommand *command; + + result = shm_mq_receive(requesth, &nbytes, &data, false); + if (result != SHM_MQ_SUCCESS) + ereport(FATAL, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection to user backend lost"))); + if (nbytes != sizeof(TestGroupLockCommand)) + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid command message from user backend"))); + command = data; + + switch (command->op) + { + case TGL_STOP: + CommitTransactionCommand(); + pq_putmessage('C', "STOP", 5); + exit(0); + break; + + case TGL_LOCK: + if (!ConditionalLockRelationOid(command->relid, + command->lockmode)) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain %s on relation with OID %u", + lock_mode_names[command->lockmode], + command->relid))); + pq_putmessage('C', "LOCK", 5); + break; + + case TGL_UNLOCK: + UnlockRelationOid(command->relid, command->lockmode); + pq_putmessage('C', "UNLOCK", 7); + break; + + default: + elog(ERROR, "unknown operation: %d", (int) command->op); + } + } +} diff --git a/contrib/test_group_locking/test_group_locking.control b/contrib/test_group_locking/test_group_locking.control new file mode 100644 index 0000000..3b69359 --- /dev/null +++ b/contrib/test_group_locking/test_group_locking.control @@ -0,0 +1,4 @@ +comment = 'Test code for group locking' +default_version = '1.0' +module_pathname = '$libdir/test_group_locking' +relocatable = true