diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 84341a30e5..50be793e26 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -719,6 +719,123 @@ include_dir 'conf.d' + + max_sessions (integer) + + max_sessions configuration parameter + + + + + The maximum number of client sessions that can be handled by + one connection proxy when session pooling is switched on. + This parameter does not add any memory or CPU overhead, so + specifying a large max_sessions value + does not affect performance. + If the max_sessions limit is reached new connection are not accepted. + + + The default value is 1000. This parameter can only be set at server start. + + + + + + session_pool_size (integer) + + session_pool_size configuration parameter + + + + + Enables session pooling and defines the maximum number of + backends that can be used by client sessions for each database/user combination. + Launched non-tainted backends are never terminated even if there are no active sessions. + Backend is considered as tainted if client updates GUCs, creates temporary table or prepared statements. + Tainted backend can server only one client. + + + The default value is 10, so up to 10 backends will server each database, + + + + + + proxy_port (integer) + + proxy_port configuration parameter + + + + + Sets the TCP port for the connection pooler. + Clients connected to main "port" will be assigned dedicated backends, + while client connected to proxy port will be connected to backends through proxy which + performs transaction level scheduling. + + + The default value is 6543. + + + + + + connection_proxies (integer) + + connection_proxies configuration parameter + + + + + Sets number of connection proxies. + Postmaster spawns separate worker process for each proxy. Postmaster scatters connections between proxies using one of scheduling policies (round-robin, random, load-balancing). + Each proxy launches its own subset of backends. + So maximal number of non-tainted backends is session_pool_size*connection_proxies*databases*roles. + + + The default value is 0, so session pooling is disabled. + + + + + + session_schedule (enum) + + session_schedule configuration parameter + + + + + Specifies scheduling policy for assigning session to proxies in case of + connection pooling. Default policy is round-robin. + + + With round-robin policy postmaster cyclicly scatter sessions between proxies. + + + With random policy postmaster randomly choose proxy for new session. + + + With load-balancing policy postmaster choose proxy with lowest load average. + Load average of proxy is estimated by number of clients connection assigned to this proxy with extra weight for SSL connections. + + + + + + restart_pooler_on_reload (string) + + restart_pooler_on_reload configuration parameter + + + + + Restart session pool workers once pg_reload_conf() is called. + The default value is false. + + + + unix_socket_directories (string) diff --git a/doc/src/sgml/connpool.sgml b/doc/src/sgml/connpool.sgml new file mode 100644 index 0000000000..8486ce1e8d --- /dev/null +++ b/doc/src/sgml/connpool.sgml @@ -0,0 +1,174 @@ + + + + Connection pooling + + + built-in connection pool proxy + + + + PostgreSQL spawns a separate process (backend) for each client. + For large number of clients such model can cause consumption of large number of system + resources and lead to significant performance degradation, especially at computers with large + number of CPU cores. The reason is high contention between backends for postgres resources. + Also size of many Postgres internal data structures are proportional to the number of + active backends as well as complexity of algorithms for this data structures. + + + + This is why most of production Postgres installation are using some kind of connection pooling: + pgbouncer, J2EE, odyssey,... But external connection pooler requires additional efforts for installation, + configuration and maintenance. Also pgbouncer (the most popular connection pooler for Postgres) is + single-threaded and so can be bottleneck for highload system, so multiple instances of pgbouncer have to be launched. + + + + Starting from version 12 PostgreSQL provides built-in connection pooler. + This chapter describes architecture and usage of built-in connection pooler. + + + + How Built-in Connection Pooler Works + + + Built-in connection pooler spawns one or more proxy processes which connect clients and backends. + Number of proxy processes is controlled by connection_proxies configuration parameter. + To avoid substantial changes in Postgres locking mechanism, only transaction level pooling policy is implemented. + It means that pooler is able to reschedule backend to another session only when it completed the current transaction. + + + + As far as each Postgres backend is able to work only with single database, each proxy process maintains + hash table of connections pools for each pair of dbname,role. + Maximal number of backends which can be spawned by connection pool is limited by + session_pool_size configuration variable. + So maximal number of non-dedicated backends in pooling mode is limited by + connection_proxies*session_pool_size*#databases*#roles. + + + + To minimize number of changes in Postgres core, built-in connection pooler is not trying to save/restore + session context. If session context is modified by client application + (changing values of configuration variables (GUCs), creating temporary tables, preparing statements, advisory locking), + then backend executing this session is considered to be tainted. + It is now dedicated to this session and can not be rescheduled to other session. + Once this session is terminated, backend is terminated as well. + Non-tainted backends are not terminated even if there are no more connected sessions. + + + + Built-in connection pooler is accepted connections on separate port (proxy_port configuration option, default value is 6543). + If client is connected to postgres through standard port (port configuration option, default value is 5432), then normal (dedicated) backend is created. It works only + with this client and is terminated when client is disconnected. Standard port is also used by proxy itself to + launch new worker backends. It means that to enable connection pooler Postgres should be configured + to accept local connections (pg_hba.conf file). + + + + If client application is connected through proxy port, then its communication with backend is always + performed through proxy. Even if it changes session context and backend becomes tainted, + still all traffic between this client and backend comes through proxy. + + + + Postmaster accepts connections on proxy port and redirects it to one of connection proxies. + Right now sessions and bounded to proxy and can not migrate between them. + To provide uniform load balancing of proxies, postmaster is using one of three scheduling policies: + round-robin, random and load-balancing. + In the last case postmaster will choose proxy with smallest number of already attached clients, with + extra weight added to SSL connections (which consume more CPU). + + + + + + Built-in Connection Pooler Configuration + + + There are four main configuration variables controlling connection pooler: + session_pool_size, connection_proxies, max_sessions and proxy_port. + Connection pooler is enabled if all of them are non-zero. + + + + connection_proxies specifies number of connection proxy processes which will be spawned. + Default value is zero, so connection pooling is disabled by default. + + + + session_pool_size specifies maximal number of backends per connection pool. Maximal number of laucnhed non-dedicated backends in pooling mode is limited by + connection_proxies*session_pool_size*#databases*#roles. + If number of backends is too small, then server will not be able to utilize all system resources. + But too large value can cause degradation of performance because of large snapshots and lock contention. + + + + max_sessionsparameter specifies maximal number of sessions which can be handled by one connection proxy. + Actually it affects only size of wait event set and so can be large enough without any essential negative impact on system resources consumption. + Default value is 1000. So maximal number of connections to one database/role is limited by connection_proxies*session_pool_size*max_sessions. + + + + Connection proxy accepts connections on special port, defined by proxy_port. + Default value is 6543, but it can be changed to standard Postgres 5432, so by default all connections to the databases will be pooled. + But it is still necessary to have a port for direct connections to the database (dedicated backends). + It is needed for connection pooler itself to launch worker backends. + + + + Postmaster scatters sessions between proxies using one of three available scheduling policies: + round-robin, random and load-balancing. + Policy can be set using session_schedule configuration variable. Default policy is + round-robin which cause cyclic distribution of sessions between proxies. + It should work well in case of more or less uniform workload. + The smartest policy is load-balancing which tries to choose least loaded proxy + based on the available statistic. It is possible to monitor proxies state using pg_pooler_state() function, which returns information about number of clients, backends and pools for each proxy as well + as some statistic information about number of proceeded transactions and amount of data + sent from client to backends (rx_bytes) and from backends to clients (tx_bytes). + + + + As far as pooled backends are not terminated on client exist, it will not + be possible to drop database to which them are connected. It can be achieved without server restart using restart_pooler_on_reload variable. Setting this variable to true cause shutdown of all pooled backends after execution of pg_reload_conf() function. Then it will be possible to drop database. + + + + + + Built-in Connection Pooler Pros and Cons + + + Unlike pgbouncer and other external connection poolers, built-in connection pooler doesn't require installation and configuration of some other components. + Also it doesn't introduce any limitations for clients: existed clients can work through proxy and don't notice any difference. + If client application requires session context, then it will be served by dedicated backend. Such connection will not participate in + connection pooling but it will correctly work. This is the main difference with pgbouncer, + which may cause incorrect behavior of client application in case of using other session level pooling policy. + And if application is not changing session context, then it can be implicitly pooled, reducing number of active backends. + + + + The main limitation of current built-in connection pooler implementation is that it is not able to save/resume session context. + Although it is not so difficult to do, but it requires more changes in Postgres core. Developers of client applications have + the choice to either avoid using session-specific operations, or not use built-in pooling. For example, using prepared statements can improve speed of simple queries + up to two times. But prepared statements can not be handled by pooled backend, so if all clients are using prepared statements, then there will be no connection pooling + even if connection pooling is enabled. + + + + Redirecting connections through connection proxy definitely have negative effect on total system performance and especially latency. + Overhead of connection proxing depends on too many factors, such as characteristics of external and internal networks, complexity of queries and size of returned result set. + Pgbench benchmark in select-only mode shows almost two times worser performance for local connections through connection pooler comparing with direct local connections when + number of connections is small enough (10). For much larger number of connections (when pooling is actually required), pooling mode outperforms direct connection mode. + + + + Another obvious limitation of transaction level pooling is that long living transaction can cause starvation of + other clients. It greatly depends on application design. If application opens database transaction and then waits for user input or some other external event, then backend can be in idle-in-transaction + state for long enough time. And such backend can not be rescheduled for some another session. + The obvious recommendation is to avoid long-living transaction and setup idle_in_transaction_session_timeout to implicitly abort such transactions. + + + + + diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml index 8960f11278..5b19fef481 100644 --- a/doc/src/sgml/filelist.sgml +++ b/doc/src/sgml/filelist.sgml @@ -29,6 +29,7 @@ + diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml index 3e115f1c76..029f0dc4e3 100644 --- a/doc/src/sgml/postgres.sgml +++ b/doc/src/sgml/postgres.sgml @@ -109,6 +109,7 @@ &mvcc; &perform; ∥ + &connpool; diff --git a/src/Makefile b/src/Makefile index bcdbd9588a..196ca8c0f0 100644 --- a/src/Makefile +++ b/src/Makefile @@ -23,6 +23,7 @@ SUBDIRS = \ interfaces \ backend/replication/libpqwalreceiver \ backend/replication/pgoutput \ + backend/postmaster/libpqconn \ fe_utils \ bin \ pl \ diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index c278ee7318..acbaed313a 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -30,6 +30,7 @@ #include "parser/parse_expr.h" #include "parser/parse_type.h" #include "rewrite/rewriteHandler.h" +#include "storage/proc.h" #include "tcop/pquery.h" #include "tcop/utility.h" #include "utils/builtins.h" @@ -457,6 +458,7 @@ StorePreparedStatement(const char *stmt_name, stmt_name, HASH_ENTER, &found); + MyProc->is_tainted = true; /* Shouldn't get a duplicate entry */ if (found) diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index fd67d2a841..10a14d0e03 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -590,6 +590,10 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("ON COMMIT can only be used on temporary tables"))); + if (stmt->relation->relpersistence != RELPERSISTENCE_TEMP + && stmt->oncommit != ONCOMMIT_DROP) + MyProc->is_tainted = true; + if (stmt->partspec != NULL) { if (relkind != RELKIND_RELATION) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 384887e70d..ebff20a61a 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -195,15 +195,13 @@ pq_init(void) { /* initialize state variables */ PqSendBufferSize = PQ_SEND_BUFFER_SIZE; - PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize); + if (!PqSendBuffer) + PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize); PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0; PqCommBusy = false; PqCommReadingMsg = false; DoingCopyOut = false; - /* set up process-exit hook to close the socket */ - on_proc_exit(socket_close, 0); - /* * In backends (as soon as forked) we operate the underlying socket in * nonblocking mode and use latches to implement blocking semantics if @@ -220,6 +218,11 @@ pq_init(void) (errmsg("could not set socket to nonblocking mode: %m"))); #endif + if (FeBeWaitSet) + FreeWaitEventSet(FeBeWaitSet); + else + on_proc_exit(socket_close, 0); + FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3); AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL, NULL); @@ -227,6 +230,7 @@ pq_init(void) AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL); } + /* -------------------------------- * socket_comm_reset - reset libpq during error recovery * @@ -329,7 +333,7 @@ socket_close(int code, Datum arg) int StreamServerPort(int family, char *hostName, unsigned short portNumber, char *unixSocketDir, - pgsocket ListenSocket[], int MaxListen) + pgsocket ListenSocket[], int ListenPort[], int MaxListen) { pgsocket fd; int err; @@ -593,6 +597,7 @@ StreamServerPort(int family, char *hostName, unsigned short portNumber, familyDesc, addrDesc, (int) portNumber))); ListenSocket[listen_index] = fd; + ListenPort[listen_index] = portNumber; added++; } diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile index f4120bec55..e0cdd9e8bb 100644 --- a/src/backend/port/Makefile +++ b/src/backend/port/Makefile @@ -21,7 +21,7 @@ subdir = src/backend/port top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = atomics.o pg_sema.o pg_shmem.o $(TAS) +OBJS = atomics.o pg_sema.o pg_shmem.o send_sock.o $(TAS) ifeq ($(PORTNAME), win32) SUBDIRS += win32 diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c new file mode 100644 index 0000000000..a76db8d171 --- /dev/null +++ b/src/backend/port/send_sock.c @@ -0,0 +1,165 @@ +/*------------------------------------------------------------------------- + * + * send_sock.c + * Send socket descriptor to another process + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/port/send_sock.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef WIN32 +typedef struct +{ + SOCKET origsocket; + WSAPROTOCOL_INFO wsainfo; +} InheritableSocket; +#endif + +/* + * Send socket descriptor "sock" to backend process through Unix socket "chan" + */ +int +pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid) +{ +#ifdef WIN32 + InheritableSocket dst; + size_t rc; + dst.origsocket = sock; + if (WSADuplicateSocket(sock, pid, &dst.wsainfo) != 0) + { + ereport(FATAL, + (errmsg("could not duplicate socket %d for use in backend: error code %d", + (int)sock, WSAGetLastError()))); + return -1; + } + rc = send(chan, (char*)&dst, sizeof(dst), 0); + if (rc != sizeof(dst)) + { + ereport(FATAL, + (errmsg("Failed to send inheritable socket: rc=%d, error code %d", + (int)rc, WSAGetLastError()))); + return -1; + } + return 0; +#else + struct msghdr msg = { 0 }; + struct iovec io; + struct cmsghdr * cmsg; + char buf[CMSG_SPACE(sizeof(sock))]; + memset(buf, '\0', sizeof(buf)); + + /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */ + io.iov_base = ""; + io.iov_len = 1; + + msg.msg_iov = &io; + msg.msg_iovlen = 1; + msg.msg_control = buf; + msg.msg_controllen = sizeof(buf); + + cmsg = CMSG_FIRSTHDR(&msg); + if (!cmsg) + return PGINVALID_SOCKET; + + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(sock)); + + memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock)); + msg.msg_controllen = cmsg->cmsg_len; + + while (sendmsg(chan, &msg, 0) < 0) + { + if (errno != EINTR) + return PGINVALID_SOCKET; + } + return 0; +#endif +} + + +/* + * Receive socket descriptor from postmaster process through Unix socket "chan" + */ +pgsocket +pg_recv_sock(pgsocket chan) +{ +#ifdef WIN32 + InheritableSocket src; + SOCKET s; + size_t rc = recv(chan, (char*)&src, sizeof(src), 0); + if (rc != sizeof(src)) + { + ereport(FATAL, + (errmsg("Failed to receive inheritable socket: rc=%d, error code %d", + (int)rc, WSAGetLastError()))); + } + s = WSASocket(FROM_PROTOCOL_INFO, + FROM_PROTOCOL_INFO, + FROM_PROTOCOL_INFO, + &src.wsainfo, + 0, + 0); + if (s == INVALID_SOCKET) + { + ereport(FATAL, + (errmsg("could not create inherited socket: error code %d\n", + WSAGetLastError()))); + } + + /* + * To make sure we don't get two references to the same socket, close + * the original one. (This would happen when inheritance actually + * works.. + */ + closesocket(src.origsocket); + return s; +#else + struct msghdr msg = {0}; + char c_buffer[256]; + char m_buffer[256]; + struct iovec io; + struct cmsghdr * cmsg; + pgsocket sock; + + io.iov_base = m_buffer; + io.iov_len = sizeof(m_buffer); + msg.msg_iov = &io; + msg.msg_iovlen = 1; + + msg.msg_control = c_buffer; + msg.msg_controllen = sizeof(c_buffer); + + while (recvmsg(chan, &msg, 0) < 0) + { + if (errno != EINTR) + return PGINVALID_SOCKET; + } + + cmsg = CMSG_FIRSTHDR(&msg); + if (!cmsg) + { + elog(WARNING, "Failed to transfer socket"); + return PGINVALID_SOCKET; + } + + memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock)); + pg_set_noblock(sock); + + return sock; +#endif +} diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c index d5b5e771e9..53eece6422 100644 --- a/src/backend/port/win32/socket.c +++ b/src/backend/port/win32/socket.c @@ -690,3 +690,65 @@ pgwin32_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, c memcpy(writefds, &outwritefds, sizeof(fd_set)); return nummatches; } + +int pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2]) +{ + union { + struct sockaddr_in inaddr; + struct sockaddr addr; + } a; + SOCKET listener; + int e; + socklen_t addrlen = sizeof(a.inaddr); + DWORD flags = 0; + int reuse = 1; + + socks[0] = socks[1] = -1; + + listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (listener == -1) + return SOCKET_ERROR; + + memset(&a, 0, sizeof(a)); + a.inaddr.sin_family = AF_INET; + a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + a.inaddr.sin_port = 0; + + for (;;) { + if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, + (char*) &reuse, (socklen_t) sizeof(reuse)) == -1) + break; + if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) + break; + + memset(&a, 0, sizeof(a)); + if (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR) + break; + a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + a.inaddr.sin_family = AF_INET; + + if (listen(listener, 1) == SOCKET_ERROR) + break; + + socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, flags); + if (socks[0] == -1) + break; + if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) + break; + + socks[1] = accept(listener, NULL, NULL); + if (socks[1] == -1) + break; + + closesocket(listener); + return 0; + } + + e = WSAGetLastError(); + closesocket(listener); + closesocket(socks[0]); + closesocket(socks[1]); + WSASetLastError(e); + socks[0] = socks[1] = -1; + return SOCKET_ERROR; +} diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile index 71c23211b2..5d8b65c50a 100644 --- a/src/backend/postmaster/Makefile +++ b/src/backend/postmaster/Makefile @@ -12,7 +12,9 @@ subdir = src/backend/postmaster top_builddir = ../../.. include $(top_builddir)/src/Makefile.global +override CPPFLAGS := $(CPPFLAGS) -I$(top_builddir)/src/port -I$(top_srcdir)/src/port + OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \ - pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o + pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o proxy.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/postmaster/libpqconn/Makefile b/src/backend/postmaster/libpqconn/Makefile new file mode 100644 index 0000000000..f05b72758e --- /dev/null +++ b/src/backend/postmaster/libpqconn/Makefile @@ -0,0 +1,35 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/postmaster/libpqconn +# +# IDENTIFICATION +# src/backend/postmaster/libpqconn/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/postmaster/libpqconn +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) + +OBJS = libpqconn.o $(WIN32RES) +SHLIB_LINK_INTERNAL = $(libpq) +SHLIB_LINK = $(filter -lintl, $(LIBS)) +SHLIB_PREREQS = submake-libpq +PGFILEDESC = "libpqconn - open libpq connection" +NAME = libpqconn + +all: all-shared-lib + +include $(top_srcdir)/src/Makefile.shlib + +install: all installdirs install-lib + +installdirs: installdirs-lib + +uninstall: uninstall-lib + +clean distclean maintainer-clean: clean-lib + rm -f $(OBJS) diff --git a/src/backend/postmaster/libpqconn/libpqconn.c b/src/backend/postmaster/libpqconn/libpqconn.c new file mode 100644 index 0000000000..bdba0f6e2c --- /dev/null +++ b/src/backend/postmaster/libpqconn/libpqconn.c @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * libpqconn.c + * + * This file provides a way to establish connection to postgres instanc from backend. + * + * Portions Copyright (c) 2010-2018, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/postmaster/libpqconn/libpqconn.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include + +#include "fmgr.h" +#include "libpq-fe.h" +#include "postmaster/postmaster.h" + +PG_MODULE_MAGIC; + +void _PG_init(void); + +static void* +libpq_connectdb(char const* keywords[], char const* values[]) +{ + PGconn* conn = PQconnectdbParams(keywords, values, false); + if (!conn || PQstatus(conn) != CONNECTION_OK) + { + ereport(WARNING, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not setup local connect to server"), + errdetail_internal("%s", pchomp(PQerrorMessage(conn))))); + return NULL; + } + return conn; +} + +void _PG_init(void) +{ + LibpqConnectdbParams = libpq_connectdb; +} + diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 688ad439ed..73a695b5ee 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -114,6 +114,7 @@ #include "postmaster/fork_process.h" #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" +#include "postmaster/proxy.h" #include "postmaster/syslogger.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" @@ -196,6 +197,9 @@ BackgroundWorker *MyBgworkerEntry = NULL; /* The socket number we are listening for connections on */ int PostPortNumber; +/* The socket number we are listening for poolled connections on */ +int ProxyPortNumber; + /* The directory names for Unix socket(s) */ char *Unix_socket_directories; @@ -216,6 +220,7 @@ int ReservedBackends; /* The socket(s) we're listening to. */ #define MAXLISTEN 64 static pgsocket ListenSocket[MAXLISTEN]; +static int ListenPort[MAXLISTEN]; /* * Set by the -o option @@ -246,6 +251,18 @@ bool enable_bonjour = false; char *bonjour_name; bool restart_after_crash = true; +typedef struct ConnectionProxy +{ + int pid; + pgsocket socks[2]; +} ConnectionProxy; + +ConnectionProxy* ConnectionProxies; +static bool ConnectionProxiesStarted; +static int CurrentConnectionProxy; /* index used for round-robin distribution of connections between proxies */ + +void* (*LibpqConnectdbParams)(char const* keywords[], char const* values[]); + /* PIDs of special child processes; 0 when not running */ static pid_t StartupPID = 0, BgWriterPID = 0, @@ -403,7 +420,6 @@ static void BackendInitialize(Port *port); static void BackendRun(Port *port) pg_attribute_noreturn(); static void ExitPostmaster(int status) pg_attribute_noreturn(); static int ServerLoop(void); -static int BackendStartup(Port *port); static int ProcessStartupPacket(Port *port, bool secure_done); static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options); static void processCancelRequest(Port *port, void *pkt); @@ -425,6 +441,7 @@ static pid_t StartChildProcess(AuxProcType type); static void StartAutovacuumWorker(void); static void MaybeStartWalReceiver(void); static void InitPostmasterDeathWatchHandle(void); +static void StartProxyWorker(int id); /* * Archiver is allowed to start up at the current postmaster state? @@ -477,6 +494,8 @@ typedef struct { Port port; InheritableSocket portsocket; + InheritableSocket proxySocket; + int proxyId; char DataDir[MAXPGPATH]; pgsocket ListenSocket[MAXLISTEN]; int32 MyCancelKey; @@ -560,6 +579,48 @@ int postmaster_alive_fds[2] = {-1, -1}; HANDLE PostmasterHandle; #endif +static void +StartConnectionProxies(void) +{ + if (SessionPoolSize > 0 && ConnectionProxiesNumber > 0 && !ConnectionProxiesStarted) + { + int i; + if (ConnectionProxies == NULL) + { + ConnectionProxies = malloc(sizeof(ConnectionProxy)*ConnectionProxiesNumber); + for (i = 0; i < ConnectionProxiesNumber; i++) + { + if (socketpair(AF_UNIX, SOCK_STREAM, 0, ConnectionProxies[i].socks) < 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg_internal("could not create socket pair for launching sessions: %m"))); + } + } + for (i = 0; i < ConnectionProxiesNumber; i++) + { + StartProxyWorker(i); + } + ConnectionProxiesStarted = true; + } +} + +/* + * Send signal to connection proxies + */ +static void +StopConnectionProxies(int signal) +{ + if (ConnectionProxiesStarted) + { + int i; + for (i = 0; i < ConnectionProxiesNumber; i++) + { + signal_child(ConnectionProxies[i].pid, signal); + } + ConnectionProxiesStarted = false; + } +} + /* * Postmaster main entry point */ @@ -572,6 +633,9 @@ PostmasterMain(int argc, char *argv[]) bool listen_addr_saved = false; int i; char *output_config_variable = NULL; + bool contains_localhost = false; + int ports[2]; + int n_ports = 0; InitProcessGlobals(); @@ -1008,6 +1072,11 @@ PostmasterMain(int argc, char *argv[]) on_proc_exit(CloseServerPorts, 0); + /* Listen on proxy socket only if session pooling is enabled */ + if (ProxyPortNumber > 0 && ConnectionProxiesNumber > 0 && SessionPoolSize > 0) + ports[n_ports++] = ProxyPortNumber; + ports[n_ports++] = PostPortNumber; + if (ListenAddresses) { char *rawstring; @@ -1031,32 +1100,36 @@ PostmasterMain(int argc, char *argv[]) foreach(l, elemlist) { char *curhost = (char *) lfirst(l); - - if (strcmp(curhost, "*") == 0) - status = StreamServerPort(AF_UNSPEC, NULL, - (unsigned short) PostPortNumber, - NULL, - ListenSocket, MAXLISTEN); - else - status = StreamServerPort(AF_UNSPEC, curhost, - (unsigned short) PostPortNumber, - NULL, - ListenSocket, MAXLISTEN); - - if (status == STATUS_OK) + for (i = 0; i < n_ports; i++) { - success++; - /* record the first successful host addr in lockfile */ - if (!listen_addr_saved) + int port = ports[i]; + if (strcmp(curhost, "*") == 0) + status = StreamServerPort(AF_UNSPEC, NULL, + (unsigned short) port, + NULL, + ListenSocket, ListenPort, MAXLISTEN); + else + status = StreamServerPort(AF_UNSPEC, curhost, + (unsigned short) port, + NULL, + ListenSocket, ListenPort, MAXLISTEN); + contains_localhost |= strcmp(curhost, "localhost") == 0; + + if (status == STATUS_OK) { - AddToDataDirLockFile(LOCK_FILE_LINE_LISTEN_ADDR, curhost); - listen_addr_saved = true; + success++; + /* record the first successful host addr in lockfile */ + if (!listen_addr_saved) + { + AddToDataDirLockFile(LOCK_FILE_LINE_LISTEN_ADDR, curhost); + listen_addr_saved = true; + } } + else + ereport(WARNING, + (errmsg("could not create listen socket for \"%s\"", + curhost))); } - else - ereport(WARNING, - (errmsg("could not create listen socket for \"%s\"", - curhost))); } if (!success && elemlist != NIL) @@ -1125,29 +1198,32 @@ PostmasterMain(int argc, char *argv[]) errmsg("invalid list syntax in parameter \"%s\"", "unix_socket_directories"))); } - + contains_localhost = true; foreach(l, elemlist) { char *socketdir = (char *) lfirst(l); + for (i = 0; i < n_ports; i++) + { + int port = ports[i]; - status = StreamServerPort(AF_UNIX, NULL, - (unsigned short) PostPortNumber, - socketdir, - ListenSocket, MAXLISTEN); + status = StreamServerPort(AF_UNIX, NULL, + (unsigned short) port, + socketdir, + ListenSocket, ListenPort, MAXLISTEN); - if (status == STATUS_OK) - { - success++; - /* record the first successful Unix socket in lockfile */ - if (success == 1) - AddToDataDirLockFile(LOCK_FILE_LINE_SOCKET_DIR, socketdir); + if (status == STATUS_OK) + { + success++; + /* record the first successful Unix socket in lockfile */ + if (success == 1) + AddToDataDirLockFile(LOCK_FILE_LINE_SOCKET_DIR, socketdir); + } + else + ereport(WARNING, + (errmsg("could not create Unix-domain socket in directory \"%s\"", + socketdir))); } - else - ereport(WARNING, - (errmsg("could not create Unix-domain socket in directory \"%s\"", - socketdir))); } - if (!success && elemlist != NIL) ereport(FATAL, (errmsg("could not create any Unix-domain sockets"))); @@ -1157,6 +1233,20 @@ PostmasterMain(int argc, char *argv[]) } #endif + if (!contains_localhost && ProxyPortNumber > 0) + { + /* we need to accept local connections from proxy */ + status = StreamServerPort(AF_UNSPEC, "localhost", + (unsigned short) PostPortNumber, + NULL, + ListenSocket, ListenPort, MAXLISTEN); + if (status != STATUS_OK) + { + ereport(WARNING, + (errmsg("could not create listen socket for locahost"))); + } + } + /* * check that we have some socket to listen on */ @@ -1374,6 +1464,8 @@ PostmasterMain(int argc, char *argv[]) /* Some workers may be scheduled to start now */ maybe_start_bgworkers(); + StartConnectionProxies(); + status = ServerLoop(); /* @@ -1611,6 +1703,57 @@ DetermineSleepTime(struct timeval *timeout) } } +/** + * This function tries to estimate workload of proxy. + * We have a lot of information about proxy state in ProxyState array: + * total number of clients, SSL clients, backends, traffic, number of transactions,... + * So in principle it is possible to implement much more sophisticated evaluation function, + * but right now we take in account only number of clients and SSL connections (which requires much more CPU) + */ +static uint64 +GetConnectionProxyWorkload(int id) +{ + return ProxyState[id].n_clients + ProxyState[id].n_ssl_clients*3; +} + +/** + * Choose connection pool for this session. + * Right now sessions can not be moved between pools (in principle it is not so difficult to implement it), + * so to support order balancing we should do dome smart work here. + */ +static ConnectionProxy* +SelectConnectionProxy(void) +{ + int i; + uint64 min_workload; + int least_loaded_proxy; + + switch (SessionSchedule) + { + case SESSION_SCHED_ROUND_ROBIN: + return &ConnectionProxies[CurrentConnectionProxy++ % ConnectionProxiesNumber]; + case SESSION_SCHED_RANDOM: + return &ConnectionProxies[random() % ConnectionProxiesNumber]; + case SESSION_SCHED_LOAD_BALANCING: + min_workload = GetConnectionProxyWorkload(0); + least_loaded_proxy = 0; + for (i = 1; i < ConnectionProxiesNumber; i++) + { + int workload = GetConnectionProxyWorkload(i); + if (workload < min_workload) + { + min_workload = workload; + least_loaded_proxy = i; + } + } + return &ConnectionProxies[least_loaded_proxy]; + default: + Assert(false); + } + return NULL; +} + + /* * Main idle loop of postmaster * @@ -1701,8 +1844,18 @@ ServerLoop(void) port = ConnCreate(ListenSocket[i]); if (port) { - BackendStartup(port); - + if (ConnectionProxies && ListenPort[i] == ProxyPortNumber) + { + ConnectionProxy* proxy = SelectConnectionProxy(); + if (pg_send_sock(proxy->socks[0], port->sock, proxy->pid) < 0) + { + elog(LOG, "could not send socket to connection pool: %m"); + } + } + else + { + BackendStartup(port, NULL); + } /* * We no longer need the open socket or port structure * in this process @@ -1899,8 +2052,6 @@ ProcessStartupPacket(Port *port, bool secure_done) { int32 len; void *buf; - ProtocolVersion proto; - MemoryContext oldcontext; pq_startmsgread(); @@ -1967,6 +2118,18 @@ ProcessStartupPacket(Port *port, bool secure_done) } pq_endmsgread(); + return ParseStartupPacket(port, TopMemoryContext, buf, len, secure_done); +} + +int +ParseStartupPacket(Port *port, MemoryContext memctx, void* buf, int len, bool secure_done) +{ + ProtocolVersion proto; + MemoryContext oldcontext; + + am_walsender = false; + am_db_walsender = false; + /* * The first field is either a protocol version number or a special * request code. @@ -2067,7 +2230,7 @@ retry1: * not worry about leaking this storage on failure, since we aren't in the * postmaster process anymore. */ - oldcontext = MemoryContextSwitchTo(TopMemoryContext); + oldcontext = MemoryContextSwitchTo(memctx); if (PG_PROTOCOL_MAJOR(proto) >= 3) { @@ -2739,6 +2902,8 @@ pmdie(SIGNAL_ARGS) if (WalWriterPID != 0) signal_child(WalWriterPID, SIGTERM); + StopConnectionProxies(SIGTERM); + /* * If we're in recovery, we can't kill the startup process * right away, because at present doing so does not release @@ -2816,6 +2981,9 @@ pmdie(SIGNAL_ARGS) /* and the walwriter too */ if (WalWriterPID != 0) signal_child(WalWriterPID, SIGTERM); + + StopConnectionProxies(SIGTERM); + pmState = PM_WAIT_BACKENDS; } @@ -4041,6 +4209,7 @@ TerminateChildren(int signal) signal_child(PgArchPID, signal); if (PgStatPID != 0) signal_child(PgStatPID, signal); + StopConnectionProxies(signal); } /* @@ -4050,8 +4219,8 @@ TerminateChildren(int signal) * * Note: if you change this code, also consider StartAutovacuumWorker. */ -static int -BackendStartup(Port *port) +int +BackendStartup(Port *port, int* backend_pid) { Backend *bn; /* for backend cleanup */ pid_t pid; @@ -4155,6 +4324,8 @@ BackendStartup(Port *port) if (!bn->dead_end) ShmemBackendArrayAdd(bn); #endif + if (backend_pid) + *backend_pid = pid; return STATUS_OK; } @@ -4851,6 +5022,7 @@ SubPostmasterMain(int argc, char *argv[]) if (strcmp(argv[1], "--forkbackend") == 0 || strcmp(argv[1], "--forkavlauncher") == 0 || strcmp(argv[1], "--forkavworker") == 0 || + strcmp(argv[1], "--forkproxy") == 0 || strcmp(argv[1], "--forkboot") == 0 || strncmp(argv[1], "--forkbgworker=", 15) == 0) PGSharedMemoryReAttach(); @@ -4991,6 +5163,19 @@ SubPostmasterMain(int argc, char *argv[]) AutoVacWorkerMain(argc - 2, argv + 2); /* does not return */ } + if (strcmp(argv[1], "--forkproxy") == 0) + { + /* Restore basic shared memory pointers */ + InitShmemAccess(UsedShmemSegAddr); + + /* Need a PGPROC to run CreateSharedMemoryAndSemaphores */ + InitProcess(); + + /* Attach process to shared data structures */ + CreateSharedMemoryAndSemaphores(0); + + ConnectionProxyMain(argc - 2, argv + 2); /* does not return */ + } if (strncmp(argv[1], "--forkbgworker=", 15) == 0) { int shmem_slot; @@ -5059,7 +5244,6 @@ ExitPostmaster(int status) errmsg_internal("postmaster became multithreaded"), errdetail("Please report this to ."))); #endif - /* should cleanup shared memory and kill all backends */ /* @@ -5525,6 +5709,74 @@ StartAutovacuumWorker(void) } } +/* + * StartProxyWorker + * Start an proxy worker process. + * + * This function is here because it enters the resulting PID into the + * postmaster's private backends list. + * + * NB -- this code very roughly matches BackendStartup. + */ +static void +StartProxyWorker(int id) +{ + Backend *bn; + int pid; + + /* + * Compute the cancel key that will be assigned to this session. We + * probably don't need cancel keys for autovac workers, but we'd + * better have something random in the field to prevent unfriendly + * people from sending cancels to them. + */ + if (!RandomCancelKey(&MyCancelKey)) + { + ereport(LOG, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("could not generate random cancel key"))); + return ; + } + bn = (Backend *) malloc(sizeof(Backend)); + if (bn) + { + bn->cancel_key = MyCancelKey; + + /* Autovac workers are not dead_end and need a child slot */ + bn->dead_end = false; + bn->child_slot = MyPMChildSlot = AssignPostmasterChildSlot(); + bn->bgworker_notify = false; + + MyProxyId = id; + MyProxySocket = ConnectionProxies[id].socks[1]; + pid = ConnectionProxyStart(); + if (pid > 0) + { + bn->pid = pid; + bn->bkend_type = BACKEND_TYPE_BGWORKER; + dlist_push_head(&BackendList, &bn->elem); +#ifdef EXEC_BACKEND + ShmemBackendArrayAdd(bn); +#endif + /* all OK */ + ConnectionProxies[id].pid = pid; + ProxyState[id].pid = pid; + return; + } + + /* + * fork failed, fall through to report -- actual error message was + * logged by ConnectionProxyStart + */ + (void) ReleasePostmasterChildSlot(bn->child_slot); + free(bn); + } + else + ereport(LOG, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); +} + /* * MaybeStartWalReceiver * Start the WAL receiver process, if not running and our state allows. @@ -6116,6 +6368,10 @@ save_backend_variables(BackendParameters *param, Port *port, strlcpy(param->ExtraOptions, ExtraOptions, MAXPGPATH); + if (!write_inheritable_socket(¶m->proxySocket, MyProxySocket, childPid)) + return false; + param->proxyId = MyProxyId; + return true; } @@ -6347,6 +6603,9 @@ restore_backend_variables(BackendParameters *param, Port *port) strlcpy(pkglib_path, param->pkglib_path, MAXPGPATH); strlcpy(ExtraOptions, param->ExtraOptions, MAXPGPATH); + + read_inheritable_socket(&MyProxySocket, ¶m->proxySocket); + MyProxyId = param->proxyId; } diff --git a/src/backend/postmaster/proxy.c b/src/backend/postmaster/proxy.c new file mode 100644 index 0000000000..1531bd7554 --- /dev/null +++ b/src/backend/postmaster/proxy.c @@ -0,0 +1,1061 @@ +#include +#include + +#include "postgres.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/postmaster.h" +#include "postmaster/proxy.h" +#include "postmaster/fork_process.h" +#include "access/htup_details.h" +#include "replication/walsender.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "utils/builtins.h" +#include "utils/memutils.h" +#include "libpq/libpq.h" +#include "libpq/libpq-be.h" +#include "libpq/pqsignal.h" +#include "tcop/tcopprot.h" +#include "utils/timeout.h" +#include "utils/ps_status.h" +#include "../interfaces/libpq/libpq-fe.h" +#include "../interfaces/libpq/libpq-int.h" + +#define INIT_BUF_SIZE (64*1024) +#define MAX_READY_EVENTS 128 +#define DB_HASH_SIZE 101 +#define PROXY_WAIT_TIMEOUT 1000 /* 1 second */ + +struct SessionPool; +struct Proxy; + +typedef struct +{ + char database[NAMEDATALEN]; + char username[NAMEDATALEN]; +} +SessionPoolKey; + +/* + * Channels represent both clients and backends + */ +typedef struct Channel +{ + char* buf; + int rx_pos; + int tx_pos; + int tx_size; + int buf_size; + int event_pos; /* Position of wait event returned by AddWaitEventToSet */ + + Port* client_port; /* Not null for client, null for server */ + + pgsocket backend_socket; + PGPROC* backend_proc; + int backend_pid; + bool backend_is_tainted; /* client changes session context */ + bool backend_is_ready; /* ready for query */ + bool is_interrupted; /* client interrupts query execution */ + bool is_disconnected; /* connection is lost */ + bool write_pending; /* write request is pending: emulate epoll EPOLLET (edge-triggered) flag */ + bool read_pending; /* read request is pending: emulate epoll EPOLLET (edge-triggered) flag */ + /* We need to save startup packet response to be able to send it to new connection */ + int handshake_response_size; + char* handshake_response; + + struct Channel* peer; + struct Channel* next; + struct Proxy* proxy; + struct SessionPool* pool; +} +Channel; + +/* + * Control structure for connection proxies (several proxy workers can be launched and each has it sown proxy instance). + * Proxy contains hash of session pools for reach role/dbname combination. + */ +typedef struct Proxy +{ + MemoryContext memctx; /* Memory context for this proxy (used only in single thread) */ + MemoryContext tmpctx; /* Temporary memory context used for parsing startup packet */ + WaitEventSet* wait_events; /* Set of socket descriptors of backends and clients socket descriptors */ + HTAB* pools; /* Session pool map with dbname/role used as a key */ + int n_accepted_connections; /* Number of accepted, but not yet established connections + * (startup packet is not received and db/role are not known) */ + int max_backends; /* Maximal number of backends per database */ + bool shutdown; /* Shutdown flag */ + Channel* hangout; /* List of disconnected backends */ + ConnectionProxyState* state; /* State of proxy */ +} Proxy; + +/* + * Connection pool to particular role/dbname + */ +typedef struct SessionPool +{ + SessionPoolKey key; + Channel* idle_backends; /* List of idle clients */ + Channel* pending_clients; /* List of clients waiting for free backend */ + Proxy* proxy; /* Owner of this pool */ + int n_launched_backends; /* Total number of launched backends */ + int n_idle_backends; /* Number of backends in idle state */ + int n_connected_clients; /* Total number of connected clients */ + int n_idle_clients; /* Number of clients in idle state */ + int n_pending_clients; /* Number of clients waiting for free backend */ +} +SessionPool; + +static void channel_remove(Channel* chan); +static Channel* backend_start(SessionPool* pool, Port* client_port); +static bool channel_read(Channel* chan); +static bool channel_write(Channel* chan, bool synchronous); + +/* + * #define ELOG(severity, fmt,...) elog(severity, "PROXY: " fmt, ## __VA_ARGS__) + */ +#define ELOG(severity,fmt,...) +//#define ELOG(severity, fmt,...) elog(severity, "PROXY: " fmt, ## __VA_ARGS__) + +static Proxy* proxy; +int MyProxyId; +pgsocket MyProxySocket; +ConnectionProxyState* ProxyState; + +/** + * Backend is ready for next command outside transaction block (idle state). + * Now if backend is not tainted it is possible to schedule some other client to this backend. + */ +static bool +backend_reschedule(Channel* chan) +{ + chan->backend_is_ready = false; + if (chan->backend_proc == NULL) /* Lazy resolving of PGPROC entry */ + { + Assert(chan->backend_pid != 0); + chan->backend_proc = BackendPidGetProc(chan->backend_pid); + Assert(chan->backend_proc); /* If backend completes execution of some query, then it has definitely registered itself in procarray */ + } + if (!chan->backend_proc->is_tainted) /* If backend is not storing some session context */ + { + Channel* pending = chan->pool->pending_clients; + Assert(!chan->backend_is_tainted); + if (chan->peer) + chan->peer->peer = NULL; + chan->pool->n_idle_clients += 1; + if (pending) + { + /* Has pending clients: serve one of them */ + ELOG(LOG, "Backed %d is reassigned to client %p", chan->backend_pid, pending); + chan->pool->pending_clients = pending->next; + Assert(chan != pending); + chan->peer = pending; + pending->peer = chan; + chan->pool->n_pending_clients -= 1; + if (pending->tx_size == 0) /* new client has sent startup packet and we now need to send handshake response */ + { + Assert(chan->handshake_response != NULL); /* backend already sent handshake response */ + Assert(chan->handshake_response_size < chan->buf_size); + memcpy(chan->buf, chan->handshake_response, chan->handshake_response_size); + chan->rx_pos = chan->tx_size = chan->handshake_response_size; + ELOG(LOG, "Simulate response for startup packet to client %p", pending); + chan->backend_is_ready = true; + return channel_write(pending, false); + } + else + { + ELOG(LOG, "Try to send pending request from client %p to backend %p (pid %d)", pending, chan, chan->backend_pid); + Assert(pending->tx_pos == 0 && pending->rx_pos >= pending->tx_size); + return channel_write(chan, false); /* Send pending request to backend */ + } + } + else /* return backend to the list of idle backends */ + { + ELOG(LOG, "Backed %d is idle", chan->backend_pid); + Assert(!chan->client_port); + chan->next = chan->pool->idle_backends; + chan->pool->idle_backends = chan; + chan->pool->n_idle_backends += 1; + chan->peer = NULL; + } + } + else if (!chan->backend_is_tainted) /* if it was not marked as tainted before... */ + { + chan->backend_is_tainted = true; + chan->proxy->state->n_dedicated_backends += 1; + } + return true; +} + +/** + * Parse client's startup packet and assign client to proper connection pool based on dbname/role + */ +static bool +client_connect(Channel* chan, int startup_packet_size) +{ + bool found; + SessionPoolKey key; + char* startup_packet = chan->buf; + + Assert(chan->client_port); + + /* parse startup packet in tmpctx memory context and reset it when it is not needed any more */ + MemoryContextReset(chan->proxy->tmpctx); + MemoryContextSwitchTo(chan->proxy->tmpctx); + + /* Associate libpq with client's port */ + MyProcPort = chan->client_port; + pq_init(); + + if (ParseStartupPacket(chan->client_port, chan->proxy->tmpctx, startup_packet+4, startup_packet_size-4, false) != STATUS_OK) /* skip packet size */ + { + MyProcPort = NULL; + elog(WARNING, "Failed to parse startup packet for client %p", chan); + return false; + } + MyProcPort = NULL; + if (am_walsender) + { + elog(WARNING, "WAL sender should not be connected through proxy"); + return false; + } + + chan->proxy->state->n_ssl_clients += chan->client_port->ssl_in_use; + pg_set_noblock(chan->client_port->sock); /* SSL handshake may switch socket to blocking mode */ + memset(&key, 0, sizeof(key)); + strlcpy(key.database, chan->client_port->database_name, NAMEDATALEN); + strlcpy(key.username, chan->client_port->user_name, NAMEDATALEN); + + ELOG(LOG, "Client %p connects to %s/%s", chan, key.database, key.username); + + chan->pool = (SessionPool*)hash_search(chan->proxy->pools, &key, HASH_ENTER, &found); + if (!found) + { + /* First connection to this role/dbname */ + chan->proxy->state->n_pools += 1; + memset((char*)chan->pool + sizeof(SessionPoolKey), 0, sizeof(SessionPool) - sizeof(SessionPoolKey)); + } + chan->pool->proxy = chan->proxy; + chan->pool->n_connected_clients += 1; + chan->pool->n_idle_clients += 1; + chan->proxy->n_accepted_connections -= 1; + return true; +} + +/* + * Attach client to backend. Return true if new backend is attached, false otherwise. + */ +static bool +client_attach(Channel* chan) +{ + Channel* idle_backend = chan->pool->idle_backends; + chan->pool->n_idle_clients -= 1; + if (idle_backend) + { + /* has some idle backend */ + Assert(!idle_backend->backend_is_tainted && !idle_backend->client_port); + Assert(chan != idle_backend); + chan->peer = idle_backend; + idle_backend->peer = chan; + chan->pool->idle_backends = idle_backend->next; + chan->pool->n_idle_backends -= 1; + ELOG(LOG, "Attach client %p to backend %p (pid %d)", chan, idle_backend, idle_backend->backend_pid); + } + else /* all backends are busy */ + { + if (chan->pool->n_launched_backends < chan->proxy->max_backends) + { + /* Try to start new backend */ + idle_backend = backend_start(chan->pool, chan->client_port); + if (idle_backend != NULL) + { + ELOG(LOG, "Start new backend %p (pid %d) for client %p", + idle_backend, idle_backend->backend_pid, chan); + Assert(chan != idle_backend); + chan->peer = idle_backend; + idle_backend->peer = chan; + return true; + } + } + /* Postpone handshake until some backend is available */ + ELOG(LOG, "Client %p is waiting for available backends", chan); + chan->next = chan->pool->pending_clients; + chan->pool->pending_clients = chan; + chan->pool->n_pending_clients += 1; + } + return false; +} + +/* + * Handle communication failure for this channel. + * It is not possible to remove channel immediately because it can be triggered by other epoll events. + * So link all channels in L1 list for pending delete. + */ +static void +channel_hangout(Channel* chan, char const* op) +{ + Channel** ipp; + Channel* peer = chan->peer; + if (chan->is_disconnected) + return; + + if (chan->client_port) { + ELOG(LOG, "Hangout client %p due to %s error: %m", chan, op); + for (ipp = &chan->pool->pending_clients; *ipp != NULL; ipp = &(*ipp)->next) { + if (*ipp == chan) { + *ipp = chan->next; + chan->pool->n_pending_clients -= 1; + break; + } + } + } else { + ELOG(LOG, "Hangout backend %p (pid %d) due to %s error: %m", chan, chan->backend_pid, op); + for (ipp = &chan->pool->idle_backends; *ipp != NULL; ipp = &(*ipp)->next) { + if (*ipp == chan) { + *ipp = chan->next; + chan->pool->n_idle_backends -= 1; + break; + } + } + } + if (peer) + { + peer->peer = NULL; + chan->peer = NULL; + } + chan->backend_is_ready = false; + + if (chan->client_port && peer) /* If it is client connected to backend. */ + { + if (!chan->is_interrupted) /* Client didn't sent 'X' command, so do it for him. */ + { + ELOG(LOG, "Send terminate command to backend %p (pid %d)", peer, peer->backend_pid); + peer->is_interrupted = true; /* interrupted flags makes channel_write to send 'X' message */ + channel_write(peer, false); + return; + } + else if (!peer->is_interrupted) + { + /* Client already sent 'X' command, so we can safely reschedule backend to some other client session */ + backend_reschedule(peer); + } + } + chan->next = chan->proxy->hangout; + chan->proxy->hangout = chan; + chan->is_disconnected = true; +} + +/* + * Try to write data to the socket. + */ +static ssize_t +socket_write(Channel* chan, char const* buf, size_t size) +{ + ssize_t rc; +#ifdef USE_SSL + int waitfor = 0; + if (chan->client_port && chan->client_port->ssl_in_use) + rc = be_tls_write(chan->client_port, (char*)buf, size, &waitfor); + else +#endif + rc = chan->client_port + ? secure_raw_write(chan->client_port, buf, size) + : send(chan->backend_socket, buf, size, 0); + if (rc == 0 || (rc < 0 && (errno != EAGAIN && errno != EWOULDBLOCK))) + { + channel_hangout(chan, "write"); + } + else if (rc < 0) + { + /* do not accept more read events while write request is pending */ + ModifyWaitEvent(chan->proxy->wait_events, chan->event_pos, WL_SOCKET_WRITEABLE|WL_SOCKET_EDGE, NULL); + chan->write_pending = true; + } + else if (chan->write_pending) + { + /* resume accepting read events */ + ModifyWaitEvent(chan->proxy->wait_events, chan->event_pos, WL_SOCKET_READABLE|WL_SOCKET_EDGE, NULL); + chan->write_pending = false; + } + return rc; +} + + +/* + * Try to send some data to the channel. + * Data is located in the peer buffer. Because of using edge-triggered mode we have have to use non-blocking IO + * and try to write all available data. Once write is completed we should try to read more data from source socket. + * "synchronous" flag is used to avoid infinite recursion or reads-writers. + * Returns true if there is nothing to do or operation is successfully completed, false in case of error + * or socket buffer is full. + */ +static bool +channel_write(Channel* chan, bool synchronous) +{ + Channel* peer = chan->peer; + if (!chan->client_port && chan->is_interrupted) + { + /* Send terminate command to the backend. */ + char const terminate[] = {'X', 0, 0, 0, 4}; + if (socket_write(chan, terminate, sizeof(terminate)) <= 0) + return false; + channel_hangout(chan, "terminate"); + return true; + } + if (peer == NULL) + return false; + + while (peer->tx_pos < peer->tx_size) /* has something to write */ + { + ssize_t rc = socket_write(chan, peer->buf + peer->tx_pos, peer->tx_size - peer->tx_pos); + ELOG(LOG, "%p: write %d tx_pos=%d, tx_size=%d: %m", chan, (int)rc, peer->tx_pos, peer->tx_size); + if (rc <= 0) + return false; + if (chan->client_port) + chan->proxy->state->tx_bytes += rc; + else + chan->proxy->state->rx_bytes += rc; + peer->tx_pos += rc; + } + if (peer->tx_size != 0) + { + /* Copy rest of received data to the beginning of the buffer */ + chan->backend_is_ready = false; + Assert(peer->rx_pos >= peer->tx_size); + memmove(peer->buf, peer->buf + peer->tx_size, peer->rx_pos - peer->tx_size); + peer->rx_pos -= peer->tx_size; + peer->tx_pos = peer->tx_size = 0; + if (peer->backend_is_ready) { + Assert(peer->rx_pos == 0); + backend_reschedule(peer); + return true; + } + } + return synchronous || channel_read(peer); /* write is not invoked from read */ +} + +/* + * Try to read more data from the channel and send it to the peer. + */ +static bool +channel_read(Channel* chan) +{ + int msg_start; + while (chan->tx_size == 0) /* there is no pending write op */ + { + ssize_t rc; +#ifdef USE_SSL + int waitfor = 0; + if (chan->client_port && chan->client_port->ssl_in_use) + rc = be_tls_read(chan->client_port, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos, &waitfor); + else +#endif + rc = chan->client_port + ? secure_raw_read(chan->client_port, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos) + : recv(chan->backend_socket, chan->buf + chan->rx_pos, chan->buf_size - chan->rx_pos, 0); + + ELOG(LOG, "%p: read %d: %m", chan, (int)rc); + if (rc <= 0) + { + if (rc == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) + channel_hangout(chan, "read"); + else + { + /* do not accept more write events while read request is pending */ + ModifyWaitEvent(chan->proxy->wait_events, chan->event_pos, WL_SOCKET_READABLE|WL_SOCKET_EDGE, NULL); + chan->read_pending = true; + } + return false; /* wait for more data */ + } + else if (chan->read_pending) + { + /* resume accepting all events */ + ModifyWaitEvent(chan->proxy->wait_events, chan->event_pos, WL_SOCKET_READABLE|WL_SOCKET_WRITEABLE|WL_SOCKET_EDGE, NULL); + chan->read_pending = false; + } + chan->rx_pos += rc; + msg_start = 0; + + /* Loop through all received messages */ + while (chan->rx_pos - msg_start >= 5) /* has message code + length */ + { + int msg_len; + bool handshake = false; + if (chan->pool == NULL) /* process startup packet */ + { + Assert(msg_start == 0); + memcpy(&msg_len, chan->buf + msg_start, sizeof(msg_len)); + msg_len = ntohl(msg_len); + handshake = true; + } + else + { + ELOG(LOG, "%p receive message %c", chan, chan->buf[msg_start]); + memcpy(&msg_len, chan->buf + msg_start + 1, sizeof(msg_len)); + msg_len = ntohl(msg_len) + 1; + } + if (msg_start + msg_len > chan->buf_size) + { + /* Reallocate buffer to fit complete message body */ + chan->buf_size = msg_start + msg_len; + chan->buf = realloc(chan->buf, chan->buf_size); + } + if (chan->rx_pos - msg_start >= msg_len) /* Message is completely fetched */ + { + int response_size = msg_start + msg_len; + if (chan->pool == NULL) /* receive startup packet */ + { + Assert(chan->client_port); + if (!client_connect(chan, msg_len)) + { + /* Some trouble with processing startup packet */ + chan->is_disconnected = true; + channel_remove(chan); + return false; + } + } + else if (!chan->client_port /* Message from backend */ + && chan->buf[msg_start] == 'Z' /* Ready for query */ + && chan->buf[msg_start+5] == 'I') /* Transaction block status is idle */ + { + Assert(chan->rx_pos - msg_start == msg_len); /* Should be last message */ + chan->backend_is_ready = true; /* Backend is ready for query */ + chan->proxy->state->n_transactions += 1; + } + else if (chan->client_port /* Message from client */ + && chan->buf[msg_start] == 'X') /* Terminate message */ + { + chan->is_interrupted = true; + if (chan->peer == NULL || !chan->peer->backend_is_tainted) + { + /* Skip terminate message to idle and non-tainted backends */ + channel_hangout(chan, "terminate"); + return false; + } + } + if (chan->peer == NULL) /* client is not yet connected to backend */ + { + if (!chan->client_port) + { + /* We are not expecting messages from idle backend. Assume that it some error or shutdown. */ + channel_hangout(chan, "idle"); + return false; + } + client_attach(chan); + if (handshake) /* Send handshake response to the client */ + { + /* If we attach new client to the existed backend, then we need to send handshake response to the client */ + Channel* backend = chan->peer; + Assert(chan->rx_pos == msg_len && msg_start == 0); + chan->rx_pos = 0; /* Skip startup packet */ + if (backend != NULL) /* Backend was assigned */ + { + Assert(backend->handshake_response != NULL); /* backend has already sent handshake responses */ + Assert(backend->handshake_response_size < backend->buf_size); + memcpy(backend->buf, backend->handshake_response, backend->handshake_response_size); + backend->rx_pos = backend->tx_size = backend->handshake_response_size; + backend->backend_is_ready = true; + return channel_write(chan, false); + } + else + { + /* Handshake response will be send to client later when backend is assigned */ + return false; + } + } + else if (chan->peer == NULL) /* Backend was not assigned */ + { + chan->tx_size = response_size; /* query will be send later once backend is assigned */ + return false; + } + } + msg_start += msg_len; + } + else break; /* Incomplete message. */ + } + if (msg_start != 0) + { + /* Has some complete messages to send to peer */ + Assert(chan->tx_pos == 0); + Assert(chan->rx_pos >= msg_start); + chan->tx_size = msg_start; + if (!channel_write(chan->peer, true)) + return false; + } + /* If backend is out of transaction, then reschedule it */ + if (chan->backend_is_ready) + return backend_reschedule(chan); + } + return true; +} + +/* + * Create new channel. + */ +static Channel* +channel_create(Proxy* proxy) +{ + Channel* chan = (Channel*)calloc(1, sizeof(Channel)); + chan->proxy = proxy; + chan->buf = malloc(INIT_BUF_SIZE); + chan->buf_size = INIT_BUF_SIZE; + chan->tx_pos = chan->rx_pos = chan->tx_size = 0; + return chan; +} + +/* + * Register new channel in wait event set. + */ +static bool +channel_register(Proxy* proxy, Channel* chan) +{ + pgsocket sock = chan->client_port ? chan->client_port->sock : chan->backend_socket; + /* Using edge epoll mode requires non-blocking sockets */ + pg_set_noblock(sock); + chan->event_pos = + AddWaitEventToSet(proxy->wait_events, WL_SOCKET_READABLE|WL_SOCKET_WRITEABLE|WL_SOCKET_EDGE, + sock, NULL, chan); + if (chan->event_pos < 0) + { + elog(WARNING, "PROXY: Failed to add new client - too much sessions: %d clients, %d backends. " + "Try to increase 'max_sessions' configuration parameter.", + proxy->state->n_clients, proxy->state->n_backends); + return false; + } + return true; +} + +/* + * Start new backend for particular pool associated with dbname/role combination. + * Backend is forked using BackendStartup function. + */ +static Channel* +backend_start(SessionPool* pool, Port* client_port) +{ + Channel* chan; + char postmaster_port[8]; + char const* keywords[] = {"port","dbname","user","sslmode","application_name",NULL}; + char const* values[] = {postmaster_port,pool->key.database,pool->key.username,"disable","pool_worker",NULL}; + PGconn* conn; + char* msg; + int int32_buf; + int msg_len; + static bool libpqconn_loaded; + + if (!libpqconn_loaded) + { + /* We need libpq library to be able to establish connections to pool workers. + * This library can not be linked statically, so load it on demand. */ + load_file("libpqconn", false); + libpqconn_loaded = true; + } + pg_itoa(PostPortNumber, postmaster_port); + conn = LibpqConnectdbParams(keywords, values); + if (!conn) + return NULL; + + chan = channel_create(pool->proxy); + chan->pool = pool; + chan->backend_socket = conn->sock; + /* Using edge epoll mode requires non-blocking sockets */ + pg_set_noblock(conn->sock); + + /* Save handshake response */ + chan->handshake_response_size = conn->inEnd; + chan->handshake_response = malloc(chan->handshake_response_size); + memcpy(chan->handshake_response, conn->inBuffer, chan->handshake_response_size); + + /* Extract backend pid */ + msg = chan->handshake_response; + while (*msg != 'K') /* Scan handshake response until we reach PID message */ + { + memcpy(&int32_buf, ++msg, sizeof(int32_buf)); + msg_len = ntohl(int32_buf); + msg += msg_len; + Assert(msg < chan->handshake_response + chan->handshake_response_size); + } + memcpy(&int32_buf, msg+5, sizeof(int32_buf)); + chan->backend_pid = ntohl(int32_buf); + + if (channel_register(pool->proxy, chan)) + { + pool->proxy->state->n_backends += 1; + pool->n_launched_backends += 1; + } + else + { + /* Too much sessions, error report was already logged */ + close(chan->backend_socket); + free(chan->buf); + free(chan); + chan = NULL; + } + return chan; +} + +/* + * Add new client accepted by postmaster. This client will be assigned to concrete session pool + * when it's startup packet is received. + */ +static void +proxy_add_client(Proxy* proxy, Port* port) +{ + Channel* chan = channel_create(proxy); + chan->client_port = port; + chan->backend_socket = PGINVALID_SOCKET; + if (channel_register(proxy, chan)) + { + ELOG(LOG, "Add new client %p", chan); + proxy->n_accepted_connections += 1; + proxy->state->n_clients += 1; + } + else + { + /* Too much sessions, error report was already logged */ + close(port->sock); +#if defined(ENABLE_GSS) || defined(ENABLE_SSPI) + free(port->gss); +#endif + free(port); + free(chan->buf); + free(chan); + } +} + +/* + * Perform delayed deletion of channel + */ +static void +channel_remove(Channel* chan) +{ + Assert(chan->is_disconnected); /* should be marked as disconnected by channel_hangout */ + DeleteWaitEventFromSet(chan->proxy->wait_events, chan->event_pos); + if (chan->client_port) + { + if (chan->pool) + chan->pool->n_connected_clients -= 1; + else + chan->proxy->n_accepted_connections -= 1; + chan->proxy->state->n_clients -= 1; + chan->proxy->state->n_ssl_clients -= chan->client_port->ssl_in_use; + close(chan->client_port->sock); + free(chan->client_port); + } + else + { + chan->proxy->state->n_backends -= 1; + chan->proxy->state->n_dedicated_backends -= chan->backend_is_tainted; + chan->pool->n_launched_backends -= 1; + close(chan->backend_socket); + free(chan->handshake_response); + } + free(chan->buf); + free(chan); +} + + + +/* + * Create new proxy. + */ +static Proxy* +proxy_create(pgsocket postmaster_socket, ConnectionProxyState* state, int max_backends) +{ + HASHCTL ctl; + Proxy* proxy = calloc(1, sizeof(Proxy)); + proxy->memctx = AllocSetContextCreate(TopMemoryContext, + "Proxy", + ALLOCSET_DEFAULT_SIZES); + proxy->tmpctx = AllocSetContextCreate(proxy->memctx, + "Startup packet parsing context", + ALLOCSET_DEFAULT_SIZES); + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(SessionPoolKey); + ctl.entrysize = sizeof(SessionPool); + ctl.hcxt = proxy->memctx; + proxy->pools = hash_create("Pool by database and user", DB_HASH_SIZE, + &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + /* We need events both for clients and backends so multiply MaxConnection by two */ + proxy->wait_events = CreateWaitEventSet(TopMemoryContext, MaxSessions*2); + AddWaitEventToSet(proxy->wait_events, WL_SOCKET_READABLE, + postmaster_socket, NULL, NULL); + proxy->max_backends = max_backends; + proxy->state = state; + return proxy; +} + +/* + * Main proxy loop + */ +static void +proxy_loop(Proxy* proxy) +{ + int i, n_ready; + WaitEvent ready[MAX_READY_EVENTS]; + Channel *chan, *next; + + /* Main loop */ + while (!proxy->shutdown) + { + /* Use timeout to allow normal proxy shutdown */ + n_ready = WaitEventSetWait(proxy->wait_events, PROXY_WAIT_TIMEOUT, ready, MAX_READY_EVENTS, PG_WAIT_CLIENT); + for (i = 0; i < n_ready; i++) { + chan = (Channel*)ready[i].user_data; + if (chan == NULL) /* new connection from postmaster */ + { + Port* port = (Port*)calloc(1, sizeof(Port)); + port->sock = pg_recv_sock(ready[i].fd); + if (port->sock == PGINVALID_SOCKET) + { + elog(WARNING, "Failed to receive session socket: %m"); + free(port); + } + else + { +#if defined(ENABLE_GSS) || defined(ENABLE_SSPI) + port->gss = (pg_gssinfo *) calloc(1, sizeof(pg_gssinfo)); + if (!port->gss) + ereport(FATAL, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); +#endif + proxy_add_client(proxy, port); + } + } + else + { + if (ready[i].events & WL_SOCKET_WRITEABLE) { + ELOG(LOG, "Channel %p is writable", chan); + channel_write(chan, false); + } + if (ready[i].events & WL_SOCKET_READABLE) { + ELOG(LOG, "Channel %p is readable", chan); + channel_read(chan); + } + } + } + /* + * Delayed deallocation of disconnected channels. + * We can not delete channels immediately because of presence of peer events. + */ + for (chan = proxy->hangout; chan != NULL; chan = next) + { + next = chan->next; + channel_remove(chan); + } + proxy->hangout = NULL; + } +} + +/* + * Handle normal shutdown of Postgres instance + */ +static void +proxy_handle_sigterm(SIGNAL_ARGS) +{ + if (proxy) + proxy->shutdown = true; +} + +#ifdef EXEC_BACKEND +static pid_t +proxy_forkexec(void) +{ + char *av[10]; + int ac = 0; + + av[ac++] = "postgres"; + av[ac++] = "--forkproxy"; + av[ac++] = NULL; /* filled in by postmaster_forkexec */ + av[ac] = NULL; + + Assert(ac < lengthof(av)); + + return postmaster_forkexec(ac, av); +} +#endif + +NON_EXEC_STATIC void +ConnectionProxyMain(int argc, char *argv[]) +{ + sigjmp_buf local_sigjmp_buf; + + /* Identify myself via ps */ + init_ps_display("connection proxy", "", "", ""); + + SetProcessingMode(InitProcessing); + + pqsignal(SIGTERM, proxy_handle_sigterm); + pqsignal(SIGQUIT, quickdie); + InitializeTimeouts(); /* establishes SIGALRM handler */ + + /* Early initialization */ + BaseInit(); + + /* + * Create a per-backend PGPROC struct in shared memory, except in the + * EXEC_BACKEND case where this was done in SubPostmasterMain. We must do + * this before we can use LWLocks (and in the EXEC_BACKEND case we already + * had to do some stuff with LWLocks). + */ +#ifndef EXEC_BACKEND + InitProcess(); +#endif + + /* + * If an exception is encountered, processing resumes here. + * + * See notes in postgres.c about the design of this coding. + */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* Prevents interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Report the error to the server log */ + EmitErrorReport(); + + /* + * We can now go away. Note that because we called InitProcess, a + * callback was registered to do ProcKill, which will clean up + * necessary state. + */ + proc_exit(0); + } + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + PG_SETMASK(&UnBlockSig); + + proxy = proxy_create(MyProxySocket, &ProxyState[MyProxyId], SessionPoolSize); + proxy_loop(proxy); + + proc_exit(0); +} + +/* + * Function for launching proxy by postmaster. + * This "boilerplate" code is taken from another auxiliary workers. + * In future it may be replaced with background worker. + * The main problem with background worker is how to pass socket to it and obtains its PID. + */ +int +ConnectionProxyStart() +{ + pid_t worker_pid; + +#ifdef EXEC_BACKEND + switch ((worker_pid = proxy_forkexec())) +#else + switch ((worker_pid = fork_process())) +#endif + { + case -1: + ereport(LOG, + (errmsg("could not fork proxy worker process: %m"))); + return 0; + +#ifndef EXEC_BACKEND + case 0: + /* in postmaster child ... */ + InitPostmasterChild(); + + ConnectionProxyMain(0, NULL); + break; +#endif + default: + elog(LOG, "Start proxy process %d", (int) worker_pid); + return (int) worker_pid; + } + + /* shouldn't get here */ + return 0; +} + +/* + * We need some place in shared memory to provide information about proxies state. + */ +int ConnectionProxyShmemSize(void) +{ + return ConnectionProxiesNumber*sizeof(ConnectionProxyState); +} + +void ConnectionProxyShmemInit(void) +{ + bool found; + ProxyState = (ConnectionProxyState*)ShmemInitStruct("connection proxy contexts", + ConnectionProxyShmemSize(), &found); + if (!found) + memset(ProxyState, 0, ConnectionProxyShmemSize()); +} + +PG_FUNCTION_INFO_V1(pg_pooler_state); + +typedef struct +{ + int proxy_id; + TupleDesc ret_desc; +} PoolerStateContext; + +/** + * Return information about proxies state. + * This set-returning functions returns the following columns: + * + * pid - proxy process identifier + * n_clients - number of clients connected to proxy + * n_ssl_clients - number of clients using SSL protocol + * n_pools - number of pools (role/dbname combinations) maintained by proxy + * n_backends - total number of backends spawned by this proxy (including tainted) + * n_dedicated_backends - number of tainted backend + * tx_bytes - amount of data sent from backends to clients + * rx_bytes - amount of data sent from client to backends + * n_transactions - number of transaction proceeded by all backends of this proxy + */ +Datum pg_pooler_state(PG_FUNCTION_ARGS) +{ + FuncCallContext* srf_ctx; + MemoryContext old_context; + PoolerStateContext* ps_ctx; + HeapTuple tuple; + Datum values[9]; + bool nulls[9]; + int id; + int i; + + if (SRF_IS_FIRSTCALL()) + { + srf_ctx = SRF_FIRSTCALL_INIT(); + old_context = MemoryContextSwitchTo(srf_ctx->multi_call_memory_ctx); + ps_ctx = (PoolerStateContext*)palloc(sizeof(PoolerStateContext)); + get_call_result_type(fcinfo, NULL, &ps_ctx->ret_desc); + ps_ctx->proxy_id = 0; + srf_ctx->user_fctx = ps_ctx; + MemoryContextSwitchTo(old_context); + } + srf_ctx = SRF_PERCALL_SETUP(); + ps_ctx = srf_ctx->user_fctx; + id = ps_ctx->proxy_id; + if (id == ConnectionProxiesNumber) + SRF_RETURN_DONE(srf_ctx); + + values[0] = Int32GetDatum(ProxyState[id].pid); + values[1] = Int32GetDatum(ProxyState[id].n_clients); + values[2] = Int32GetDatum(ProxyState[id].n_ssl_clients); + values[3] = Int32GetDatum(ProxyState[id].n_pools); + values[4] = Int32GetDatum(ProxyState[id].n_backends); + values[5] = Int32GetDatum(ProxyState[id].n_dedicated_backends); + values[6] = Int64GetDatum(ProxyState[id].tx_bytes); + values[7] = Int64GetDatum(ProxyState[id].rx_bytes); + values[8] = Int64GetDatum(ProxyState[id].n_transactions); + + for (i = 0; i <= 8; i++) + nulls[i] = false; + + ps_ctx->proxy_id += 1; + tuple = heap_form_tuple(ps_ctx->ret_desc, values, nulls); + SRF_RETURN_NEXT(srf_ctx, HeapTupleGetDatum(tuple)); +} + diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index d7d733530f..6d32d8fe8d 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -28,6 +28,7 @@ #include "postmaster/bgworker_internals.h" #include "postmaster/bgwriter.h" #include "postmaster/postmaster.h" +#include "postmaster/proxy.h" #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/walreceiver.h" @@ -150,6 +151,7 @@ CreateSharedMemoryAndSemaphores(int port) #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif + size = add_size(size, ConnectionProxyShmemSize()); /* freeze the addin request size and include it */ addin_request_allowed = false; @@ -255,6 +257,7 @@ CreateSharedMemoryAndSemaphores(int port) WalSndShmemInit(); WalRcvShmemInit(); ApplyLauncherShmemInit(); + ConnectionProxyShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 2426cbcf8e..d2806b7399 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -77,6 +77,7 @@ struct WaitEventSet { int nevents; /* number of registered events */ int nevents_space; /* maximum number of events in this set */ + int free_events; /* L1-list of free events linked by "pos" and terminated by -1*/ /* * Array, of nevents_space length, storing the definition of events this @@ -137,9 +138,9 @@ static void drainSelfPipe(void); #if defined(WAIT_USE_EPOLL) static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action); #elif defined(WAIT_USE_POLL) -static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event); +static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove); #elif defined(WAIT_USE_WIN32) -static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event); +static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove); #endif static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, @@ -585,6 +586,7 @@ CreateWaitEventSet(MemoryContext context, int nevents) set->latch = NULL; set->nevents_space = nevents; set->exit_on_postmaster_death = false; + set->free_events = -1; #if defined(WAIT_USE_EPOLL) #ifdef EPOLL_CLOEXEC @@ -691,9 +693,11 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data) { WaitEvent *event; + int free_event; /* not enough space */ - Assert(set->nevents < set->nevents_space); + if (set->nevents == set->nevents_space) + return -1; if (events == WL_EXIT_ON_PM_DEATH) { @@ -720,8 +724,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK)) elog(ERROR, "cannot wait on socket event without a socket"); - event = &set->events[set->nevents]; - event->pos = set->nevents++; + free_event = set->free_events; + if (free_event >= 0) + { + event = &set->events[free_event]; + set->free_events = event->pos; + event->pos = free_event; + } + else + { + event = &set->events[set->nevents]; + event->pos = set->nevents; + } + set->nevents += 1; event->fd = fd; event->events = events; event->user_data = user_data; @@ -748,14 +763,29 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, #if defined(WAIT_USE_EPOLL) WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD); #elif defined(WAIT_USE_POLL) - WaitEventAdjustPoll(set, event); + WaitEventAdjustPoll(set, event, false); #elif defined(WAIT_USE_WIN32) - WaitEventAdjustWin32(set, event); + WaitEventAdjustWin32(set, event, false); #endif return event->pos; } +/* + * Remove event with specified socket descriptor + */ +void DeleteWaitEventFromSet(WaitEventSet *set, int event_pos) +{ + WaitEvent *event = &set->events[event_pos]; +#if defined(WAIT_USE_EPOLL) + WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL); +#elif defined(WAIT_USE_POLL) + WaitEventAdjustPoll(set, event, true); +#elif defined(WAIT_USE_WIN32) + WaitEventAdjustWin32(set, event, true); +#endif +} + /* * Change the event mask and, in the WL_LATCH_SET case, the latch associated * with the WaitEvent. @@ -767,10 +797,16 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) { WaitEvent *event; - Assert(pos < set->nevents); + Assert(pos < set->nevents_space); event = &set->events[pos]; +#if defined(WAIT_USE_EPOLL) + /* ModifyWaitEvent is used to emulate epoll EPOLLET (edge-triggered) flag */ + if (events & WL_SOCKET_EDGE) + return; +#endif + /* * If neither the event mask nor the associated latch changes, return * early. That's an important optimization for some sockets, where @@ -804,9 +840,9 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) #if defined(WAIT_USE_EPOLL) WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD); #elif defined(WAIT_USE_POLL) - WaitEventAdjustPoll(set, event); + WaitEventAdjustPoll(set, event, false); #elif defined(WAIT_USE_WIN32) - WaitEventAdjustWin32(set, event); + WaitEventAdjustWin32(set, event, false); #endif } @@ -844,6 +880,8 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) epoll_ev.events |= EPOLLIN; if (event->events & WL_SOCKET_WRITEABLE) epoll_ev.events |= EPOLLOUT; + if (event->events & WL_SOCKET_EDGE) + epoll_ev.events |= EPOLLET; } /* @@ -852,21 +890,39 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) * requiring that, and actually it makes the code simpler... */ rc = epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev); - if (rc < 0) ereport(ERROR, (errcode_for_socket_access(), - /* translator: %s is a syscall name, such as "poll()" */ + /* translator: %s is a syscall name, such as "poll()" */ errmsg("%s failed: %m", "epoll_ctl()"))); + + if (action == EPOLL_CTL_DEL) + { + int pos = event->pos; + event->fd = PGINVALID_SOCKET; + set->nevents -= 1; + event->pos = set->free_events; + set->free_events = pos; + } } #endif #if defined(WAIT_USE_POLL) static void -WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) +WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove) { - struct pollfd *pollfd = &set->pollfds[event->pos]; + int pos = event->pos; + struct pollfd *pollfd = &set->pollfds[pos]; + + if (remove) + { + set->nevents -= 1; + *pollfd = set->pollfds[set->nevents]; + set->events[pos] = set->events[set->nevents]; + event->pos = pos; + return; + } pollfd->revents = 0; pollfd->fd = event->fd; @@ -897,9 +953,25 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) #if defined(WAIT_USE_WIN32) static void -WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) +WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove) { - HANDLE *handle = &set->handles[event->pos + 1]; + int pos = event->pos; + HANDLE *handle = &set->handles[pos + 1]; + + if (remove) + { + Assert(event->fd != PGINVALID_SOCKET); + + if (*handle != WSA_INVALID_EVENT) + WSACloseEvent(*handle); + + set->nevents -= 1; + set->events[pos] = set->events[set->nevents]; + *handle = set->handles[set->nevents + 1]; + set->handles[set->nevents + 1] = WSA_INVALID_EVENT; + event->pos = pos; + return; + } if (event->events == WL_LATCH_SET) { @@ -912,7 +984,7 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) } else { - int flags = FD_CLOSE; /* always check for errors/EOF */ + int flags = FD_CLOSE; /* always check for errors/EOF */ if (event->events & WL_SOCKET_READABLE) flags |= FD_READ; @@ -929,8 +1001,8 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) WSAGetLastError()); } if (WSAEventSelect(event->fd, *handle, flags) != 0) - elog(ERROR, "failed to set up event for socket: error code %u", - WSAGetLastError()); + elog(ERROR, "failed to set up event for socket %p: error code %u", + event->fd, WSAGetLastError()); Assert(event->fd != PGINVALID_SOCKET); } @@ -1336,7 +1408,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, { if (cur_event->reset) { - WaitEventAdjustWin32(set, cur_event); + WaitEventAdjustWin32(set, cur_event, false); cur_event->reset = false; } diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 44a59e1d4f..62ec2afd2e 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -4217,6 +4217,8 @@ PostgresMain(int argc, char *argv[], */ if (ConfigReloadPending) { + if (RestartPoolerOnReload && strcmp(application_name, "pool_worker") == 0) + proc_exit(0); ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); } diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c index ffd1970f58..16ca58d9d0 100644 --- a/src/backend/utils/adt/lockfuncs.c +++ b/src/backend/utils/adt/lockfuncs.c @@ -18,6 +18,7 @@ #include "funcapi.h" #include "miscadmin.h" #include "storage/predicate_internals.h" +#include "storage/proc.h" #include "utils/array.h" #include "utils/builtins.h" @@ -658,6 +659,7 @@ pg_isolation_test_session_is_blocked(PG_FUNCTION_ARGS) static void PreventAdvisoryLocksInParallelMode(void) { + MyProc->is_tainted = true; if (IsInParallelMode()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 3bf96de256..79001ccf91 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -130,9 +130,14 @@ int max_parallel_maintenance_workers = 2; */ int NBuffers = 1000; int MaxConnections = 90; +int SessionPoolSize = 0; +int ConnectionProxiesNumber = 1; +int SessionSchedule = SESSION_SCHED_ROUND_ROBIN; + int max_worker_processes = 8; int max_parallel_workers = 8; int MaxBackends = 0; +int MaxSessions = 1000; int VacuumCostPageHit = 1; /* GUC parameters for vacuum */ int VacuumCostPageMiss = 10; @@ -148,3 +153,4 @@ int VacuumCostBalance = 0; /* working state for vacuum */ bool VacuumCostActive = false; double vacuum_cleanup_index_scale_factor; +bool RestartPoolerOnReload = false; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 92c4fee8f8..65f66db8e9 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -457,6 +457,14 @@ const struct config_enum_entry ssl_protocol_versions_info[] = { {NULL, 0, false} }; +static const struct config_enum_entry session_schedule_options[] = { + {"round-robin", SESSION_SCHED_ROUND_ROBIN, false}, + {"random", SESSION_SCHED_RANDOM, false}, + {"load-balancing", SESSION_SCHED_LOAD_BALANCING, false}, + {NULL, 0, false} +}; + + static struct config_enum_entry shared_memory_options[] = { #ifndef WIN32 {"sysv", SHMEM_TYPE_SYSV, false}, @@ -1285,6 +1293,16 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"restart_pooler_on_reload", PGC_SIGHUP, CONN_POOLING, + gettext_noop("Restart session pool workers on pg_reload_conf()."), + NULL, + }, + &RestartPoolerOnReload, + false, + NULL, NULL, NULL + }, + { {"log_duration", PGC_SUSET, LOGGING_WHAT, gettext_noop("Logs the duration of each completed SQL statement."), @@ -2137,6 +2155,42 @@ static struct config_int ConfigureNamesInt[] = check_maxconnections, NULL, NULL }, + { + /* see max_connections and max_wal_senders */ + {"session_pool_size", PGC_POSTMASTER, CONN_POOLING, + gettext_noop("Sets number of backends serving client sessions."), + gettext_noop("If non-zero then session pooling will be used: " + "client connections will be redirected to one of the backends and maximal number of backends is determined by this parameter." + "Launched backend are never terminated even in case of no active sessions.") + }, + &SessionPoolSize, + 10, 0, INT_MAX, + NULL, NULL, NULL + }, + + { + {"connection_proxies", PGC_POSTMASTER, CONN_POOLING, + gettext_noop("Sets number of connection proxies."), + gettext_noop("Postmaster spawns separate worker process for each proxy. Postmaster scatters connections between proxies using one of scheduling policies (round-robin, random, load-balancing)." + "Each proxy launches its own subset of backends. So maximal number of non-tainted backends is " + "session_pool_size*connection_proxies*databases*roles.") + }, + &ConnectionProxiesNumber, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + + { + {"max_sessions", PGC_POSTMASTER, CONN_POOLING, + gettext_noop("Sets the maximum number of client session."), + gettext_noop("Maximal number of client sessions which can be handled by ont connection proxy." + "It can be greater than max_connections and actually be arbitrary large.") + }, + &MaxSessions, + 1000, 1, INT_MAX, + NULL, NULL, NULL + }, + { /* see max_connections */ {"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, @@ -2184,6 +2238,16 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"proxy_port", PGC_POSTMASTER, CONN_AUTH_SETTINGS, + gettext_noop("Sets the TCP port for the connection pooler."), + NULL + }, + &ProxyPortNumber, + 6543, 1, 65535, + NULL, NULL, NULL + }, + { {"unix_socket_permissions", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the access permissions of the Unix-domain socket."), @@ -4550,6 +4614,16 @@ static struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"session_schedule", PGC_POSTMASTER, RESOURCES_MEM, + gettext_noop("Session schedule policy for connection pool."), + NULL + }, + &SessionSchedule, + SESSION_SCHED_ROUND_ROBIN, session_schedule_options, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL @@ -8145,6 +8219,7 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot set parameters during a parallel operation"))); + MyProc->is_tainted = true; switch (stmt->kind) { diff --git a/src/backend/utils/mmgr/mcxt.c b/src/backend/utils/mmgr/mcxt.c index b07be12236..dac74a272d 100644 --- a/src/backend/utils/mmgr/mcxt.c +++ b/src/backend/utils/mmgr/mcxt.c @@ -506,7 +506,7 @@ MemoryContextStatsDetail(MemoryContext context, int max_children) * *totals (if given). */ static void -MemoryContextStatsInternal(MemoryContext context, int level, + MemoryContextStatsInternal(MemoryContext context, int level, bool print, int max_children, MemoryContextCounters *totals) { diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 87335248a0..5f528c1d72 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10677,4 +10677,11 @@ proname => 'pg_partition_root', prorettype => 'regclass', proargtypes => 'regclass', prosrc => 'pg_partition_root' }, +# builin connection pool +{ oid => '3435', descr => 'information about connection pooler proxies workload', + proname => 'pg_pooler_state', prorows => '1000', proretset => 't', + provolatile => 'v', prorettype => 'record', proargtypes => '', + proallargtypes => '{int4,int4,int4,int4,int4,int4,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,n_clients,n_ssl_clients,n_pools,n_backends,n_dedicated_backends,tx_bytes,rx_bytes,n_transactions}', prosrc => 'pg_pooler_state' }, + ] diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 08a257616d..1e12ee1884 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -54,10 +54,9 @@ extern const PGDLLIMPORT PQcommMethods *PqCommMethods; * prototypes for functions in pqcomm.c */ extern WaitEventSet *FeBeWaitSet; - -extern int StreamServerPort(int family, char *hostName, - unsigned short portNumber, char *unixSocketDir, - pgsocket ListenSocket[], int MaxListen); +extern int StreamServerPort(int family, char *hostName, + unsigned short portNumber, char *unixSocketDir, + pgsocket ListenSocket[], int ListenPort[], int MaxListen); extern int StreamConnection(pgsocket server_fd, Port *port); extern void StreamClose(pgsocket sock); extern void TouchSocketFiles(void); diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 61a24c2e3c..86c0ef84e5 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -159,6 +159,19 @@ extern PGDLLIMPORT int data_directory_mode; extern PGDLLIMPORT int NBuffers; extern PGDLLIMPORT int MaxBackends; extern PGDLLIMPORT int MaxConnections; + +enum SessionSchedulePolicy +{ + SESSION_SCHED_ROUND_ROBIN, + SESSION_SCHED_RANDOM, + SESSION_SCHED_LOAD_BALANCING +}; +extern PGDLLIMPORT int MaxSessions; +extern PGDLLIMPORT int SessionPoolSize; +extern PGDLLIMPORT int ConnectionProxiesNumber; +extern PGDLLIMPORT int SessionSchedule; +extern PGDLLIMPORT bool RestartPoolerOnReload; + extern PGDLLIMPORT int max_worker_processes; extern PGDLLIMPORT int max_parallel_workers; diff --git a/src/include/port.h b/src/include/port.h index b5c03d912b..3ea24a3b70 100644 --- a/src/include/port.h +++ b/src/include/port.h @@ -41,6 +41,10 @@ typedef SOCKET pgsocket; extern bool pg_set_noblock(pgsocket sock); extern bool pg_set_block(pgsocket sock); +/* send/receive socket descriptor */ +extern int pg_send_sock(pgsocket chan, pgsocket sock, pid_t pid); +extern pgsocket pg_recv_sock(pgsocket chan); + /* Portable path handling for Unix/Win32 (in path.c) */ extern bool has_drive_prefix(const char *filename); diff --git a/src/include/port/win32_port.h b/src/include/port/win32_port.h index f4841fb397..e101df179f 100644 --- a/src/include/port/win32_port.h +++ b/src/include/port/win32_port.h @@ -445,6 +445,7 @@ extern int pgkill(int pid, int sig); #define select(n, r, w, e, timeout) pgwin32_select(n, r, w, e, timeout) #define recv(s, buf, len, flags) pgwin32_recv(s, buf, len, flags) #define send(s, buf, len, flags) pgwin32_send(s, buf, len, flags) +#define socketpair(af, type, protocol, socks) pgwin32_socketpair(af, type, protocol, socks) SOCKET pgwin32_socket(int af, int type, int protocol); int pgwin32_bind(SOCKET s, struct sockaddr *addr, int addrlen); @@ -455,7 +456,8 @@ int pgwin32_select(int nfds, fd_set *readfs, fd_set *writefds, fd_set *exceptf int pgwin32_recv(SOCKET s, char *buf, int len, int flags); int pgwin32_send(SOCKET s, const void *buf, int len, int flags); int pgwin32_waitforsinglesocket(SOCKET s, int what, int timeout); - +int pgwin32_socketpair(int domain, int type, int protocol, SOCKET socks[2]); + extern int pgwin32_noblock; #endif /* FRONTEND */ diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h index 8ccd2afce5..05906e94a0 100644 --- a/src/include/postmaster/postmaster.h +++ b/src/include/postmaster/postmaster.h @@ -17,6 +17,7 @@ extern bool EnableSSL; extern int ReservedBackends; extern PGDLLIMPORT int PostPortNumber; +extern PGDLLIMPORT int ProxyPortNumber; extern int Unix_socket_permissions; extern char *Unix_socket_group; extern char *Unix_socket_directories; @@ -46,6 +47,11 @@ extern int postmaster_alive_fds[2]; extern PGDLLIMPORT const char *progname; +extern PGDLLIMPORT void* (*LibpqConnectdbParams)(char const* keywords[], char const* values[]); + +struct Proxy; +struct Port; + extern void PostmasterMain(int argc, char *argv[]) pg_attribute_noreturn(); extern void ClosePostmasterPorts(bool am_syslogger); extern void InitProcessGlobals(void); @@ -63,6 +69,9 @@ extern Size ShmemBackendArraySize(void); extern void ShmemBackendArrayAllocation(void); #endif +extern int ParseStartupPacket(struct Port* port, MemoryContext memctx, void* pkg_body, int pkg_size, bool SSLdone); +extern int BackendStartup(struct Port* port, int* backend_pid); + /* * Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved * for buffer references in buf_internals.h. This limitation could be lifted diff --git a/src/include/postmaster/proxy.h b/src/include/postmaster/proxy.h new file mode 100644 index 0000000000..7f7a92a56a --- /dev/null +++ b/src/include/postmaster/proxy.h @@ -0,0 +1,43 @@ +/*------------------------------------------------------------------------- + * + * proxy.h + * Exports from postmaster/proxy.c. + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/postmaster/proxy.h + * + *------------------------------------------------------------------------- + */ +#ifndef _PROXY_H +#define _PROXY_H + +/* + * Information in share dmemory about connection proxy state (used for session scheduling and monitoring) + */ +typedef struct ConnectionProxyState +{ + int pid; /* proxy worker pid */ + int n_clients; /* total number of clients */ + int n_ssl_clients; /* number of clients using SSL connection */ + int n_pools; /* nubmer of dbname/role combinations */ + int n_backends; /* totatal number of launched backends */ + int n_dedicated_backends; /* number of tainted backends */ + uint64 tx_bytes; /* amount of data sent to client */ + uint64 rx_bytes; /* amount of data send to server */ + uint64 n_transactions; /* total number of proroceeded transactions */ +} ConnectionProxyState; + +extern ConnectionProxyState* ProxyState; +extern PGDLLIMPORT int MyProxyId; +extern PGDLLIMPORT pgsocket MyProxySocket; + +extern int ConnectionProxyStart(void); +extern int ConnectionProxyShmemSize(void); +extern void ConnectionProxyShmemInit(void); +#ifdef EXEC_BACKEND +extern void ConnectionProxyMain(int argc, char *argv[]); +#endif + +#endif diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index bd7af11a8a..680eb5ee10 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -133,9 +133,11 @@ typedef struct Latch /* avoid having to deal with case on platforms not requiring it */ #define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE #endif +#define WL_SOCKET_EDGE (1 << 7) #define WL_SOCKET_MASK (WL_SOCKET_READABLE | \ WL_SOCKET_WRITEABLE | \ + WL_SOCKET_EDGE | \ WL_SOCKET_CONNECTED) typedef struct WaitEvent @@ -177,6 +179,8 @@ extern int WaitLatch(Latch *latch, int wakeEvents, long timeout, extern int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info); +extern void DeleteWaitEventFromSet(WaitEventSet *set, int event_pos); + /* * Unix implementation uses SIGUSR1 for inter-process signaling. * Win32 doesn't need this. diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index ac7ee72952..e7207e2d9a 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -203,6 +203,8 @@ struct PGPROC PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */ dlist_head lockGroupMembers; /* list of members, if I'm a leader */ dlist_node lockGroupLink; /* my member link, if I'm a member */ + + bool is_tainted; /* backend has modified session GUCs, use temporary tables, prepare statements, ... */ }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h index d68976fafa..9ff45b190a 100644 --- a/src/include/utils/guc_tables.h +++ b/src/include/utils/guc_tables.h @@ -58,6 +58,7 @@ enum config_group CONN_AUTH_SETTINGS, CONN_AUTH_AUTH, CONN_AUTH_SSL, + CONN_POOLING, RESOURCES, RESOURCES_MEM, RESOURCES_DISK, diff --git a/src/makefiles/Makefile.cygwin b/src/makefiles/Makefile.cygwin index f274d802b1..fdf53e9a8d 100644 --- a/src/makefiles/Makefile.cygwin +++ b/src/makefiles/Makefile.cygwin @@ -19,6 +19,7 @@ override CPPFLAGS += -DWIN32_STACK_RLIMIT=$(WIN32_STACK_RLIMIT) ifneq (,$(findstring backend,$(subdir))) ifeq (,$(findstring conversion_procs,$(subdir))) ifeq (,$(findstring libpqwalreceiver,$(subdir))) +ifeq (,$(findstring libpqconn,$(subdir))) ifeq (,$(findstring replication/pgoutput,$(subdir))) ifeq (,$(findstring snowball,$(subdir))) override CPPFLAGS+= -DBUILDING_DLL diff --git a/src/makefiles/Makefile.win32 b/src/makefiles/Makefile.win32 index 3dea11e5c2..39bd2de85e 100644 --- a/src/makefiles/Makefile.win32 +++ b/src/makefiles/Makefile.win32 @@ -17,6 +17,7 @@ CFLAGS_SL = ifneq (,$(findstring backend,$(subdir))) ifeq (,$(findstring conversion_procs,$(subdir))) ifeq (,$(findstring libpqwalreceiver,$(subdir))) +ifeq (,$(findstring libpqconn,$(subdir))) ifeq (,$(findstring replication/pgoutput,$(subdir))) ifeq (,$(findstring snowball,$(subdir))) override CPPFLAGS+= -DBUILDING_DLL