From 2b34be53d5d91920fa8b3366c73a4c624d98391c Mon Sep 17 00:00:00 2001 From: "kuroda.hayato%40jp.fujitsu.com" Date: Tue, 17 May 2022 08:03:31 +0000 Subject: [PATCH 1/2] (PoC) implement LRG --- src/Makefile | 1 + src/backend/catalog/Makefile | 3 +- src/backend/postmaster/bgworker.c | 7 + src/backend/postmaster/postmaster.c | 3 + src/backend/replication/Makefile | 4 +- src/backend/replication/libpqlrg/Makefile | 38 ++ src/backend/replication/libpqlrg/libpqlrg.c | 220 ++++++++ src/backend/replication/lrg/Makefile | 22 + src/backend/replication/lrg/lrg.c | 417 ++++++++++++++ src/backend/replication/lrg/lrg_launcher.c | 323 +++++++++++ src/backend/replication/lrg/lrg_worker.c | 592 ++++++++++++++++++++ src/backend/storage/ipc/ipci.c | 2 + src/include/catalog/pg_lrg_info.h | 47 ++ src/include/catalog/pg_lrg_nodes.h | 53 ++ src/include/catalog/pg_lrg_pub.h | 46 ++ src/include/catalog/pg_lrg_sub.h | 46 ++ src/include/catalog/pg_proc.dat | 25 + src/include/replication/libpqlrg.h | 63 +++ src/include/replication/lrg.h | 67 +++ src/test/regress/expected/oidjoins.out | 6 + 20 files changed, 1983 insertions(+), 2 deletions(-) create mode 100644 src/backend/replication/libpqlrg/Makefile create mode 100644 src/backend/replication/libpqlrg/libpqlrg.c create mode 100644 src/backend/replication/lrg/Makefile create mode 100644 src/backend/replication/lrg/lrg.c create mode 100644 src/backend/replication/lrg/lrg_launcher.c create mode 100644 src/backend/replication/lrg/lrg_worker.c create mode 100644 src/include/catalog/pg_lrg_info.h create mode 100644 src/include/catalog/pg_lrg_nodes.h create mode 100644 src/include/catalog/pg_lrg_pub.h create mode 100644 src/include/catalog/pg_lrg_sub.h create mode 100644 src/include/replication/libpqlrg.h create mode 100644 src/include/replication/lrg.h diff --git a/src/Makefile b/src/Makefile index 79e274a476..75db706762 100644 --- a/src/Makefile +++ b/src/Makefile @@ -23,6 +23,7 @@ SUBDIRS = \ interfaces \ backend/replication/libpqwalreceiver \ backend/replication/pgoutput \ + backend/replication/libpqlrg \ fe_utils \ bin \ pl \ diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index 89a0221ec9..744fdf4fb8 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -72,7 +72,8 @@ CATALOG_HEADERS := \ pg_collation.h pg_parameter_acl.h pg_partitioned_table.h \ pg_range.h pg_transform.h \ pg_sequence.h pg_publication.h pg_publication_namespace.h \ - pg_publication_rel.h pg_subscription.h pg_subscription_rel.h + pg_publication_rel.h pg_subscription.h pg_subscription_rel.h \ + pg_lrg_info.h pg_lrg_nodes.h pg_lrg_pub.h pg_lrg_sub.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 40601aefd9..49d8ff1878 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -20,6 +20,7 @@ #include "postmaster/bgworker_internals.h" #include "postmaster/interrupt.h" #include "postmaster/postmaster.h" +#include "replication/lrg.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "storage/dsm.h" @@ -128,6 +129,12 @@ static const struct }, { "ApplyWorkerMain", ApplyWorkerMain + }, + { + "lrg_launcher_main", lrg_launcher_main + }, + { + "lrg_worker_main", lrg_worker_main } }; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 3b73e26956..b900008cdd 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -118,6 +118,7 @@ #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "replication/logicallauncher.h" +#include "replication/lrg.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -1020,6 +1021,8 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); + LrgLauncherRegister(); + /* * process any libraries that should be preloaded at postmaster start */ diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 3d8fb70c0e..49ffc243f6 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -35,7 +35,9 @@ OBJS = \ walreceiverfuncs.o \ walsender.o -SUBDIRS = logical +SUBDIRS = \ + logical \ + lrg include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/libpqlrg/Makefile b/src/backend/replication/libpqlrg/Makefile new file mode 100644 index 0000000000..72d911a918 --- /dev/null +++ b/src/backend/replication/libpqlrg/Makefile @@ -0,0 +1,38 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/lrg/libpqlrg +# +# IDENTIFICATION +# src/backend/replication/lrg/libpqlrg/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/replication/lrg/libpqlrg +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) + +OBJS = \ + $(WIN32RES) \ + libpqlrg.o + +SHLIB_LINK_INTERNAL = $(libpq) +SHLIB_LINK = $(filter -lintl, $(LIBS)) +SHLIB_PREREQS = submake-libpq +PGFILEDESC = "libpqlrg" +NAME = libpqlrg + +all: all-shared-lib + +include $(top_srcdir)/src/Makefile.shlib + +install: all installdirs install-lib + +installdirs: installdirs-lib + +uninstall: uninstall-lib + +clean distclean maintainer-clean: clean-lib + rm -f $(OBJS) diff --git a/src/backend/replication/libpqlrg/libpqlrg.c b/src/backend/replication/libpqlrg/libpqlrg.c new file mode 100644 index 0000000000..4bd8375be7 --- /dev/null +++ b/src/backend/replication/libpqlrg/libpqlrg.c @@ -0,0 +1,220 @@ +/*------------------------------------------------------------------------- + * + * libpqlrg.c + * functions for lrg worker + * + *------------------------------------------------------------------------- + */ + + +#include "postgres.h" + +#include "access/heapam.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "lib/stringinfo.h" +#include "replication/libpqlrg.h" +#include "replication/lrg.h" +#include "utils/snapmgr.h" + +PG_MODULE_MAGIC; + +void _PG_init(void); + +static void libpqlrg_connect(const char *connstring, PGconn **conn); +static bool libpqlrg_check_group(PGconn *conn, const char *group_name); +static void libpqlrg_copy_lrg_nodes(PGconn *remoteconn, PGconn *localconn); +static void libpqlrg_insert_into_lrg_nodes(PGconn *remoteconn, + const char *node_id, LRG_NODE_STATE status, + const char *node_name, const char *local_connstring, + const char *upstream_connstring); + +static void libpqlrg_create_subscription(const char *group_name, const char *publisher_connstring, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn, const char *options); +static void libpqlrg_disconnect(PGconn *conn); + +static lrg_function_types PQLrgFunctionTypes = +{ + libpqlrg_connect, + libpqlrg_check_group, + libpqlrg_copy_lrg_nodes, + libpqlrg_insert_into_lrg_nodes, + libpqlrg_create_subscription, + libpqlrg_disconnect +}; + +/* + * Just a wrapper for PQconnectdb() and PQstatus(). + */ +static void +libpqlrg_connect(const char *connstring, PGconn **conn) +{ + elog(LOG, "given connstring: %s", connstring); + *conn = PQconnectdb(connstring); + if (PQstatus(*conn) != CONNECTION_OK) + elog(ERROR, "failed to connect"); +} + +static bool +libpqlrg_check_group(PGconn *conn, const char *group_name) +{ + PGresult *result; + StringInfoData query; + bool ret; + + Assert(PQstatus(conn) == CONNECTION_OK); + initStringInfo(&query); + appendStringInfo(&query, "SELECT COUNT(*) FROM pg_lrg_info WHERE groupname = '%s'", group_name); + + result = PQexec(conn, query.data); + + ret = atoi(PQgetvalue(result, 0, 0)); + pfree(query.data); + + return ret != 0; +} + +/* + * Copy pg_lrg_nodes from remoteconn + */ +static void +libpqlrg_copy_lrg_nodes(PGconn *remoteconn, PGconn *localconn) +{ + PGresult *result; + StringInfoData query; + int i, num_tuples; + + Assert(PQstatus(remoteconn) == CONNECTION_OK + && PQstatus(localconn) == CONNECTION_OK); + initStringInfo(&query); + + + /* + * Note that COPY command cannot be used here because group_oid + * might be different between remote and local. + */ + appendStringInfo(&query, "SELECT nodeid, status, nodename, " + "localconn, upstreamconn FROM pg_lrg_nodes"); + result = PQexec(remoteconn, query.data); + if (PQresultStatus(result) != PGRES_TUPLES_OK) + elog(ERROR, "failed to read pg_lrg_nodes"); + + resetStringInfo(&query); + + num_tuples = PQntuples(result); + + for(i = 0; i < num_tuples; i++) + { + char *node_id; + char *status; + char *nodename; + char *localconn; + char *upstreamconn; + + node_id = PQgetvalue(result, i, 0); + status = PQgetvalue(result, i, 1); + nodename = PQgetvalue(result, i, 2); + localconn = PQgetvalue(result, i, 3); + upstreamconn = PQgetvalue(result, i, 4); + + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + /* + * group_oid is adjusted to local value + */ + lrg_add_nodes(node_id, get_group_oid(), atoi(status), nodename, localconn, upstreamconn); + CommitTransactionCommand(); + } +} + +static void +libpqlrg_insert_into_lrg_nodes(PGconn *remoteconn, + const char *node_id, LRG_NODE_STATE status, + const char *node_name, const char *local_connstring, + const char *upstream_connstring) +{ + StringInfoData query; + PGresult *result; + + Assert(PQstatus(remoteconn) == CONNECTION_OK + && node_id != NULL + && node_name != NULL + && local_connstring != NULL + && upstream_connstring != NULL); + + initStringInfo(&query); + appendStringInfo(&query, "SELECT lrg_insert_into_nodes('%s', %d, '%s', '%s', '%s')", + node_id, status, node_name, local_connstring, upstream_connstring); + + result = PQexec(remoteconn, query.data); + if (PQresultStatus(result) != PGRES_TUPLES_OK) + elog(ERROR, "failed to execute libpqlrg_insert_to_remote_lrg_nodes: %s", query.data); + PQclear(result); + + pfree(query.data); +} + + +static void +libpqlrg_create_subscription(const char *group_name, const char *publisher_connstring, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn, const char *options) +{ + StringInfoData query, sub_name; + PGresult *result; + + Assert(publisher_connstring != NULL && subscriberconn != NULL); + + /* + * the name of subscriber is just concat of two node_id. + */ + initStringInfo(&query); + initStringInfo(&sub_name); + + /* + * construct the name of subscription and query. + */ + appendStringInfo(&sub_name, "sub_%s_%s", subscriber_node_id, publisher_node_id); + appendStringInfo(&query, "CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION pub_for_%s", + sub_name.data, publisher_connstring, group_name); + + if (options) + appendStringInfo(&query, " WITH (%s)", options); + + result = PQexec(subscriberconn, query.data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + elog(ERROR, "failed to create subscription: %s", query.data); + PQclear(result); + + resetStringInfo(&query); + appendStringInfo(&query, "SELECT lrg_insert_into_sub('%s')", sub_name.data); + result = PQexec(subscriberconn, query.data); + if (PQresultStatus(result) != PGRES_TUPLES_OK) + elog(ERROR, "failed to execute lrg_insert_into_sub: %s", query.data); + PQclear(result); + + pfree(sub_name.data); + pfree(query.data); +} + + +/* + * Just a wrapper for PQfinish() + */ +static void +libpqlrg_disconnect(PGconn *conn) +{ + PQfinish(conn); +} + +/* + * Module initialization function + */ +void +_PG_init(void) +{ + if (LrgFunctionTypes != NULL) + elog(ERROR, "libpqlrg already loaded"); + LrgFunctionTypes = &PQLrgFunctionTypes; +} diff --git a/src/backend/replication/lrg/Makefile b/src/backend/replication/lrg/Makefile new file mode 100644 index 0000000000..4ce929b6a4 --- /dev/null +++ b/src/backend/replication/lrg/Makefile @@ -0,0 +1,22 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/lrg +# +# IDENTIFICATION +# src/backend/replication/lrg/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/replication/lrg +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) + +OBJS = \ + lrg.o \ + lrg_launcher.o \ + lrg_worker.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/lrg/lrg.c b/src/backend/replication/lrg/lrg.c new file mode 100644 index 0000000000..1580b9283f --- /dev/null +++ b/src/backend/replication/lrg/lrg.c @@ -0,0 +1,417 @@ +/*------------------------------------------------------------------------- + * + * lrg.c + * Constructs a logical replication group + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/relscan.h" +#include "access/table.h" +#include "access/xlog.h" +#include "catalog/catalog.h" +#include "catalog/indexing.h" +#include "catalog/pg_lrg_info.h" +#include "catalog/pg_lrg_nodes.h" +#include "catalog/pg_lrg_sub.h" +#include "catalog/pg_subscription.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "replication/libpqlrg.h" +#include "replication/logicallauncher.h" +#include "replication/lrg.h" +#include "storage/lock.h" +#include "utils/builtins.h" +#include "utils/fmgrprotos.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" + +LrgPerdbCtxStruct *LrgPerdbCtx; + +static Size lrg_worker_array_size(void); +static Oid lrg_add_info(char *group_name, bool puballtables); +static Oid find_subscription(const char *subname); + +/* + * Helpler function for LrgLauncherShmemInit. + */ +static Size +lrg_worker_array_size(void) +{ + Size size; + + size = sizeof(LrgPerdbCtxStruct); + size = MAXALIGN(size); + /* XXX: for simplify the size of the array is set to max_worker_processes */ + size = add_size(size, mul_size(max_worker_processes, sizeof(LrgPerdbCtxStruct))); + + return size; +} + +/* + * Allocate LrgPerdbCtxStruct to the shared memory. + */ +void +LrgLauncherShmemInit(void) +{ + bool found; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + LrgPerdbCtx = (LrgPerdbCtxStruct *) + ShmemInitStruct("Lrg Launcher Data", + lrg_worker_array_size(), + &found); + if (!found) + { + MemSet(LrgPerdbCtx, 0, lrg_worker_array_size()); + LWLockInitialize(&(LrgPerdbCtx->lock), LWLockNewTrancheId()); + } + LWLockRelease(AddinShmemInitLock); + LWLockRegisterTranche(LrgPerdbCtx->lock.tranche, "lrg"); +} + +void +LrgLauncherRegister(void) +{ + BackgroundWorker worker; + + if (max_logical_replication_workers == 0) + return; + + /* + * Build struct BackgroundWorker for launcher. + */ + MemSet(&worker, 0, sizeof(BackgroundWorker)); + + snprintf(worker.bgw_name, BGW_MAXLEN, "lrg launcher"); + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_restart_time = BGW_NEVER_RESTART; + snprintf(worker.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(worker.bgw_function_name, BGW_MAXLEN, "lrg_launcher_main"); + RegisterBackgroundWorker(&worker); +} + +/* + * construct node_id. + * + * TODO: construct proper node_id. Currently it is just concat of + * sytem identifier and dbid. + */ +void +construct_node_id(char *out_node_id, int size) +{ + snprintf(out_node_id, size, UINT64_FORMAT "%u", GetSystemIdentifier(), MyDatabaseId); +} + +/* + * Actual work for adding a tuple to pg_lrg_nodes. + */ +void +lrg_add_nodes(char *node_id, Oid group_id, LRG_NODE_STATE status, char *node_name, char *local_connstring, char *upstream_connstring) +{ + Relation rel; + bool nulls[Natts_pg_lrg_nodes]; + Datum values[Natts_pg_lrg_nodes]; + HeapTuple tup; + + Oid lrgnodesoid; + + rel = table_open(LrgNodesRelationId, ExclusiveLock); + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + lrgnodesoid = GetNewOidWithIndex(rel, LrgNodesRelationIndexId, Anum_pg_lrg_nodes_oid); + values[Anum_pg_lrg_nodes_oid - 1] = ObjectIdGetDatum(lrgnodesoid); + values[Anum_pg_lrg_nodes_nodeid - 1] = CStringGetDatum(node_id); + values[Anum_pg_lrg_nodes_groupid - 1] = ObjectIdGetDatum(group_id); + values[Anum_pg_lrg_nodes_status - 1] = Int32GetDatum(status); + values[Anum_pg_lrg_nodes_dbid - 1] = ObjectIdGetDatum(MyDatabaseId); + values[Anum_pg_lrg_nodes_nodename - 1] = CStringGetDatum(node_name); + values[Anum_pg_lrg_nodes_localconn - 1] = CStringGetDatum(local_connstring); + + if (upstream_connstring != NULL) + values[Anum_pg_lrg_nodes_upstreamconn - 1] = CStringGetDatum(upstream_connstring); + else + nulls[Anum_pg_lrg_nodes_upstreamconn - 1] = true; + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + /* Insert tuple into catalog. */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + table_close(rel, ExclusiveLock); +} + +/* + * read pg_lrg_info and get oid. + * + * XXX: This function assumes that there is only one tuple + * in thepg_lrg_info. + */ +Oid +get_group_oid(void) +{ + Relation rel; + HeapTuple tup; + TableScanDesc scan; + Oid group_oid = InvalidOid; + Form_pg_lrg_info infoform; + bool is_opened = false; + + if (!IsTransactionState()) + { + is_opened = true; + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + } + + rel = table_open(LrgNodesRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + tup = heap_getnext(scan, ForwardScanDirection); + + if (tup != NULL) + { + infoform = (Form_pg_lrg_info) GETSTRUCT(tup); + group_oid = infoform->oid; + } + + table_endscan(scan); + table_close(rel, AccessShareLock); + + if (is_opened) + CommitTransactionCommand(); + + return group_oid; +} + +/* + * Actual work for adding a tuple to pg_lrg_info. + */ +static Oid +lrg_add_info(char *group_name, bool puballtables) +{ + Relation rel; + bool nulls[Natts_pg_lrg_info]; + Datum values[Natts_pg_lrg_info]; + HeapTuple tup; + Oid lrgoid; + + rel = table_open(LrgInfoRelationId, ExclusiveLock); + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + lrgoid = GetNewOidWithIndex(rel, LrgInfoRelationIndexId, Anum_pg_lrg_info_oid); + values[Anum_pg_lrg_info_oid - 1] = ObjectIdGetDatum(lrgoid); + + values[Anum_pg_lrg_info_groupname - 1] = CStringGetDatum(group_name); + + values[Anum_pg_lrg_info_puballtables - 1] = BoolGetDatum(puballtables); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + /* Insert tuple into catalog. */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + table_close(rel, ExclusiveLock); + + return lrgoid; +} + +/* + * helper function for lrg_insert_into_sub + */ +static Oid +find_subscription(const char *subname) +{ + /* for scannning */ + Relation rel; + HeapTuple tup; + Form_pg_subscription form; + + rel = table_open(SubscriptionRelationId, AccessExclusiveLock); + tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId, + CStringGetDatum(subname)); + + if (!HeapTupleIsValid(tup)) + { + table_close(rel, NoLock); + return InvalidOid; + } + + form = (Form_pg_subscription) GETSTRUCT(tup); + table_close(rel, NoLock); + + return form->oid; +} + +/* + * ================================ + * Public APIs + * ================================ + */ + +/* + * SQL function for creating a new logical replication group. + * + * This function adds a tuple to pg_lrg_info and pg_lrg_nodes, + * and after that kick lrg launcher. + */ +Datum +lrg_create(PG_FUNCTION_ARGS) +{ + Oid lrgoid; + char *group_name; + char *pub_type; + char *local_connstring; + char *node_name; + + /* XXX: for simplify the fixed array is used */ + char node_id[64]; + + group_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + pub_type = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(1))); + + if (pg_strcasecmp(pub_type, "FOR ALL TABLES") != 0) + elog(ERROR, "'only 'FOR ALL TABLES' is support"); + + lrgoid = lrg_add_info(group_name, true); + + construct_node_id(node_id, sizeof(node_id)); + local_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(2))); + node_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(3))); + lrg_add_nodes(node_id, lrgoid, LRG_STATE_INIT, node_name, local_connstring, NULL); + + lrg_launcher_wakeup(); + PG_RETURN_NULL(); +} + + +/* + * SQL function for attaching to a specified group + * + * This function adds a tuple to pg_lrg_info and pg_lrg_nodes, + * and after that kick lrg launcher. + */ +Datum +lrg_node_attach(PG_FUNCTION_ARGS) +{ + Oid lrgoid; + char *group_name; + char *local_connstring; + char *upstream_connstring; + char *node_name; + PGconn *upstreamconn = NULL; + + /* XXX: for simplify the fixed array is used */ + char node_id[64]; + + group_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + local_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(1))); + upstream_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(2))); + node_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(3))); + + /* + * For sanity check the backend process must connect to the upstream node. + * libpqlrg shared library will be used for that. + */ + load_file("libpqlrg", false); + lrg_connect(upstream_connstring, &upstreamconn); + if (!lrg_check_group(upstreamconn, group_name)) + elog(ERROR, "specified group is not exist"); + lrg_disconnect(upstreamconn); + + lrgoid = lrg_add_info(group_name, true); + construct_node_id(node_id, sizeof(node_id)); + lrg_add_nodes(node_id, lrgoid, LRG_STATE_INIT, node_name, local_connstring, upstream_connstring); + + lrg_launcher_wakeup(); + PG_RETURN_NULL(); +} + +/* + * SQL function for detaching from a group + */ +Datum +lrg_node_detach(PG_FUNCTION_ARGS) +{ + PG_RETURN_NULL(); +} + +/* + * SQL function for dropping a group + */ +Datum +lrg_drop(PG_FUNCTION_ARGS) +{ + PG_RETURN_NULL(); +} + +/* + * This funciton is used internally: wrapper for adding a tuple into pg_lrg_sub + */ +Datum +lrg_insert_into_sub(PG_FUNCTION_ARGS) +{ + char *sub_name; + Oid group_oid, sub_oid, lrgsub_oid; + Relation rel; + bool nulls[Natts_pg_lrg_sub]; + Datum values[Natts_pg_lrg_sub]; + HeapTuple tup; + + sub_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + + group_oid = get_group_oid(); + sub_oid = find_subscription(sub_name); + + rel = table_open(LrgSubscriptionId, ExclusiveLock); + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + lrgsub_oid = GetNewOidWithIndex(rel, LrgSubscriptionOidIndexId, Anum_pg_lrg_sub_oid); + + values[Anum_pg_lrg_sub_oid - 1] = ObjectIdGetDatum(lrgsub_oid); + values[Anum_pg_lrg_sub_groupid - 1] = ObjectIdGetDatum(group_oid); + values[Anum_pg_lrg_sub_subid - 1] = ObjectIdGetDatum(sub_oid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + /* Insert tuple into catalog. */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + table_close(rel, ExclusiveLock); + + PG_RETURN_NULL(); +} + +/* + * This funciton is used internally: wrapper for adding a tuple into pg_lrg_nodes + */ +Datum +lrg_insert_into_nodes(PG_FUNCTION_ARGS) +{ + char *node_id; + LRG_NODE_STATE status; + char *node_name; + char *local_connstring; + char *upstream_connstring; + Oid group_oid; + + node_id = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + status = DatumGetInt32(PG_GETARG_DATUM(1)); + node_name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(2))); + local_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(3))); + upstream_connstring = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(4))); + + group_oid = get_group_oid(); + + lrg_add_nodes(node_id, group_oid, status, node_name, local_connstring, upstream_connstring); + + PG_RETURN_NULL(); +} diff --git a/src/backend/replication/lrg/lrg_launcher.c b/src/backend/replication/lrg/lrg_launcher.c new file mode 100644 index 0000000000..d0cbe36515 --- /dev/null +++ b/src/backend/replication/lrg/lrg_launcher.c @@ -0,0 +1,323 @@ +/*------------------------------------------------------------------------- + * + * lrg_launcher.c + * functions for lrg launcher + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/heapam.h" +#include "access/relscan.h" +#include "access/table.h" +#include "catalog/pg_database.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/logicallauncher.h" +#include "replication/lrg.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "tcop/tcopprot.h" +#include "utils/memutils.h" +#include "utils/snapmgr.h" + +static void launch_lrg_worker(Oid dbid); +static LrgPerdbWorker* find_perdb_worker(Oid dbid); +static List* get_db_list(void); +static void scan_and_launch(void); +static void lrglauncher_worker_onexit(int code, Datum arg); + +static bool ishook_registered = false; +static bool isworker_needed = false; + +typedef struct db_list_cell +{ + Oid dbid; + char *dbname; +} db_list_cell; + +/* + * Launch a per-db worker that related with the given database + */ +static void +launch_lrg_worker(Oid dbid) +{ + BackgroundWorker bgw; + LrgPerdbWorker *worker = NULL; + int slot = 0; + + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + + /* + * Find a free worker slot. + */ + for (int i = 0; i < max_logical_replication_workers; i++) + { + LrgPerdbWorker *pw = &LrgPerdbCtx->workers[i]; + + if (pw->dbid == InvalidOid) + { + worker = pw; + slot = i; + break; + } + } + + /* + * If there are no more free worker slots, raise an ERROR now. + * + * TODO: cleanup the array? + */ + if (worker == NULL) + { + LWLockRelease(&LrgPerdbCtx->lock); + ereport(ERROR, + errmsg("out of worker slots")); + } + + + /* Prepare the worker slot. */ + worker->dbid = dbid; + + LWLockRelease(&LrgPerdbCtx->lock); + + MemSet(&bgw, 0, sizeof(BackgroundWorker)); + + snprintf(bgw.bgw_name, BGW_MAXLEN, "lrg worker for database %u", dbid); + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_restart_time = BGW_NEVER_RESTART; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "lrg_worker_main"); + bgw.bgw_main_arg = UInt32GetDatum(slot); + + if (!RegisterDynamicBackgroundWorker(&bgw, NULL)) + { + /* Failed to start worker, so clean up the worker slot. */ + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + lrg_worker_cleanup(worker); + LWLockRelease(&LrgPerdbCtx->lock); + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of worker slots"))); + } +} + +/* + * Find a per-db worker that related with the given database + */ +static LrgPerdbWorker* +find_perdb_worker(Oid dbid) +{ + int i; + + Assert(LWLockHeldByMe(&LrgPerdbCtx->lock)); + + for (i = 0; i < max_logical_replication_workers; i++) + { + LrgPerdbWorker *worker = &LrgPerdbCtx->workers[i]; + if (worker->dbid == dbid) + return worker; + } + return NULL; +} + +/* + * Load the list of databases. + */ +static List* +get_db_list() +{ + List *res = NIL; + Relation rel; + TableScanDesc scan; + HeapTuple tup; + /* We will allocate the output data in the current memory context */ + MemoryContext resultcxt = CurrentMemoryContext; + + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + + rel = table_open(DatabaseRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tup); + db_list_cell *cell; + MemoryContext oldcxt; + + /* skip if connection is not allowed */ + if (!dbform->datallowconn) + continue; + + /* + * Allocate our results in the caller's context, not the transaction's. + */ + oldcxt = MemoryContextSwitchTo(resultcxt); + + cell = (db_list_cell *) palloc0(sizeof(db_list_cell)); + cell->dbid = dbform->oid; + cell->dbname = pstrdup(NameStr(dbform->datname)); + res = lappend(res, cell); + + MemoryContextSwitchTo(oldcxt); + } + + table_endscan(scan); + table_close(rel, AccessShareLock); + CommitTransactionCommand(); + + return res; +} + +/* + * Scan pg_lrg_nodes and launch or notify to per-db worker if needed. + */ +static void +scan_and_launch(void) +{ + List *list; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + + subctx = AllocSetContextCreate(TopMemoryContext, + "Lrg Launcher list", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + /* search for lrg nodes to start */ + list = get_db_list(); + + foreach(lc, list) + { + db_list_cell *cell = (db_list_cell *)lfirst(lc); + LrgPerdbWorker *worker; + + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + worker = find_perdb_worker(cell->dbid); + LWLockRelease(&LrgPerdbCtx->lock); + + if (worker != NULL) + continue; + + launch_lrg_worker(cell->dbid); + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); +} + + +/* + * Callback for process exit. cleanup the controller + */ +static void +lrglauncher_worker_onexit(int code, Datum arg) +{ + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + LrgPerdbCtx->launcher_pid = InvalidPid; + LrgPerdbCtx->launcher_latch = NULL; + LWLockRelease(&LrgPerdbCtx->lock); +} + +/* + * Entry point for lrg launcher + */ +void +lrg_launcher_main(Datum arg) +{ + Assert(LrgPerdbCtx->launcher_pid == 0); + LrgPerdbCtx->launcher_pid = MyProcPid; + + /* Establish signal handlers. */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Register my latch to the controller + * for receiving notifications from per-db background worker. + */ + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + LrgPerdbCtx->launcher_latch = &MyProc->procLatch; + LrgPerdbCtx->launcher_pid = MyProcPid; + LWLockRelease(&LrgPerdbCtx->lock); + before_shmem_exit(lrglauncher_worker_onexit, (Datum) 0); + ResetLatch(&MyProc->procLatch); + + /* + * we did not connect specific database, because this + * will read only pg_database + */ + BackgroundWorkerInitializeConnection(NULL, NULL, 0); + + /* + * main loop + */ + for (;;) + { + int rc = 0; + + CHECK_FOR_INTERRUPTS(); +#define TEMPORARY_NAP_TIME 180000L + + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + TEMPORARY_NAP_TIME, 0); + if (rc & WL_LATCH_SET) + { + ResetLatch(&MyProc->procLatch); + CHECK_FOR_INTERRUPTS(); + scan_and_launch(); + } + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + } + /* Not reachable */ +} + +/* + * Launches per-db worker if needed. + */ +static void +lrg_perdb_wakeup_callback(XactEvent event, void *arg) +{ + switch (event) + { + case XACT_EVENT_COMMIT: + if (isworker_needed) + { + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + SetLatch(LrgPerdbCtx->launcher_latch); + LWLockRelease(&LrgPerdbCtx->lock); + } + isworker_needed = false; + break; + default: + break; + } +} + +/* + * Register a callback for notifying to launcher. + */ +void +lrg_launcher_wakeup(void) +{ + if (!ishook_registered) + { + RegisterXactCallback(lrg_perdb_wakeup_callback, NULL); + ishook_registered = true; + } + isworker_needed = true; +} diff --git a/src/backend/replication/lrg/lrg_worker.c b/src/backend/replication/lrg/lrg_worker.c new file mode 100644 index 0000000000..785ab851c8 --- /dev/null +++ b/src/backend/replication/lrg/lrg_worker.c @@ -0,0 +1,592 @@ +/*------------------------------------------------------------------------- + * + * lrg_worker.c + * functions for lrg worker + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/relscan.h" +#include "access/table.h" +#include "catalog/catalog.h" +#include "catalog/indexing.h" +#include "catalog/pg_lrg_info.h" +#include "catalog/pg_lrg_nodes.h" +#include "catalog/pg_lrg_pub.h" +#include "catalog/pg_publication.h" +#include "executor/spi.h" +#include "libpq-fe.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/libpqlrg.h" +#include "replication/lrg.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "tcop/tcopprot.h" +#include "utils/fmgroids.h" +#include "utils/memutils.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" + +typedef struct LrgNode { + Oid group_oid; + char *node_id; + char *node_name; + char *local_connstring; + char *upstream_connstring; +} LrgNode; + +lrg_function_types *LrgFunctionTypes = NULL; + +static LrgPerdbWorker* my_lrg_worker = NULL; + +static void lrg_worker_onexit(int code, Datum arg); +static void do_node_management(void); + +static void get_node_information(LrgNode *node, LRG_NODE_STATE *status); +static void advance_state_machine(LrgNode *node, LRG_NODE_STATE initial_status); + +static void create_publication(const char* group_name, const char* node_id, Oid group_oid); +static Oid find_publication(const char *pubname); + +static List* get_lrg_nodes_list(const char *local_nodeid); + +static void synchronise_system_tables(PGconn *localconn, PGconn *upstreamconn, char *local_connstring); +static void get_group_name(char **group_name, Oid group_oid); +static void update_mynode(LRG_NODE_STATE state); + +void +lrg_worker_cleanup(LrgPerdbWorker *worker) +{ + Assert(LWLockHeldByMeInMode(&LrgPerdbCtx->lock, LW_EXCLUSIVE)); + + worker->dbid = InvalidOid; + worker->worker_pid = InvalidPid; + worker->worker_latch = NULL; +} + +/* + * Callback for process exit. cleanup the array. + */ +static void +lrg_worker_onexit(int code, Datum arg) +{ + LWLockAcquire(&LrgPerdbCtx->lock, LW_EXCLUSIVE); + lrg_worker_cleanup(my_lrg_worker); + LWLockRelease(&LrgPerdbCtx->lock); +} + +/* + * Synchronise system tables from upstream node. + * + * Currently it will read and insert pg_lrg_nodes only. + */ +static void +synchronise_system_tables(PGconn *localconn, PGconn *upstreamconn, char *local_connstring) +{ + lrg_copy_lrg_nodes(upstreamconn, localconn); +} + +/* + * Load the list of lrg_nodes. + */ +static List* +get_lrg_nodes_list(const char *local_nodeid) +{ + List *res = NIL; + Relation rel; + TableScanDesc scan; + HeapTuple tup; + /* We will allocate the output data in the current memory context */ + MemoryContext resultcxt = CurrentMemoryContext; + + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + + rel = table_open(LrgNodesRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + Form_pg_lrg_nodes nodesform = (Form_pg_lrg_nodes) GETSTRUCT(tup); + LrgNode *node; + MemoryContext oldcxt; + + if (strcmp(NameStr(nodesform->nodeid), local_nodeid) == 0) + continue; + /* + * Allocate our results in the caller's context, not the transaction's. + */ + oldcxt = MemoryContextSwitchTo(resultcxt); + + node = (LrgNode *)palloc0(sizeof(LrgNode)); + node->group_oid = nodesform->groupid; + node->node_id = NameStr(nodesform->nodeid); + node->node_name = NameStr(nodesform->nodename); + node->local_connstring = NameStr(nodesform->localconn); + node->upstream_connstring = NameStr(nodesform->upstreamconn); + res = lappend(res, node); + + MemoryContextSwitchTo(oldcxt); + } + + table_endscan(scan); + table_close(rel, AccessShareLock); + CommitTransactionCommand(); + + return res; +} + +/* + * get group_name from local pg_lrg_info. + * The second argument is used for the key. + * + * XXX: In this version this function may be not needed + * because one node can join only one group. + */ +static void +get_group_name(char **group_name, Oid group_oid) +{ + Relation rel; + HeapTuple tup; + SysScanDesc scandesc; + ScanKeyData entry[1]; + Form_pg_lrg_info infoform; + MemoryContext old; + + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + + Assert(group_name != NULL); + /* + * Read a related tuple from pg_lrg_info. + * TODO: use index scan instead? + */ + rel = table_open(LrgInfoRelationId, AccessShareLock); + ScanKeyInit(&entry[0], + Anum_pg_lrg_info_oid, + BTEqualStrategyNumber, F_OIDEQ, + DatumGetObjectId(group_oid)); + + scandesc = systable_beginscan(rel, LrgInfoRelationIndexId, true, + NULL, 1, entry); + tup = systable_getnext(scandesc); + + Assert(HeapTupleIsValid(tup)); + infoform = (Form_pg_lrg_info) GETSTRUCT(tup); + old = MemoryContextSwitchTo(TopMemoryContext); + *group_name = pstrdup(NameStr(infoform->groupname)); + + MemoryContextSwitchTo(old); + + systable_endscan(scandesc); + table_close(rel, AccessShareLock); + + CommitTransactionCommand(); +} + +/* + * Update the node information myself to specified state + */ +static void +update_mynode(LRG_NODE_STATE state) +{ + StringInfoData query; + int ret; + + initStringInfo(&query); + appendStringInfo(&query, "UPDATE pg_lrg_nodes SET status = "); + + switch (state) + { + case LRG_STATE_CREATE_PUBLICATION: + appendStringInfo(&query, "%d ", LRG_STATE_CREATE_PUBLICATION); + break; + case LRG_STATE_CREATE_SUBSCRIPTION: + appendStringInfo(&query, "%d", LRG_STATE_CREATE_SUBSCRIPTION); + break; + case LRG_STATE_READY: + appendStringInfo(&query, "%d", LRG_STATE_READY); + break; + default: + elog(ERROR, "not implemented yet"); + } + + appendStringInfo(&query, " WHERE dbid = %d", my_lrg_worker->dbid); + + StartTransactionCommand(); + SPI_connect(); + PushActiveSnapshot(GetTransactionSnapshot()); + + ret = SPI_execute(query.data, false, 0); + if (ret != SPI_OK_UPDATE) + elog(ERROR, "SPI error while updating a table"); + + PopActiveSnapshot(); + SPI_finish(); + CommitTransactionCommand(); + + pfree(query.data); +} + +static Oid +find_publication(const char *pubname) +{ + /* for scannning */ + Relation rel; + HeapTuple tup; + Form_pg_publication pubform; + + rel = table_open(PublicationRelationId, RowExclusiveLock); + + /* Check if name is used */ + tup = SearchSysCacheCopy1(PUBLICATIONNAME, + CStringGetDatum(pubname)); + + if (!HeapTupleIsValid(tup)) + { + table_close(rel, NoLock); + return InvalidOid; + } + + pubform = (Form_pg_publication) GETSTRUCT(tup); + table_close(rel, NoLock); + + return pubform->oid; +} + +/* + * Create publication via SPI interface. + */ +static void +create_publication(const char* group_name, const char* node_id, Oid group_oid) +{ + int ret; + StringInfoData query, pub_name; + Oid pub_oid; + Oid lrgpub_oid; + Relation rel; + bool nulls[Natts_pg_lrg_pub]; + Datum values[Natts_pg_lrg_pub]; + HeapTuple tup; + + initStringInfo(&query); + initStringInfo(&pub_name); + + StartTransactionCommand(); + SPI_connect(); + PushActiveSnapshot(GetTransactionSnapshot()); + + + appendStringInfo(&pub_name, "pub_for_%s", group_name); + appendStringInfo(&query, "CREATE PUBLICATION %s %s", pub_name.data, "FOR ALL TABLES"); + + ret = SPI_execute(query.data, false, 0); + if (ret != SPI_OK_UTILITY) + elog(ERROR, "SPI error while creating publication"); + + PopActiveSnapshot(); + SPI_finish(); + CommitTransactionCommand(); + + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + + pub_oid = find_publication(pub_name.data); + if (pub_oid == InvalidOid) + elog(ERROR, "publication is not found"); + + rel = table_open(LrgPublicationId, ExclusiveLock); + + memset(nulls, 0, sizeof(nulls)); + memset(values, 0, sizeof(values)); + + lrgpub_oid = GetNewOidWithIndex(rel, LrgPublicationOidIndexId, Anum_pg_lrg_pub_oid); + + values[Anum_pg_lrg_pub_oid - 1] = ObjectIdGetDatum(lrgpub_oid); + values[Anum_pg_lrg_pub_groupid - 1] = ObjectIdGetDatum(group_oid); + values[Anum_pg_lrg_pub_pubid - 1] = ObjectIdGetDatum(pub_oid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + /* Insert tuple into catalog. */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + table_close(rel, ExclusiveLock); + + CommitTransactionCommand(); + + pfree(pub_name.data); + pfree(query.data); +} + + +/* + * advance the state machine + */ +static void +advance_state_machine(LrgNode *local_node, LRG_NODE_STATE initial_status) +{ + PGconn *localconn = NULL; + PGconn *upstreamconn = NULL; + char *group_name = NULL; + + LRG_NODE_STATE state = initial_status; + + if (state == LRG_STATE_INIT) + { + /* Establish connection if we are in the attaching case */ + if (local_node->upstream_connstring != NULL) + { + load_file("libpqlrg", false); + lrg_connect(local_node->upstream_connstring, &upstreamconn); + lrg_connect(local_node->local_connstring, &localconn); + synchronise_system_tables(localconn, upstreamconn, local_node->local_connstring); + } + + get_group_name(&group_name, local_node->group_oid); + elog(LOG, "set_name: %s", group_name); + + create_publication(group_name, local_node->node_id, local_node->group_oid); + + state = LRG_STATE_CREATE_PUBLICATION; + update_mynode(LRG_STATE_CREATE_PUBLICATION); + } + + if (state == LRG_STATE_CREATE_PUBLICATION) + { + if (local_node->upstream_connstring != NULL) + { + List *list; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + + subctx = AllocSetContextCreate(TopMemoryContext, + "Lrg Launcher list", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + list = get_lrg_nodes_list(local_node->node_id); + + foreach(lc, list) + { + LrgNode *other_node = (LrgNode *)lfirst(lc); + PGconn *otherconn = NULL; + lrg_connect(other_node->local_connstring, &otherconn); + lrg_create_subscription(group_name, local_node->local_connstring, + local_node->node_id, other_node->node_id, + otherconn, "local_only = true, copy_data = false"); + lrg_create_subscription(group_name, other_node->local_connstring, + other_node->node_id, local_node->node_id, + localconn, "local_only = true, copy_data = false"); + + /* + * XXX: adding a tuple into remote's pg_lrg_nodes here, + * but it is bad. it should be end of this function. + */ + if (local_node->upstream_connstring != NULL) + lrg_insert_into_lrg_nodes(otherconn, local_node->node_id, + LRG_STATE_READY, local_node->node_name, + local_node->local_connstring, local_node->upstream_connstring); + lrg_disconnect(otherconn); + } + MemoryContextSwitchTo(oldctx); + MemoryContextDelete(subctx); + } + + state = LRG_STATE_CREATE_SUBSCRIPTION; + update_mynode(LRG_STATE_CREATE_SUBSCRIPTION); + } + + state = LRG_STATE_READY; + update_mynode(LRG_STATE_READY); + + /* + * clean up phase + */ + if (localconn != NULL) + lrg_disconnect(localconn); + if (upstreamconn != NULL) + lrg_disconnect(upstreamconn); + if (group_name != NULL) + pfree(group_name); +} + +/* + * Get node-specific information from pg_lrg_nodes. + */ +static void +get_node_information(LrgNode *node, LRG_NODE_STATE *status) +{ + Relation rel; + HeapTuple tup; + bool found = false; + char local_node_id[64]; + SysScanDesc scandesc; + ScanKeyData entry[1]; + + construct_node_id(local_node_id, sizeof(local_node_id)); + + /* + * Read a related tuple from pg_lrg_nodes. + * TODO: use index scan instead + */ + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + + rel = table_open(LrgNodesRelationId, AccessShareLock); + + ScanKeyInit(&entry[0], + Anum_pg_lrg_nodes_nodeid, + BTEqualStrategyNumber, F_NAMEEQ, + CStringGetDatum(local_node_id)); + + scandesc = systable_beginscan(rel, LrgNodeIdIndexId, true, + NULL, 1, entry); + + tup = systable_getnext(scandesc); + + /* We assume that there can be at most one matching tuple */ + if (HeapTupleIsValid(tup)) + { + MemoryContext old; + Form_pg_lrg_nodes nodesform = (Form_pg_lrg_nodes) GETSTRUCT(tup); + + old = MemoryContextSwitchTo(TopMemoryContext); + node->group_oid = nodesform->groupid; + node->node_id = pstrdup(NameStr(nodesform->nodeid)); + node->node_name = pstrdup(NameStr(nodesform->nodename)); + node->local_connstring = pstrdup(NameStr(nodesform->localconn)); + if (strlen(NameStr(nodesform->upstreamconn)) != 0) + node->upstream_connstring = pstrdup(NameStr(nodesform->upstreamconn)); + else + node->upstream_connstring = NULL; + *status = nodesform->status; + found = true; + MemoryContextSwitchTo(old); + } + + systable_endscan(scandesc); + + table_close(rel, AccessShareLock); + CommitTransactionCommand(); + + if (!found) + elog(ERROR, "no tuples found"); + +} + +static void +do_node_management(void) +{ + LrgNode node; + LRG_NODE_STATE status; + /* + * read information from pg_lrg_nodes + */ + get_node_information(&node, &status); + elog(DEBUG3, "initial status of %u: %d", my_lrg_worker->dbid, status); + + /* + * advance the state machine for creating or attaching. + * + * TODO: consider detaching case + */ + advance_state_machine(&node, status); + + pfree(node.node_id); + pfree(node.node_name); + pfree(node.local_connstring); + if (node.upstream_connstring != NULL) + pfree(node.upstream_connstring); +} + +/* + * Entry point for lrg worker + */ +void +lrg_worker_main(Datum arg) +{ + int slot = DatumGetInt32(arg); + + /* Establish signal handlers. */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Get information from the controller. The idex + * is given as the argument + */ + LWLockAcquire(&LrgPerdbCtx->lock, LW_SHARED); + my_lrg_worker = &LrgPerdbCtx->workers[slot]; + my_lrg_worker->worker_pid = MyProcPid; + my_lrg_worker->worker_latch = &MyProc->procLatch; + LWLockRelease(&LrgPerdbCtx->lock); + + before_shmem_exit(lrg_worker_onexit, (Datum) 0); + + BackgroundWorkerInitializeConnectionByOid(my_lrg_worker->dbid, 0, 0); + + elog(DEBUG3, "per-db worker for %u was launched", my_lrg_worker->dbid); + + /* + * The launcher launches the worker without considering + * the existence of lrg related data. + * So firstly workers must check their catalogs, and exit + * if there is no data. + * In any cases pg_lrg_info will have tuples if + * this node is in a node group, so we reads it. + */ + if (!get_group_oid()) + { + elog(DEBUG3, "This database %u is not a member of lrg", MyDatabaseId); + proc_exit(0); + } + + do_node_management(); + + ResetLatch(&MyProc->procLatch); + + /* + * Wait for detaching or removing. + */ + for (;;) + { + int rc; + bool is_latch_set = false; + + CHECK_FOR_INTERRUPTS(); + +#define TEMPORARY_NAP_TIME 180000L + + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + TEMPORARY_NAP_TIME, 0); + + if (rc & WL_LATCH_SET) + { + is_latch_set = true; + ResetLatch(&MyProc->procLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + if (is_latch_set) + { + do_node_management(); + is_latch_set = false; + } + } +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 26372d95b3..15b77405bc 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -32,6 +32,7 @@ #include "postmaster/bgwriter.h" #include "postmaster/postmaster.h" #include "replication/logicallauncher.h" +#include "replication/lrg.h" #include "replication/origin.h" #include "replication/slot.h" #include "replication/walreceiver.h" @@ -284,6 +285,7 @@ CreateSharedMemoryAndSemaphores(void) WalRcvShmemInit(); PgArchShmemInit(); ApplyLauncherShmemInit(); + LrgLauncherShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/include/catalog/pg_lrg_info.h b/src/include/catalog/pg_lrg_info.h new file mode 100644 index 0000000000..0067aac389 --- /dev/null +++ b/src/include/catalog/pg_lrg_info.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * pg_lrg_info.h + * definition of the "logical replication group information" system + * catalog (pg_lrg_info) + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_lrg_info.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_LRG_INFO_H +#define PG_LRG_INFO_H + +#include "catalog/genbki.h" +#include "catalog/pg_lrg_info_d.h" + +/* ---------------- + * pg_lrg_info definition. cpp turns this into + * typedef struct FormData_pg_lrg_info + * ---------------- + */ +CATALOG(pg_lrg_info,8337,LrgInfoRelationId) +{ + Oid oid; /* oid */ + + NameData groupname; /* name of the logical replication group */ + bool puballtables; +} FormData_pg_lrg_info; + +/* ---------------- + * Form_pg_lrg_info corresponds to a pointer to a tuple with + * the format of pg_lrg_info relation. + * ---------------- + */ +typedef FormData_pg_lrg_info *Form_pg_lrg_info; + +DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_info_oid_index, 8338, LrgInfoRelationIndexId, on pg_lrg_info using btree(oid oid_ops)); + +#endif /* PG_LRG_INFO_H */ diff --git a/src/include/catalog/pg_lrg_nodes.h b/src/include/catalog/pg_lrg_nodes.h new file mode 100644 index 0000000000..b4e4b290dc --- /dev/null +++ b/src/include/catalog/pg_lrg_nodes.h @@ -0,0 +1,53 @@ +/*------------------------------------------------------------------------- + * + * pg_lrg_nodes.h + * definition of the "logical replication nodes" system + * catalog (pg_lrg_nodes) + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_lrg_nodes.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_LRG_NODES_H +#define PG_LRG_NODES_H + +#include "catalog/genbki.h" +#include "catalog/pg_lrg_nodes_d.h" + +/* ---------------- + * pg_lrg_nodes definition. cpp turns this into + * typedef struct FormData_pg_lrg_nodes + * ---------------- + */ +CATALOG(pg_lrg_nodes,8339,LrgNodesRelationId) +{ + Oid oid; /* oid */ + + NameData nodeid; /* name of the logical replication group */ + Oid groupid BKI_LOOKUP(pg_lrg_info); + Oid dbid BKI_LOOKUP(pg_database); + int32 status; + NameData nodename; + NameData localconn; + NameData upstreamconn BKI_FORCE_NULL; +} FormData_pg_lrg_nodes; + +/* ---------------- + * Form_pg_lrg_nodes corresponds to a pointer to a tuple with + * the format of pg_lrg_nodes relation. + * ---------------- + */ +typedef FormData_pg_lrg_nodes *Form_pg_lrg_nodes; + +DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_nodes_oid_index, 8340, LrgNodesRelationIndexId, on pg_lrg_nodes using btree(oid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_lrg_nodes_name_index, 8346, LrgNodeIdIndexId, on pg_lrg_nodes using btree(nodeid name_ops)); + +#endif /* PG_LRG_NODES_H */ diff --git a/src/include/catalog/pg_lrg_pub.h b/src/include/catalog/pg_lrg_pub.h new file mode 100644 index 0000000000..d65dc51d4d --- /dev/null +++ b/src/include/catalog/pg_lrg_pub.h @@ -0,0 +1,46 @@ +/*------------------------------------------------------------------------- + * + * pg_lrg_info.h + * definition of the "logical replication group publication" system + * catalog (pg_lrg_pub) + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_lrg_pub.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_LRG_PUB_H +#define PG_LRG_PUB_H + +#include "catalog/genbki.h" +#include "catalog/pg_lrg_pub_d.h" + +/* ---------------- + * pg_lrg_pub definition. cpp turns this into + * typedef struct FormData_pg_lrg_pub + * ---------------- + */ +CATALOG(pg_lrg_pub,8341,LrgPublicationId) +{ + Oid oid; + Oid groupid BKI_LOOKUP(pg_lrg_info); + Oid pubid BKI_LOOKUP(pg_publication); +} FormData_pg_lrg_pub; + +/* ---------------- + * Form_pg_lrg_pub corresponds to a pointer to a tuple with + * the format of pg_lrg_pub relation. + * ---------------- + */ +typedef FormData_pg_lrg_pub *Form_pg_lrg_pub; + +DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_pub_oid_index, 8344, LrgPublicationOidIndexId, on pg_lrg_pub using btree(oid oid_ops)); + +#endif /* PG_LRG_PUB_H */ diff --git a/src/include/catalog/pg_lrg_sub.h b/src/include/catalog/pg_lrg_sub.h new file mode 100644 index 0000000000..398c8e8971 --- /dev/null +++ b/src/include/catalog/pg_lrg_sub.h @@ -0,0 +1,46 @@ +/*------------------------------------------------------------------------- + * + * pg_lrg_sub.h + * definition of the "logical replication group subscription" system + * catalog (pg_lrg_sub) + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_lrg_sub.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_LRG_SUB_H +#define PG_LRG_SUB_H + +#include "catalog/genbki.h" +#include "catalog/pg_lrg_sub_d.h" + +/* ---------------- + * pg_lrg_sub definition. cpp turns this into + * typedef struct FormData_pg_lrg_sub + * ---------------- + */ +CATALOG(pg_lrg_sub,8343,LrgSubscriptionId) +{ + Oid oid; + Oid groupid BKI_LOOKUP(pg_lrg_info);; + Oid subid BKI_LOOKUP(pg_subscription); +} FormData_pg_lrg_sub; + +/* ---------------- + * Form_pg_lrg_sub corresponds to a pointer to a tuple with + * the format of pg_lrg_sub relation. + * ---------------- + */ +typedef FormData_pg_lrg_sub *Form_pg_lrg_sub; + +DECLARE_UNIQUE_INDEX_PKEY(pg_lrg_sub_oid_index, 8345, LrgSubscriptionOidIndexId, on pg_lrg_sub using btree(oid oid_ops)); + +#endif /* PG_LRG_SUB_H */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index babe16f00a..8350934a77 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11885,4 +11885,29 @@ prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary', prosrc => 'brin_minmax_multi_summary_send' }, +# lrg +{ oid => '8143', descr => 'create logical replication group', + proname => 'lrg_create', proparallel => 'r', + prorettype => 'void', proargtypes => 'text text text text', + prosrc => 'lrg_create' }, +{ oid => '8144', descr => 'attach to logical replication group', + proname => 'lrg_node_attach', proparallel => 'r', + prorettype => 'void', proargtypes => 'text text text text', + prosrc => 'lrg_node_attach' }, +{ oid => '8145', descr => 'detach from logical replication group', + proname => 'lrg_node_detach', proparallel => 'r', + prorettype => 'void', proargtypes => 'text text text text', + prosrc => 'lrg_node_detach' }, +{ oid => '8146', descr => 'delete logical replication group', + proname => 'lrg_drop', proparallel => 'r', + prorettype => 'void', proargtypes => 'text text text text', + prosrc => 'lrg_drop' }, +{ oid => '8147', descr => 'insert a tuple to pg_lrg_sub', + proname => 'lrg_insert_into_sub', proparallel => 'r', + prorettype => 'void', proargtypes => 'text', + prosrc => 'lrg_insert_into_sub' }, +{ oid => '8148', descr => 'insert a tuple to pg_lrg_nodes', + proname => 'lrg_insert_into_nodes', proparallel => 'r', + prorettype => 'void', proargtypes => 'text int4 text text text', + prosrc => 'lrg_insert_into_nodes' }, ] diff --git a/src/include/replication/libpqlrg.h b/src/include/replication/libpqlrg.h new file mode 100644 index 0000000000..650715d40d --- /dev/null +++ b/src/include/replication/libpqlrg.h @@ -0,0 +1,63 @@ +/*------------------------------------------------------------------------- + * + * libpqlrg.h + * Constructs a logical replication group + * + *------------------------------------------------------------------------- + */ +#ifndef LIBPQLIG_H +#define LIBPQLIG_H + +#include "postgres.h" +#include "libpq-fe.h" +#include "replication/lrg.h" + +typedef void (*libpqlrg_connect_fn) (const char *connstring, PGconn **conn); +typedef bool (*libpqlrg_check_group_fn) (PGconn *conn, const char *group_name); +typedef void (*libpqlrg_copy_lrg_nodes_fn) (PGconn *remoteconn, PGconn *localconn); +typedef void (*libpqlrg_insert_into_lrg_nodes_fn) (PGconn *remoteconn, + const char *node_id, LRG_NODE_STATE status, + const char *node_name, const char *local_connstring, + const char *upstream_connstring); +typedef void (*libpqlrg_create_subscription_fn) (const char *group_name, const char *publisher_connstring, + const char *publisher_node_id, const char *subscriber_node_id, + PGconn *subscriberconn, const char *options); +typedef void (*libpqlrg_disconnect_fn) (PGconn *conn); + +typedef struct lrg_function_types +{ + libpqlrg_connect_fn libpqlrg_connect; + libpqlrg_check_group_fn libpqlrg_check_group; + libpqlrg_copy_lrg_nodes_fn libpqlrg_copy_lrg_nodes; + libpqlrg_insert_into_lrg_nodes_fn libpqlrg_insert_into_lrg_nodes; + libpqlrg_create_subscription_fn libpqlrg_create_subscription; + libpqlrg_disconnect_fn libpqlrg_disconnect; +} lrg_function_types; + +extern PGDLLIMPORT lrg_function_types *LrgFunctionTypes; + +#define lrg_connect(connstring, conn) \ + LrgFunctionTypes->libpqlrg_connect(connstring, conn) +#define lrg_check_group(conn, group_name) \ + LrgFunctionTypes->libpqlrg_check_group(conn, group_name) +#define lrg_copy_lrg_nodes(remoteconn, localconn) \ + LrgFunctionTypes->libpqlrg_copy_lrg_nodes(remoteconn, localconn) + +#define lrg_insert_into_lrg_nodes(remoteconn, \ + node_id, status, \ + node_name, local_connstring, \ + upstream_connstring) \ + LrgFunctionTypes->libpqlrg_insert_into_lrg_nodes(remoteconn, \ + node_id, status, \ + node_name, local_connstring, \ + upstream_connstring) +#define lrg_create_subscription(group_name, publisher_connstring, \ + publisher_node_id, subscriber_node_id, \ + subscriberconn, options) \ + LrgFunctionTypes->libpqlrg_create_subscription(group_name, publisher_connstring, \ + publisher_node_id, subscriber_node_id, \ + subscriberconn, options) +#define lrg_disconnect(conn) \ + LrgFunctionTypes->libpqlrg_disconnect(conn) + +#endif /* LIBPQLIG_H */ \ No newline at end of file diff --git a/src/include/replication/lrg.h b/src/include/replication/lrg.h new file mode 100644 index 0000000000..874cfe6477 --- /dev/null +++ b/src/include/replication/lrg.h @@ -0,0 +1,67 @@ +/*------------------------------------------------------------------------- + * + * lrg.h + * Constructs a logical replication group + * + *------------------------------------------------------------------------- + */ +#ifndef LRG_H +#define LRG_H + +#include "postgres.h" + +#include "storage/latch.h" +#include "storage/lock.h" +#include "storage/lwlock.h" + +/* + * enumeration for represents its status + */ +typedef enum +{ + LRG_STATE_INIT = 0, + LRG_STATE_CREATE_PUBLICATION, + LRG_STATE_CREATE_SUBSCRIPTION, + LRG_STATE_READY, + LRG_STATE_TO_BE_DETACHED, +} LRG_NODE_STATE; + +/* + * working space for each lrg per-db worker. + */ +typedef struct LrgPerdbWorker { + pid_t worker_pid; + Oid dbid; + Latch *worker_latch; +} LrgPerdbWorker; + +/* + * controller for lrg per-db worker. + * This will be hold by launcher. + */ +typedef struct LrgPerdbCtxStruct { + LWLock lock; + pid_t launcher_pid; + Latch *launcher_latch; + LrgPerdbWorker workers[FLEXIBLE_ARRAY_MEMBER]; +} LrgPerdbCtxStruct; + +extern LrgPerdbCtxStruct *LrgPerdbCtx; + +/* lrg.c */ +extern void LrgLauncherShmemInit(void); +extern void LrgLauncherRegister(void); +extern void lrg_add_nodes(char *node_id, Oid group_id, LRG_NODE_STATE status, char *node_name, char *local_connstring, char *upstream_connstring); +extern Oid get_group_oid(void); +extern void construct_node_id(char *out_node_id, int size); + + +/* lrg_launcher.c */ +extern void lrg_launcher_main(Datum arg) pg_attribute_noreturn(); +extern void lrg_launcher_wakeup(void); + +/* *lrg_worker.c */ +extern void lrg_worker_main(Datum arg) pg_attribute_noreturn(); +extern void lrg_worker_cleanup(LrgPerdbWorker *worker); + +#endif /* LRG_H */ \ No newline at end of file diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 215eb899be..da0ef150a2 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -266,3 +266,9 @@ NOTICE: checking pg_subscription {subdbid} => pg_database {oid} NOTICE: checking pg_subscription {subowner} => pg_authid {oid} NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid} NOTICE: checking pg_subscription_rel {srrelid} => pg_class {oid} +NOTICE: checking pg_lrg_nodes {groupid} => pg_lrg_info {oid} +NOTICE: checking pg_lrg_nodes {dbid} => pg_database {oid} +NOTICE: checking pg_lrg_pub {groupid} => pg_lrg_info {oid} +NOTICE: checking pg_lrg_pub {pubid} => pg_publication {oid} +NOTICE: checking pg_lrg_sub {groupid} => pg_lrg_info {oid} +NOTICE: checking pg_lrg_sub {subid} => pg_subscription {oid} -- 2.27.0