diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 84341a30e5..50be793e26 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -719,6 +719,123 @@ include_dir 'conf.d'
+
+ max_sessions (integer)
+
+ max_sessions configuration parameter
+
+
+
+
+ The maximum number of client sessions that can be handled by
+ one connection proxy when session pooling is switched on.
+ This parameter does not add any memory or CPU overhead, so
+ specifying a large max_sessions value
+ does not affect performance.
+ If the max_sessions limit is reached new connection are not accepted.
+
+
+ The default value is 1000. This parameter can only be set at server start.
+
+
+
+
+
+ session_pool_size (integer)
+
+ session_pool_size configuration parameter
+
+
+
+
+ Enables session pooling and defines the maximum number of
+ backends that can be used by client sessions for each database/user combination.
+ Launched non-tainted backends are never terminated even if there are no active sessions.
+ Backend is considered as tainted if client updates GUCs, creates temporary table or prepared statements.
+ Tainted backend can server only one client.
+
+
+ The default value is 10, so up to 10 backends will server each database,
+
+
+
+
+
+ proxy_port (integer)
+
+ proxy_port configuration parameter
+
+
+
+
+ Sets the TCP port for the connection pooler.
+ Clients connected to main "port" will be assigned dedicated backends,
+ while client connected to proxy port will be connected to backends through proxy which
+ performs transaction level scheduling.
+
+
+ The default value is 6543.
+
+
+
+
+
+ connection_proxies (integer)
+
+ connection_proxies configuration parameter
+
+
+
+
+ Sets number of connection proxies.
+ Postmaster spawns separate worker process for each proxy. Postmaster scatters connections between proxies using one of scheduling policies (round-robin, random, load-balancing).
+ Each proxy launches its own subset of backends.
+ So maximal number of non-tainted backends is session_pool_size*connection_proxies*databases*roles.
+
+
+ The default value is 0, so session pooling is disabled.
+
+
+
+
+
+ session_schedule (enum)
+
+ session_schedule configuration parameter
+
+
+
+
+ Specifies scheduling policy for assigning session to proxies in case of
+ connection pooling. Default policy is round-robin.
+
+
+ With round-robin policy postmaster cyclicly scatter sessions between proxies.
+
+
+ With random policy postmaster randomly choose proxy for new session.
+
+
+ With load-balancing policy postmaster choose proxy with lowest load average.
+ Load average of proxy is estimated by number of clients connection assigned to this proxy with extra weight for SSL connections.
+
+
+
+
+
+ restart_pooler_on_reload (string)
+
+ restart_pooler_on_reload configuration parameter
+
+
+
+
+ Restart session pool workers once pg_reload_conf() is called.
+ The default value is false.
+
+
+
+
unix_socket_directories (string)
diff --git a/doc/src/sgml/connpool.sgml b/doc/src/sgml/connpool.sgml
new file mode 100644
index 0000000000..8486ce1e8d
--- /dev/null
+++ b/doc/src/sgml/connpool.sgml
@@ -0,0 +1,174 @@
+
+
+
+ Connection pooling
+
+
+ built-in connection pool proxy
+
+
+
+ PostgreSQL spawns a separate process (backend) for each client.
+ For large number of clients such model can cause consumption of large number of system
+ resources and lead to significant performance degradation, especially at computers with large
+ number of CPU cores. The reason is high contention between backends for postgres resources.
+ Also size of many Postgres internal data structures are proportional to the number of
+ active backends as well as complexity of algorithms for this data structures.
+
+
+
+ This is why most of production Postgres installation are using some kind of connection pooling:
+ pgbouncer, J2EE, odyssey,... But external connection pooler requires additional efforts for installation,
+ configuration and maintenance. Also pgbouncer (the most popular connection pooler for Postgres) is
+ single-threaded and so can be bottleneck for highload system, so multiple instances of pgbouncer have to be launched.
+
+
+
+ Starting from version 12 PostgreSQL provides built-in connection pooler.
+ This chapter describes architecture and usage of built-in connection pooler.
+
+
+
+ How Built-in Connection Pooler Works
+
+
+ Built-in connection pooler spawns one or more proxy processes which connect clients and backends.
+ Number of proxy processes is controlled by connection_proxies configuration parameter.
+ To avoid substantial changes in Postgres locking mechanism, only transaction level pooling policy is implemented.
+ It means that pooler is able to reschedule backend to another session only when it completed the current transaction.
+
+
+
+ As far as each Postgres backend is able to work only with single database, each proxy process maintains
+ hash table of connections pools for each pair of dbname,role.
+ Maximal number of backends which can be spawned by connection pool is limited by
+ session_pool_size configuration variable.
+ So maximal number of non-dedicated backends in pooling mode is limited by
+ connection_proxies*session_pool_size*#databases*#roles.
+
+
+
+ To minimize number of changes in Postgres core, built-in connection pooler is not trying to save/restore
+ session context. If session context is modified by client application
+ (changing values of configuration variables (GUCs), creating temporary tables, preparing statements, advisory locking),
+ then backend executing this session is considered to be tainted.
+ It is now dedicated to this session and can not be rescheduled to other session.
+ Once this session is terminated, backend is terminated as well.
+ Non-tainted backends are not terminated even if there are no more connected sessions.
+
+
+
+ Built-in connection pooler is accepted connections on separate port (proxy_port configuration option, default value is 6543).
+ If client is connected to postgres through standard port (port configuration option, default value is 5432), then normal (dedicated) backend is created. It works only
+ with this client and is terminated when client is disconnected. Standard port is also used by proxy itself to
+ launch new worker backends. It means that to enable connection pooler Postgres should be configured
+ to accept local connections (pg_hba.conf file).
+
+
+
+ If client application is connected through proxy port, then its communication with backend is always
+ performed through proxy. Even if it changes session context and backend becomes tainted,
+ still all traffic between this client and backend comes through proxy.
+
+
+
+ Postmaster accepts connections on proxy port and redirects it to one of connection proxies.
+ Right now sessions and bounded to proxy and can not migrate between them.
+ To provide uniform load balancing of proxies, postmaster is using one of three scheduling policies:
+ round-robin, random and load-balancing.
+ In the last case postmaster will choose proxy with smallest number of already attached clients, with
+ extra weight added to SSL connections (which consume more CPU).
+
+
+
+
+
+ Built-in Connection Pooler Configuration
+
+
+ There are four main configuration variables controlling connection pooler:
+ session_pool_size, connection_proxies, max_sessions and proxy_port.
+ Connection pooler is enabled if all of them are non-zero.
+
+
+
+ connection_proxies specifies number of connection proxy processes which will be spawned.
+ Default value is zero, so connection pooling is disabled by default.
+
+
+
+ session_pool_size specifies maximal number of backends per connection pool. Maximal number of laucnhed non-dedicated backends in pooling mode is limited by
+ connection_proxies*session_pool_size*#databases*#roles.
+ If number of backends is too small, then server will not be able to utilize all system resources.
+ But too large value can cause degradation of performance because of large snapshots and lock contention.
+
+
+
+ max_sessionsparameter specifies maximal number of sessions which can be handled by one connection proxy.
+ Actually it affects only size of wait event set and so can be large enough without any essential negative impact on system resources consumption.
+ Default value is 1000. So maximal number of connections to one database/role is limited by connection_proxies*session_pool_size*max_sessions.
+
+
+
+ Connection proxy accepts connections on special port, defined by proxy_port.
+ Default value is 6543, but it can be changed to standard Postgres 5432, so by default all connections to the databases will be pooled.
+ But it is still necessary to have a port for direct connections to the database (dedicated backends).
+ It is needed for connection pooler itself to launch worker backends.
+
+
+
+ Postmaster scatters sessions between proxies using one of three available scheduling policies:
+ round-robin, random and load-balancing.
+ Policy can be set using session_schedule configuration variable. Default policy is
+ round-robin which cause cyclic distribution of sessions between proxies.
+ It should work well in case of more or less uniform workload.
+ The smartest policy is load-balancing which tries to choose least loaded proxy
+ based on the available statistic. It is possible to monitor proxies state using pg_pooler_state() function, which returns information about number of clients, backends and pools for each proxy as well
+ as some statistic information about number of proceeded transactions and amount of data
+ sent from client to backends (rx_bytes) and from backends to clients (tx_bytes).
+
+
+
+ As far as pooled backends are not terminated on client exist, it will not
+ be possible to drop database to which them are connected. It can be achieved without server restart using restart_pooler_on_reload variable. Setting this variable to true cause shutdown of all pooled backends after execution of pg_reload_conf() function. Then it will be possible to drop database.
+
+
+
+
+
+ Built-in Connection Pooler Pros and Cons
+
+
+ Unlike pgbouncer and other external connection poolers, built-in connection pooler doesn't require installation and configuration of some other components.
+ Also it doesn't introduce any limitations for clients: existed clients can work through proxy and don't notice any difference.
+ If client application requires session context, then it will be served by dedicated backend. Such connection will not participate in
+ connection pooling but it will correctly work. This is the main difference with pgbouncer,
+ which may cause incorrect behavior of client application in case of using other session level pooling policy.
+ And if application is not changing session context, then it can be implicitly pooled, reducing number of active backends.
+
+
+
+ The main limitation of current built-in connection pooler implementation is that it is not able to save/resume session context.
+ Although it is not so difficult to do, but it requires more changes in Postgres core. Developers of client applications have
+ the choice to either avoid using session-specific operations, or not use built-in pooling. For example, using prepared statements can improve speed of simple queries
+ up to two times. But prepared statements can not be handled by pooled backend, so if all clients are using prepared statements, then there will be no connection pooling
+ even if connection pooling is enabled.
+
+
+
+ Redirecting connections through connection proxy definitely have negative effect on total system performance and especially latency.
+ Overhead of connection proxing depends on too many factors, such as characteristics of external and internal networks, complexity of queries and size of returned result set.
+ Pgbench benchmark in select-only mode shows almost two times worser performance for local connections through connection pooler comparing with direct local connections when
+ number of connections is small enough (10). For much larger number of connections (when pooling is actually required), pooling mode outperforms direct connection mode.
+
+
+
+ Another obvious limitation of transaction level pooling is that long living transaction can cause starvation of
+ other clients. It greatly depends on application design. If application opens database transaction and then waits for user input or some other external event, then backend can be in idle-in-transaction
+ state for long enough time. And such backend can not be rescheduled for some another session.
+ The obvious recommendation is to avoid long-living transaction and setup idle_in_transaction_session_timeout to implicitly abort such transactions.
+
+
+
+
+
diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml
index 8960f11278..5b19fef481 100644
--- a/doc/src/sgml/filelist.sgml
+++ b/doc/src/sgml/filelist.sgml
@@ -29,6 +29,7 @@
+
diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml
index 3e115f1c76..029f0dc4e3 100644
--- a/doc/src/sgml/postgres.sgml
+++ b/doc/src/sgml/postgres.sgml
@@ -109,6 +109,7 @@
&mvcc;
&perform;
∥
+ &connpool;
diff --git a/src/Makefile b/src/Makefile
index bcdbd9588a..196ca8c0f0 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -23,6 +23,7 @@ SUBDIRS = \
interfaces \
backend/replication/libpqwalreceiver \
backend/replication/pgoutput \
+ backend/postmaster/libpqconn \
fe_utils \
bin \
pl \
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index c278ee7318..acbaed313a 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -30,6 +30,7 @@
#include "parser/parse_expr.h"
#include "parser/parse_type.h"
#include "rewrite/rewriteHandler.h"
+#include "storage/proc.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
@@ -457,6 +458,7 @@ StorePreparedStatement(const char *stmt_name,
stmt_name,
HASH_ENTER,
&found);
+ MyProc->is_tainted = true;
/* Shouldn't get a duplicate entry */
if (found)
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index fd67d2a841..10a14d0e03 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -590,6 +590,10 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("ON COMMIT can only be used on temporary tables")));
+ if (stmt->relation->relpersistence != RELPERSISTENCE_TEMP
+ && stmt->oncommit != ONCOMMIT_DROP)
+ MyProc->is_tainted = true;
+
if (stmt->partspec != NULL)
{
if (relkind != RELKIND_RELATION)
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 384887e70d..ebff20a61a 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -195,15 +195,13 @@ pq_init(void)
{
/* initialize state variables */
PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
- PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
+ if (!PqSendBuffer)
+ PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
PqCommBusy = false;
PqCommReadingMsg = false;
DoingCopyOut = false;
- /* set up process-exit hook to close the socket */
- on_proc_exit(socket_close, 0);
-
/*
* In backends (as soon as forked) we operate the underlying socket in
* nonblocking mode and use latches to implement blocking semantics if
@@ -220,6 +218,11 @@ pq_init(void)
(errmsg("could not set socket to nonblocking mode: %m")));
#endif
+ if (FeBeWaitSet)
+ FreeWaitEventSet(FeBeWaitSet);
+ else
+ on_proc_exit(socket_close, 0);
+
FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
NULL, NULL);
@@ -227,6 +230,7 @@ pq_init(void)
AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
}
+
/* --------------------------------
* socket_comm_reset - reset libpq during error recovery
*
@@ -329,7 +333,7 @@ socket_close(int code, Datum arg)
int
StreamServerPort(int family, char *hostName, unsigned short portNumber,
char *unixSocketDir,
- pgsocket ListenSocket[], int MaxListen)
+ pgsocket ListenSocket[], int ListenPort[], int MaxListen)
{
pgsocket fd;
int err;
@@ -593,6 +597,7 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber,
familyDesc, addrDesc, (int) portNumber)));
ListenSocket[listen_index] = fd;
+ ListenPort[listen_index] = portNumber;
added++;
}
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index f4120bec55..e0cdd9e8bb 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = atomics.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = atomics.o pg_sema.o pg_shmem.o send_sock.o $(TAS)
ifeq ($(PORTNAME), win32)
SUBDIRS += win32
diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c
new file mode 100644
index 0000000000..a76db8d171
--- /dev/null
+++ b/src/backend/port/send_sock.c
@@ -0,0 +1,165 @@
+/*-------------------------------------------------------------------------
+ *
+ * send_sock.c
+ * Send socket descriptor to another process
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/port/send_sock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#ifdef WIN32
+typedef struct
+{
+ SOCKET origsocket;
+ WSAPROTOCOL_INFO wsainfo;
+} InheritableSocket;
+#endif
+
+/*
+ * Send socket descriptor "sock" to backend process through Unix socket "chan"
+ */
+int
+pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid)
+{
+#ifdef WIN32
+ InheritableSocket dst;
+ size_t rc;
+ dst.origsocket = sock;
+ if (WSADuplicateSocket(sock, pid, &dst.wsainfo) != 0)
+ {
+ ereport(FATAL,
+ (errmsg("could not duplicate socket %d for use in backend: error code %d",
+ (int)sock, WSAGetLastError())));
+ return -1;
+ }
+ rc = send(chan, (char*)&dst, sizeof(dst), 0);
+ if (rc != sizeof(dst))
+ {
+ ereport(FATAL,
+ (errmsg("Failed to send inheritable socket: rc=%d, error code %d",
+ (int)rc, WSAGetLastError())));
+ return -1;
+ }
+ return 0;
+#else
+ struct msghdr msg = { 0 };
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ char buf[CMSG_SPACE(sizeof(sock))];
+ memset(buf, '\0', sizeof(buf));
+
+ /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */
+ io.iov_base = "";
+ io.iov_len = 1;
+
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ if (!cmsg)
+ return PGINVALID_SOCKET;
+
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(sock));
+
+ memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock));
+ msg.msg_controllen = cmsg->cmsg_len;
+
+ while (sendmsg(chan, &msg, 0) < 0)
+ {
+ if (errno != EINTR)
+ return PGINVALID_SOCKET;
+ }
+ return 0;
+#endif
+}
+
+
+/*
+ * Receive socket descriptor from postmaster process through Unix socket "chan"
+ */
+pgsocket
+pg_recv_sock(pgsocket chan)
+{
+#ifdef WIN32
+ InheritableSocket src;
+ SOCKET s;
+ size_t rc = recv(chan, (char*)&src, sizeof(src), 0);
+ if (rc != sizeof(src))
+ {
+ ereport(FATAL,
+ (errmsg("Failed to receive inheritable socket: rc=%d, error code %d",
+ (int)rc, WSAGetLastError())));
+ }
+ s = WSASocket(FROM_PROTOCOL_INFO,
+ FROM_PROTOCOL_INFO,
+ FROM_PROTOCOL_INFO,
+ &src.wsainfo,
+ 0,
+ 0);
+ if (s == INVALID_SOCKET)
+ {
+ ereport(FATAL,
+ (errmsg("could not create inherited socket: error code %d\n",
+ WSAGetLastError())));
+ }
+
+ /*
+ * To make sure we don't get two references to the same socket, close
+ * the original one. (This would happen when inheritance actually
+ * works..
+ */
+ closesocket(src.origsocket);
+ return s;
+#else
+ struct msghdr msg = {0};
+ char c_buffer[256];
+ char m_buffer[256];
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ pgsocket sock;
+
+ io.iov_base = m_buffer;
+ io.iov_len = sizeof(m_buffer);
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+
+ msg.msg_control = c_buffer;
+ msg.msg_controllen = sizeof(c_buffer);
+
+ while (recvmsg(chan, &msg, 0) < 0)
+ {
+ if (errno != EINTR)
+ return PGINVALID_SOCKET;
+ }
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ if (!cmsg)
+ {
+ elog(WARNING, "Failed to transfer socket");
+ return PGINVALID_SOCKET;
+ }
+
+ memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock));
+ pg_set_noblock(sock);
+
+ return sock;
+#endif
+}
diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c
index d5b5e771e9..53eece6422 100644
--- a/src/backend/port/win32/socket.c
+++ b/src/backend/port/win32/socket.c
@@ -690,3 +690,65 @@ pgwin32_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, c
memcpy(writefds, &outwritefds, sizeof(fd_set));
return nummatches;
}
+
+int pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2])
+{
+ union {
+ struct sockaddr_in inaddr;
+ struct sockaddr addr;
+ } a;
+ SOCKET listener;
+ int e;
+ socklen_t addrlen = sizeof(a.inaddr);
+ DWORD flags = 0;
+ int reuse = 1;
+
+ socks[0] = socks[1] = -1;
+
+ listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (listener == -1)
+ return SOCKET_ERROR;
+
+ memset(&a, 0, sizeof(a));
+ a.inaddr.sin_family = AF_INET;
+ a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ a.inaddr.sin_port = 0;
+
+ for (;;) {
+ if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,
+ (char*) &reuse, (socklen_t) sizeof(reuse)) == -1)
+ break;
+ if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
+ break;
+
+ memset(&a, 0, sizeof(a));
+ if (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR)
+ break;
+ a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ a.inaddr.sin_family = AF_INET;
+
+ if (listen(listener, 1) == SOCKET_ERROR)
+ break;
+
+ socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, flags);
+ if (socks[0] == -1)
+ break;
+ if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
+ break;
+
+ socks[1] = accept(listener, NULL, NULL);
+ if (socks[1] == -1)
+ break;
+
+ closesocket(listener);
+ return 0;
+ }
+
+ e = WSAGetLastError();
+ closesocket(listener);
+ closesocket(socks[0]);
+ closesocket(socks[1]);
+ WSASetLastError(e);
+ socks[0] = socks[1] = -1;
+ return SOCKET_ERROR;
+}
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 71c23211b2..5d8b65c50a 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -12,7 +12,9 @@ subdir = src/backend/postmaster
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
+override CPPFLAGS := $(CPPFLAGS) -I$(top_builddir)/src/port -I$(top_srcdir)/src/port
+
OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \
- pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o
+ pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o proxy.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/postmaster/libpqconn/Makefile b/src/backend/postmaster/libpqconn/Makefile
new file mode 100644
index 0000000000..f05b72758e
--- /dev/null
+++ b/src/backend/postmaster/libpqconn/Makefile
@@ -0,0 +1,35 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for src/backend/postmaster/libpqconn
+#
+# IDENTIFICATION
+# src/backend/postmaster/libpqconn/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/postmaster/libpqconn
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
+
+OBJS = libpqconn.o $(WIN32RES)
+SHLIB_LINK_INTERNAL = $(libpq)
+SHLIB_LINK = $(filter -lintl, $(LIBS))
+SHLIB_PREREQS = submake-libpq
+PGFILEDESC = "libpqconn - open libpq connection"
+NAME = libpqconn
+
+all: all-shared-lib
+
+include $(top_srcdir)/src/Makefile.shlib
+
+install: all installdirs install-lib
+
+installdirs: installdirs-lib
+
+uninstall: uninstall-lib
+
+clean distclean maintainer-clean: clean-lib
+ rm -f $(OBJS)
diff --git a/src/backend/postmaster/libpqconn/libpqconn.c b/src/backend/postmaster/libpqconn/libpqconn.c
new file mode 100644
index 0000000000..bdba0f6e2c
--- /dev/null
+++ b/src/backend/postmaster/libpqconn/libpqconn.c
@@ -0,0 +1,47 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpqconn.c
+ *
+ * This file provides a way to establish connection to postgres instanc from backend.
+ *
+ * Portions Copyright (c) 2010-2018, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/postmaster/libpqconn/libpqconn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include
+#include
+
+#include "fmgr.h"
+#include "libpq-fe.h"
+#include "postmaster/postmaster.h"
+
+PG_MODULE_MAGIC;
+
+void _PG_init(void);
+
+static void*
+libpq_connectdb(char const* keywords[], char const* values[])
+{
+ PGconn* conn = PQconnectdbParams(keywords, values, false);
+ if (!conn || PQstatus(conn) != CONNECTION_OK)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+ errmsg("could not setup local connect to server"),
+ errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
+ return NULL;
+ }
+ return conn;
+}
+
+void _PG_init(void)
+{
+ LibpqConnectdbParams = libpq_connectdb;
+}
+
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 688ad439ed..73a695b5ee 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -114,6 +114,7 @@
#include "postmaster/fork_process.h"
#include "postmaster/pgarch.h"
#include "postmaster/postmaster.h"
+#include "postmaster/proxy.h"
#include "postmaster/syslogger.h"
#include "replication/logicallauncher.h"
#include "replication/walsender.h"
@@ -196,6 +197,9 @@ BackgroundWorker *MyBgworkerEntry = NULL;
/* The socket number we are listening for connections on */
int PostPortNumber;
+/* The socket number we are listening for poolled connections on */
+int ProxyPortNumber;
+
/* The directory names for Unix socket(s) */
char *Unix_socket_directories;
@@ -216,6 +220,7 @@ int ReservedBackends;
/* The socket(s) we're listening to. */
#define MAXLISTEN 64
static pgsocket ListenSocket[MAXLISTEN];
+static int ListenPort[MAXLISTEN];
/*
* Set by the -o option
@@ -246,6 +251,18 @@ bool enable_bonjour = false;
char *bonjour_name;
bool restart_after_crash = true;
+typedef struct ConnectionProxy
+{
+ int pid;
+ pgsocket socks[2];
+} ConnectionProxy;
+
+ConnectionProxy* ConnectionProxies;
+static bool ConnectionProxiesStarted;
+static int CurrentConnectionProxy; /* index used for round-robin distribution of connections between proxies */
+
+void* (*LibpqConnectdbParams)(char const* keywords[], char const* values[]);
+
/* PIDs of special child processes; 0 when not running */
static pid_t StartupPID = 0,
BgWriterPID = 0,
@@ -403,7 +420,6 @@ static void BackendInitialize(Port *port);
static void BackendRun(Port *port) pg_attribute_noreturn();
static void ExitPostmaster(int status) pg_attribute_noreturn();
static int ServerLoop(void);
-static int BackendStartup(Port *port);
static int ProcessStartupPacket(Port *port, bool secure_done);
static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options);
static void processCancelRequest(Port *port, void *pkt);
@@ -425,6 +441,7 @@ static pid_t StartChildProcess(AuxProcType type);
static void StartAutovacuumWorker(void);
static void MaybeStartWalReceiver(void);
static void InitPostmasterDeathWatchHandle(void);
+static void StartProxyWorker(int id);
/*
* Archiver is allowed to start up at the current postmaster state?
@@ -477,6 +494,8 @@ typedef struct
{
Port port;
InheritableSocket portsocket;
+ InheritableSocket proxySocket;
+ int proxyId;
char DataDir[MAXPGPATH];
pgsocket ListenSocket[MAXLISTEN];
int32 MyCancelKey;
@@ -560,6 +579,48 @@ int postmaster_alive_fds[2] = {-1, -1};
HANDLE PostmasterHandle;
#endif
+static void
+StartConnectionProxies(void)
+{
+ if (SessionPoolSize > 0 && ConnectionProxiesNumber > 0 && !ConnectionProxiesStarted)
+ {
+ int i;
+ if (ConnectionProxies == NULL)
+ {
+ ConnectionProxies = malloc(sizeof(ConnectionProxy)*ConnectionProxiesNumber);
+ for (i = 0; i < ConnectionProxiesNumber; i++)
+ {
+ if (socketpair(AF_UNIX, SOCK_STREAM, 0, ConnectionProxies[i].socks) < 0)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg_internal("could not create socket pair for launching sessions: %m")));
+ }
+ }
+ for (i = 0; i < ConnectionProxiesNumber; i++)
+ {
+ StartProxyWorker(i);
+ }
+ ConnectionProxiesStarted = true;
+ }
+}
+
+/*
+ * Send signal to connection proxies
+ */
+static void
+StopConnectionProxies(int signal)
+{
+ if (ConnectionProxiesStarted)
+ {
+ int i;
+ for (i = 0; i < ConnectionProxiesNumber; i++)
+ {
+ signal_child(ConnectionProxies[i].pid, signal);
+ }
+ ConnectionProxiesStarted = false;
+ }
+}
+
/*
* Postmaster main entry point
*/
@@ -572,6 +633,9 @@ PostmasterMain(int argc, char *argv[])
bool listen_addr_saved = false;
int i;
char *output_config_variable = NULL;
+ bool contains_localhost = false;
+ int ports[2];
+ int n_ports = 0;
InitProcessGlobals();
@@ -1008,6 +1072,11 @@ PostmasterMain(int argc, char *argv[])
on_proc_exit(CloseServerPorts, 0);
+ /* Listen on proxy socket only if session pooling is enabled */
+ if (ProxyPortNumber > 0 && ConnectionProxiesNumber > 0 && SessionPoolSize > 0)
+ ports[n_ports++] = ProxyPortNumber;
+ ports[n_ports++] = PostPortNumber;
+
if (ListenAddresses)
{
char *rawstring;
@@ -1031,32 +1100,36 @@ PostmasterMain(int argc, char *argv[])
foreach(l, elemlist)
{
char *curhost = (char *) lfirst(l);
-
- if (strcmp(curhost, "*") == 0)
- status = StreamServerPort(AF_UNSPEC, NULL,
- (unsigned short) PostPortNumber,
- NULL,
- ListenSocket, MAXLISTEN);
- else
- status = StreamServerPort(AF_UNSPEC, curhost,
- (unsigned short) PostPortNumber,
- NULL,
- ListenSocket, MAXLISTEN);
-
- if (status == STATUS_OK)
+ for (i = 0; i < n_ports; i++)
{
- success++;
- /* record the first successful host addr in lockfile */
- if (!listen_addr_saved)
+ int port = ports[i];
+ if (strcmp(curhost, "*") == 0)
+ status = StreamServerPort(AF_UNSPEC, NULL,
+ (unsigned short) port,
+ NULL,
+ ListenSocket, ListenPort, MAXLISTEN);
+ else
+ status = StreamServerPort(AF_UNSPEC, curhost,
+ (unsigned short) port,
+ NULL,
+ ListenSocket, ListenPort, MAXLISTEN);
+ contains_localhost |= strcmp(curhost, "localhost") == 0;
+
+ if (status == STATUS_OK)
{
- AddToDataDirLockFile(LOCK_FILE_LINE_LISTEN_ADDR, curhost);
- listen_addr_saved = true;
+ success++;
+ /* record the first successful host addr in lockfile */
+ if (!listen_addr_saved)
+ {
+ AddToDataDirLockFile(LOCK_FILE_LINE_LISTEN_ADDR, curhost);
+ listen_addr_saved = true;
+ }
}
+ else
+ ereport(WARNING,
+ (errmsg("could not create listen socket for \"%s\"",
+ curhost)));
}
- else
- ereport(WARNING,
- (errmsg("could not create listen socket for \"%s\"",
- curhost)));
}
if (!success && elemlist != NIL)
@@ -1125,29 +1198,32 @@ PostmasterMain(int argc, char *argv[])
errmsg("invalid list syntax in parameter \"%s\"",
"unix_socket_directories")));
}
-
+ contains_localhost = true;
foreach(l, elemlist)
{
char *socketdir = (char *) lfirst(l);
+ for (i = 0; i < n_ports; i++)
+ {
+ int port = ports[i];
- status = StreamServerPort(AF_UNIX, NULL,
- (unsigned short) PostPortNumber,
- socketdir,
- ListenSocket, MAXLISTEN);
+ status = StreamServerPort(AF_UNIX, NULL,
+ (unsigned short) port,
+ socketdir,
+ ListenSocket, ListenPort, MAXLISTEN);
- if (status == STATUS_OK)
- {
- success++;
- /* record the first successful Unix socket in lockfile */
- if (success == 1)
- AddToDataDirLockFile(LOCK_FILE_LINE_SOCKET_DIR, socketdir);
+ if (status == STATUS_OK)
+ {
+ success++;
+ /* record the first successful Unix socket in lockfile */
+ if (success == 1)
+ AddToDataDirLockFile(LOCK_FILE_LINE_SOCKET_DIR, socketdir);
+ }
+ else
+ ereport(WARNING,
+ (errmsg("could not create Unix-domain socket in directory \"%s\"",
+ socketdir)));
}
- else
- ereport(WARNING,
- (errmsg("could not create Unix-domain socket in directory \"%s\"",
- socketdir)));
}
-
if (!success && elemlist != NIL)
ereport(FATAL,
(errmsg("could not create any Unix-domain sockets")));
@@ -1157,6 +1233,20 @@ PostmasterMain(int argc, char *argv[])
}
#endif
+ if (!contains_localhost && ProxyPortNumber > 0)
+ {
+ /* we need to accept local connections from proxy */
+ status = StreamServerPort(AF_UNSPEC, "localhost",
+ (unsigned short) PostPortNumber,
+ NULL,
+ ListenSocket, ListenPort, MAXLISTEN);
+ if (status != STATUS_OK)
+ {
+ ereport(WARNING,
+ (errmsg("could not create listen socket for locahost")));
+ }
+ }
+
/*
* check that we have some socket to listen on
*/
@@ -1374,6 +1464,8 @@ PostmasterMain(int argc, char *argv[])
/* Some workers may be scheduled to start now */
maybe_start_bgworkers();
+ StartConnectionProxies();
+
status = ServerLoop();
/*
@@ -1611,6 +1703,57 @@ DetermineSleepTime(struct timeval *timeout)
}
}
+/**
+ * This function tries to estimate workload of proxy.
+ * We have a lot of information about proxy state in ProxyState array:
+ * total number of clients, SSL clients, backends, traffic, number of transactions,...
+ * So in principle it is possible to implement much more sophisticated evaluation function,
+ * but right now we take in account only number of clients and SSL connections (which requires much more CPU)
+ */
+static uint64
+GetConnectionProxyWorkload(int id)
+{
+ return ProxyState[id].n_clients + ProxyState[id].n_ssl_clients*3;
+}
+
+/**
+ * Choose connection pool for this session.
+ * Right now sessions can not be moved between pools (in principle it is not so difficult to implement it),
+ * so to support order balancing we should do dome smart work here.
+ */
+static ConnectionProxy*
+SelectConnectionProxy(void)
+{
+ int i;
+ uint64 min_workload;
+ int least_loaded_proxy;
+
+ switch (SessionSchedule)
+ {
+ case SESSION_SCHED_ROUND_ROBIN:
+ return &ConnectionProxies[CurrentConnectionProxy++ % ConnectionProxiesNumber];
+ case SESSION_SCHED_RANDOM:
+ return &ConnectionProxies[random() % ConnectionProxiesNumber];
+ case SESSION_SCHED_LOAD_BALANCING:
+ min_workload = GetConnectionProxyWorkload(0);
+ least_loaded_proxy = 0;
+ for (i = 1; i < ConnectionProxiesNumber; i++)
+ {
+ int workload = GetConnectionProxyWorkload(i);
+ if (workload < min_workload)
+ {
+ min_workload = workload;
+ least_loaded_proxy = i;
+ }
+ }
+ return &ConnectionProxies[least_loaded_proxy];
+ default:
+ Assert(false);
+ }
+ return NULL;
+}
+
+
/*
* Main idle loop of postmaster
*
@@ -1701,8 +1844,18 @@ ServerLoop(void)
port = ConnCreate(ListenSocket[i]);
if (port)
{
- BackendStartup(port);
-
+ if (ConnectionProxies && ListenPort[i] == ProxyPortNumber)
+ {
+ ConnectionProxy* proxy = SelectConnectionProxy();
+ if (pg_send_sock(proxy->socks[0], port->sock, proxy->pid) < 0)
+ {
+ elog(LOG, "could not send socket to connection pool: %m");
+ }
+ }
+ else
+ {
+ BackendStartup(port, NULL);
+ }
/*
* We no longer need the open socket or port structure
* in this process
@@ -1899,8 +2052,6 @@ ProcessStartupPacket(Port *port, bool secure_done)
{
int32 len;
void *buf;
- ProtocolVersion proto;
- MemoryContext oldcontext;
pq_startmsgread();
@@ -1967,6 +2118,18 @@ ProcessStartupPacket(Port *port, bool secure_done)
}
pq_endmsgread();
+ return ParseStartupPacket(port, TopMemoryContext, buf, len, secure_done);
+}
+
+int
+ParseStartupPacket(Port *port, MemoryContext memctx, void* buf, int len, bool secure_done)
+{
+ ProtocolVersion proto;
+ MemoryContext oldcontext;
+
+ am_walsender = false;
+ am_db_walsender = false;
+
/*
* The first field is either a protocol version number or a special
* request code.
@@ -2067,7 +2230,7 @@ retry1:
* not worry about leaking this storage on failure, since we aren't in the
* postmaster process anymore.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ oldcontext = MemoryContextSwitchTo(memctx);
if (PG_PROTOCOL_MAJOR(proto) >= 3)
{
@@ -2739,6 +2902,8 @@ pmdie(SIGNAL_ARGS)
if (WalWriterPID != 0)
signal_child(WalWriterPID, SIGTERM);
+ StopConnectionProxies(SIGTERM);
+
/*
* If we're in recovery, we can't kill the startup process
* right away, because at present doing so does not release
@@ -2816,6 +2981,9 @@ pmdie(SIGNAL_ARGS)
/* and the walwriter too */
if (WalWriterPID != 0)
signal_child(WalWriterPID, SIGTERM);
+
+ StopConnectionProxies(SIGTERM);
+
pmState = PM_WAIT_BACKENDS;
}
@@ -4041,6 +4209,7 @@ TerminateChildren(int signal)
signal_child(PgArchPID, signal);
if (PgStatPID != 0)
signal_child(PgStatPID, signal);
+ StopConnectionProxies(signal);
}
/*
@@ -4050,8 +4219,8 @@ TerminateChildren(int signal)
*
* Note: if you change this code, also consider StartAutovacuumWorker.
*/
-static int
-BackendStartup(Port *port)
+int
+BackendStartup(Port *port, int* backend_pid)
{
Backend *bn; /* for backend cleanup */
pid_t pid;
@@ -4155,6 +4324,8 @@ BackendStartup(Port *port)
if (!bn->dead_end)
ShmemBackendArrayAdd(bn);
#endif
+ if (backend_pid)
+ *backend_pid = pid;
return STATUS_OK;
}
@@ -4851,6 +5022,7 @@ SubPostmasterMain(int argc, char *argv[])
if (strcmp(argv[1], "--forkbackend") == 0 ||
strcmp(argv[1], "--forkavlauncher") == 0 ||
strcmp(argv[1], "--forkavworker") == 0 ||
+ strcmp(argv[1], "--forkproxy") == 0 ||
strcmp(argv[1], "--forkboot") == 0 ||
strncmp(argv[1], "--forkbgworker=", 15) == 0)
PGSharedMemoryReAttach();
@@ -4991,6 +5163,19 @@ SubPostmasterMain(int argc, char *argv[])
AutoVacWorkerMain(argc - 2, argv + 2); /* does not return */
}
+ if (strcmp(argv[1], "--forkproxy") == 0)
+ {
+ /* Restore basic shared memory pointers */
+ InitShmemAccess(UsedShmemSegAddr);
+
+ /* Need a PGPROC to run CreateSharedMemoryAndSemaphores */
+ InitProcess();
+
+ /* Attach process to shared data structures */
+ CreateSharedMemoryAndSemaphores(0);
+
+ ConnectionProxyMain(argc - 2, argv + 2); /* does not return */
+ }
if (strncmp(argv[1], "--forkbgworker=", 15) == 0)
{
int shmem_slot;
@@ -5059,7 +5244,6 @@ ExitPostmaster(int status)
errmsg_internal("postmaster became multithreaded"),
errdetail("Please report this to .")));
#endif
-
/* should cleanup shared memory and kill all backends */
/*
@@ -5525,6 +5709,74 @@ StartAutovacuumWorker(void)
}
}
+/*
+ * StartProxyWorker
+ * Start an proxy worker process.
+ *
+ * This function is here because it enters the resulting PID into the
+ * postmaster's private backends list.
+ *
+ * NB -- this code very roughly matches BackendStartup.
+ */
+static void
+StartProxyWorker(int id)
+{
+ Backend *bn;
+ int pid;
+
+ /*
+ * Compute the cancel key that will be assigned to this session. We
+ * probably don't need cancel keys for autovac workers, but we'd
+ * better have something random in the field to prevent unfriendly
+ * people from sending cancels to them.
+ */
+ if (!RandomCancelKey(&MyCancelKey))
+ {
+ ereport(LOG,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("could not generate random cancel key")));
+ return ;
+ }
+ bn = (Backend *) malloc(sizeof(Backend));
+ if (bn)
+ {
+ bn->cancel_key = MyCancelKey;
+
+ /* Autovac workers are not dead_end and need a child slot */
+ bn->dead_end = false;
+ bn->child_slot = MyPMChildSlot = AssignPostmasterChildSlot();
+ bn->bgworker_notify = false;
+
+ MyProxyId = id;
+ MyProxySocket = ConnectionProxies[id].socks[1];
+ pid = ConnectionProxyStart();
+ if (pid > 0)
+ {
+ bn->pid = pid;
+ bn->bkend_type = BACKEND_TYPE_BGWORKER;
+ dlist_push_head(&BackendList, &bn->elem);
+#ifdef EXEC_BACKEND
+ ShmemBackendArrayAdd(bn);
+#endif
+ /* all OK */
+ ConnectionProxies[id].pid = pid;
+ ProxyState[id].pid = pid;
+ return;
+ }
+
+ /*
+ * fork failed, fall through to report -- actual error message was
+ * logged by ConnectionProxyStart
+ */
+ (void) ReleasePostmasterChildSlot(bn->child_slot);
+ free(bn);
+ }
+ else
+ ereport(LOG,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+}
+
/*
* MaybeStartWalReceiver
* Start the WAL receiver process, if not running and our state allows.
@@ -6116,6 +6368,10 @@ save_backend_variables(BackendParameters *param, Port *port,
strlcpy(param->ExtraOptions, ExtraOptions, MAXPGPATH);
+ if (!write_inheritable_socket(¶m->proxySocket, MyProxySocket, childPid))
+ return false;
+ param->proxyId = MyProxyId;
+
return true;
}
@@ -6347,6 +6603,9 @@ restore_backend_variables(BackendParameters *param, Port *port)
strlcpy(pkglib_path, param->pkglib_path, MAXPGPATH);
strlcpy(ExtraOptions, param->ExtraOptions, MAXPGPATH);
+
+ read_inheritable_socket(&MyProxySocket, ¶m->proxySocket);
+ MyProxyId = param->proxyId;
}
diff --git a/src/backend/postmaster/proxy.c b/src/backend/postmaster/proxy.c
new file mode 100644
index 0000000000..1531bd7554
--- /dev/null
+++ b/src/backend/postmaster/proxy.c
@@ -0,0 +1,1061 @@
+#include
+#include
+
+#include "postgres.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/postmaster.h"
+#include "postmaster/proxy.h"
+#include "postmaster/fork_process.h"
+#include "access/htup_details.h"
+#include "replication/walsender.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "libpq/libpq.h"
+#include "libpq/libpq-be.h"
+#include "libpq/pqsignal.h"
+#include "tcop/tcopprot.h"
+#include "utils/timeout.h"
+#include "utils/ps_status.h"
+#include "../interfaces/libpq/libpq-fe.h"
+#include "../interfaces/libpq/libpq-int.h"
+
+#define INIT_BUF_SIZE (64*1024)
+#define MAX_READY_EVENTS 128
+#define DB_HASH_SIZE 101
+#define PROXY_WAIT_TIMEOUT 1000 /* 1 second */
+
+struct SessionPool;
+struct Proxy;
+
+typedef struct
+{
+ char database[NAMEDATALEN];
+ char username[NAMEDATALEN];
+}
+SessionPoolKey;
+
+/*
+ * Channels represent both clients and backends
+ */
+typedef struct Channel
+{
+ char* buf;
+ int rx_pos;
+ int tx_pos;
+ int tx_size;
+ int buf_size;
+ int event_pos; /* Position of wait event returned by AddWaitEventToSet */
+
+ Port* client_port; /* Not null for client, null for server */
+
+ pgsocket backend_socket;
+ PGPROC* backend_proc;
+ int backend_pid;
+ bool backend_is_tainted; /* client changes session context */
+ bool backend_is_ready; /* ready for query */
+ bool is_interrupted; /* client interrupts query execution */
+ bool is_disconnected; /* connection is lost */
+ bool write_pending; /* write request is pending: emulate epoll EPOLLET (edge-triggered) flag */
+ bool read_pending; /* read request is pending: emulate epoll EPOLLET (edge-triggered) flag */
+ /* We need to save startup packet response to be able to send it to new connection */
+ int handshake_response_size;
+ char* handshake_response;
+
+ struct Channel* peer;
+ struct Channel* next;
+ struct Proxy* proxy;
+ struct SessionPool* pool;
+}
+Channel;
+
+/*
+ * Control structure for connection proxies (several proxy workers can be launched and each has it sown proxy instance).
+ * Proxy contains hash of session pools for reach role/dbname combination.
+ */
+typedef struct Proxy
+{
+ MemoryContext memctx; /* Memory context for this proxy (used only in single thread) */
+ MemoryContext tmpctx; /* Temporary memory context used for parsing startup packet */
+ WaitEventSet* wait_events; /* Set of socket descriptors of backends and clients socket descriptors */
+ HTAB* pools; /* Session pool map with dbname/role used as a key */
+ int n_accepted_connections; /* Number of accepted, but not yet established connections
+ * (startup packet is not received and db/role are not known) */
+ int max_backends; /* Maximal number of backends per database */
+ bool shutdown; /* Shutdown flag */
+ Channel* hangout; /* List of disconnected backends */
+ ConnectionProxyState* state; /* State of proxy */
+} Proxy;
+
+/*
+ * Connection pool to particular role/dbname
+ */
+typedef struct SessionPool
+{
+ SessionPoolKey key;
+ Channel* idle_backends; /* List of idle clients */
+ Channel* pending_clients; /* List of clients waiting for free backend */
+ Proxy* proxy; /* Owner of this pool */
+ int n_launched_backends; /* Total number of launched backends */
+ int n_idle_backends; /* Number of backends in idle state */
+ int n_connected_clients; /* Total number of connected clients */
+ int n_idle_clients; /* Number of clients in idle state */
+ int n_pending_clients; /* Number of clients waiting for free backend */
+}
+SessionPool;
+
+static void channel_remove(Channel* chan);
+static Channel* backend_start(SessionPool* pool, Port* client_port);
+static bool channel_read(Channel* chan);
+static bool channel_write(Channel* chan, bool synchronous);
+
+/*
+ * #define ELOG(severity, fmt,...) elog(severity, "PROXY: " fmt, ## __VA_ARGS__)
+ */
+#define ELOG(severity,fmt,...)
+//#define ELOG(severity, fmt,...) elog(severity, "PROXY: " fmt, ## __VA_ARGS__)
+
+static Proxy* proxy;
+int MyProxyId;
+pgsocket MyProxySocket;
+ConnectionProxyState* ProxyState;
+
+/**
+ * Backend is ready for next command outside transaction block (idle state).
+ * Now if backend is not tainted it is possible to schedule some other client to this backend.
+ */
+static bool
+backend_reschedule(Channel* chan)
+{
+ chan->backend_is_ready = false;
+ if (chan->backend_proc == NULL) /* Lazy resolving of PGPROC entry */
+ {
+ Assert(chan->backend_pid != 0);
+ chan->backend_proc = BackendPidGetProc(chan->backend_pid);
+ Assert(chan->backend_proc); /* If backend completes execution of some query, then it has definitely registered itself in procarray */
+ }
+ if (!chan->backend_proc->is_tainted) /* If backend is not storing some session context */
+ {
+ Channel* pending = chan->pool->pending_clients;
+ Assert(!chan->backend_is_tainted);
+ if (chan->peer)
+ chan->peer->peer = NULL;
+ chan->pool->n_idle_clients += 1;
+ if (pending)
+ {
+ /* Has pending clients: serve one of them */
+ ELOG(LOG, "Backed %d is reassigned to client %p", chan->backend_pid, pending);
+ chan->pool->pending_clients = pending->next;
+ Assert(chan != pending);
+ chan->peer = pending;
+ pending->peer = chan;
+ chan->pool->n_pending_clients -= 1;
+ if (pending->tx_size == 0) /* new client has sent startup packet and we now need to send handshake response */
+ {
+ Assert(chan->handshake_response != NULL); /* backend already sent handshake response */
+ Assert(chan->handshake_response_size < chan->buf_size);
+ memcpy(chan->buf, chan->handshake_response, chan->handshake_response_size);
+ chan->rx_pos = chan->tx_size = chan->handshake_response_size;
+ ELOG(LOG, "Simulate response for startup packet to client %p", pending);
+ chan->backend_is_ready = true;
+ return channel_write(pending, false);
+ }
+ else
+ {
+ ELOG(LOG, "Try to send pending request from client %p to backend %p (pid %d)", pending, chan, chan->backend_pid);
+ Assert(pending->tx_pos == 0 && pending->rx_pos >= pending->tx_size);
+ return channel_write(chan, false); /* Send pending request to backend */
+ }
+ }
+ else /* return backend to the list of idle backends */
+ {
+ ELOG(LOG, "Backed %d is idle", chan->backend_pid);
+ Assert(!chan->client_port);
+ chan->next = chan->pool->idle_backends;
+ chan->pool->idle_backends = chan;
+ chan->pool->n_idle_backends += 1;
+ chan->peer = NULL;
+ }
+ }
+ else if (!chan->backend_is_tainted) /* if it was not marked as tainted before... */
+ {
+ chan->backend_is_tainted = true;
+ chan->proxy->state->n_dedicated_backends += 1;
+ }
+ return true;
+}
+
+/**
+ * Parse client's startup packet and assign client to proper connection pool based on dbname/role
+ */
+static bool
+client_connect(Channel* chan, int startup_packet_size)
+{
+ bool found;
+ SessionPoolKey key;
+ char* startup_packet = chan->buf;
+
+ Assert(chan->client_port);
+
+ /* parse startup packet in tmpctx memory context and reset it when it is not needed any more */
+ MemoryContextReset(chan->proxy->tmpctx);
+ MemoryContextSwitchTo(chan->proxy->tmpctx);
+
+ /* Associate libpq with client's port */
+ MyProcPort = chan->client_port;
+ pq_init();
+
+ if (ParseStartupPacket(chan->client_port, chan->proxy->tmpctx, startup_packet+4, startup_packet_size-4, false) != STATUS_OK) /* skip packet size */
+ {
+ MyProcPort = NULL;
+ elog(WARNING, "Failed to parse startup packet for client %p", chan);
+ return false;
+ }
+ MyProcPort = NULL;
+ if (am_walsender)
+ {
+ elog(WARNING, "WAL sender should not be connected through proxy");
+ return false;
+ }
+
+ chan->proxy->state->n_ssl_clients += chan->client_port->ssl_in_use;
+ pg_set_noblock(chan->client_port->sock); /* SSL handshake may switch socket to blocking mode */
+ memset(&key, 0, sizeof(key));
+ strlcpy(key.database, chan->client_port->database_name, NAMEDATALEN);
+ strlcpy(key.username, chan->client_port->user_name, NAMEDATALEN);
+
+ ELOG(LOG, "Client %p connects to %s/%s", chan, key.database, key.username);
+
+ chan->pool = (SessionPool*)hash_search(chan->proxy->pools, &key, HASH_ENTER, &found);
+ if (!found)
+ {
+ /* First connection to this role/dbname */
+ chan->proxy->state->n_pools += 1;
+ memset((char*)chan->pool + sizeof(SessionPoolKey), 0, sizeof(SessionPool) - sizeof(SessionPoolKey));
+ }
+ chan->pool->proxy = chan->proxy;
+ chan->pool->n_connected_clients += 1;
+ chan->pool->n_idle_clients += 1;
+ chan->proxy->n_accepted_connections -= 1;
+ return true;
+}
+
+/*
+ * Attach client to backend. Return true if new backend is attached, false otherwise.
+ */
+static bool
+client_attach(Channel* chan)
+{
+ Channel* idle_backend = chan->pool->idle_backends;
+ chan->pool->n_idle_clients -= 1;
+ if (idle_backend)
+ {
+ /* has some idle backend */
+ Assert(!idle_backend->backend_is_tainted && !idle_backend->client_port);
+ Assert(chan != idle_backend);
+ chan->peer = idle_backend;
+ idle_backend->peer = chan;
+ chan->pool->idle_backends = idle_backend->next;
+ chan->pool->n_idle_backends -= 1;
+ ELOG(LOG, "Attach client %p to backend %p (pid %d)", chan, idle_backend, idle_backend->backend_pid);
+ }
+ else /* all backends are busy */
+ {
+ if (chan->pool->n_launched_backends < chan->proxy->max_backends)
+ {
+ /* Try to start new backend */
+ idle_backend = backend_start(chan->pool, chan->client_port);
+ if (idle_backend != NULL)
+ {
+ ELOG(LOG, "Start new backend %p (pid %d) for client %p",
+ idle_backend, idle_backend->backend_pid, chan);
+ Assert(chan != idle_backend);
+ chan->peer = idle_backend;
+ idle_backend->peer = chan;
+ return true;
+ }
+ }
+ /* Postpone handshake until some backend is available */
+ ELOG(LOG, "Client %p is waiting for available backends", chan);
+ chan->next = chan->pool->pending_clients;
+ chan->pool->pending_clients = chan;
+ chan->pool->n_pending_clients += 1;
+ }
+ return false;
+}
+
+/*
+ * Handle communication failure for this channel.
+ * It is not possible to remove channel immediately because it can be triggered by other epoll events.
+ * So link all channels in L1 list for pending delete.
+ */
+static void
+channel_hangout(Channel* chan, char const* op)
+{
+ Channel** ipp;
+ Channel* peer = chan->peer;
+ if (chan->is_disconnected)
+ return;
+
+ if (chan->client_port) {
+ ELOG(LOG, "Hangout client %p due to %s error: %m", chan, op);
+ for (ipp = &chan->pool->pending_clients; *ipp != NULL; ipp = &(*ipp)->next) {
+ if (*ipp == chan) {
+ *ipp = chan->next;
+ chan->pool->n_pending_clients -= 1;
+ break;
+ }
+ }
+ } else {
+ ELOG(LOG, "Hangout backend %p (pid %d) due to %s error: %m", chan, chan->backend_pid, op);
+ for (ipp = &chan->pool->idle_backends; *ipp != NULL; ipp = &(*ipp)->next) {
+ if (*ipp == chan) {
+ *ipp = chan->next;
+ chan->pool->n_idle_backends -= 1;
+ break;
+ }
+ }
+ }
+ if (peer)
+ {
+ peer->peer = NULL;
+ chan->peer = NULL;
+ }
+ chan->backend_is_ready = false;
+
+ if (chan->client_port && peer) /* If it is client connected to backend. */
+ {
+ if (!chan->is_interrupted) /* Client didn't sent 'X' command, so do it for him. */
+ {
+ ELOG(LOG, "Send terminate command to backend %p (pid %d)", peer, peer->backend_pid);
+ peer->is_interrupted = true; /* interrupted flags makes channel_write to send 'X' message */
+ channel_write(peer, false);
+ return;
+ }
+ else if (!peer->is_interrupted)
+ {
+ /* Client already sent 'X' command, so we can safely reschedule backend to some other client session */
+ backend_reschedule(peer);
+ }
+ }
+ chan->next = chan->proxy->hangout;
+ chan->proxy->hangout = chan;
+ chan->is_disconnected = true;
+}
+
+/*
+ * Try to write data to the socket.
+ */
+static ssize_t
+socket_write(Channel* chan, char const* buf, size_t size)
+{
+ ssize_t rc;
+#ifdef USE_SSL
+ int waitfor = 0;
+ if (chan->client_port && chan->client_port->ssl_in_use)
+ rc = be_tls_write(chan->client_port, (char*)buf, size, &waitfor);
+ else
+#endif
+ rc = chan->client_port
+ ? secure_raw_write(chan->client_port, buf, size)
+ : send(chan->backend_socket, buf, size, 0);
+ if (rc == 0 || (rc < 0 && (errno != EAGAIN && errno != EWOULDBLOCK)))
+ {
+ channel_hangout(chan, "write");
+ }
+ else if (rc < 0)
+ {
+ /* do not accept more read events while write request is pending */
+ ModifyWaitEvent(chan->proxy->wait_events, chan->event_pos, WL_SOCKET_WRITEABLE|WL_SOCKET_EDGE, NULL);
+ chan->write_pending = true;
+ }
+ else if (chan->write_pending)
+ {
+ /* resume accepting read events */
+ ModifyWaitEvent(chan->proxy->wait_events, chan->event_pos, WL_SOCKET_READABLE|WL_SOCKET_EDGE, NULL);
+ chan->write_pending = false;
+ }
+ return rc;
+}
+
+
+/*
+ * Try to send some data to the channel.
+ * Data is located in the peer buffer. Because of using edge-triggered mode we have have to use non-blocking IO
+ * and try to write all available data. Once write is completed we should try to read more data from source socket.
+ * "synchronous" flag is used to avoid infinite recursion or reads-writers.
+ * Returns true if there is nothing to do or operation is successfully completed, false in case of error
+ * or socket buffer is full.
+ */
+static bool
+channel_write(Channel* chan, bool synchronous)
+{
+ Channel* peer = chan->peer;
+ if (!chan->client_port && chan->is_interrupted)
+ {
+ /* Send terminate command to the backend. */
+ char const terminate[] = {'X', 0, 0, 0, 4};
+ if (socket_write(chan, terminate, sizeof(terminate)) <= 0)
+ return false;
+ channel_hangout(chan, "terminate");
+ return true;
+ }
+ if (peer == NULL)
+ return false;
+
+ while (peer->tx_pos < peer->tx_size) /* has something to write */
+ {
+ ssize_t rc = socket_write(chan, peer->buf + peer->tx_pos, peer->tx_size - peer->tx_pos);
+ ELOG(LOG, "%p: write %d tx_pos=%d, tx_size=%d: %m", chan, (int)rc, peer->tx_pos, peer->tx_size);
+ if (rc <= 0)
+ return false;
+ if (chan->client_port)
+ chan->proxy->state->tx_bytes += rc;
+ else
+ chan->proxy->state->rx_bytes += rc;
+ peer->tx_pos += rc;
+ }
+ if (peer->tx_size != 0)
+ {
+ /* Copy rest of received data to the beginning of the buffer */
+ chan->backend_is_ready = false;
+ Assert(peer->rx_pos >= peer->tx_size);
+ memmove(peer->buf, peer->buf + peer->tx_size, peer->rx_pos - peer->tx_size);
+ peer->rx_pos -= peer->tx_size;
+ peer->tx_pos = peer->tx_size = 0;
+ if (peer->backend_is_ready) {
+ Assert(peer->rx_pos == 0);
+ backend_reschedule(peer);
+ return true;
+ }
+ }
+ return synchronous || channel_read(peer); /* write is not invoked from read */
+}
+
+/*
+ * Try to read more data from the channel and send it to the peer.
+ */
+static bool
+channel_read(Channel* chan)
+{
+ int msg_start;
+ while (chan->tx_size == 0) /* there is no pending write op */
+ {
+ ssize_t rc;
+#ifdef USE_SSL
+ int waitfor = 0;
+ if (chan->client_port && chan->client_port->ssl_in_use)
+ rc = be_tls_read(chan->client_port, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos, &waitfor);
+ else
+#endif
+ rc = chan->client_port
+ ? secure_raw_read(chan->client_port, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos)
+ : recv(chan->backend_socket, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos, 0);
+
+ ELOG(LOG, "%p: read %d: %m", chan, (int)rc);
+ if (rc <= 0)
+ {
+ if (rc == 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
+ channel_hangout(chan, "read");
+ else
+ {
+ /* do not accept more write events while read request is pending */
+ ModifyWaitEvent(chan->proxy->wait_events, chan->event_pos, WL_SOCKET_READABLE|WL_SOCKET_EDGE, NULL);
+ chan->read_pending = true;
+ }
+ return false; /* wait for more data */
+ }
+ else if (chan->read_pending)
+ {
+ /* resume accepting all events */
+ ModifyWaitEvent(chan->proxy->wait_events, chan->event_pos, WL_SOCKET_READABLE|WL_SOCKET_WRITEABLE|WL_SOCKET_EDGE, NULL);
+ chan->read_pending = false;
+ }
+ chan->rx_pos += rc;
+ msg_start = 0;
+
+ /* Loop through all received messages */
+ while (chan->rx_pos - msg_start >= 5) /* has message code + length */
+ {
+ int msg_len;
+ bool handshake = false;
+ if (chan->pool == NULL) /* process startup packet */
+ {
+ Assert(msg_start == 0);
+ memcpy(&msg_len, chan->buf + msg_start, sizeof(msg_len));
+ msg_len = ntohl(msg_len);
+ handshake = true;
+ }
+ else
+ {
+ ELOG(LOG, "%p receive message %c", chan, chan->buf[msg_start]);
+ memcpy(&msg_len, chan->buf + msg_start + 1, sizeof(msg_len));
+ msg_len = ntohl(msg_len) + 1;
+ }
+ if (msg_start + msg_len > chan->buf_size)
+ {
+ /* Reallocate buffer to fit complete message body */
+ chan->buf_size = msg_start + msg_len;
+ chan->buf = realloc(chan->buf, chan->buf_size);
+ }
+ if (chan->rx_pos - msg_start >= msg_len) /* Message is completely fetched */
+ {
+ int response_size = msg_start + msg_len;
+ if (chan->pool == NULL) /* receive startup packet */
+ {
+ Assert(chan->client_port);
+ if (!client_connect(chan, msg_len))
+ {
+ /* Some trouble with processing startup packet */
+ chan->is_disconnected = true;
+ channel_remove(chan);
+ return false;
+ }
+ }
+ else if (!chan->client_port /* Message from backend */
+ && chan->buf[msg_start] == 'Z' /* Ready for query */
+ && chan->buf[msg_start+5] == 'I') /* Transaction block status is idle */
+ {
+ Assert(chan->rx_pos - msg_start == msg_len); /* Should be last message */
+ chan->backend_is_ready = true; /* Backend is ready for query */
+ chan->proxy->state->n_transactions += 1;
+ }
+ else if (chan->client_port /* Message from client */
+ && chan->buf[msg_start] == 'X') /* Terminate message */
+ {
+ chan->is_interrupted = true;
+ if (chan->peer == NULL || !chan->peer->backend_is_tainted)
+ {
+ /* Skip terminate message to idle and non-tainted backends */
+ channel_hangout(chan, "terminate");
+ return false;
+ }
+ }
+ if (chan->peer == NULL) /* client is not yet connected to backend */
+ {
+ if (!chan->client_port)
+ {
+ /* We are not expecting messages from idle backend. Assume that it some error or shutdown. */
+ channel_hangout(chan, "idle");
+ return false;
+ }
+ client_attach(chan);
+ if (handshake) /* Send handshake response to the client */
+ {
+ /* If we attach new client to the existed backend, then we need to send handshake response to the client */
+ Channel* backend = chan->peer;
+ Assert(chan->rx_pos == msg_len && msg_start == 0);
+ chan->rx_pos = 0; /* Skip startup packet */
+ if (backend != NULL) /* Backend was assigned */
+ {
+ Assert(backend->handshake_response != NULL); /* backend has already sent handshake responses */
+ Assert(backend->handshake_response_size < backend->buf_size);
+ memcpy(backend->buf, backend->handshake_response, backend->handshake_response_size);
+ backend->rx_pos = backend->tx_size = backend->handshake_response_size;
+ backend->backend_is_ready = true;
+ return channel_write(chan, false);
+ }
+ else
+ {
+ /* Handshake response will be send to client later when backend is assigned */
+ return false;
+ }
+ }
+ else if (chan->peer == NULL) /* Backend was not assigned */
+ {
+ chan->tx_size = response_size; /* query will be send later once backend is assigned */
+ return false;
+ }
+ }
+ msg_start += msg_len;
+ }
+ else break; /* Incomplete message. */
+ }
+ if (msg_start != 0)
+ {
+ /* Has some complete messages to send to peer */
+ Assert(chan->tx_pos == 0);
+ Assert(chan->rx_pos >= msg_start);
+ chan->tx_size = msg_start;
+ if (!channel_write(chan->peer, true))
+ return false;
+ }
+ /* If backend is out of transaction, then reschedule it */
+ if (chan->backend_is_ready)
+ return backend_reschedule(chan);
+ }
+ return true;
+}
+
+/*
+ * Create new channel.
+ */
+static Channel*
+channel_create(Proxy* proxy)
+{
+ Channel* chan = (Channel*)calloc(1, sizeof(Channel));
+ chan->proxy = proxy;
+ chan->buf = malloc(INIT_BUF_SIZE);
+ chan->buf_size = INIT_BUF_SIZE;
+ chan->tx_pos = chan->rx_pos = chan->tx_size = 0;
+ return chan;
+}
+
+/*
+ * Register new channel in wait event set.
+ */
+static bool
+channel_register(Proxy* proxy, Channel* chan)
+{
+ pgsocket sock = chan->client_port ? chan->client_port->sock : chan->backend_socket;
+ /* Using edge epoll mode requires non-blocking sockets */
+ pg_set_noblock(sock);
+ chan->event_pos =
+ AddWaitEventToSet(proxy->wait_events, WL_SOCKET_READABLE|WL_SOCKET_WRITEABLE|WL_SOCKET_EDGE,
+ sock, NULL, chan);
+ if (chan->event_pos < 0)
+ {
+ elog(WARNING, "PROXY: Failed to add new client - too much sessions: %d clients, %d backends. "
+ "Try to increase 'max_sessions' configuration parameter.",
+ proxy->state->n_clients, proxy->state->n_backends);
+ return false;
+ }
+ return true;
+}
+
+/*
+ * Start new backend for particular pool associated with dbname/role combination.
+ * Backend is forked using BackendStartup function.
+ */
+static Channel*
+backend_start(SessionPool* pool, Port* client_port)
+{
+ Channel* chan;
+ char postmaster_port[8];
+ char const* keywords[] = {"port","dbname","user","sslmode","application_name",NULL};
+ char const* values[] = {postmaster_port,pool->key.database,pool->key.username,"disable","pool_worker",NULL};
+ PGconn* conn;
+ char* msg;
+ int int32_buf;
+ int msg_len;
+ static bool libpqconn_loaded;
+
+ if (!libpqconn_loaded)
+ {
+ /* We need libpq library to be able to establish connections to pool workers.
+ * This library can not be linked statically, so load it on demand. */
+ load_file("libpqconn", false);
+ libpqconn_loaded = true;
+ }
+ pg_itoa(PostPortNumber, postmaster_port);
+ conn = LibpqConnectdbParams(keywords, values);
+ if (!conn)
+ return NULL;
+
+ chan = channel_create(pool->proxy);
+ chan->pool = pool;
+ chan->backend_socket = conn->sock;
+ /* Using edge epoll mode requires non-blocking sockets */
+ pg_set_noblock(conn->sock);
+
+ /* Save handshake response */
+ chan->handshake_response_size = conn->inEnd;
+ chan->handshake_response = malloc(chan->handshake_response_size);
+ memcpy(chan->handshake_response, conn->inBuffer, chan->handshake_response_size);
+
+ /* Extract backend pid */
+ msg = chan->handshake_response;
+ while (*msg != 'K') /* Scan handshake response until we reach PID message */
+ {
+ memcpy(&int32_buf, ++msg, sizeof(int32_buf));
+ msg_len = ntohl(int32_buf);
+ msg += msg_len;
+ Assert(msg < chan->handshake_response + chan->handshake_response_size);
+ }
+ memcpy(&int32_buf, msg+5, sizeof(int32_buf));
+ chan->backend_pid = ntohl(int32_buf);
+
+ if (channel_register(pool->proxy, chan))
+ {
+ pool->proxy->state->n_backends += 1;
+ pool->n_launched_backends += 1;
+ }
+ else
+ {
+ /* Too much sessions, error report was already logged */
+ close(chan->backend_socket);
+ free(chan->buf);
+ free(chan);
+ chan = NULL;
+ }
+ return chan;
+}
+
+/*
+ * Add new client accepted by postmaster. This client will be assigned to concrete session pool
+ * when it's startup packet is received.
+ */
+static void
+proxy_add_client(Proxy* proxy, Port* port)
+{
+ Channel* chan = channel_create(proxy);
+ chan->client_port = port;
+ chan->backend_socket = PGINVALID_SOCKET;
+ if (channel_register(proxy, chan))
+ {
+ ELOG(LOG, "Add new client %p", chan);
+ proxy->n_accepted_connections += 1;
+ proxy->state->n_clients += 1;
+ }
+ else
+ {
+ /* Too much sessions, error report was already logged */
+ close(port->sock);
+#if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
+ free(port->gss);
+#endif
+ free(port);
+ free(chan->buf);
+ free(chan);
+ }
+}
+
+/*
+ * Perform delayed deletion of channel
+ */
+static void
+channel_remove(Channel* chan)
+{
+ Assert(chan->is_disconnected); /* should be marked as disconnected by channel_hangout */
+ DeleteWaitEventFromSet(chan->proxy->wait_events, chan->event_pos);
+ if (chan->client_port)
+ {
+ if (chan->pool)
+ chan->pool->n_connected_clients -= 1;
+ else
+ chan->proxy->n_accepted_connections -= 1;
+ chan->proxy->state->n_clients -= 1;
+ chan->proxy->state->n_ssl_clients -= chan->client_port->ssl_in_use;
+ close(chan->client_port->sock);
+ free(chan->client_port);
+ }
+ else
+ {
+ chan->proxy->state->n_backends -= 1;
+ chan->proxy->state->n_dedicated_backends -= chan->backend_is_tainted;
+ chan->pool->n_launched_backends -= 1;
+ close(chan->backend_socket);
+ free(chan->handshake_response);
+ }
+ free(chan->buf);
+ free(chan);
+}
+
+
+
+/*
+ * Create new proxy.
+ */
+static Proxy*
+proxy_create(pgsocket postmaster_socket, ConnectionProxyState* state, int max_backends)
+{
+ HASHCTL ctl;
+ Proxy* proxy = calloc(1, sizeof(Proxy));
+ proxy->memctx = AllocSetContextCreate(TopMemoryContext,
+ "Proxy",
+ ALLOCSET_DEFAULT_SIZES);
+ proxy->tmpctx = AllocSetContextCreate(proxy->memctx,
+ "Startup packet parsing context",
+ ALLOCSET_DEFAULT_SIZES);
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(SessionPoolKey);
+ ctl.entrysize = sizeof(SessionPool);
+ ctl.hcxt = proxy->memctx;
+ proxy->pools = hash_create("Pool by database and user", DB_HASH_SIZE,
+ &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ /* We need events both for clients and backends so multiply MaxConnection by two */
+ proxy->wait_events = CreateWaitEventSet(TopMemoryContext, MaxSessions*2);
+ AddWaitEventToSet(proxy->wait_events, WL_SOCKET_READABLE,
+ postmaster_socket, NULL, NULL);
+ proxy->max_backends = max_backends;
+ proxy->state = state;
+ return proxy;
+}
+
+/*
+ * Main proxy loop
+ */
+static void
+proxy_loop(Proxy* proxy)
+{
+ int i, n_ready;
+ WaitEvent ready[MAX_READY_EVENTS];
+ Channel *chan, *next;
+
+ /* Main loop */
+ while (!proxy->shutdown)
+ {
+ /* Use timeout to allow normal proxy shutdown */
+ n_ready = WaitEventSetWait(proxy->wait_events, PROXY_WAIT_TIMEOUT, ready, MAX_READY_EVENTS, PG_WAIT_CLIENT);
+ for (i = 0; i < n_ready; i++) {
+ chan = (Channel*)ready[i].user_data;
+ if (chan == NULL) /* new connection from postmaster */
+ {
+ Port* port = (Port*)calloc(1, sizeof(Port));
+ port->sock = pg_recv_sock(ready[i].fd);
+ if (port->sock == PGINVALID_SOCKET)
+ {
+ elog(WARNING, "Failed to receive session socket: %m");
+ free(port);
+ }
+ else
+ {
+#if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
+ port->gss = (pg_gssinfo *) calloc(1, sizeof(pg_gssinfo));
+ if (!port->gss)
+ ereport(FATAL,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+#endif
+ proxy_add_client(proxy, port);
+ }
+ }
+ else
+ {
+ if (ready[i].events & WL_SOCKET_WRITEABLE) {
+ ELOG(LOG, "Channel %p is writable", chan);
+ channel_write(chan, false);
+ }
+ if (ready[i].events & WL_SOCKET_READABLE) {
+ ELOG(LOG, "Channel %p is readable", chan);
+ channel_read(chan);
+ }
+ }
+ }
+ /*
+ * Delayed deallocation of disconnected channels.
+ * We can not delete channels immediately because of presence of peer events.
+ */
+ for (chan = proxy->hangout; chan != NULL; chan = next)
+ {
+ next = chan->next;
+ channel_remove(chan);
+ }
+ proxy->hangout = NULL;
+ }
+}
+
+/*
+ * Handle normal shutdown of Postgres instance
+ */
+static void
+proxy_handle_sigterm(SIGNAL_ARGS)
+{
+ if (proxy)
+ proxy->shutdown = true;
+}
+
+#ifdef EXEC_BACKEND
+static pid_t
+proxy_forkexec(void)
+{
+ char *av[10];
+ int ac = 0;
+
+ av[ac++] = "postgres";
+ av[ac++] = "--forkproxy";
+ av[ac++] = NULL; /* filled in by postmaster_forkexec */
+ av[ac] = NULL;
+
+ Assert(ac < lengthof(av));
+
+ return postmaster_forkexec(ac, av);
+}
+#endif
+
+NON_EXEC_STATIC void
+ConnectionProxyMain(int argc, char *argv[])
+{
+ sigjmp_buf local_sigjmp_buf;
+
+ /* Identify myself via ps */
+ init_ps_display("connection proxy", "", "", "");
+
+ SetProcessingMode(InitProcessing);
+
+ pqsignal(SIGTERM, proxy_handle_sigterm);
+ pqsignal(SIGQUIT, quickdie);
+ InitializeTimeouts(); /* establishes SIGALRM handler */
+
+ /* Early initialization */
+ BaseInit();
+
+ /*
+ * Create a per-backend PGPROC struct in shared memory, except in the
+ * EXEC_BACKEND case where this was done in SubPostmasterMain. We must do
+ * this before we can use LWLocks (and in the EXEC_BACKEND case we already
+ * had to do some stuff with LWLocks).
+ */
+#ifndef EXEC_BACKEND
+ InitProcess();
+#endif
+
+ /*
+ * If an exception is encountered, processing resumes here.
+ *
+ * See notes in postgres.c about the design of this coding.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* Prevents interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log */
+ EmitErrorReport();
+
+ /*
+ * We can now go away. Note that because we called InitProcess, a
+ * callback was registered to do ProcKill, which will clean up
+ * necessary state.
+ */
+ proc_exit(0);
+ }
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ PG_SETMASK(&UnBlockSig);
+
+ proxy = proxy_create(MyProxySocket, &ProxyState[MyProxyId], SessionPoolSize);
+ proxy_loop(proxy);
+
+ proc_exit(0);
+}
+
+/*
+ * Function for launching proxy by postmaster.
+ * This "boilerplate" code is taken from another auxiliary workers.
+ * In future it may be replaced with background worker.
+ * The main problem with background worker is how to pass socket to it and obtains its PID.
+ */
+int
+ConnectionProxyStart()
+{
+ pid_t worker_pid;
+
+#ifdef EXEC_BACKEND
+ switch ((worker_pid = proxy_forkexec()))
+#else
+ switch ((worker_pid = fork_process()))
+#endif
+ {
+ case -1:
+ ereport(LOG,
+ (errmsg("could not fork proxy worker process: %m")));
+ return 0;
+
+#ifndef EXEC_BACKEND
+ case 0:
+ /* in postmaster child ... */
+ InitPostmasterChild();
+
+ ConnectionProxyMain(0, NULL);
+ break;
+#endif
+ default:
+ elog(LOG, "Start proxy process %d", (int) worker_pid);
+ return (int) worker_pid;
+ }
+
+ /* shouldn't get here */
+ return 0;
+}
+
+/*
+ * We need some place in shared memory to provide information about proxies state.
+ */
+int ConnectionProxyShmemSize(void)
+{
+ return ConnectionProxiesNumber*sizeof(ConnectionProxyState);
+}
+
+void ConnectionProxyShmemInit(void)
+{
+ bool found;
+ ProxyState = (ConnectionProxyState*)ShmemInitStruct("connection proxy contexts",
+ ConnectionProxyShmemSize(), &found);
+ if (!found)
+ memset(ProxyState, 0, ConnectionProxyShmemSize());
+}
+
+PG_FUNCTION_INFO_V1(pg_pooler_state);
+
+typedef struct
+{
+ int proxy_id;
+ TupleDesc ret_desc;
+} PoolerStateContext;
+
+/**
+ * Return information about proxies state.
+ * This set-returning functions returns the following columns:
+ *
+ * pid - proxy process identifier
+ * n_clients - number of clients connected to proxy
+ * n_ssl_clients - number of clients using SSL protocol
+ * n_pools - number of pools (role/dbname combinations) maintained by proxy
+ * n_backends - total number of backends spawned by this proxy (including tainted)
+ * n_dedicated_backends - number of tainted backend
+ * tx_bytes - amount of data sent from backends to clients
+ * rx_bytes - amount of data sent from client to backends
+ * n_transactions - number of transaction proceeded by all backends of this proxy
+ */
+Datum pg_pooler_state(PG_FUNCTION_ARGS)
+{
+ FuncCallContext* srf_ctx;
+ MemoryContext old_context;
+ PoolerStateContext* ps_ctx;
+ HeapTuple tuple;
+ Datum values[9];
+ bool nulls[9];
+ int id;
+ int i;
+
+ if (SRF_IS_FIRSTCALL())
+ {
+ srf_ctx = SRF_FIRSTCALL_INIT();
+ old_context = MemoryContextSwitchTo(srf_ctx->multi_call_memory_ctx);
+ ps_ctx = (PoolerStateContext*)palloc(sizeof(PoolerStateContext));
+ get_call_result_type(fcinfo, NULL, &ps_ctx->ret_desc);
+ ps_ctx->proxy_id = 0;
+ srf_ctx->user_fctx = ps_ctx;
+ MemoryContextSwitchTo(old_context);
+ }
+ srf_ctx = SRF_PERCALL_SETUP();
+ ps_ctx = srf_ctx->user_fctx;
+ id = ps_ctx->proxy_id;
+ if (id == ConnectionProxiesNumber)
+ SRF_RETURN_DONE(srf_ctx);
+
+ values[0] = Int32GetDatum(ProxyState[id].pid);
+ values[1] = Int32GetDatum(ProxyState[id].n_clients);
+ values[2] = Int32GetDatum(ProxyState[id].n_ssl_clients);
+ values[3] = Int32GetDatum(ProxyState[id].n_pools);
+ values[4] = Int32GetDatum(ProxyState[id].n_backends);
+ values[5] = Int32GetDatum(ProxyState[id].n_dedicated_backends);
+ values[6] = Int64GetDatum(ProxyState[id].tx_bytes);
+ values[7] = Int64GetDatum(ProxyState[id].rx_bytes);
+ values[8] = Int64GetDatum(ProxyState[id].n_transactions);
+
+ for (i = 0; i <= 8; i++)
+ nulls[i] = false;
+
+ ps_ctx->proxy_id += 1;
+ tuple = heap_form_tuple(ps_ctx->ret_desc, values, nulls);
+ SRF_RETURN_NEXT(srf_ctx, HeapTupleGetDatum(tuple));
+}
+
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index d7d733530f..6d32d8fe8d 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -28,6 +28,7 @@
#include "postmaster/bgworker_internals.h"
#include "postmaster/bgwriter.h"
#include "postmaster/postmaster.h"
+#include "postmaster/proxy.h"
#include "replication/logicallauncher.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
@@ -150,6 +151,7 @@ CreateSharedMemoryAndSemaphores(int port)
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
+ size = add_size(size, ConnectionProxyShmemSize());
/* freeze the addin request size and include it */
addin_request_allowed = false;
@@ -255,6 +257,7 @@ CreateSharedMemoryAndSemaphores(int port)
WalSndShmemInit();
WalRcvShmemInit();
ApplyLauncherShmemInit();
+ ConnectionProxyShmemInit();
/*
* Set up other modules that need some shared memory space
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 2426cbcf8e..d2806b7399 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -77,6 +77,7 @@ struct WaitEventSet
{
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */
+ int free_events; /* L1-list of free events linked by "pos" and terminated by -1*/
/*
* Array, of nevents_space length, storing the definition of events this
@@ -137,9 +138,9 @@ static void drainSelfPipe(void);
#if defined(WAIT_USE_EPOLL)
static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
#elif defined(WAIT_USE_POLL)
-static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove);
#elif defined(WAIT_USE_WIN32)
-static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove);
#endif
static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
@@ -585,6 +586,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
set->latch = NULL;
set->nevents_space = nevents;
set->exit_on_postmaster_death = false;
+ set->free_events = -1;
#if defined(WAIT_USE_EPOLL)
#ifdef EPOLL_CLOEXEC
@@ -691,9 +693,11 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
void *user_data)
{
WaitEvent *event;
+ int free_event;
/* not enough space */
- Assert(set->nevents < set->nevents_space);
+ if (set->nevents == set->nevents_space)
+ return -1;
if (events == WL_EXIT_ON_PM_DEATH)
{
@@ -720,8 +724,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
- event = &set->events[set->nevents];
- event->pos = set->nevents++;
+ free_event = set->free_events;
+ if (free_event >= 0)
+ {
+ event = &set->events[free_event];
+ set->free_events = event->pos;
+ event->pos = free_event;
+ }
+ else
+ {
+ event = &set->events[set->nevents];
+ event->pos = set->nevents;
+ }
+ set->nevents += 1;
event->fd = fd;
event->events = events;
event->user_data = user_data;
@@ -748,14 +763,29 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
- WaitEventAdjustWin32(set, event);
+ WaitEventAdjustWin32(set, event, false);
#endif
return event->pos;
}
+/*
+ * Remove event with specified socket descriptor
+ */
+void DeleteWaitEventFromSet(WaitEventSet *set, int event_pos)
+{
+ WaitEvent *event = &set->events[event_pos];
+#if defined(WAIT_USE_EPOLL)
+ WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_POLL)
+ WaitEventAdjustPoll(set, event, true);
+#elif defined(WAIT_USE_WIN32)
+ WaitEventAdjustWin32(set, event, true);
+#endif
+}
+
/*
* Change the event mask and, in the WL_LATCH_SET case, the latch associated
* with the WaitEvent.
@@ -767,10 +797,16 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
{
WaitEvent *event;
- Assert(pos < set->nevents);
+ Assert(pos < set->nevents_space);
event = &set->events[pos];
+#if defined(WAIT_USE_EPOLL)
+ /* ModifyWaitEvent is used to emulate epoll EPOLLET (edge-triggered) flag */
+ if (events & WL_SOCKET_EDGE)
+ return;
+#endif
+
/*
* If neither the event mask nor the associated latch changes, return
* early. That's an important optimization for some sockets, where
@@ -804,9 +840,9 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
- WaitEventAdjustWin32(set, event);
+ WaitEventAdjustWin32(set, event, false);
#endif
}
@@ -844,6 +880,8 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
epoll_ev.events |= EPOLLIN;
if (event->events & WL_SOCKET_WRITEABLE)
epoll_ev.events |= EPOLLOUT;
+ if (event->events & WL_SOCKET_EDGE)
+ epoll_ev.events |= EPOLLET;
}
/*
@@ -852,21 +890,39 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
* requiring that, and actually it makes the code simpler...
*/
rc = epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev);
-
if (rc < 0)
ereport(ERROR,
(errcode_for_socket_access(),
- /* translator: %s is a syscall name, such as "poll()" */
+ /* translator: %s is a syscall name, such as "poll()" */
errmsg("%s failed: %m",
"epoll_ctl()")));
+
+ if (action == EPOLL_CTL_DEL)
+ {
+ int pos = event->pos;
+ event->fd = PGINVALID_SOCKET;
+ set->nevents -= 1;
+ event->pos = set->free_events;
+ set->free_events = pos;
+ }
}
#endif
#if defined(WAIT_USE_POLL)
static void
-WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove)
{
- struct pollfd *pollfd = &set->pollfds[event->pos];
+ int pos = event->pos;
+ struct pollfd *pollfd = &set->pollfds[pos];
+
+ if (remove)
+ {
+ set->nevents -= 1;
+ *pollfd = set->pollfds[set->nevents];
+ set->events[pos] = set->events[set->nevents];
+ event->pos = pos;
+ return;
+ }
pollfd->revents = 0;
pollfd->fd = event->fd;
@@ -897,9 +953,25 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
#if defined(WAIT_USE_WIN32)
static void
-WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove)
{
- HANDLE *handle = &set->handles[event->pos + 1];
+ int pos = event->pos;
+ HANDLE *handle = &set->handles[pos + 1];
+
+ if (remove)
+ {
+ Assert(event->fd != PGINVALID_SOCKET);
+
+ if (*handle != WSA_INVALID_EVENT)
+ WSACloseEvent(*handle);
+
+ set->nevents -= 1;
+ set->events[pos] = set->events[set->nevents];
+ *handle = set->handles[set->nevents + 1];
+ set->handles[set->nevents + 1] = WSA_INVALID_EVENT;
+ event->pos = pos;
+ return;
+ }
if (event->events == WL_LATCH_SET)
{
@@ -912,7 +984,7 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
}
else
{
- int flags = FD_CLOSE; /* always check for errors/EOF */
+ int flags = FD_CLOSE; /* always check for errors/EOF */
if (event->events & WL_SOCKET_READABLE)
flags |= FD_READ;
@@ -929,8 +1001,8 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
WSAGetLastError());
}
if (WSAEventSelect(event->fd, *handle, flags) != 0)
- elog(ERROR, "failed to set up event for socket: error code %u",
- WSAGetLastError());
+ elog(ERROR, "failed to set up event for socket %p: error code %u",
+ event->fd, WSAGetLastError());
Assert(event->fd != PGINVALID_SOCKET);
}
@@ -1336,7 +1408,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
{
if (cur_event->reset)
{
- WaitEventAdjustWin32(set, cur_event);
+ WaitEventAdjustWin32(set, cur_event, false);
cur_event->reset = false;
}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 44a59e1d4f..62ec2afd2e 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4217,6 +4217,8 @@ PostgresMain(int argc, char *argv[],
*/
if (ConfigReloadPending)
{
+ if (RestartPoolerOnReload && strcmp(application_name, "pool_worker") == 0)
+ proc_exit(0);
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c
index ffd1970f58..16ca58d9d0 100644
--- a/src/backend/utils/adt/lockfuncs.c
+++ b/src/backend/utils/adt/lockfuncs.c
@@ -18,6 +18,7 @@
#include "funcapi.h"
#include "miscadmin.h"
#include "storage/predicate_internals.h"
+#include "storage/proc.h"
#include "utils/array.h"
#include "utils/builtins.h"
@@ -658,6 +659,7 @@ pg_isolation_test_session_is_blocked(PG_FUNCTION_ARGS)
static void
PreventAdvisoryLocksInParallelMode(void)
{
+ MyProc->is_tainted = true;
if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 3bf96de256..79001ccf91 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -130,9 +130,14 @@ int max_parallel_maintenance_workers = 2;
*/
int NBuffers = 1000;
int MaxConnections = 90;
+int SessionPoolSize = 0;
+int ConnectionProxiesNumber = 1;
+int SessionSchedule = SESSION_SCHED_ROUND_ROBIN;
+
int max_worker_processes = 8;
int max_parallel_workers = 8;
int MaxBackends = 0;
+int MaxSessions = 1000;
int VacuumCostPageHit = 1; /* GUC parameters for vacuum */
int VacuumCostPageMiss = 10;
@@ -148,3 +153,4 @@ int VacuumCostBalance = 0; /* working state for vacuum */
bool VacuumCostActive = false;
double vacuum_cleanup_index_scale_factor;
+bool RestartPoolerOnReload = false;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 92c4fee8f8..65f66db8e9 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -457,6 +457,14 @@ const struct config_enum_entry ssl_protocol_versions_info[] = {
{NULL, 0, false}
};
+static const struct config_enum_entry session_schedule_options[] = {
+ {"round-robin", SESSION_SCHED_ROUND_ROBIN, false},
+ {"random", SESSION_SCHED_RANDOM, false},
+ {"load-balancing", SESSION_SCHED_LOAD_BALANCING, false},
+ {NULL, 0, false}
+};
+
+
static struct config_enum_entry shared_memory_options[] = {
#ifndef WIN32
{"sysv", SHMEM_TYPE_SYSV, false},
@@ -1285,6 +1293,16 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"restart_pooler_on_reload", PGC_SIGHUP, CONN_POOLING,
+ gettext_noop("Restart session pool workers on pg_reload_conf()."),
+ NULL,
+ },
+ &RestartPoolerOnReload,
+ false,
+ NULL, NULL, NULL
+ },
+
{
{"log_duration", PGC_SUSET, LOGGING_WHAT,
gettext_noop("Logs the duration of each completed SQL statement."),
@@ -2137,6 +2155,42 @@ static struct config_int ConfigureNamesInt[] =
check_maxconnections, NULL, NULL
},
+ {
+ /* see max_connections and max_wal_senders */
+ {"session_pool_size", PGC_POSTMASTER, CONN_POOLING,
+ gettext_noop("Sets number of backends serving client sessions."),
+ gettext_noop("If non-zero then session pooling will be used: "
+ "client connections will be redirected to one of the backends and maximal number of backends is determined by this parameter."
+ "Launched backend are never terminated even in case of no active sessions.")
+ },
+ &SessionPoolSize,
+ 10, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"connection_proxies", PGC_POSTMASTER, CONN_POOLING,
+ gettext_noop("Sets number of connection proxies."),
+ gettext_noop("Postmaster spawns separate worker process for each proxy. Postmaster scatters connections between proxies using one of scheduling policies (round-robin, random, load-balancing)."
+ "Each proxy launches its own subset of backends. So maximal number of non-tainted backends is "
+ "session_pool_size*connection_proxies*databases*roles.")
+ },
+ &ConnectionProxiesNumber,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"max_sessions", PGC_POSTMASTER, CONN_POOLING,
+ gettext_noop("Sets the maximum number of client session."),
+ gettext_noop("Maximal number of client sessions which can be handled by ont connection proxy."
+ "It can be greater than max_connections and actually be arbitrary large.")
+ },
+ &MaxSessions,
+ 1000, 1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
{
/* see max_connections */
{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
@@ -2184,6 +2238,16 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"proxy_port", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets the TCP port for the connection pooler."),
+ NULL
+ },
+ &ProxyPortNumber,
+ 6543, 1, 65535,
+ NULL, NULL, NULL
+ },
+
{
{"unix_socket_permissions", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the access permissions of the Unix-domain socket."),
@@ -4550,6 +4614,16 @@ static struct config_enum ConfigureNamesEnum[] =
NULL, NULL, NULL
},
+ {
+ {"session_schedule", PGC_POSTMASTER, RESOURCES_MEM,
+ gettext_noop("Session schedule policy for connection pool."),
+ NULL
+ },
+ &SessionSchedule,
+ SESSION_SCHED_ROUND_ROBIN, session_schedule_options,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL
@@ -8145,6 +8219,7 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot set parameters during a parallel operation")));
+ MyProc->is_tainted = true;
switch (stmt->kind)
{
diff --git a/src/backend/utils/mmgr/mcxt.c b/src/backend/utils/mmgr/mcxt.c
index b07be12236..dac74a272d 100644
--- a/src/backend/utils/mmgr/mcxt.c
+++ b/src/backend/utils/mmgr/mcxt.c
@@ -506,7 +506,7 @@ MemoryContextStatsDetail(MemoryContext context, int max_children)
* *totals (if given).
*/
static void
-MemoryContextStatsInternal(MemoryContext context, int level,
+ MemoryContextStatsInternal(MemoryContext context, int level,
bool print, int max_children,
MemoryContextCounters *totals)
{
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 87335248a0..5f528c1d72 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10677,4 +10677,11 @@
proname => 'pg_partition_root', prorettype => 'regclass',
proargtypes => 'regclass', prosrc => 'pg_partition_root' },
+# builin connection pool
+{ oid => '3435', descr => 'information about connection pooler proxies workload',
+ proname => 'pg_pooler_state', prorows => '1000', proretset => 't',
+ provolatile => 'v', prorettype => 'record', proargtypes => '',
+ proallargtypes => '{int4,int4,int4,int4,int4,int4,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o,o,o}',
+ proargnames => '{pid,n_clients,n_ssl_clients,n_pools,n_backends,n_dedicated_backends,tx_bytes,rx_bytes,n_transactions}', prosrc => 'pg_pooler_state' },
+
]
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 08a257616d..1e12ee1884 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -54,10 +54,9 @@ extern const PGDLLIMPORT PQcommMethods *PqCommMethods;
* prototypes for functions in pqcomm.c
*/
extern WaitEventSet *FeBeWaitSet;
-
-extern int StreamServerPort(int family, char *hostName,
- unsigned short portNumber, char *unixSocketDir,
- pgsocket ListenSocket[], int MaxListen);
+extern int StreamServerPort(int family, char *hostName,
+ unsigned short portNumber, char *unixSocketDir,
+ pgsocket ListenSocket[], int ListenPort[], int MaxListen);
extern int StreamConnection(pgsocket server_fd, Port *port);
extern void StreamClose(pgsocket sock);
extern void TouchSocketFiles(void);
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 61a24c2e3c..86c0ef84e5 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -159,6 +159,19 @@ extern PGDLLIMPORT int data_directory_mode;
extern PGDLLIMPORT int NBuffers;
extern PGDLLIMPORT int MaxBackends;
extern PGDLLIMPORT int MaxConnections;
+
+enum SessionSchedulePolicy
+{
+ SESSION_SCHED_ROUND_ROBIN,
+ SESSION_SCHED_RANDOM,
+ SESSION_SCHED_LOAD_BALANCING
+};
+extern PGDLLIMPORT int MaxSessions;
+extern PGDLLIMPORT int SessionPoolSize;
+extern PGDLLIMPORT int ConnectionProxiesNumber;
+extern PGDLLIMPORT int SessionSchedule;
+extern PGDLLIMPORT bool RestartPoolerOnReload;
+
extern PGDLLIMPORT int max_worker_processes;
extern PGDLLIMPORT int max_parallel_workers;
diff --git a/src/include/port.h b/src/include/port.h
index b5c03d912b..3ea24a3b70 100644
--- a/src/include/port.h
+++ b/src/include/port.h
@@ -41,6 +41,10 @@ typedef SOCKET pgsocket;
extern bool pg_set_noblock(pgsocket sock);
extern bool pg_set_block(pgsocket sock);
+/* send/receive socket descriptor */
+extern int pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid);
+extern pgsocket pg_recv_sock(pgsocket chan);
+
/* Portable path handling for Unix/Win32 (in path.c) */
extern bool has_drive_prefix(const char *filename);
diff --git a/src/include/port/win32_port.h b/src/include/port/win32_port.h
index f4841fb397..e101df179f 100644
--- a/src/include/port/win32_port.h
+++ b/src/include/port/win32_port.h
@@ -445,6 +445,7 @@ extern int pgkill(int pid, int sig);
#define select(n, r, w, e, timeout) pgwin32_select(n, r, w, e, timeout)
#define recv(s, buf, len, flags) pgwin32_recv(s, buf, len, flags)
#define send(s, buf, len, flags) pgwin32_send(s, buf, len, flags)
+#define socketpair(af, type, protocol, socks) pgwin32_socketpair(af, type, protocol, socks)
SOCKET pgwin32_socket(int af, int type, int protocol);
int pgwin32_bind(SOCKET s, struct sockaddr *addr, int addrlen);
@@ -455,7 +456,8 @@ int pgwin32_select(int nfds, fd_set *readfs, fd_set *writefds, fd_set *exceptf
int pgwin32_recv(SOCKET s, char *buf, int len, int flags);
int pgwin32_send(SOCKET s, const void *buf, int len, int flags);
int pgwin32_waitforsinglesocket(SOCKET s, int what, int timeout);
-
+int pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2]);
+
extern int pgwin32_noblock;
#endif /* FRONTEND */
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index 8ccd2afce5..05906e94a0 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -17,6 +17,7 @@
extern bool EnableSSL;
extern int ReservedBackends;
extern PGDLLIMPORT int PostPortNumber;
+extern PGDLLIMPORT int ProxyPortNumber;
extern int Unix_socket_permissions;
extern char *Unix_socket_group;
extern char *Unix_socket_directories;
@@ -46,6 +47,11 @@ extern int postmaster_alive_fds[2];
extern PGDLLIMPORT const char *progname;
+extern PGDLLIMPORT void* (*LibpqConnectdbParams)(char const* keywords[], char const* values[]);
+
+struct Proxy;
+struct Port;
+
extern void PostmasterMain(int argc, char *argv[]) pg_attribute_noreturn();
extern void ClosePostmasterPorts(bool am_syslogger);
extern void InitProcessGlobals(void);
@@ -63,6 +69,9 @@ extern Size ShmemBackendArraySize(void);
extern void ShmemBackendArrayAllocation(void);
#endif
+extern int ParseStartupPacket(struct Port* port, MemoryContext memctx, void* pkg_body, int pkg_size, bool SSLdone);
+extern int BackendStartup(struct Port* port, int* backend_pid);
+
/*
* Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved
* for buffer references in buf_internals.h. This limitation could be lifted
diff --git a/src/include/postmaster/proxy.h b/src/include/postmaster/proxy.h
new file mode 100644
index 0000000000..7f7a92a56a
--- /dev/null
+++ b/src/include/postmaster/proxy.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * proxy.h
+ * Exports from postmaster/proxy.c.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/postmaster/proxy.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _PROXY_H
+#define _PROXY_H
+
+/*
+ * Information in share dmemory about connection proxy state (used for session scheduling and monitoring)
+ */
+typedef struct ConnectionProxyState
+{
+ int pid; /* proxy worker pid */
+ int n_clients; /* total number of clients */
+ int n_ssl_clients; /* number of clients using SSL connection */
+ int n_pools; /* nubmer of dbname/role combinations */
+ int n_backends; /* totatal number of launched backends */
+ int n_dedicated_backends; /* number of tainted backends */
+ uint64 tx_bytes; /* amount of data sent to client */
+ uint64 rx_bytes; /* amount of data send to server */
+ uint64 n_transactions; /* total number of proroceeded transactions */
+} ConnectionProxyState;
+
+extern ConnectionProxyState* ProxyState;
+extern PGDLLIMPORT int MyProxyId;
+extern PGDLLIMPORT pgsocket MyProxySocket;
+
+extern int ConnectionProxyStart(void);
+extern int ConnectionProxyShmemSize(void);
+extern void ConnectionProxyShmemInit(void);
+#ifdef EXEC_BACKEND
+extern void ConnectionProxyMain(int argc, char *argv[]);
+#endif
+
+#endif
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index bd7af11a8a..680eb5ee10 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -133,9 +133,11 @@ typedef struct Latch
/* avoid having to deal with case on platforms not requiring it */
#define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE
#endif
+#define WL_SOCKET_EDGE (1 << 7)
#define WL_SOCKET_MASK (WL_SOCKET_READABLE | \
WL_SOCKET_WRITEABLE | \
+ WL_SOCKET_EDGE | \
WL_SOCKET_CONNECTED)
typedef struct WaitEvent
@@ -177,6 +179,8 @@ extern int WaitLatch(Latch *latch, int wakeEvents, long timeout,
extern int WaitLatchOrSocket(Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
+extern void DeleteWaitEventFromSet(WaitEventSet *set, int event_pos);
+
/*
* Unix implementation uses SIGUSR1 for inter-process signaling.
* Win32 doesn't need this.
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index ac7ee72952..e7207e2d9a 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -203,6 +203,8 @@ struct PGPROC
PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */
dlist_head lockGroupMembers; /* list of members, if I'm a leader */
dlist_node lockGroupLink; /* my member link, if I'm a member */
+
+ bool is_tainted; /* backend has modified session GUCs, use temporary tables, prepare statements, ... */
};
/* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h
index d68976fafa..9ff45b190a 100644
--- a/src/include/utils/guc_tables.h
+++ b/src/include/utils/guc_tables.h
@@ -58,6 +58,7 @@ enum config_group
CONN_AUTH_SETTINGS,
CONN_AUTH_AUTH,
CONN_AUTH_SSL,
+ CONN_POOLING,
RESOURCES,
RESOURCES_MEM,
RESOURCES_DISK,
diff --git a/src/makefiles/Makefile.cygwin b/src/makefiles/Makefile.cygwin
index f274d802b1..fdf53e9a8d 100644
--- a/src/makefiles/Makefile.cygwin
+++ b/src/makefiles/Makefile.cygwin
@@ -19,6 +19,7 @@ override CPPFLAGS += -DWIN32_STACK_RLIMIT=$(WIN32_STACK_RLIMIT)
ifneq (,$(findstring backend,$(subdir)))
ifeq (,$(findstring conversion_procs,$(subdir)))
ifeq (,$(findstring libpqwalreceiver,$(subdir)))
+ifeq (,$(findstring libpqconn,$(subdir)))
ifeq (,$(findstring replication/pgoutput,$(subdir)))
ifeq (,$(findstring snowball,$(subdir)))
override CPPFLAGS+= -DBUILDING_DLL
diff --git a/src/makefiles/Makefile.win32 b/src/makefiles/Makefile.win32
index 3dea11e5c2..39bd2de85e 100644
--- a/src/makefiles/Makefile.win32
+++ b/src/makefiles/Makefile.win32
@@ -17,6 +17,7 @@ CFLAGS_SL =
ifneq (,$(findstring backend,$(subdir)))
ifeq (,$(findstring conversion_procs,$(subdir)))
ifeq (,$(findstring libpqwalreceiver,$(subdir)))
+ifeq (,$(findstring libpqconn,$(subdir)))
ifeq (,$(findstring replication/pgoutput,$(subdir)))
ifeq (,$(findstring snowball,$(subdir)))
override CPPFLAGS+= -DBUILDING_DLL