diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index b945b15..a73d584 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -813,3 +813,30 @@ build_regtype_array(Oid *param_types, int num_params) result = construct_array(tmp_ary, num_params, REGTYPEOID, 4, true, 'i'); return PointerGetDatum(result); } + + +void +DropSessionPreparedStatements(char const* sessionId) +{ + HASH_SEQ_STATUS seq; + PreparedStatement *entry; + size_t idLen = strlen(sessionId); + + /* nothing cached */ + if (!prepared_queries) + return; + + /* walk over cache */ + hash_seq_init(&seq, prepared_queries); + while ((entry = hash_seq_search(&seq)) != NULL) + { + if (strncmp(entry->stmt_name, sessionId, idLen) == 0 && entry->stmt_name[idLen] == '.') + { + /* Release the plancache entry */ + DropCachedPlan(entry->plansource); + + /* Now we can remove the hash table entry */ + hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL); + } + } +} diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index a4f6d4d..5b07a88 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -1029,6 +1029,18 @@ pq_peekbyte(void) } /* -------------------------------- + * pq_peekbyte - peek at next byte from connection + * + * Same as pq_getbyte() except we don't advance the pointer. + * -------------------------------- + */ +int +pq_available_bytes(void) +{ + return PqRecvLength - PqRecvPointer; +} + +/* -------------------------------- * pq_getbyte_if_available - get a single byte from connection, * if available * diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile index aba1e92..56ec998 100644 --- a/src/backend/port/Makefile +++ b/src/backend/port/Makefile @@ -21,7 +21,7 @@ subdir = src/backend/port top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o $(TAS) +OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o send_sock.o $(TAS) ifeq ($(PORTNAME), win32) SUBDIRS += win32 diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c new file mode 100644 index 0000000..7b36923 --- /dev/null +++ b/src/backend/port/send_sock.c @@ -0,0 +1,89 @@ +/*------------------------------------------------------------------------- + * + * send_sock.c + * Send socket descriptor to another process + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/port/send_sock.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +/* + * Send socket descriptor "sock" to backend process through Unix socket "chan" + */ +int pg_send_sock(pgsocket chan, pgsocket sock) +{ + struct msghdr msg = { 0 }; + struct iovec io; + struct cmsghdr * cmsg; + char buf[CMSG_SPACE(sizeof(sock))]; + memset(buf, '\0', sizeof(buf)); + + /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */ + io.iov_base = ""; + io.iov_len = 1; + + msg.msg_iov = &io; + msg.msg_iovlen = 1; + msg.msg_control = buf; + msg.msg_controllen = sizeof(buf); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(sock)); + + memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock)); + msg.msg_controllen = cmsg->cmsg_len; + + if (sendmsg(chan, &msg, 0) < 0) + { + return -1; + } + return 0; +} + + +/* + * Receive socket descriptor from postmaster process through Unix socket "chan" + */ +pgsocket pg_recv_sock(pgsocket chan) +{ + struct msghdr msg = {0}; + char c_buffer[256]; + char m_buffer[256]; + struct iovec io; + struct cmsghdr * cmsg; + pgsocket sock; + + io.iov_base = m_buffer; + io.iov_len = sizeof(m_buffer); + msg.msg_iov = &io; + msg.msg_iovlen = 1; + + msg.msg_control = c_buffer; + msg.msg_controllen = sizeof(c_buffer); + + if (recvmsg(chan, &msg, 0) < 0) + { + return -1; + } + + cmsg = CMSG_FIRSTHDR(&msg); + memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock)); + + return sock; +} diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index f3ddf82..4586b57 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -169,6 +169,7 @@ typedef struct bkend pid_t pid; /* process id of backend */ int32 cancel_key; /* cancel key for cancels for this backend */ int child_slot; /* PMChildSlot for this backend, if any */ + pgsocket session_send_sock; /* Write end of socket pipe to this backend used to send session socket descriptor to the backend process */ /* * Flavor of backend or auxiliary process. Note that BACKEND_TYPE_WALSND @@ -182,6 +183,15 @@ typedef struct bkend } Backend; static dlist_head BackendList = DLIST_STATIC_INIT(BackendList); +/* + * Pointer in backend list used to implement round-robin distribution of sessions through backends. + * This variable either NULL, either points to the normal backend. + */ +static Backend* BackendListClockPtr; +/* + * Number of active normal backends + */ +static int nNormalBackends; #ifdef EXEC_BACKEND static Backend *ShmemBackendArray; @@ -412,7 +422,6 @@ static void BackendRun(Port *port) pg_attribute_noreturn(); static void ExitPostmaster(int status) pg_attribute_noreturn(); static int ServerLoop(void); static int BackendStartup(Port *port); -static int ProcessStartupPacket(Port *port, bool SSLdone); static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options); static void processCancelRequest(Port *port, void *pkt); static int initMasks(fd_set *rmask); @@ -568,6 +577,22 @@ HANDLE PostmasterHandle; #endif /* + * Move current backend pointer to the next normal backend. + * This function is called either when new session is started to implement round-robin policy, either when backend pointer by BackendListClockPtr is terminated + */ +static void AdvanceBackendListClockPtr(void) +{ + Backend* b = BackendListClockPtr; + do { + dlist_node* node = &b->elem; + node = node->next ? node->next : BackendList.head.next; + b = dlist_container(Backend, elem, node); + } while (b->bkend_type != BACKEND_TYPE_NORMAL && b != BackendListClockPtr); + + BackendListClockPtr = b; +} + +/* * Postmaster main entry point */ void @@ -1944,8 +1969,8 @@ initMasks(fd_set *rmask) * send anything to the client, which would typically be appropriate * if we detect a communications failure.) */ -static int -ProcessStartupPacket(Port *port, bool SSLdone) +int +ProcessStartupPacket(Port *port, bool SSLdone, MemoryContext memctx) { int32 len; void *buf; @@ -2043,7 +2068,7 @@ retry1: #endif /* regular startup packet, cancel, etc packet should follow... */ /* but not another SSL negotiation request */ - return ProcessStartupPacket(port, true); + return ProcessStartupPacket(port, true, memctx); } /* Could add additional special packet types here */ @@ -2073,7 +2098,7 @@ retry1: * not worry about leaking this storage on failure, since we aren't in the * postmaster process anymore. */ - oldcontext = MemoryContextSwitchTo(TopMemoryContext); + oldcontext = MemoryContextSwitchTo(memctx); if (PG_PROTOCOL_MAJOR(proto) >= 3) { @@ -2449,7 +2474,7 @@ ConnCreate(int serverFd) ConnFree(port); return NULL; } - + SessionPoolSock = PGINVALID_SOCKET; /* * Allocate GSSAPI specific state struct */ @@ -3236,6 +3261,24 @@ CleanupBackgroundWorker(int pid, } /* + * Unlink backend from backend's list and free memory + */ +static void UnlinkBackend(Backend* bp) +{ + if (bp->bkend_type == BACKEND_TYPE_NORMAL) + { + if (bp == BackendListClockPtr) + AdvanceBackendListClockPtr(); + if (bp->session_send_sock != PGINVALID_SOCKET) + close(bp->session_send_sock); + elog(DEBUG2, "Cleanup backend %d", bp->pid); + nNormalBackends -= 1; + } + dlist_delete(&bp->elem); + free(bp); +} + +/* * CleanupBackend -- cleanup after terminated backend. * * Remove all local state associated with backend. @@ -3312,8 +3355,7 @@ CleanupBackend(int pid, */ BackgroundWorkerStopNotifications(bp->pid); } - dlist_delete(iter.cur); - free(bp); + UnlinkBackend(bp); break; } } @@ -3415,8 +3457,7 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) ShmemBackendArrayRemove(bp); #endif } - dlist_delete(iter.cur); - free(bp); + UnlinkBackend(bp); /* Keep looping so we can signal remaining backends */ } else @@ -4017,6 +4058,19 @@ BackendStartup(Port *port) { Backend *bn; /* for backend cleanup */ pid_t pid; + int session_pipe[2]; + + if (SessionPoolSize != 0 && nNormalBackends >= SessionPoolSize) + { + /* Instead of spawning new backend open new session at one of the existed backends. */ + Assert(BackendListClockPtr && BackendListClockPtr->session_send_sock != PGINVALID_SOCKET); + elog(DEBUG2, "Start new session for socket %d at backend %d total %d", port->sock, BackendListClockPtr->pid, nNormalBackends); + if (pg_send_sock(BackendListClockPtr->session_send_sock, port->sock) < 0) + elog(FATAL, "Failed to send session socket: %m"); + AdvanceBackendListClockPtr(); /* round-robin */ + return STATUS_OK; + } + /* * Create backend data structure. Better before the fork() so we can @@ -4030,7 +4084,6 @@ BackendStartup(Port *port) errmsg("out of memory"))); return STATUS_ERROR; } - /* * Compute the cancel key that will be assigned to this backend. The * backend will have its own copy in the forked-off process' value of @@ -4063,12 +4116,23 @@ BackendStartup(Port *port) /* Hasn't asked to be notified about any bgworkers yet */ bn->bgworker_notify = false; + if (SessionPoolSize != 0) + if (socketpair(AF_UNIX, SOCK_DGRAM, 0, session_pipe) < 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg_internal("could not create socket pair for launching sessions: %m"))); + #ifdef EXEC_BACKEND pid = backend_forkexec(port); #else /* !EXEC_BACKEND */ pid = fork_process(); if (pid == 0) /* child */ { + if (SessionPoolSize != 0) + { + SessionPoolSock = session_pipe[0]; + close(session_pipe[1]); + } free(bn); /* Detangle from postmaster */ @@ -4110,9 +4174,19 @@ BackendStartup(Port *port) * of backends. */ bn->pid = pid; + if (SessionPoolSize != 0) + { + bn->session_send_sock = session_pipe[1]; + close(session_pipe[0]); + } + else + bn->session_send_sock = PGINVALID_SOCKET; bn->bkend_type = BACKEND_TYPE_NORMAL; /* Can change later to WALSND */ dlist_push_head(&BackendList, &bn->elem); - + if (BackendListClockPtr == NULL) + BackendListClockPtr = bn; + nNormalBackends += 1; + elog(DEBUG2, "Start backend %d total %d", pid, nNormalBackends); #ifdef EXEC_BACKEND if (!bn->dead_end) ShmemBackendArrayAdd(bn); @@ -4299,7 +4373,7 @@ BackendInitialize(Port *port) * Receive the startup packet (which might turn out to be a cancel request * packet). */ - status = ProcessStartupPacket(port, false); + status = ProcessStartupPacket(port, false, TopMemoryContext); /* * Stop here if it was bad or a cancel packet. ProcessStartupPacket diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index e6706f7..9c42fab 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -76,6 +76,7 @@ struct WaitEventSet { int nevents; /* number of registered events */ int nevents_space; /* maximum number of events in this set */ + int free_events; /* L1-list of free events linked by "pos" and terminated by -1*/ /* * Array, of nevents_space length, storing the definition of events this @@ -129,7 +130,7 @@ static void drainSelfPipe(void); #if defined(WAIT_USE_EPOLL) static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action); #elif defined(WAIT_USE_POLL) -static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event); +static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove); #elif defined(WAIT_USE_WIN32) static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event); #endif @@ -562,6 +563,7 @@ CreateWaitEventSet(MemoryContext context, int nevents) set->latch = NULL; set->nevents_space = nevents; + set->free_events = -1; #if defined(WAIT_USE_EPOLL) #ifdef EPOLL_CLOEXEC @@ -667,6 +669,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data) { WaitEvent *event; + int free_event; /* not enough space */ Assert(set->nevents < set->nevents_space); @@ -690,8 +693,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK)) elog(ERROR, "cannot wait on socket event without a socket"); - event = &set->events[set->nevents]; - event->pos = set->nevents++; + free_event = set->free_events; + if (free_event >= 0) + { + event = &set->events[free_event]; + set->free_events = event->pos; + event->pos = free_event; + } + else + { + event = &set->events[set->nevents]; + event->pos = set->nevents; + } + set->nevents += 1; event->fd = fd; event->events = events; event->user_data = user_data; @@ -718,7 +732,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, #if defined(WAIT_USE_EPOLL) WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD); #elif defined(WAIT_USE_POLL) - WaitEventAdjustPoll(set, event); + WaitEventAdjustPoll(set, event, false); #elif defined(WAIT_USE_WIN32) WaitEventAdjustWin32(set, event); #endif @@ -727,6 +741,27 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, } /* + * Remove event with specified socket descriptor + */ +void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd) +{ + int i, n = set->nevents; + for (i = 0; i < n; i++) + { + WaitEvent *event = &set->events[i]; + if (event->fd == fd) + { +#if defined(WAIT_USE_EPOLL) + WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL); +#elif defined(WAIT_USE_POLL) + WaitEventAdjustPoll(set, event, true); +#endif + break; + } + } +} + +/* * Change the event mask and, in the WL_LATCH_SET case, the latch associated * with the WaitEvent. * @@ -774,7 +809,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) #if defined(WAIT_USE_EPOLL) WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD); #elif defined(WAIT_USE_POLL) - WaitEventAdjustPoll(set, event); + WaitEventAdjustPoll(set, event, false); #elif defined(WAIT_USE_WIN32) WaitEventAdjustWin32(set, event); #endif @@ -827,14 +862,33 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) ereport(ERROR, (errcode_for_socket_access(), errmsg("epoll_ctl() failed: %m"))); + + if (action == EPOLL_CTL_DEL) + { + int pos = event->pos; + event->fd = PGINVALID_SOCKET; + set->nevents -= 1; + event->pos = set->free_events; + set->free_events = pos; + } } #endif #if defined(WAIT_USE_POLL) static void -WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) +WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove) { - struct pollfd *pollfd = &set->pollfds[event->pos]; + int pos = event->pos; + struct pollfd *pollfd = &set->pollfds[pos]; + + if (remove) + { + set->nevents -= 1; + *pollfd = set->pollfds[set->nevents]; + set->events[pos] = set->events[set->nevents]; + event->pos = pos; + return; + } pollfd->revents = 0; pollfd->fd = event->fd; diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index ddc3ec8..f8abfd0 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -75,9 +75,17 @@ #include "utils/snapmgr.h" #include "utils/timeout.h" #include "utils/timestamp.h" +#include "utils/builtins.h" #include "mb/pg_wchar.h" +typedef struct SessionContext +{ + MemoryContext memory; + Port* port; + char* id; +} SessionContext; + /* ---------------- * global variables * ---------------- @@ -98,6 +106,8 @@ int max_stack_depth = 100; /* wait N seconds to allow attach from a debugger */ int PostAuthDelay = 0; +/* Local socket for redirecting sessions to the backends */ +pgsocket SessionPoolSock = PGINVALID_SOCKET; /* ---------------- @@ -169,6 +179,11 @@ static ProcSignalReason RecoveryConflictReason; static MemoryContext row_description_context = NULL; static StringInfoData row_description_buf; +static WaitEventSet* SessionPool; +static int64 SessionCount; +static SessionContext* CurrentSession; +static Port* BackendPort; + /* ---------------------------------------------------------------- * decls for routines only used in this file * ---------------------------------------------------------------- @@ -194,6 +209,22 @@ static void log_disconnections(int code, Datum arg); static void enable_statement_timeout(void); static void disable_statement_timeout(void); +/* + * Generate session ID unique within this backend + */ +static char* CreateSessionId(void) +{ + char buf[64]; + pg_lltoa(++SessionCount, buf); + return pstrdup(buf); +} + +static void DeleteSession(SessionContext* session) +{ + elog(LOG, "Delete session %p, id=%s, memory context=%p", session, session->id, session->memory); + MemoryContextDelete(session->memory); + free(session); +} /* ---------------------------------------------------------------- * routines to obtain user input @@ -1232,6 +1263,12 @@ exec_parse_message(const char *query_string, /* string to execute */ bool save_log_statement_stats = log_statement_stats; char msec_str[32]; + if (CurrentSession && stmt_name[0] != '\0') + { + /* Make names of prepared statements unique for session in case of using internal session pool */ + stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name); + } + /* * Report query to various monitoring facilities. */ @@ -1503,6 +1540,12 @@ exec_bind_message(StringInfo input_message) portal_name = pq_getmsgstring(input_message); stmt_name = pq_getmsgstring(input_message); + if (CurrentSession && stmt_name[0] != '\0') + { + /* Make names of prepared statements unique for session in case of using internal session pool */ + stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name); + } + ereport(DEBUG2, (errmsg("bind %s to %s", *portal_name ? portal_name : "", @@ -2325,6 +2368,12 @@ exec_describe_statement_message(const char *stmt_name) CachedPlanSource *psrc; int i; + if (CurrentSession && stmt_name[0] != '\0') + { + /* Make names of prepared statements unique for session in case of using internal session pool */ + stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name); + } + /* * Start up a transaction command. (Note that this will normally change * current memory context.) Nothing happens if we are already in one. @@ -3603,7 +3652,6 @@ process_postgres_switches(int argc, char *argv[], GucContext ctx, #endif } - /* ---------------------------------------------------------------- * PostgresMain * postgres main loop -- all backends, interactive or otherwise start here @@ -3654,6 +3702,21 @@ PostgresMain(int argc, char *argv[], progname))); } + /* Assign session ID if use session pooling */ + if (SessionPoolSize != 0) + { + MemoryContext oldcontext; + CurrentSession = (SessionContext*)malloc(sizeof(SessionContext)); + CurrentSession->memory = AllocSetContextCreate(TopMemoryContext, + "SessionMemoryContext", + ALLOCSET_DEFAULT_SIZES); + oldcontext = MemoryContextSwitchTo(CurrentSession->memory); + CurrentSession->id = CreateSessionId(); + CurrentSession->port = MyProcPort; + BackendPort = MyProcPort; + MemoryContextSwitchTo(oldcontext); + } + /* Acquire configuration parameters, unless inherited from postmaster */ if (!IsUnderPostmaster) { @@ -3783,7 +3846,7 @@ PostgresMain(int argc, char *argv[], * ... else we'd need to copy the Port data first. Also, subsidiary data * such as the username isn't lost either; see ProcessStartupPacket(). */ - if (PostmasterContext) + if (PostmasterContext && SessionPoolSize == 0) { MemoryContextDelete(PostmasterContext); PostmasterContext = NULL; @@ -4069,6 +4132,120 @@ PostgresMain(int argc, char *argv[], ReadyForQuery(whereToSendOutput); send_ready_for_query = false; + + if (SessionPoolSock != PGINVALID_SOCKET && !IsTransactionState() && pq_available_bytes() == 0) + { + WaitEvent ready_client; + if (SessionPool == NULL) + { + SessionPool = CreateWaitEventSet(TopMemoryContext, MaxSessions); + AddWaitEventToSet(SessionPool, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, CurrentSession); + AddWaitEventToSet(SessionPool, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, CurrentSession); + AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, SessionPoolSock, NULL, CurrentSession); + AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, MyProcPort->sock, NULL, CurrentSession); + } + ChooseSession: + DoingCommandRead = true; + if (WaitEventSetWait(SessionPool, -1, &ready_client, 1, PG_WAIT_CLIENT) != 1) + { + /* TODO: do some error recovery here */ + elog(FATAL, "Failed to poll client sessions"); + } + CHECK_FOR_INTERRUPTS(); + DoingCommandRead = false; + + if (ready_client.events & WL_POSTMASTER_DEATH) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating connection due to unexpected postmaster exit"))); + + if (ready_client.events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + ProcessClientReadInterrupt(true); + goto ChooseSession; + } + + if (ready_client.fd == SessionPoolSock) + { + int status; + SessionContext* session; + StringInfoData buf; + Port* port; + pgsocket sock; + MemoryContext oldcontext; + + sock = pg_recv_sock(SessionPoolSock); + if (sock < 0) + elog(FATAL, "Failed to receive session socket: %m"); + + session = (SessionContext*)malloc(sizeof(SessionContext)); + session->memory = AllocSetContextCreate(TopMemoryContext, + "SessionMemoryContext", + ALLOCSET_DEFAULT_SIZES); + oldcontext = MemoryContextSwitchTo(session->memory); + port = palloc(sizeof(Port)); + memcpy(port, BackendPort, sizeof(Port)); + + /* + * Receive the startup packet (which might turn out to be a cancel request + * packet). + */ + port->sock = sock; + session->port = port; + session->id = CreateSessionId(); + + MyProcPort = port; + status = ProcessStartupPacket(port, false, session->memory); + MemoryContextSwitchTo(oldcontext); + + if (strcmp(port->database_name, MyProcPort->database_name) || + strcmp(port->user_name, MyProcPort->user_name)) + { + elog(FATAL, "Failed to open session (dbname=%s user=%s) in backend %d (dbname=%s user=%s)", + port->database_name, port->user_name, + MyProcPid, MyProcPort->database_name, MyProcPort->user_name); + } + else if (status == STATUS_OK) + { + elog(DEBUG2, "Start new session %d in backend %d for database %s user %s", + sock, MyProcPid, port->database_name, port->user_name); + CurrentSession = session; + AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, sock, NULL, session); + + SetCurrentStatementStartTimestamp(); + StartTransactionCommand(); + PerformAuthentication(MyProcPort); + CommitTransactionCommand(); + + BeginReportingGUCOptions(); + /* + * Send this backend's cancellation info to the frontend. + */ + pq_beginmessage(&buf, 'K'); + pq_sendint32(&buf, (int32) MyProcPid); + pq_sendint32(&buf, (int32) MyCancelKey); + pq_endmessage(&buf); + + /* Need not flush since ReadyForQuery will do it. */ + send_ready_for_query = true; + continue; + } + else + { + DeleteSession(session); + elog(LOG, "Session startup failed"); + close(sock); + goto ChooseSession; + } + } + else + { + elog(DEBUG2, "Switch to session %d in backend %d", ready_client.fd, MyProcPid); + CurrentSession = (SessionContext*)ready_client.user_data; + MyProcPort = CurrentSession->port; + } + } } /* @@ -4350,6 +4527,29 @@ PostgresMain(int argc, char *argv[], * it will fail to be called during other backend-shutdown * scenarios. */ + if (SessionPool) + { + DeleteWaitEventFromSet(SessionPool, MyProcPort->sock); + elog(DEBUG1, "Close session %d in backend %d", MyProcPort->sock, MyProcPid); + + pq_getmsgend(&input_message); + if (pq_is_reading_msg()) + pq_endmsgread(); + + close(MyProcPort->sock); + MyProcPort->sock = PGINVALID_SOCKET; + MyProcPort = NULL; + + if (CurrentSession) + { + DropSessionPreparedStatements(CurrentSession->id); + DeleteSession(CurrentSession); + CurrentSession = NULL; + } + whereToSendOutput = DestRemote; + goto ChooseSession; + } + elog(DEBUG1, "Terminate backend %d", MyProcPid); proc_exit(0); case 'd': /* copy data */ diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 54fa4a3..b2f43a8 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -120,7 +120,9 @@ int maintenance_work_mem = 16384; * register background workers. */ int NBuffers = 1000; +int SessionPoolSize = 0; int MaxConnections = 90; +int MaxSessions = 1000; int max_worker_processes = 8; int max_parallel_workers = 8; int MaxBackends = 0; diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index f9b3309..571c80f 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -65,7 +65,7 @@ static HeapTuple GetDatabaseTuple(const char *dbname); static HeapTuple GetDatabaseTupleByOid(Oid dboid); -static void PerformAuthentication(Port *port); +void PerformAuthentication(Port *port); static void CheckMyDatabase(const char *name, bool am_superuser); static void InitCommunication(void); static void ShutdownPostgres(int code, Datum arg); @@ -180,7 +180,7 @@ GetDatabaseTupleByOid(Oid dboid) * * returns: nothing. Will not return at all if there's any failure. */ -static void +void PerformAuthentication(Port *port) { /* This should be set already, but let's make sure */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 72f6be3..02373a3 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1871,6 +1871,26 @@ static struct config_int ConfigureNamesInt[] = }, { + {"max_sessions", PGC_POSTMASTER, CONN_AUTH_SETTINGS, + gettext_noop("Sets the maximum number of client session."), + NULL + }, + &MaxSessions, + 1000, 1, INT_MAX, + NULL, NULL, NULL + }, + + { + {"session_pool_size", PGC_POSTMASTER, CONN_AUTH_SETTINGS, + gettext_noop("Sets number of backends serving client sessions."), + NULL + }, + &SessionPoolSize, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + + { {"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the number of connection slots reserved for superusers."), NULL diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h index ffec029..cb5f8d4 100644 --- a/src/include/commands/prepare.h +++ b/src/include/commands/prepare.h @@ -56,5 +56,6 @@ extern TupleDesc FetchPreparedStatementResultDesc(PreparedStatement *stmt); extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt); extern void DropAllPreparedStatements(void); +extern void DropSessionPreparedStatements(char const* sessionId); #endif /* PREPARE_H */ diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 2e7725d..9169b21 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -71,6 +71,7 @@ extern int pq_getbyte(void); extern int pq_peekbyte(void); extern int pq_getbyte_if_available(unsigned char *c); extern int pq_putbytes(const char *s, size_t len); +extern int pq_available_bytes(void); /* * prototypes for functions in be-secure.c diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 54ee273..a9f9228 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -157,6 +157,8 @@ extern PGDLLIMPORT char *DataDir; extern PGDLLIMPORT int NBuffers; extern PGDLLIMPORT int MaxBackends; extern PGDLLIMPORT int MaxConnections; +extern PGDLLIMPORT int MaxSessions; +extern PGDLLIMPORT int SessionPoolSize; extern PGDLLIMPORT int max_worker_processes; extern int max_parallel_workers; @@ -420,6 +422,7 @@ extern void InitializeMaxBackends(void); extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username, Oid useroid, char *out_dbname); extern void BaseInit(void); +extern void PerformAuthentication(struct Port *port); /* in utils/init/miscinit.c */ extern bool IgnoreSystemIndexes; diff --git a/src/include/port.h b/src/include/port.h index 3e528fa..c14a20d 100644 --- a/src/include/port.h +++ b/src/include/port.h @@ -41,6 +41,10 @@ typedef SOCKET pgsocket; extern bool pg_set_noblock(pgsocket sock); extern bool pg_set_block(pgsocket sock); +/* send/receive socket descriptor */ +extern int pg_send_sock(pgsocket chan, pgsocket sock); +extern pgsocket pg_recv_sock(pgsocket chan); + /* Portable path handling for Unix/Win32 (in path.c) */ extern bool has_drive_prefix(const char *filename); diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h index 1877eef..c9527c9 100644 --- a/src/include/postmaster/postmaster.h +++ b/src/include/postmaster/postmaster.h @@ -62,6 +62,9 @@ extern Size ShmemBackendArraySize(void); extern void ShmemBackendArrayAllocation(void); #endif +struct Port; +extern int ProcessStartupPacket(struct Port *port, bool SSLdone, MemoryContext memctx); + /* * Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved * for buffer references in buf_internals.h. This limitation could be lifted diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index a4bcb48..10f30d1 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -176,6 +176,8 @@ extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info); +extern void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd); + /* * Unix implementation uses SIGUSR1 for inter-process signaling. * Win32 doesn't need this. diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index 63b4e48..191eeaa 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -34,6 +34,7 @@ extern CommandDest whereToSendOutput; extern PGDLLIMPORT const char *debug_query_string; extern int max_stack_depth; extern int PostAuthDelay; +extern pgsocket SessionPoolSock; /* GUC-configurable parameters */