From 4d458d8fe9c22740525a1878a9cc718b78ed9951 Mon Sep 17 00:00:00 2001 From: "kuroda.hayato%40jp.fujitsu.com" Date: Mon, 27 Jun 2022 04:19:08 +0000 Subject: [PATCH v4 1/3] (PoC) implement LRG Logical Replication Group (LRG) is a way to create a node group that replicates data objects and their changes to each other. All nodes in the group can execute Read-Write queries, and its changes will "eventually" send to other nodes. For managing a node group, some APIs have defined: lrg_create(), lrg_node_attach(), lrg_node_detach(), and lrg_drop(). Each APIs is called to create a group, attach to it, detach from it, and drop. Usage: Suppose that there are two nodes node1 and node2, and a DBA want to construct a bi-directional logical replication between them. Before doing LRG operations, same tables must be defined in both nodes: node1=# CREATE TABLE tbl (a INT PRIMARY KEY); node2=# CREATE TABLE tbl (a INT PRIMARY KEY); Note that both nodes must not have a pre-existing data. In order to construct a 2-way replication, following two steps are required in this case: step 1: node1=# SELECT lrg_create('testgroup', 'FOR ALL TABLES', node1_connstr, 'testnode1'); step 2: node2=# SELECT lrg_node_attach('testgroup', node2_connstr, node1_connstr, 'testnode2'); After that new data in node1 will be sent to node2, and vice versa. Internal: Internally, two processes "LRG launcher" and "LRG worker" have been introduced. LRG launcher process will boot when the server promotes, and it has a responsibility for starting LRG worker processes. LRG worker process has a responsibility for connecting to other nodes, CREATE PUB/SUB, update system catalogs, and so on. Note that for using libpq functions in LRG worker processes, a new libirary libpqlrg has been also introduced. Restriction: Currently this patch is work-in-progress. At least following functionalities are needed more: - Allow initial data synchronisation when data is present on the existing nodes In this version all subscriptions are defined with option copy_data=false, it means that data has been in the pre-existing node will be not sent to the attached nodes. For that two functionalities are needed: * Create subscriptions with copy_data = {on|force} when data has been already present in the old node. * Ensure all changes have been sent to the node that specified in attaching API. - Prevent concurrent lrg operations If DBAs try to attach different nodes to a same group concurrently, synchronization of LRG system catalogs will be failed and group may be inconsistent state. To avoid that node management must be done one by one. - Prevent concurrent DML executions during node management Considering with the above, it might be a good to implement per-database locking across nodes. --- src/Makefile | 1 + src/backend/catalog/Makefile | 3 +- src/backend/catalog/system_functions.sql | 16 + src/backend/catalog/system_views.sql | 5 + src/backend/postmaster/bgworker.c | 7 + src/backend/postmaster/postmaster.c | 6 + src/backend/replication/Makefile | 4 +- src/backend/replication/libpqlrg/Makefile | 38 + src/backend/replication/libpqlrg/libpqlrg.c | 444 ++++++++++++ src/backend/replication/lrg/Makefile | 22 + src/backend/replication/lrg/lrg.c | 570 +++++++++++++++ src/backend/replication/lrg/lrg_launcher.c | 359 ++++++++++ src/backend/replication/lrg/lrg_worker.c | 749 ++++++++++++++++++++ src/backend/storage/ipc/ipci.c | 2 + src/backend/utils/cache/syscache.c | 23 + src/include/catalog/pg_lrg_info.h | 49 ++ src/include/catalog/pg_lrg_nodes.h | 56 ++ src/include/catalog/pg_lrg_pub.h | 46 ++ src/include/catalog/pg_lrg_sub.h | 46 ++ src/include/catalog/pg_proc.dat | 30 + src/include/replication/libpqlrg.h | 99 +++ src/include/replication/lrg.h | 71 ++ src/include/utils/syscache.h | 2 + 23 files changed, 2646 insertions(+), 2 deletions(-) create mode 100644 src/backend/replication/libpqlrg/Makefile create mode 100644 src/backend/replication/libpqlrg/libpqlrg.c create mode 100644 src/backend/replication/lrg/Makefile create mode 100644 src/backend/replication/lrg/lrg.c create mode 100644 src/backend/replication/lrg/lrg_launcher.c create mode 100644 src/backend/replication/lrg/lrg_worker.c create mode 100644 src/include/catalog/pg_lrg_info.h create mode 100644 src/include/catalog/pg_lrg_nodes.h create mode 100644 src/include/catalog/pg_lrg_pub.h create mode 100644 src/include/catalog/pg_lrg_sub.h create mode 100644 src/include/replication/libpqlrg.h create mode 100644 src/include/replication/lrg.h diff --git a/src/Makefile b/src/Makefile index 79e274a476..75db706762 100644 --- a/src/Makefile +++ b/src/Makefile @@ -23,6 +23,7 @@ SUBDIRS = \ interfaces \ backend/replication/libpqwalreceiver \ backend/replication/pgoutput \ + backend/replication/libpqlrg \ fe_utils \ bin \ pl \ diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index 89a0221ec9..744fdf4fb8 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -72,7 +72,8 @@ CATALOG_HEADERS := \ pg_collation.h pg_parameter_acl.h pg_partitioned_table.h \ pg_range.h pg_transform.h \ pg_sequence.h pg_publication.h pg_publication_namespace.h \ - pg_publication_rel.h pg_subscription.h pg_subscription_rel.h + pg_publication_rel.h pg_subscription.h pg_subscription_rel.h \ + pg_lrg_info.h pg_lrg_nodes.h pg_lrg_pub.h pg_lrg_sub.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 73da687d5d..2a5fc1fc07 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -594,6 +594,10 @@ LANGUAGE internal STRICT IMMUTABLE PARALLEL SAFE AS 'unicode_is_normalized'; +CREATE OR REPLACE FUNCTION + lrg_node_detach(text, text, boolean DEFAULT false) +RETURNS VOID LANGUAGE internal AS 'lrg_node_detach' +PARALLEL RESTRICTED; -- -- The default permissions for functions mean that anyone can execute them. -- A number of functions shouldn't be executable by just anyone, but rather @@ -709,6 +713,18 @@ REVOKE EXECUTE ON FUNCTION pg_ls_logicalmapdir() FROM PUBLIC; REVOKE EXECUTE ON FUNCTION pg_ls_replslotdir(text) FROM PUBLIC; +REVOKE EXECUTE ON FUNCTION lrg_create(text, text, text, text) FROM PUBLIC; + +REVOKE EXECUTE ON FUNCTION lrg_node_attach(text, text, text, text) FROM PUBLIC; + +REVOKE EXECUTE ON FUNCTION lrg_node_detach(text, text, boolean) FROM PUBLIC; + +REVOKE EXECUTE ON FUNCTION lrg_drop(text) FROM PUBLIC; + +REVOKE EXECUTE ON FUNCTION lrg_insert_into_sub(text) FROM PUBLIC; + +REVOKE EXECUTE ON FUNCTION lrg_insert_into_nodes(text, int4, text, text, text) FROM PUBLIC; + -- -- We also set up some things as accessible to standard roles. -- diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fedaed533b..9d1f45fd65 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1311,3 +1311,8 @@ CREATE VIEW pg_stat_subscription_stats AS ss.stats_reset FROM pg_subscription as s, pg_stat_get_subscription_stats(s.oid) as ss; + +-- All columns of pg_lrg_nodes except subconninfo are publicly readable. +REVOKE ALL ON pg_lrg_nodes FROM public; +GRANT SELECT (oid, groupid, dbid, status, nodename, nodeid) + ON pg_lrg_nodes TO public; diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 40601aefd9..49d8ff1878 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -20,6 +20,7 @@ #include "postmaster/bgworker_internals.h" #include "postmaster/interrupt.h" #include "postmaster/postmaster.h" +#include "replication/lrg.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "storage/dsm.h" @@ -128,6 +129,12 @@ static const struct }, { "ApplyWorkerMain", ApplyWorkerMain + }, + { + "lrg_launcher_main", lrg_launcher_main + }, + { + "lrg_worker_main", lrg_worker_main } }; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index dde4bc25b1..98468f9451 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -118,6 +118,7 @@ #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "replication/logicallauncher.h" +#include "replication/lrg.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -1020,6 +1021,11 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); + /* + * Register the LRG launcher. + */ + LrgLauncherRegister(); + /* * process any libraries that should be preloaded at postmaster start */ diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 3d8fb70c0e..49ffc243f6 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -35,7 +35,9 @@ OBJS = \ walreceiverfuncs.o \ walsender.o -SUBDIRS = logical +SUBDIRS = \ + logical \ + lrg include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/libpqlrg/Makefile b/src/backend/replication/libpqlrg/Makefile new file mode 100644 index 0000000000..72d911a918 --- /dev/null +++ b/src/backend/replication/libpqlrg/Makefile @@ -0,0 +1,38 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/lrg/libpqlrg +# +# IDENTIFICATION +# src/backend/replication/lrg/libpqlrg/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/replication/lrg/libpqlrg +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) + +OBJS = \ + $(WIN32RES) \ + libpqlrg.o + +SHLIB_LINK_INTERNAL = $(libpq) +SHLIB_LINK = $(filter -lintl, $(LIBS)) +SHLIB_PREREQS = submake-libpq +PGFILEDESC = "libpqlrg" +NAME = libpqlrg + +all: all-shared-lib + +include $(top_srcdir)/src/Makefile.shlib + +install: all installdirs install-lib + +installdirs: installdirs-lib + +uninstall: uninstall-lib + +clean distclean maintainer-clean: clean-lib + rm -f $(OBJS) diff --git a/src/backend/replication/libpqlrg/libpqlrg.c b/src/backend/replication/libpqlrg/libpqlrg.c new file mode 100644 index 0000000000..bad72e6159 --- /dev/null +++ b/src/backend/replication/libpqlrg/libpqlrg.c @@ -0,0 +1,444 @@ +/*------------------------------------------------------------------------- + * + * libpqlrg.c + * + * This file contains the libpq-specific parts of lrg feature. It's + * loaded as a dynamic module to avoid linking the main server binary with + * libpq. + *------------------------------------------------------------------------- + */ + + +#include "postgres.h" + +#include "access/heapam.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "lib/stringinfo.h" +#include "replication/libpqlrg.h" +#include "replication/lrg.h" +#include "utils/snapmgr.h" + +PG_MODULE_MAGIC; + +void _PG_init(void); + +/* Prototypes for interface functions */ +static bool libpqlrg_connect(const char *connstring, PGconn **conn, bool should_throw_error); +static bool libpqlrg_check_group(PGconn *conn, const char *group_name); +static void libpqlrg_copy_lrg_nodes(PGconn *remoteconn, PGconn *localconn); +static void libpqlrg_insert_into_lrg_nodes(PGconn *remoteconn, + const char *node_id, LRG_NODE_STATE status, + const char *node_name, const char *local_connstring, + const char *upstream_connstring); + +static void libpqlrg_create_subscription(const char *group_name, const char *publisher_connstring, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn, const char *options); + +static void libpqlrg_drop_publication(const char *group_name, + PGconn *publisherconn); + +static void libpqlrg_drop_subscription(const char *group_name, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn, bool need_to_alter); + +static void libpqlrg_delete_from_nodes(PGconn *conn, const char *node_id); + +static void libpqlrg_cleanup(PGconn *conn); + +static void libpqlrg_disconnect(PGconn *conn); + +static lrg_function_types PQLrgFunctionTypes = +{ + libpqlrg_connect, + libpqlrg_check_group, + libpqlrg_copy_lrg_nodes, + libpqlrg_insert_into_lrg_nodes, + libpqlrg_create_subscription, + libpqlrg_drop_publication, + libpqlrg_drop_subscription, + libpqlrg_delete_from_nodes, + libpqlrg_cleanup, + libpqlrg_disconnect +}; + +/* + * Executes PQconnectdb() and PQstatus(), and check privilege + */ +static bool +libpqlrg_connect(const char *connstring, PGconn **conn, bool should_throw_error) +{ + PGresult *result; + + *conn = PQconnectdb(connstring); + + /* + * If the remote host goes down, throws an ERROR or return immediately. + */ + if (PQstatus(*conn) != CONNECTION_OK) + { + if (should_throw_error) + ereport(ERROR, + errmsg("could not connect to the server"), + errhint("Please check the connection string and health of destination.")); + else + return false; + } + + /* + * Ensure the the connection is established as superuser. + * Throws an ERROR if not. + */ + result = PQexec(*conn, "SELECT usesuper FROM pg_user WHERE usename = current_user"); + if (strcmp(PQgetvalue(result, 0, 0), "t") != 0) + { + PQfinish(*conn); + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser for LRG works"))); + } + PQclear(result); + + return true; +} + + +/* + * Check whether the node is in the specified group or not. + */ +static bool +libpqlrg_check_group(PGconn *conn, const char *group_name) +{ + PGresult *result; + StringInfoData query; + bool ret; + + Assert(PQstatus(conn) == CONNECTION_OK); + initStringInfo(&query); + appendStringInfo(&query, "SELECT COUNT(*) FROM pg_lrg_info WHERE groupname = '%s'", group_name); + + result = PQexec(conn, query.data); + + ret = atoi(PQgetvalue(result, 0, 0)); + pfree(query.data); + + return ret != 0; +} + +/* + * Copy pg_lrg_nodes from remoteconn. + */ +static void +libpqlrg_copy_lrg_nodes(PGconn *remoteconn, PGconn *localconn) +{ + PGresult *result; + StringInfoData query; + int i, num_tuples; + + Assert(PQstatus(remoteconn) == CONNECTION_OK + && PQstatus(localconn) == CONNECTION_OK); + initStringInfo(&query); + + + /* + * Note that COPY command cannot be used here because group_oid + * might be different between remote and local. + */ + appendStringInfo(&query, "SELECT nodeid, status, nodename, " + "localconn, upstreamconn FROM pg_lrg_nodes"); + result = PQexec(remoteconn, query.data); + if (PQresultStatus(result) != PGRES_TUPLES_OK) + ereport(ERROR, + errmsg("could not read pg_lrg_nodes from the upstream node"), + errhint("Is the server really running?")); + + resetStringInfo(&query); + + num_tuples = PQntuples(result); + + for(i = 0; i < num_tuples; i++) + { + char *node_id; + char *status; + char *nodename; + char *localconn; + char *upstreamconn; + + node_id = PQgetvalue(result, i, 0); + status = PQgetvalue(result, i, 1); + nodename = PQgetvalue(result, i, 2); + localconn = PQgetvalue(result, i, 3); + upstreamconn = PQgetvalue(result, i, 4); + + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + /* + * group_oid is adjusted to local value + */ + lrg_add_nodes(node_id, get_group_info(NULL), atoi(status), nodename, localconn, upstreamconn); + CommitTransactionCommand(); + } +} + +/* + * Insert data to remote's pg_lrg_nodes. It will be done + * via internal SQL function. + */ +static void +libpqlrg_insert_into_lrg_nodes(PGconn *remoteconn, + const char *node_id, LRG_NODE_STATE status, + const char *node_name, const char *local_connstring, + const char *upstream_connstring) +{ + StringInfoData query; + PGresult *result; + + Assert(PQstatus(remoteconn) == CONNECTION_OK + && node_id != NULL + && node_name != NULL + && local_connstring != NULL + && upstream_connstring != NULL); + + initStringInfo(&query); + appendStringInfo(&query, "SELECT lrg_insert_into_nodes('%s', %d, '%s', '%s', '%s')", + node_id, status, node_name, local_connstring, upstream_connstring); + + result = PQexec(remoteconn, query.data); + if (PQresultStatus(result) != PGRES_TUPLES_OK) + ereport(ERROR, + errmsg("could not execute lrg_insert_into_nodes on the remote node"), + errhint("Is the server really running?")); + + PQclear(result); + pfree(query.data); +} + +/* + * Create a subscription with given name and parameters, and + * add a tuple to remote's pg_lrg_sub. + * + * Note that both of this and libpqlrg_insert_into_lrg_nodes() + * must be called during attaching a node. + */ +static void +libpqlrg_create_subscription(const char *group_name, const char *publisher_connstring, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn, const char *options) +{ + StringInfoData query, sub_name; + PGresult *result; + + Assert(publisher_connstring != NULL && subscriberconn != NULL); + + /* + * the name of subscriber is just concat of two node_id. + */ + initStringInfo(&query); + initStringInfo(&sub_name); + + /* + * construct the name of subscription and query. + */ + appendStringInfo(&sub_name, "sub_%s_%s", subscriber_node_id, publisher_node_id); + appendStringInfo(&query, "CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION pub_for_%s", + sub_name.data, publisher_connstring, group_name); + + if (options) + appendStringInfo(&query, " WITH (%s)", options); + + result = PQexec(subscriberconn, query.data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + ereport(ERROR, + errmsg("could not create a subscription : %s", sub_name.data), + errhint("Is the server really running?")); + PQclear(result); + + resetStringInfo(&query); + appendStringInfo(&query, "SELECT lrg_insert_into_sub('%s')", sub_name.data); + result = PQexec(subscriberconn, query.data); + if (PQresultStatus(result) != PGRES_TUPLES_OK) + + PQclear(result); + + pfree(sub_name.data); + pfree(query.data); +} + +/* + * Drop a given publication and delete a tuple + * from remote's pg_lrg_pub. + */ +static void +libpqlrg_drop_publication(const char *group_name, + PGconn *publisherconn) +{ + StringInfoData query, pub_name; + PGresult *result; + + Assert(PQstatus(publisherconn) == CONNECTION_OK); + + initStringInfo(&query); + initStringInfo(&pub_name); + + appendStringInfo(&pub_name, "pub_for_%s", group_name); + appendStringInfo(&query, "DROP PUBLICATION %s", pub_name.data); + + result = PQexec(publisherconn, query.data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + ereport(ERROR, + errmsg("could not drop the publication : %s", pub_name.data), + errhint("Is the server really running?")); + PQclear(result); + pfree(pub_name.data); + pfree(query.data); +} + +/* + * same as above, but for subscription. + */ +static void +libpqlrg_drop_subscription(const char *group_name, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn, bool need_to_alter) +{ + StringInfoData query, sub_name; + PGresult *result; + + Assert(PQstatus(subscriberconn) == CONNECTION_OK); + + /* + * the name of subscriber is just concat of two node_id. + */ + initStringInfo(&query); + initStringInfo(&sub_name); + + /* + * construct the name of subscription. + */ + appendStringInfo(&sub_name, "sub_%s_%s", subscriber_node_id, publisher_node_id); + + /* + * If the publisher node is not reachable, the subscription cannot be dropped + * easily. Followings are needed: + * + * - disable the subscription + * - disassociate the subscription from the replication slot + */ + if (need_to_alter) + { + appendStringInfo(&query, "ALTER SUBSCRIPTION %s DISABLE", sub_name.data); + result = PQexec(subscriberconn, query.data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + ereport(ERROR, + errmsg("could not alter the subscription : %s", query.data), + errhint("Is the server really running?")); + PQclear(result); + resetStringInfo(&query); + + appendStringInfo(&query, "ALTER SUBSCRIPTION %s SET (slot_name = NONE)", sub_name.data); + result = PQexec(subscriberconn, query.data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + ereport(ERROR, + errmsg("could not alter the subscription : %s", query.data), + errhint("Is the server really running?")); + PQclear(result); + resetStringInfo(&query); + } + + appendStringInfo(&query, "DROP SUBSCRIPTION %s", sub_name.data); + + result = PQexec(subscriberconn, query.data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + ereport(ERROR, + errmsg("could not drop the subscription : %s", query.data), + errhint("Is the server really running?")); + PQclear(result); + + /* + * ...Moreover, we must delete the remaining replication slot. + */ + if (need_to_alter) + { + resetStringInfo(&sub_name); + resetStringInfo(&query); + + appendStringInfo(&sub_name, "sub_%s_%s", publisher_node_id, subscriber_node_id); + + appendStringInfo(&query, "SELECT pg_drop_replication_slot('%s')", sub_name.data); + result = PQexec(subscriberconn, query.data); + if (PQresultStatus(result) != PGRES_TUPLES_OK) + ereport(ERROR, + errmsg("could not drop replication slot : %s", query.data), + errhint("Is the server really running?")); + PQclear(result); + } + + pfree(sub_name.data); + pfree(query.data); +} + +/* + * Delete data to remote's pg_lrg_nodes. It will be done + * via internal SQL function. + */ +static void +libpqlrg_delete_from_nodes(PGconn *conn, const char *node_id) +{ + StringInfoData query; + PGresult *result; + + Assert(PQstatus(conn) == CONNECTION_OK); + + initStringInfo(&query); + appendStringInfo(&query, "DELETE FROM pg_lrg_nodes WHERE nodeid = '%s'", node_id); + + result = PQexec(conn, query.data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + ereport(ERROR, + errmsg("could not delete a tuple from remote's pg_lrg_nodes"), + errhint("Is the server really running?")); + PQclear(result); + pfree(query.data); +} + +/* + * Delete all data from LRG catalogs + */ +static void +libpqlrg_cleanup(PGconn *conn) +{ + PGresult *result; + Assert(PQstatus(conn) == CONNECTION_OK); + + result = PQexec(conn, "DELETE FROM pg_lrg_pub;" + "DELETE FROM pg_lrg_sub;" + "DELETE FROM pg_lrg_nodes;" + "DELETE FROM pg_lrg_info;"); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + ereport(ERROR, + errmsg("could not delete data from remote's system catalogs"), + errhint("Is the server really running?")); + + PQclear(result); +} + +/* + * Just a wrapper for PQfinish() + */ +static void +libpqlrg_disconnect(PGconn *conn) +{ + PQfinish(conn); +} + +/* + * Module initialization function + */ +void +_PG_init(void) +{ + if (LrgFunctionTypes != NULL) + elog(ERROR, "libpqlrg already loaded"); + LrgFunctionTypes = &PQLrgFunctionTypes; +} diff --git a/src/backend/replication/lrg/Makefile b/src/backend/replication/lrg/Makefile new file mode 100644 index 0000000000..4ce929b6a4 --- /dev/null +++ b/src/backend/replication/lrg/Makefile @@ -0,0 +1,22 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/lrg +# +# IDENTIFICATION +# src/backend/replication/lrg/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/replication/lrg +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) + +OBJS = \ + lrg.o \ + lrg_launcher.o \ + lrg_worker.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/lrg/lrg.c b/src/backend/replication/lrg/lrg.c new file mode 100644 index 0000000000..bd628d8f1c --- /dev/null +++ b/src/backend/replication/lrg/lrg.c @@ -0,0 +1,570 @@ +/*------------------------------------------------------------------------- + * + * lrg.c + * Constructs a logical replication group + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/relscan.h" +#include "access/table.h" +#include "access/xlog.h" +#include "catalog/catalog.h" +#include "catalog/indexing.h" +#include "catalog/pg_lrg_info.h" +#include "catalog/pg_lrg_nodes.h" +#include "catalog/pg_lrg_sub.h" +#include "catalog/pg_subscription.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "replication/libpqlrg.h" +#include "replication/logicallauncher.h" +#include "replication/lrg.h" +#include "storage/lock.h" +#include "storage/proc.h" +#include "utils/builtins.h" +#include "utils/fmgrprotos.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" + +LrgWorkerCtxStruct *LrgWorkerCtx; + +static Size lrg_worker_array_size(void); +static Oid lrg_add_info(const char *group_name, const char *publication_type); +static Oid find_subscription(const char *subname); + +/* + * Helpler function for LrgLauncherShmemInit. + */ +static Size +lrg_worker_array_size(void) +{ + Size size; + + size = sizeof(LrgWorkerCtxStruct); + size = MAXALIGN(size); + /* XXX: Which value is appropriate for the array size? */ + size = add_size(size, mul_size(max_worker_processes, sizeof(LrgWorkerCtxStruct))); + + return size; +} + +/* + * Allocate LrgWorkerCtxStruct to the shared memory. + */ +void +LrgLauncherShmemInit(void) +{ + bool found; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + LrgWorkerCtx = (LrgWorkerCtxStruct *) + ShmemInitStruct("Lrg Launcher Data", + lrg_worker_array_size(), + &found); + if (!found) + { + MemSet(LrgWorkerCtx, 0, lrg_worker_array_size()); + LWLockInitialize(&(LrgWorkerCtx->lock), LWLockNewTrancheId()); + } + LWLockRelease(AddinShmemInitLock); + LWLockRegisterTranche(LrgWorkerCtx->lock.tranche, "lrg"); +} + +/* + * Register the LRG launcher. This will be called during postmaster startup. + */ +void +LrgLauncherRegister(void) +{ + BackgroundWorker worker; + + /* + * LRG deeply depends on the logical replication mechanism, so + * skip registering the LRG launcher if logical replication + * cannot be used. + */ + if (max_logical_replication_workers == 0) + return; + + /* + * Build struct BackgroundWorker for launcher. + */ + MemSet(&worker, 0, sizeof(BackgroundWorker)); + + snprintf(worker.bgw_name, BGW_MAXLEN, "lrg launcher"); + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_restart_time = BGW_NEVER_RESTART; + snprintf(worker.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(worker.bgw_function_name, BGW_MAXLEN, "lrg_launcher_main"); + RegisterBackgroundWorker(&worker); +} + +/* + * construct node_id. + * + * TODO: construct proper node_id. Currently it is just concat of + * sytem identifier and dbid. + */ +void +construct_node_id(char *out_node_id, int size) +{ + snprintf(out_node_id, size, UINT64_FORMAT "%u", GetSystemIdentifier(), MyDatabaseId); +} + +/* + * Actual work for adding a tuple to pg_lrg_nodes. + */ +void +lrg_add_nodes(const char *node_id, Oid group_id, LRG_NODE_STATE status, + const char *node_name, const char *local_connstring, + const char *upstream_connstring) +{ + Relation rel; + bool nulls[Natts_pg_lrg_nodes]; + Datum values[Natts_pg_lrg_nodes]; + HeapTuple tup; + + Oid lrgnodesoid; + + rel = table_open(LrgNodesRelationId, ExclusiveLock); + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + lrgnodesoid = GetNewOidWithIndex(rel, LrgNodesRelationIndexId, Anum_pg_lrg_nodes_oid); + values[Anum_pg_lrg_nodes_oid - 1] = ObjectIdGetDatum(lrgnodesoid); + values[Anum_pg_lrg_nodes_nodeid - 1] = CStringGetTextDatum(node_id); + values[Anum_pg_lrg_nodes_groupid - 1] = ObjectIdGetDatum(group_id); + values[Anum_pg_lrg_nodes_status - 1] = Int32GetDatum(status); + values[Anum_pg_lrg_nodes_dbid - 1] = ObjectIdGetDatum(MyDatabaseId); + values[Anum_pg_lrg_nodes_nodename - 1] = CStringGetDatum(node_name); + values[Anum_pg_lrg_nodes_localconn - 1] = CStringGetTextDatum(local_connstring); + + if (upstream_connstring != NULL) + values[Anum_pg_lrg_nodes_upstreamconn - 1] = CStringGetTextDatum(upstream_connstring); + else + nulls[Anum_pg_lrg_nodes_upstreamconn - 1] = true; + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + /* Insert tuple into catalog. */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + table_close(rel, ExclusiveLock); +} + +/* + * read pg_lrg_info and get oid. + * + * XXX: This function assumes that there is only one tuple + * in the pg_lrg_info. + */ +Oid +get_group_info(char **group_name) +{ + Relation rel; + HeapTuple tup; + TableScanDesc scan; + Oid group_oid = InvalidOid; + Form_pg_lrg_info infoform; + bool is_opened = false; + + if (!IsTransactionState()) + { + is_opened = true; + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + } + + rel = table_open(LrgInfoRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + tup = heap_getnext(scan, ForwardScanDirection); + + if (tup != NULL) + { + infoform = (Form_pg_lrg_info) GETSTRUCT(tup); + group_oid = infoform->oid; + if (group_name != NULL) + { + MemoryContext old; + old = MemoryContextSwitchTo(TopMemoryContext); + *group_name = pstrdup(NameStr(infoform->groupname)); + MemoryContextSwitchTo(old); + } + } + + table_endscan(scan); + table_close(rel, AccessShareLock); + + if (is_opened) + CommitTransactionCommand(); + + return group_oid; +} + +/* + * Actual work for adding a tuple to pg_lrg_info. + */ +static Oid +lrg_add_info(const char *group_name, const char *publication_type) +{ + Relation rel; + bool nulls[Natts_pg_lrg_info]; + Datum values[Natts_pg_lrg_info]; + HeapTuple tup; + Oid lrgoid; + + rel = table_open(LrgInfoRelationId, ExclusiveLock); + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + lrgoid = GetNewOidWithIndex(rel, LrgInfoRelationIndexId, Anum_pg_lrg_info_oid); + values[Anum_pg_lrg_info_oid - 1] = ObjectIdGetDatum(lrgoid); + values[Anum_pg_lrg_info_groupname - 1] = CStringGetDatum(group_name); + values[Anum_pg_lrg_info_pub_type - 1] = CStringGetTextDatum(publication_type); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + /* Insert tuple into catalog. */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + table_close(rel, ExclusiveLock); + + return lrgoid; +} + +/* + * helper function for lrg_insert_into_sub + */ +static Oid +find_subscription(const char *subname) +{ + /* for scannning */ + Relation rel; + HeapTuple tup; + Form_pg_subscription form; + + rel = table_open(SubscriptionRelationId, AccessExclusiveLock); + tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId, + CStringGetDatum(subname)); + + if (!HeapTupleIsValid(tup)) + { + table_close(rel, NoLock); + return InvalidOid; + } + + form = (Form_pg_subscription) GETSTRUCT(tup); + table_close(rel, NoLock); + + return form->oid; +} + +/* + * ================================ + * Public APIs + * ================================ + */ + +/* + * SQL function for creating a new logical replication group. + * + * This function adds a tuple to pg_lrg_info and pg_lrg_nodes, + * and after that kick lrg launcher. + */ +Datum +lrg_create(PG_FUNCTION_ARGS) +{ + Oid lrgoid; + char *group_name; + char *pub_type; + char *local_connstring; + char *node_name; + char *group_name_from_catalog = NULL; + + /* XXX: for simplify the fixed array is used */ + char node_id[64]; + + if (get_group_info(&group_name_from_catalog) != InvalidOid) + ereport(ERROR, + errmsg("could not create a node group"), + errdetail("This node was already a member of %s.", group_name_from_catalog), + errhint("You need to detach from or drop the group.")); + + group_name = text_to_cstring(PG_GETARG_TEXT_PP(0)); + pub_type = text_to_cstring(PG_GETARG_TEXT_PP(1)); + + if (pg_strcasecmp(pub_type, "FOR ALL TABLES") != 0) + ereport(ERROR, + errmsg("cannot create a node group"), + errdetail("Only 'FOR ALL TABLES' is supported as publication type.")); + + lrgoid = lrg_add_info(group_name, pub_type); + + construct_node_id(node_id, sizeof(node_id)); + local_connstring = text_to_cstring(PG_GETARG_TEXT_PP(2)); + node_name = text_to_cstring(PG_GETARG_TEXT_PP(3)); + lrg_add_nodes(node_id, lrgoid, LRG_STATE_INIT, node_name, local_connstring, NULL); + + lrg_launcher_wakeup(); + PG_RETURN_VOID(); +} + + +/* + * SQL function for attaching to a specified group + * + * This function adds a tuple to pg_lrg_info and pg_lrg_nodes, + * and after that kicks lrg launcher. + */ +Datum +lrg_node_attach(PG_FUNCTION_ARGS) +{ + Oid lrgoid; + char *group_name; + char *local_connstring; + char *upstream_connstring; + char *node_name; + PGconn *upstreamconn = NULL; + char *group_name_from_catalog = NULL; + + /* XXX: for simplify the fixed array is used */ + char node_id[64]; + + if (get_group_info(&group_name_from_catalog) != InvalidOid) + ereport(ERROR, + errmsg("could not attach to a node group"), + errdetail("This node was already a member of %s.", group_name_from_catalog), + errhint("You need to detach from or drop the group.")); + + group_name = text_to_cstring(PG_GETARG_TEXT_PP(0)); + local_connstring = text_to_cstring(PG_GETARG_TEXT_PP(1)); + upstream_connstring = text_to_cstring(PG_GETARG_TEXT_PP(2)); + node_name = text_to_cstring(PG_GETARG_TEXT_PP(3)); + + /* + * For sanity check the backend process must connect to the upstream node. + * libpqlrg shared library will be used for that. + */ + load_file("libpqlrg", false); + lrg_connect(upstream_connstring, &upstreamconn, true); + + if (!lrg_check_group(upstreamconn, group_name)) + ereport(ERROR, + errmsg("could not attach to the node group"), + errdetail("Upstream node is not a member of specified group.")); + + lrg_disconnect(upstreamconn); + + lrgoid = lrg_add_info(group_name, "FOR ALL TABLES"); + + construct_node_id(node_id, sizeof(node_id)); + lrg_add_nodes(node_id, lrgoid, LRG_STATE_INIT, node_name, local_connstring, upstream_connstring); + + lrg_launcher_wakeup(); + PG_RETURN_VOID(); +} + +/* + * SQL function for detaching from a group + */ +Datum +lrg_node_detach(PG_FUNCTION_ARGS) +{ + char *node_name; + char *given_group_name; + char *group_name_from_catalog = NULL; + bool force_detach; + + given_group_name = text_to_cstring(PG_GETARG_TEXT_PP(0)); + node_name = text_to_cstring(PG_GETARG_TEXT_PP(1)); + force_detach = PG_GETARG_BOOL(2); + + (void) get_group_info(&group_name_from_catalog); + if (group_name_from_catalog == NULL) + ereport(ERROR, + errmsg("could not detach from the node group"), + errdetail("This node was in any node groups.")); + else if (strcmp(given_group_name, group_name_from_catalog) != 0) + ereport(ERROR, + errmsg("could not detach from the node group"), + errdetail("This node was in %s, but %s is specified.", + group_name_from_catalog, given_group_name)); + + update_node_status_by_nodename(node_name, force_detach ? LRG_STATE_FORCE_DETACH : LRG_STATE_TO_BE_DETACHED, true); + lrg_launcher_wakeup(); + PG_RETURN_VOID(); +} + +/* + * SQL function for dropping a group. + */ +Datum +lrg_drop(PG_FUNCTION_ARGS) +{ + char node_id[64]; + char *given_group_name; + char *group_name_from_catalog = NULL; + + construct_node_id(node_id, sizeof(node_id)); + + given_group_name = text_to_cstring(PG_GETARG_TEXT_PP(0)); + + (void) get_group_info(&group_name_from_catalog); + if (group_name_from_catalog == NULL) + ereport(ERROR, + errmsg("could not drop the node group"), + errdetail("This node was in any node groups.")); + else if (strcmp(given_group_name, group_name_from_catalog) != 0) + ereport(ERROR, + errmsg("could not drop the node group"), + errdetail("This node was in %s, but %s is specified.", + group_name_from_catalog, given_group_name)); + + /* TODO: add a check whether there are not other members in the group or not */ + update_node_status_by_nodeid(node_id, LRG_STATE_TO_BE_DETACHED, true); + lrg_launcher_wakeup(); + PG_RETURN_VOID(); +} + +/* + * Wait until lrg related functions are done + * + * Note that this function registers/unregisters a latest snapshot within a + * loop This may be not consistent with the isolation level set by user. + * + * XXX: Should we add a timeout parameter? + */ +Datum +lrg_wait(PG_FUNCTION_ARGS) +{ + if (get_group_info(NULL) == InvalidOid) + PG_RETURN_NULL(); + + for (;;) + { + Relation rel; + HeapTuple tup; + SysScanDesc scan; + /* Get latest snapshot in the every loop */ + Snapshot latest = RegisterSnapshot(GetLatestSnapshot()); + bool need_more_loop = false; + + CHECK_FOR_INTERRUPTS(); + + rel = table_open(LrgNodesRelationId, AccessShareLock); + scan = systable_beginscan(rel, InvalidOid, false, latest, 0, NULL); + + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_lrg_nodes nodesform = (Form_pg_lrg_nodes) GETSTRUCT(tup); + + /* + * Set a flag if we must wait more. + */ + if (nodesform->status != LRG_STATE_READY) + need_more_loop = true; + } + + systable_endscan(scan); + table_close(rel, NoLock); + + UnregisterSnapshot(latest); + + if (!need_more_loop) + break; + + /* + * wait very short time... + */ +#define TEMPORARY_NAP_TIME 500L + WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + TEMPORARY_NAP_TIME, 0); + } + + PG_RETURN_VOID(); +} + +/* + * ================================ + * Internal SQL functions + * ================================ + */ + +/* + * Wrapper for adding a tuple into pg_lrg_sub + * + * This function will be called by LRG worker. + */ +Datum +lrg_insert_into_sub(PG_FUNCTION_ARGS) +{ + char *sub_name; + Oid group_oid, sub_oid, lrgsub_oid; + Relation rel; + bool nulls[Natts_pg_lrg_sub]; + Datum values[Natts_pg_lrg_sub]; + HeapTuple tup; + + sub_name = text_to_cstring(PG_GETARG_TEXT_PP(0)); + + group_oid = get_group_info(NULL); + sub_oid = find_subscription(sub_name); + + rel = table_open(LrgSubscriptionId, ExclusiveLock); + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + lrgsub_oid = GetNewOidWithIndex(rel, LrgSubscriptionOidIndexId, Anum_pg_lrg_sub_oid); + + values[Anum_pg_lrg_sub_oid - 1] = ObjectIdGetDatum(lrgsub_oid); + values[Anum_pg_lrg_sub_groupid - 1] = ObjectIdGetDatum(group_oid); + values[Anum_pg_lrg_sub_subid - 1] = ObjectIdGetDatum(sub_oid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + /* Insert tuple into catalog. */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + table_close(rel, ExclusiveLock); + + PG_RETURN_VOID(); +} + +/* + * Wrapper for adding a tuple into pg_lrg_nodes + * + * This function will be called by LRG worker. + */ +Datum +lrg_insert_into_nodes(PG_FUNCTION_ARGS) +{ + char *node_id; + LRG_NODE_STATE status; + char *node_name; + char *local_connstring; + char *upstream_connstring; + Oid group_oid; + + node_id = text_to_cstring(PG_GETARG_TEXT_PP(0)); + status = DatumGetInt32(PG_GETARG_DATUM(1)); + node_name = text_to_cstring(PG_GETARG_TEXT_PP(2)); + local_connstring = text_to_cstring(PG_GETARG_TEXT_PP(3)); + upstream_connstring = text_to_cstring(PG_GETARG_TEXT_PP(4)); + + group_oid = get_group_info(NULL); + + lrg_add_nodes(node_id, group_oid, status, node_name, local_connstring, upstream_connstring); + + PG_RETURN_VOID(); +} diff --git a/src/backend/replication/lrg/lrg_launcher.c b/src/backend/replication/lrg/lrg_launcher.c new file mode 100644 index 0000000000..5d633ec2cf --- /dev/null +++ b/src/backend/replication/lrg/lrg_launcher.c @@ -0,0 +1,359 @@ +/*------------------------------------------------------------------------- + * + * lrg_launcher.c + * functions for lrg launcher + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/heapam.h" +#include "access/relscan.h" +#include "access/table.h" +#include "catalog/pg_database.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/logicallauncher.h" +#include "replication/lrg.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "tcop/tcopprot.h" +#include "utils/memutils.h" +#include "utils/snapmgr.h" + +static void launch_lrg_worker(Oid dbid); +static LrgWorker* find_perdb_worker(Oid dbid); +static List* get_db_list(void); +static void scan_and_launch(void); +static void lrglauncher_worker_onexit(int code, Datum arg); + +static bool ishook_registered = false; +static bool isworker_needed = false; + +/* + * Helper strcut used by get_db_list() + */ +typedef struct db_list_cell +{ + Oid dbid; + char *dbname; +} db_list_cell; + +/* + * Launch a lrg worker related with the given database + */ +static void +launch_lrg_worker(Oid dbid) +{ + BackgroundWorker bgw; + LrgWorker *worker = NULL; + int slot = 0; + + LWLockAcquire(&LrgWorkerCtx->lock, LW_EXCLUSIVE); + + /* + * Find a free worker slot. + */ + for (int i = 0; i < max_logical_replication_workers; i++) + { + LrgWorker *pw = &LrgWorkerCtx->workers[i]; + + if (pw->dbid == InvalidOid) + { + worker = pw; + slot = i; + break; + } + } + + /* + * If there are no more free worker slots, raise an ERROR now. + * + * TODO: cleanup the array at that time? + */ + if (worker == NULL) + { + LWLockRelease(&LrgWorkerCtx->lock); + ereport(ERROR, + errmsg("out of lrg worker slots")); + } + + + /* Prepare the worker slot. */ + worker->dbid = dbid; + + LWLockRelease(&LrgWorkerCtx->lock); + + MemSet(&bgw, 0, sizeof(BackgroundWorker)); + + snprintf(bgw.bgw_name, BGW_MAXLEN, "lrg worker for database %u", dbid); + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_restart_time = BGW_NEVER_RESTART; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "lrg_worker_main"); + bgw.bgw_main_arg = UInt32GetDatum(slot); + + if (!RegisterDynamicBackgroundWorker(&bgw, NULL)) + { + /* Failed to start worker, so clean up the worker slot. */ + LWLockAcquire(&LrgWorkerCtx->lock, LW_EXCLUSIVE); + lrg_worker_cleanup(worker); + LWLockRelease(&LrgWorkerCtx->lock); + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of worker slots"))); + } +} + +/* + * Find a launched lrg worker that related with the given database. + * This returns NUL if not exist. + */ +static LrgWorker* +find_perdb_worker(Oid dbid) +{ + int i; + + Assert(LWLockHeldByMe(&LrgWorkerCtx->lock)); + + for (i = 0; i < max_logical_replication_workers; i++) + { + LrgWorker *worker = &LrgWorkerCtx->workers[i]; + if (worker->dbid == dbid) + return worker; + } + return NULL; +} + +/* + * Load the list of databases in this server. + */ +static List* +get_db_list() +{ + List *res = NIL; + Relation rel; + TableScanDesc scan; + HeapTuple tup; + /* We will allocate the output data in the current memory context */ + MemoryContext resultcxt = CurrentMemoryContext; + + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + + rel = table_open(DatabaseRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tup); + db_list_cell *cell; + MemoryContext oldcxt; + + /* skip if connection is not allowed */ + if (!dbform->datallowconn) + continue; + + /* + * Allocate our results in the caller's context + */ + oldcxt = MemoryContextSwitchTo(resultcxt); + + cell = (db_list_cell *) palloc0(sizeof(db_list_cell)); + cell->dbid = dbform->oid; + cell->dbname = pstrdup(NameStr(dbform->datname)); + res = lappend(res, cell); + + MemoryContextSwitchTo(oldcxt); + } + + table_endscan(scan); + table_close(rel, AccessShareLock); + CommitTransactionCommand(); + + return res; +} + +/* + * Scan pg_lrg_nodes and launch if needed. + */ +static void +scan_and_launch(void) +{ + List *list; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + + subctx = AllocSetContextCreate(TopMemoryContext, + "Lrg Launcher list", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + list = get_db_list(); + + /* + * Start per-db loop + */ + foreach(lc, list) + { + db_list_cell *cell = (db_list_cell *)lfirst(lc); + LrgWorker *worker; + + /* + * Have the worker already started? + */ + LWLockAcquire(&LrgWorkerCtx->lock, LW_EXCLUSIVE); + worker = find_perdb_worker(cell->dbid); + LWLockRelease(&LrgWorkerCtx->lock); + + /* + * If so, launcher should not do anything + */ + if (worker != NULL) + continue; + + launch_lrg_worker(cell->dbid); + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); +} + + +/* + * Callback for process exit; cleanup the controller + */ +static void +lrglauncher_worker_onexit(int code, Datum arg) +{ + LWLockAcquire(&LrgWorkerCtx->lock, LW_EXCLUSIVE); + LrgWorkerCtx->launcher_pid = InvalidPid; + LrgWorkerCtx->launcher_latch = NULL; + LWLockRelease(&LrgWorkerCtx->lock); +} + +/* + * Entry point for lrg launcher + */ +void +lrg_launcher_main(Datum arg) +{ + Assert(LrgWorkerCtx->launcher_pid == 0); + LrgWorkerCtx->launcher_pid = MyProcPid; + + /* Establish signal handlers. */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Register my latch to the controller + * for receiving notifications from lrg background worker. + */ + LWLockAcquire(&LrgWorkerCtx->lock, LW_EXCLUSIVE); + LrgWorkerCtx->launcher_latch = &MyProc->procLatch; + LrgWorkerCtx->launcher_pid = MyProcPid; + LWLockRelease(&LrgWorkerCtx->lock); + before_shmem_exit(lrglauncher_worker_onexit, (Datum) 0); + ResetLatch(&MyProc->procLatch); + + /* + * we did not connect specific database, because launcher + * will read only pg_database. + */ + BackgroundWorkerInitializeConnection(NULL, NULL, 0); + + /* + * main loop + */ + for (;;) + { + int rc = 0; + + CHECK_FOR_INTERRUPTS(); + + /* + * XXX: for simplify laucnher will start a loop at fixed intervals, + * but it will be no-op if no one sets a latch. + */ +#define TEMPORARY_NAP_TIME 180000L + + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + TEMPORARY_NAP_TIME, 0); + if (rc & WL_LATCH_SET) + { + ResetLatch(&MyProc->procLatch); + CHECK_FOR_INTERRUPTS(); + scan_and_launch(); + } + + /* + * XXX: Reload configuration file, but is it really needed for launcher? + */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + } + /* Not reachable */ +} + +/* + * xact callback for launcher/worker. + */ +static void +lrg_perdb_wakeup_callback(XactEvent event, void *arg) +{ + switch (event) + { + case XACT_EVENT_COMMIT: + if (isworker_needed) + { + LrgWorker *worker; + LWLockAcquire(&LrgWorkerCtx->lock, LW_EXCLUSIVE); + worker = find_perdb_worker(MyDatabaseId); + + /* + * If lrg worker related with this db has been + * launched, notify to the worker. + * If not, maybe it means that someone has called lrg_create()/lrg_node_attach(), + * notify to the launcher. + */ + if (worker != NULL) + SetLatch(worker->worker_latch); + else + SetLatch(LrgWorkerCtx->launcher_latch); + + LWLockRelease(&LrgWorkerCtx->lock); + } + isworker_needed = false; + break; + /* + * XXX: Do we have to wake up LRG launcher and worker processes when PREPARE event? + */ + default: + break; + } +} + +/* + * Register a callback for notifying to launcher, and set a flag + */ +void +lrg_launcher_wakeup(void) +{ + if (!ishook_registered) + { + RegisterXactCallback(lrg_perdb_wakeup_callback, NULL); + ishook_registered = true; + } + isworker_needed = true; +} diff --git a/src/backend/replication/lrg/lrg_worker.c b/src/backend/replication/lrg/lrg_worker.c new file mode 100644 index 0000000000..dba0ad6d2c --- /dev/null +++ b/src/backend/replication/lrg/lrg_worker.c @@ -0,0 +1,749 @@ +/*------------------------------------------------------------------------- + * + * lrg_worker.c + * functions for lrg worker + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/relscan.h" +#include "access/table.h" +#include "catalog/catalog.h" +#include "catalog/indexing.h" +#include "catalog/pg_lrg_info.h" +#include "catalog/pg_lrg_nodes.h" +#include "catalog/pg_lrg_pub.h" +#include "catalog/pg_publication.h" +#include "executor/spi.h" +#include "libpq-fe.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/libpqlrg.h" +#include "replication/lrg.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/memutils.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" + +/* + * Data structure for one node. + */ +typedef struct LrgNode { + Oid group_oid; + char *node_id; + char *node_name; + char *local_connstring; + char *upstream_connstring; +} LrgNode; + +lrg_function_types *LrgFunctionTypes = NULL; + +static LrgWorker* my_lrg_worker = NULL; + +static void lrg_worker_onexit(int code, Datum arg); +static void do_node_management(void); +static void get_node_information(LrgNode **output_node, LRG_NODE_STATE *status); +static void advance_state_machine(LrgNode *node, LRG_NODE_STATE initial_status); +static void update_node_status_internal(const char *node_id, const char *node_name, LRG_NODE_STATE state, bool is_in_txn); +static void detach_node(LrgNode *node, bool force_detach); +static void create_publication(const char* group_name, const char* node_id, Oid group_oid); +static Oid find_publication(const char *pubname); +static List* get_lrg_nodes_list(const char *local_nodeid); +static void synchronise_system_tables(PGconn *localconn, PGconn *upstreamconn); + +void +lrg_worker_cleanup(LrgWorker *worker) +{ + Assert(LWLockHeldByMeInMode(&LrgWorkerCtx->lock, LW_EXCLUSIVE)); + + worker->dbid = InvalidOid; + worker->worker_pid = InvalidPid; + worker->worker_latch = NULL; +} + +/* + * Callback for process exit. cleanup the array. + */ +static void +lrg_worker_onexit(int code, Datum arg) +{ + LWLockAcquire(&LrgWorkerCtx->lock, LW_EXCLUSIVE); + lrg_worker_cleanup(my_lrg_worker); + LWLockRelease(&LrgWorkerCtx->lock); +} + +/* + * Synchronise system tables from upstream node. + * + * Currently it will read and insert pg_lrg_nodes only. + */ +static void +synchronise_system_tables(PGconn *localconn, PGconn *upstreamconn) +{ + lrg_copy_lrg_nodes(upstreamconn, localconn); +} + +/* + * Load the list of lrg_nodes, except the given node + */ +static List* +get_lrg_nodes_list(const char *excepted_node) +{ + List *res = NIL; + Relation rel; + SysScanDesc scan; + HeapTuple tup; + Snapshot current; + /* We will allocate the output data in the current memory context */ + MemoryContext resultcxt = CurrentMemoryContext; + + StartTransactionCommand(); + current = GetTransactionSnapshot(); + + rel = table_open(LrgNodesRelationId, AccessShareLock); + scan = systable_beginscan(rel, InvalidOid, false, current, 0, NULL); + + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + LrgNode *node; + MemoryContext oldcxt; + bool isnull; + Datum tmp_upstream_connstring; + + if (excepted_node != NULL && + strcmp(TextDatumGetCString(heap_getattr(tup, Anum_pg_lrg_nodes_nodeid, RelationGetDescr(rel), &isnull)), excepted_node) == 0) + continue; + /* + * Allocate our results in the caller's context, not the transaction's. + */ + oldcxt = MemoryContextSwitchTo(resultcxt); + + node = (LrgNode *)palloc0(sizeof(LrgNode)); + + node->group_oid = heap_getattr(tup, + Anum_pg_lrg_nodes_groupid, + RelationGetDescr(rel), + &isnull); + + node->node_id = pstrdup(TextDatumGetCString(heap_getattr(tup, + Anum_pg_lrg_nodes_nodeid, + RelationGetDescr(rel), + &isnull))); + node->node_name = pstrdup(DatumGetCString(heap_getattr(tup, + Anum_pg_lrg_nodes_nodename, + RelationGetDescr(rel), + &isnull))); + node->local_connstring = pstrdup(TextDatumGetCString(heap_getattr(tup, + Anum_pg_lrg_nodes_localconn, + RelationGetDescr(rel), + &isnull))); + + /* + * Unlike above attributes, upstreamconn might be NULL. + * So it must be substituted to the temporary variable + * and check whether it is null. + */ + tmp_upstream_connstring = heap_getattr(tup, + Anum_pg_lrg_nodes_upstreamconn, + RelationGetDescr(rel), + &isnull); + + if (!isnull) + node->upstream_connstring = pstrdup(TextDatumGetCString(tmp_upstream_connstring)); + else + node->upstream_connstring = NULL; + + res = lappend(res, node); + + MemoryContextSwitchTo(oldcxt); + } + + systable_endscan(scan); + table_close(rel, AccessShareLock); + CommitTransactionCommand(); + + return res; +} + +/* + * Internal routine for updaing the status of the node. + */ +static void +update_node_status_internal(const char *node_id, const char *node_name, LRG_NODE_STATE state, bool is_in_txn) +{ + Relation rel; + bool nulls[Natts_pg_lrg_nodes]; + bool replaces[Natts_pg_lrg_nodes]; + Datum values[Natts_pg_lrg_nodes]; + HeapTuple tup; + + Assert(!(node_id == NULL && node_name == NULL) + && !(node_id != NULL && node_name != NULL)); + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + values[Anum_pg_lrg_nodes_status - 1] = Int32GetDatum(state); + replaces[Anum_pg_lrg_nodes_status - 1] = true; + + if (!is_in_txn) + StartTransactionCommand(); + + rel = table_open(LrgNodesRelationId, RowExclusiveLock); + + if (node_id != NULL) + tup = SearchSysCacheCopy1(LRGNODEID, CStringGetTextDatum(node_id)); + else + tup = SearchSysCacheCopy1(LRGNODENAME, CStringGetDatum(node_name)); + + Assert(HeapTupleIsValid(tup)); + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + heap_freetuple(tup); + + table_close(rel, NoLock); + + if (!is_in_txn) + CommitTransactionCommand(); +} + +/* + * Update the status of node, that is speciefied by the name + */ +void +update_node_status_by_nodename(const char *node_name, LRG_NODE_STATE state, bool is_in_txn) +{ + update_node_status_internal(NULL, node_name, state, is_in_txn); +} + +/* + * Same as above, but node_id is used for the key + */ +void +update_node_status_by_nodeid(const char *node_id, LRG_NODE_STATE state, bool is_in_txn) +{ + update_node_status_internal(node_id, NULL, state, is_in_txn); +} + +/* + * Helper function for create_publication() + */ +static Oid +find_publication(const char *pubname) +{ + Relation rel; + HeapTuple tup; + Form_pg_publication pubform; + + rel = table_open(PublicationRelationId, RowExclusiveLock); + + /* Check if name is used */ + tup = SearchSysCacheCopy1(PUBLICATIONNAME, + CStringGetDatum(pubname)); + + if (!HeapTupleIsValid(tup)) + { + table_close(rel, NoLock); + return InvalidOid; + } + + pubform = (Form_pg_publication) GETSTRUCT(tup); + table_close(rel, NoLock); + + return pubform->oid; +} + +/* + * Create publication via SPI interface, and insert its oid + * to the system catalog pg_lrg_pub. + */ +static void +create_publication(const char* group_name, const char* node_id, Oid group_oid) +{ + int ret; + StringInfoData query, pub_name; + Oid pub_oid; + Oid lrgpub_oid; + Relation rel; + bool nulls[Natts_pg_lrg_pub]; + Datum values[Natts_pg_lrg_pub]; + HeapTuple tup; + + initStringInfo(&query); + initStringInfo(&pub_name); + + /* Firstly do CREATE PUBLICATION */ + StartTransactionCommand(); + SPI_connect(); + PushActiveSnapshot(GetTransactionSnapshot()); + + appendStringInfo(&pub_name, "pub_for_%s", group_name); + appendStringInfo(&query, "CREATE PUBLICATION %s %s", pub_name.data, "FOR ALL TABLES"); + + ret = SPI_execute(query.data, false, 0); + if (ret != SPI_OK_UTILITY) + ereport(ERROR, + errmsg("could not create a publication"), + errdetail("Query: %s", query.data)); + + pub_oid = find_publication(pub_name.data); + if (pub_oid == InvalidOid) + ereport(ERROR, + errmsg("could not find a publication: %s", pub_name.data)); + + rel = table_open(LrgPublicationId, ExclusiveLock); + + memset(nulls, 0, sizeof(nulls)); + memset(values, 0, sizeof(values)); + + lrgpub_oid = GetNewOidWithIndex(rel, LrgPublicationOidIndexId, Anum_pg_lrg_pub_oid); + + values[Anum_pg_lrg_pub_oid - 1] = ObjectIdGetDatum(lrgpub_oid); + values[Anum_pg_lrg_pub_groupid - 1] = ObjectIdGetDatum(group_oid); + values[Anum_pg_lrg_pub_pubid - 1] = ObjectIdGetDatum(pub_oid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + /* Insert tuple into catalog. */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + table_close(rel, ExclusiveLock); + + SPI_finish(); + PopActiveSnapshot(); + CommitTransactionCommand(); + + pfree(pub_name.data); + pfree(query.data); +} + +/* + * Some work for detaching and dropping + */ +static void +detach_node(LrgNode *tobedetached, bool force_detach) +{ + PGconn *tobedetachedconn = NULL; + List *list; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + char *group_name = NULL; + bool could_connect; + + get_group_info(&group_name); + + /* + * load a library if LRG worker has not used yet + */ + if (LrgFunctionTypes == NULL) + load_file("libpqlrg", false); + + /* + * Try to connect to the to-be-detached node, + * and throw an ERROR if "force" option is not specified. + * + * Information about the health of the node must be kept + * because we must do some special things if it dies. + */ + could_connect = lrg_connect(tobedetached->local_connstring, &tobedetachedconn, false); + if (!force_detach && !could_connect) + ereport(ERROR, + errmsg("could not connect to the to-be-detached node")); + + subctx = AllocSetContextCreate(TopMemoryContext, + "Lrg Launcher list", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + list = get_lrg_nodes_list(tobedetached->node_id); + + if (list != NIL) + { + foreach(lc, list) + { + LrgNode *other_node = (LrgNode *)lfirst(lc); + PGconn *otherconn = NULL; + lrg_connect(other_node->local_connstring, &otherconn, true); + + lrg_drop_subscription(group_name, tobedetached->node_id, other_node->node_id, + otherconn, !could_connect); + /* + * DROP SUBSCRIPTION in to-be-detached node if possible + */ + if (could_connect) + lrg_drop_subscription(group_name, other_node->node_id, tobedetached->node_id, + tobedetachedconn, false); + + lrg_delete_from_nodes(otherconn, tobedetached->node_id); + lrg_disconnect(otherconn); + } + } + else + { + /* + * lrg_drop() case. Just delete all tuples from LRG catalogs. + */ + lrg_delete_from_nodes(tobedetachedconn, tobedetached->node_id); + } + + MemoryContextSwitchTo(oldctx); + MemoryContextDelete(subctx); + + + /* + * Do garbage collection if we can connect to + */ + if (could_connect) + { + lrg_drop_publication(group_name, tobedetachedconn); + lrg_cleanup(tobedetachedconn); + lrg_disconnect(tobedetachedconn); + } + pfree(group_name); +} + +/* + * advance the state machine for creating/attaching + */ +static void +advance_state_machine(LrgNode *local_node, LRG_NODE_STATE initial_status) +{ + PGconn *localconn = NULL; + PGconn *upstreamconn = NULL; + char *group_name = NULL; + LRG_NODE_STATE state = initial_status; + char node_id[64]; + + /* + * Assuming that the specified node is local + */ + construct_node_id(node_id, sizeof(node_id)); + Assert(strcmp(node_id, local_node->node_id) == 0); + + /* + * XXX: Global lock should be acquired around here. + */ + + if (state == LRG_STATE_INIT) + { + /* Establish connection if we are in the attaching case */ + if (local_node->upstream_connstring != NULL) + { + load_file("libpqlrg", false); + lrg_connect(local_node->upstream_connstring, &upstreamconn, true); + lrg_connect(local_node->local_connstring, &localconn, true); + + /* and get pg_lrg_nodes from upstream */ + synchronise_system_tables(localconn, upstreamconn); + } + get_group_info(&group_name); + + create_publication(group_name, local_node->node_id, local_node->group_oid); + + state = LRG_STATE_CREATE_PUBLICATION; + update_node_status_by_nodename(local_node->node_name, LRG_STATE_CREATE_PUBLICATION, false); + } + + /* + * XXX: We should ensure all changes have been sent to nodes here. + */ + + if (state == LRG_STATE_CREATE_PUBLICATION) + { + if (local_node->upstream_connstring != NULL) + { + List *list; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + + subctx = AllocSetContextCreate(TopMemoryContext, + "Lrg Launcher list", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + /* Get a node list that belong to the group */ + list = get_lrg_nodes_list(local_node->node_id); + + /* and do CREATE SUBSCRIPTION on all nodes! */ + foreach(lc, list) + { + LrgNode *other_node = (LrgNode *)lfirst(lc); + PGconn *otherconn = NULL; + lrg_connect(other_node->local_connstring, &otherconn, true); + + /* + * XXX: Initial data should be synchronized from upstream node, + * so a subscription that subscribes upstream node should be set as copy_data = force. + */ + + lrg_create_subscription(group_name, local_node->local_connstring, + local_node->node_id, other_node->node_id, + otherconn, "origin = local, copy_data = false"); + lrg_create_subscription(group_name, other_node->local_connstring, + other_node->node_id, local_node->node_id, + localconn, "origin = local, copy_data = false"); + + /* + * XXX: adding a tuple into remote's pg_lrg_nodes here, + * but it is bad. it should be end of this function. + */ + if (local_node->upstream_connstring != NULL) + lrg_insert_into_lrg_nodes(otherconn, local_node->node_id, + LRG_STATE_READY, local_node->node_name, + local_node->local_connstring, local_node->upstream_connstring); + lrg_disconnect(otherconn); + } + MemoryContextSwitchTo(oldctx); + MemoryContextDelete(subctx); + } + + /* + * XXX: Global lock should be released here + */ + + state = LRG_STATE_CREATE_SUBSCRIPTION; + update_node_status_by_nodename(local_node->node_name, LRG_STATE_CREATE_SUBSCRIPTION, false); + } + + state = LRG_STATE_READY; + update_node_status_by_nodename(local_node->node_name, LRG_STATE_READY, false); + + /* + * clean up phase + */ + if (localconn != NULL) + lrg_disconnect(localconn); + if (upstreamconn != NULL) + lrg_disconnect(upstreamconn); + if (group_name != NULL) + pfree(group_name); +} + +/* + * Get node-specific information that status is not ready. + */ +static void +get_node_information(LrgNode **output_node, LRG_NODE_STATE *status) +{ + Relation rel; + HeapTuple tup; + SysScanDesc scan; + Snapshot current; + + StartTransactionCommand(); + current = GetTransactionSnapshot(); + + rel = table_open(LrgNodesRelationId, AccessShareLock); + + scan = systable_beginscan(rel, InvalidOid, false, current, 0, NULL); + + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + MemoryContext oldcxt; + LrgNode *tmp; + LRG_NODE_STATE node_status; + bool isnull; + + Datum tmp_upstream_connstring; + + node_status = DatumGetInt32(heap_getattr(tup, + Anum_pg_lrg_nodes_status, + RelationGetDescr(rel), + &isnull)); + + /* + * If the status is ready, we skip it. + */ + if (node_status == LRG_STATE_READY) + continue; + + oldcxt = MemoryContextSwitchTo(TopMemoryContext); + tmp = (LrgNode *)palloc0(sizeof(LrgNode)); + + tmp->group_oid = heap_getattr(tup, + Anum_pg_lrg_nodes_groupid, + RelationGetDescr(rel), + &isnull); + + tmp->node_id = pstrdup(TextDatumGetCString(heap_getattr(tup, + Anum_pg_lrg_nodes_nodeid, + RelationGetDescr(rel), + &isnull))); + tmp->node_name = pstrdup(DatumGetCString(heap_getattr(tup, + Anum_pg_lrg_nodes_nodename, + RelationGetDescr(rel), + &isnull))); + tmp->local_connstring = pstrdup(TextDatumGetCString(heap_getattr(tup, + Anum_pg_lrg_nodes_localconn, + RelationGetDescr(rel), + &isnull))); + + + /* + * the temporary valiable is used in order to do null checking + */ + tmp_upstream_connstring = heap_getattr(tup, + Anum_pg_lrg_nodes_upstreamconn, + RelationGetDescr(rel), + &isnull); + + if (!isnull) + tmp->upstream_connstring = pstrdup(TextDatumGetCString(tmp_upstream_connstring)); + else + tmp->upstream_connstring = NULL; + + + *output_node = tmp; + *status = node_status; + + MemoryContextSwitchTo(oldcxt); + break; + } + + systable_endscan(scan); + table_close(rel, NoLock); + CommitTransactionCommand(); +} + +static void +do_node_management(void) +{ + LrgNode *node = NULL; + LRG_NODE_STATE status; + + /* + * read information from pg_lrg_nodes + */ + get_node_information(&node, &status); + + if (node == NULL) + { + /* + * If we rearch here status of nodes are READY, + * it means that no operations are needed. + */ + return; + } + + /* + * XXX: for simplify the case for detaching/dropping is completely separated + * from the creating/attaching. + */ + if (status == LRG_STATE_TO_BE_DETACHED + || status == LRG_STATE_FORCE_DETACH) + detach_node(node, status == LRG_STATE_FORCE_DETACH); + else + { + /* + * advance the state machine for creating or attaching. + */ + advance_state_machine(node, status); + } + + pfree(node->node_id); + pfree(node->node_name); + pfree(node->local_connstring); + if (node->upstream_connstring != NULL) + pfree(node->upstream_connstring); + pfree(node); +} + +/* + * Entry point for lrg worker + */ +void +lrg_worker_main(Datum arg) +{ + int slot = DatumGetInt32(arg); + + /* Establish signal handlers. */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Get information from the controller. The idex + * is given as the argument + */ + LWLockAcquire(&LrgWorkerCtx->lock, LW_SHARED); + my_lrg_worker = &LrgWorkerCtx->workers[slot]; + my_lrg_worker->worker_pid = MyProcPid; + my_lrg_worker->worker_latch = &MyProc->procLatch; + LWLockRelease(&LrgWorkerCtx->lock); + + before_shmem_exit(lrg_worker_onexit, (Datum) 0); + + /* + * Connect to the "allocated" database as superuser. + */ + BackgroundWorkerInitializeConnectionByOid(my_lrg_worker->dbid, 0, 0); + + elog(DEBUG3, "per-db worker for %u was launched", my_lrg_worker->dbid); + + /* + * The launcher launches the worker without considering + * the existence of lrg related data. + * So firstly workers must check their catalogs, and exit + * if there is no data. + * In any cases pg_lrg_info will have tuples if + * this node is in a node group, so we reads it. + */ + if (get_group_info(NULL) == InvalidOid) + { + elog(DEBUG3, "This database %u is not a member of lrg", MyDatabaseId); + proc_exit(0); + } + + do_node_management(); + + ResetLatch(&MyProc->procLatch); + + /* + * Wait for detaching or dropping. + */ + for (;;) + { + int rc; + bool is_latch_set = false; + + CHECK_FOR_INTERRUPTS(); + +#define TEMPORARY_NAP_TIME 180000L + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + TEMPORARY_NAP_TIME, 0); + + if (rc & WL_LATCH_SET) + { + is_latch_set = true; + ResetLatch(&MyProc->procLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + if (is_latch_set) + { + do_node_management(); + is_latch_set = false; + } + } +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 1a6f527051..48e9a422a0 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -32,6 +32,7 @@ #include "postmaster/bgwriter.h" #include "postmaster/postmaster.h" #include "replication/logicallauncher.h" +#include "replication/lrg.h" #include "replication/origin.h" #include "replication/slot.h" #include "replication/walreceiver.h" @@ -284,6 +285,7 @@ CreateSharedMemoryAndSemaphores(void) WalRcvShmemInit(); PgArchShmemInit(); ApplyLauncherShmemInit(); + LrgLauncherShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index 1912b12146..8e00fde1cc 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -43,6 +43,7 @@ #include "catalog/pg_foreign_server.h" #include "catalog/pg_foreign_table.h" #include "catalog/pg_language.h" +#include "catalog/pg_lrg_nodes.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" #include "catalog/pg_operator.h" @@ -509,6 +510,28 @@ static const struct cachedesc cacheinfo[] = { }, 4 }, + {LrgNodesRelationId, /* LRGNODEID */ + LrgNodeIdIndexId, + 1, + { + Anum_pg_lrg_nodes_nodeid, + 0, + 0, + 0 + }, + 4 + }, + {LrgNodesRelationId, /* LRGNODENAME */ + LrgNodeNameIndexId, + 1, + { + Anum_pg_lrg_nodes_nodename, + 0, + 0, + 0 + }, + 4 + }, {NamespaceRelationId, /* NAMESPACENAME */ NamespaceNameIndexId, 1, diff --git a/src/include/catalog/pg_lrg_info.h b/src/include/catalog/pg_lrg_info.h new file mode 100644 index 0000000000..26c20a70ef --- /dev/null +++ b/src/include/catalog/pg_lrg_info.h @@ -0,0 +1,49 @@ +/*------------------------------------------------------------------------- + * + * pg_lrg_info.h + * definition of the "logical replication group information" system + * catalog (pg_lrg_info) + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_lrg_info.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_LRG_INFO_H +#define PG_LRG_INFO_H + +#include "catalog/genbki.h" +#include "catalog/pg_lrg_info_d.h" + +/* ---------------- + * pg_lrg_info definition. cpp turns this into + * typedef struct FormData_pg_lrg_info + * ---------------- + */ +CATALOG(pg_lrg_info,8337,LrgInfoRelationId) +{ + Oid oid; /* oid */ + + NameData groupname; /* name of the logical replication group */ +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + text pub_type BKI_FORCE_NOT_NULL; +#endif +} FormData_pg_lrg_info; + +/* ---------------- + * Form_pg_lrg_info corresponds to a pointer to a tuple with + * the format of pg_lrg_info relation. + * ---------------- + */ +typedef FormData_pg_lrg_info *Form_pg_lrg_info; + +DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_info_oid_index, 8338, LrgInfoRelationIndexId, on pg_lrg_info using btree(oid oid_ops)); + +#endif /* PG_LRG_INFO_H */ diff --git a/src/include/catalog/pg_lrg_nodes.h b/src/include/catalog/pg_lrg_nodes.h new file mode 100644 index 0000000000..99534fb96c --- /dev/null +++ b/src/include/catalog/pg_lrg_nodes.h @@ -0,0 +1,56 @@ +/*------------------------------------------------------------------------- + * + * pg_lrg_nodes.h + * definition of the "logical replication nodes" system + * catalog (pg_lrg_nodes) + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_lrg_nodes.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_LRG_NODES_H +#define PG_LRG_NODES_H + +#include "catalog/genbki.h" +#include "catalog/pg_lrg_nodes_d.h" + +/* ---------------- + * pg_lrg_nodes definition. cpp turns this into + * typedef struct FormData_pg_lrg_nodes + * ---------------- + */ +CATALOG(pg_lrg_nodes,8339,LrgNodesRelationId) +{ + Oid oid; /* oid */ + + Oid groupid BKI_LOOKUP(pg_lrg_info); + Oid dbid BKI_LOOKUP(pg_database); + int32 status; + NameData nodename BKI_FORCE_NOT_NULL; +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + text nodeid BKI_FORCE_NOT_NULL; /* name of the logical replication group */ + text localconn BKI_FORCE_NOT_NULL; + text upstreamconn BKI_FORCE_NULL; +#endif +} FormData_pg_lrg_nodes; + +/* ---------------- + * Form_pg_lrg_nodes corresponds to a pointer to a tuple with + * the format of pg_lrg_nodes relation. + * ---------------- + */ +typedef FormData_pg_lrg_nodes *Form_pg_lrg_nodes; + +DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_nodes_oid_index, 8340, LrgNodesRelationIndexId, on pg_lrg_nodes using btree(oid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_lrg_node_id_index, 8346, LrgNodeIdIndexId, on pg_lrg_nodes using btree(nodeid text_ops)); +DECLARE_UNIQUE_INDEX(pg_lrg_nodes_name_index, 8347, LrgNodeNameIndexId, on pg_lrg_nodes using btree(nodename name_ops)); + +#endif /* PG_LRG_NODES_H */ diff --git a/src/include/catalog/pg_lrg_pub.h b/src/include/catalog/pg_lrg_pub.h new file mode 100644 index 0000000000..d65dc51d4d --- /dev/null +++ b/src/include/catalog/pg_lrg_pub.h @@ -0,0 +1,46 @@ +/*------------------------------------------------------------------------- + * + * pg_lrg_info.h + * definition of the "logical replication group publication" system + * catalog (pg_lrg_pub) + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_lrg_pub.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_LRG_PUB_H +#define PG_LRG_PUB_H + +#include "catalog/genbki.h" +#include "catalog/pg_lrg_pub_d.h" + +/* ---------------- + * pg_lrg_pub definition. cpp turns this into + * typedef struct FormData_pg_lrg_pub + * ---------------- + */ +CATALOG(pg_lrg_pub,8341,LrgPublicationId) +{ + Oid oid; + Oid groupid BKI_LOOKUP(pg_lrg_info); + Oid pubid BKI_LOOKUP(pg_publication); +} FormData_pg_lrg_pub; + +/* ---------------- + * Form_pg_lrg_pub corresponds to a pointer to a tuple with + * the format of pg_lrg_pub relation. + * ---------------- + */ +typedef FormData_pg_lrg_pub *Form_pg_lrg_pub; + +DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_pub_oid_index, 8344, LrgPublicationOidIndexId, on pg_lrg_pub using btree(oid oid_ops)); + +#endif /* PG_LRG_PUB_H */ diff --git a/src/include/catalog/pg_lrg_sub.h b/src/include/catalog/pg_lrg_sub.h new file mode 100644 index 0000000000..398c8e8971 --- /dev/null +++ b/src/include/catalog/pg_lrg_sub.h @@ -0,0 +1,46 @@ +/*------------------------------------------------------------------------- + * + * pg_lrg_sub.h + * definition of the "logical replication group subscription" system + * catalog (pg_lrg_sub) + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_lrg_sub.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_LRG_SUB_H +#define PG_LRG_SUB_H + +#include "catalog/genbki.h" +#include "catalog/pg_lrg_sub_d.h" + +/* ---------------- + * pg_lrg_sub definition. cpp turns this into + * typedef struct FormData_pg_lrg_sub + * ---------------- + */ +CATALOG(pg_lrg_sub,8343,LrgSubscriptionId) +{ + Oid oid; + Oid groupid BKI_LOOKUP(pg_lrg_info);; + Oid subid BKI_LOOKUP(pg_subscription); +} FormData_pg_lrg_sub; + +/* ---------------- + * Form_pg_lrg_sub corresponds to a pointer to a tuple with + * the format of pg_lrg_sub relation. + * ---------------- + */ +typedef FormData_pg_lrg_sub *Form_pg_lrg_sub; + +DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_sub_oid_index, 8345, LrgSubscriptionOidIndexId, on pg_lrg_sub using btree(oid oid_ops)); + +#endif /* PG_LRG_SUB_H */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 87aa571a33..1f6065d29c 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11885,4 +11885,34 @@ prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary', prosrc => 'brin_minmax_multi_summary_send' }, +# lrg +{ oid => '8143', descr => 'create logical replication group', + proname => 'lrg_create', proparallel => 'r', + prorettype => 'void', proargtypes => 'text text text text', + prosrc => 'lrg_create' }, +{ oid => '8144', descr => 'attach to logical replication group', + proname => 'lrg_node_attach', proparallel => 'r', + prorettype => 'void', proargtypes => 'text text text text', + prosrc => 'lrg_node_attach' }, +{ oid => '8145', descr => 'detach from logical replication group', + proname => 'lrg_node_detach', proparallel => 'r', + prorettype => 'void', proargtypes => 'text text bool', + prosrc => 'lrg_node_detach' }, +{ oid => '8146', descr => 'delete logical replication group', + proname => 'lrg_drop', proparallel => 'r', + prorettype => 'void', proargtypes => 'text', + prosrc => 'lrg_drop' }, +{ oid => '8147', descr => 'insert a tuple to pg_lrg_sub', + proname => 'lrg_insert_into_sub', proparallel => 'r', + prorettype => 'void', proargtypes => 'text', + prosrc => 'lrg_insert_into_sub' }, +{ oid => '8148', descr => 'insert a tuple to pg_lrg_nodes', + proname => 'lrg_insert_into_nodes', proparallel => 'r', + prorettype => 'void', proargtypes => 'text int4 text text text', + prosrc => 'lrg_insert_into_nodes' }, +{ oid => '8149', descr => 'wait until node operations are done.', + proname => 'lrg_wait', proparallel => 'r', + prorettype => 'void', proargtypes => '', + prosrc => 'lrg_wait' }, + ] diff --git a/src/include/replication/libpqlrg.h b/src/include/replication/libpqlrg.h new file mode 100644 index 0000000000..120e709364 --- /dev/null +++ b/src/include/replication/libpqlrg.h @@ -0,0 +1,99 @@ +/*------------------------------------------------------------------------- + * + * libpqlrg.h + * Constructs a logical replication group + * + *------------------------------------------------------------------------- + */ +#ifndef LIBPQLIG_H +#define LIBPQLIG_H + +#include "postgres.h" +#include "libpq-fe.h" +#include "replication/lrg.h" + +/* function pointers for libpqlrg */ + +typedef bool (*libpqlrg_connect_fn) (const char *connstring, PGconn **conn, bool should_throw_error); +typedef bool (*libpqlrg_check_group_fn) (PGconn *conn, const char *group_name); +typedef void (*libpqlrg_copy_lrg_nodes_fn) (PGconn *remoteconn, PGconn *localconn); +typedef void (*libpqlrg_insert_into_lrg_nodes_fn) (PGconn *remoteconn, + const char *node_id, LRG_NODE_STATE status, + const char *node_name, const char *local_connstring, + const char *upstream_connstring); +typedef void (*libpqlrg_create_subscription_fn) (const char *group_name, const char *publisher_connstring, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn, const char *options); + +typedef void (*libpqlrg_drop_publication_fn) (const char *group_name, + PGconn *publisherconn); + +typedef void (*libpqlrg_drop_subscription_fn) (const char *group_name, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn, bool need_to_alter); + +typedef void (*libpqlrg_delete_from_nodes_fn) (PGconn *conn, const char *node_id); +typedef void (*libpqlrg_cleanup_fn) (PGconn *conn); + +typedef void (*libpqlrg_disconnect_fn) (PGconn *conn); + +typedef struct lrg_function_types +{ + libpqlrg_connect_fn libpqlrg_connect; + libpqlrg_check_group_fn libpqlrg_check_group; + libpqlrg_copy_lrg_nodes_fn libpqlrg_copy_lrg_nodes; + libpqlrg_insert_into_lrg_nodes_fn libpqlrg_insert_into_lrg_nodes; + libpqlrg_create_subscription_fn libpqlrg_create_subscription; + libpqlrg_drop_publication_fn libpqlrg_drop_publication; + libpqlrg_drop_subscription_fn libpqlrg_drop_subscription; + libpqlrg_delete_from_nodes_fn libpqlrg_delete_from_nodes; + libpqlrg_cleanup_fn libpqlrg_cleanup; + libpqlrg_disconnect_fn libpqlrg_disconnect; +} lrg_function_types; + +extern PGDLLIMPORT lrg_function_types *LrgFunctionTypes; + +#define lrg_connect(connstring, conn, should_throw_error) \ + LrgFunctionTypes->libpqlrg_connect(connstring, conn, should_throw_error) +#define lrg_check_group(conn, group_name) \ + LrgFunctionTypes->libpqlrg_check_group(conn, group_name) +#define lrg_copy_lrg_nodes(remoteconn, localconn) \ + LrgFunctionTypes->libpqlrg_copy_lrg_nodes(remoteconn, localconn) + +#define lrg_insert_into_lrg_nodes(remoteconn, \ + node_id, status, \ + node_name, local_connstring, \ + upstream_connstring) \ + LrgFunctionTypes->libpqlrg_insert_into_lrg_nodes(remoteconn, \ + node_id, status, \ + node_name, local_connstring, \ + upstream_connstring) +#define lrg_create_subscription(group_name, publisher_connstring, \ + publisher_node_id, subscriber_node_id, \ + subscriberconn, options) \ + LrgFunctionTypes->libpqlrg_create_subscription(group_name, publisher_connstring, \ + publisher_node_id, subscriber_node_id, \ + subscriberconn, options) + +#define lrg_drop_publication(group_name, \ + publisherconn) \ + LrgFunctionTypes->libpqlrg_drop_publication(group_name, \ + publisherconn) + +#define lrg_drop_subscription(group_name, \ + publisher_node_id, subscriber_node_id, \ + subscriberconn, need_to_alter) \ + LrgFunctionTypes->libpqlrg_drop_subscription(group_name, \ + publisher_node_id, subscriber_node_id, \ + subscriberconn, need_to_alter) + +#define lrg_delete_from_nodes(conn, node_id) \ + LrgFunctionTypes->libpqlrg_delete_from_nodes(conn, node_id) + +#define lrg_cleanup(conn) \ + LrgFunctionTypes->libpqlrg_cleanup(conn) + +#define lrg_disconnect(conn) \ + LrgFunctionTypes->libpqlrg_disconnect(conn) + +#endif /* LIBPQLIG_H */ \ No newline at end of file diff --git a/src/include/replication/lrg.h b/src/include/replication/lrg.h new file mode 100644 index 0000000000..6f44948c95 --- /dev/null +++ b/src/include/replication/lrg.h @@ -0,0 +1,71 @@ +/*------------------------------------------------------------------------- + * + * lrg.h + * Constructs a logical replication group + * + *------------------------------------------------------------------------- + */ +#ifndef LRG_H +#define LRG_H + +#include "postgres.h" + +#include "storage/latch.h" +#include "storage/lock.h" +#include "storage/lwlock.h" + +/* + * enumeration for represents its status + */ +typedef enum +{ + LRG_STATE_INIT = 0, + LRG_STATE_CREATE_PUBLICATION, + LRG_STATE_CREATE_SUBSCRIPTION, + LRG_STATE_READY, + LRG_STATE_TO_BE_DETACHED, + LRG_STATE_FORCE_DETACH, +} LRG_NODE_STATE; + +/* + * working space for each LRG worker. + */ +typedef struct LrgWorker { + pid_t worker_pid; + Oid dbid; + Latch *worker_latch; +} LrgWorker; + +/* + * controller for lrg worker. + * This will be hold by launcher. + */ +typedef struct LrgWorkerCtxStruct { + LWLock lock; + pid_t launcher_pid; + Latch *launcher_latch; + LrgWorker workers[FLEXIBLE_ARRAY_MEMBER]; +} LrgWorkerCtxStruct; + +extern LrgWorkerCtxStruct *LrgWorkerCtx; + +/* lrg.c */ +extern void LrgLauncherShmemInit(void); +extern void LrgLauncherRegister(void); +extern void lrg_add_nodes(const char *node_id, Oid group_id, LRG_NODE_STATE status, + const char *node_name, const char *local_connstring, + const char *upstream_connstring); +extern Oid get_group_info(char **group_name); +extern void construct_node_id(char *out_node_id, int size); +extern void update_node_status_by_nodename(const char *node_name, LRG_NODE_STATE state, bool is_in_txn); +extern void update_node_status_by_nodeid(const char *node_id, LRG_NODE_STATE state, bool is_in_txn); + +/* lrg_launcher.c */ +extern void lrg_launcher_main(Datum arg) pg_attribute_noreturn(); +extern void lrg_launcher_wakeup(void); + +/* lrg_worker.c */ +extern void lrg_worker_main(Datum arg) pg_attribute_noreturn(); +extern void lrg_worker_cleanup(LrgWorker *worker); + +#endif /* LRG_H */ diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index 4463ea66be..8305fca4bf 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -66,6 +66,8 @@ enum SysCacheIdentifier INDEXRELID, LANGNAME, LANGOID, + LRGNODEID, + LRGNODENAME, NAMESPACENAME, NAMESPACEOID, OPERNAMENSP, -- 2.27.0