diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index f6be98b..6291d66 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -96,8 +96,6 @@ int max_stack_depth = 100; /* wait N seconds to allow attach from a debugger */ int PostAuthDelay = 0; - - /* ---------------- * private variables * ---------------- @@ -188,6 +186,7 @@ static bool IsTransactionStmtList(List *parseTrees); static void drop_unnamed_stmt(void); static void SigHupHandler(SIGNAL_ARGS); static void log_disconnections(int code, Datum arg); +static bool exec_cached_query(const char *query_string); /* ---------------------------------------------------------------- @@ -916,6 +915,14 @@ exec_simple_query(const char *query_string) drop_unnamed_stmt(); /* + * Try to find cached plan + */ + if (autoprepare_threshold != 0 && exec_cached_query(query_string)) + { + return; + } + + /* * Switch to appropriate context for constructing parsetrees. */ oldcontext = MemoryContextSwitchTo(MessageContext); @@ -4500,3 +4509,566 @@ log_disconnections(int code, Datum arg) port->user_name, port->database_name, port->remote_host, port->remote_port[0] ? " port=" : "", port->remote_port))); } + + + +typedef struct { + char const* query; + int64 exec_count; + CachedPlanSource* plan; + int n_params; + int16 format; +} plan_cache_entry; + + +/* + * Replace string literals with parameters. We do not consider integer or real literals to avoid problems with + * negative number, user defined operators, ... For example it is not easy to distinguish cases (-1), (1-1), (1-1)-1 + */ +static void generalize_statement(const char *query_string, char** gen_query, char** query_params, int* n_params) +{ + size_t query_len = strlen(query_string); + char const* src = query_string; + char* dst; + char* params; + unsigned char ch; + + *n_params = 0; + + *gen_query = (char*)palloc(query_len*2); /* assume that we have less than 1000 parameters, the worst case is replacing '' with $999 */ + *query_params = (char*)palloc(query_len + 1); + dst = *gen_query; + params = *query_params; + + while ((ch = *src++) != '\0') { + if (isspace(ch)) { + /* Replace sequence of whitespaces with just one space */ + while (*src && isspace(*(unsigned char*)src)) { + src += 1; + } + *dst++ = ' '; + } else if (ch == '\'') { + while (true) { + ch = *src++; + if (ch == '\'') { + if (*src != '\'') { + break; + } else { + /* escaped quote */ + *params++ = '\''; + src += 1; + } + } else { + *params++ = ch; + } + } + *params++ = '\0'; + dst += sprintf(dst, "$%d", ++*n_params); + } else { + *dst++ = ch; + } + } + Assert(dst <= *gen_query + query_len); + Assert(params <= *query_params + query_len*2); + *dst = '\0'; +} + +static uint32 plan_cache_hash_fn(const void *key, Size keysize) +{ + return string_hash(((plan_cache_entry*)key)->query, 0); +} + +static int plan_cache_match_fn(const void *key1, const void *key2, Size keysize) +{ + return strcmp(((plan_cache_entry*)key1)->query, ((plan_cache_entry*)key2)->query); +} + +static void* plan_cache_keycopy_fn(void *dest, const void *src, Size keysize) +{ + ((plan_cache_entry*)dest)->query = pstrdup(((plan_cache_entry*)src)->query); + return dest; +} + +#define PLAN_CACHE_SIZE 113 + +/* + * Try to generalize query, find cached plan for it and execute + */ +static bool exec_cached_query(const char *query_string) +{ + CommandDest dest = whereToSendOutput; + DestReceiver *receiver; + char *gen_query; + char *query_params; + int n_params; + plan_cache_entry *entry; + bool found; + MemoryContext old_context; + CachedPlanSource *psrc; + ParamListInfo params; + int paramno; + CachedPlan *cplan; + Portal portal; + bool was_logged = false; + bool is_xact_command; + bool execute_is_fetch; + char completion_tag[COMPLETION_TAG_BUFSIZE]; + bool save_log_statement_stats = log_statement_stats; + ParamListInfo portal_params; + const char *source_text; + char msec_str[32]; + bool snapshot_set = false; + + static HTAB* plan_cache; + static MemoryContext plan_cache_context; + + /* + * Extract literals from query + */ + generalize_statement(query_string, &gen_query, &query_params, &n_params); + + if (plan_cache_context == NULL) { + plan_cache_context = AllocSetContextCreate(TopMemoryContext, + "plan cache context", + ALLOCSET_DEFAULT_SIZES); + } + old_context = MemoryContextSwitchTo(plan_cache_context); + + /* + * Initialize hash table if not initialized yet + */ + if (plan_cache == NULL) + { + static HASHCTL info; + info.keysize = sizeof(char*); + info.entrysize = sizeof(plan_cache_entry); + info.hash = plan_cache_hash_fn; + info.match = plan_cache_match_fn; + info.keycopy = plan_cache_keycopy_fn; + plan_cache = hash_create("plan_cache", PLAN_CACHE_SIZE, &info, HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_KEYCOPY); + } + + /* + * Lookup generalized query + */ + entry = (plan_cache_entry*)hash_search(plan_cache, &gen_query, HASH_ENTER, &found); + if (!found) { + entry->exec_count = 0; + entry->plan = NULL; + entry->n_params = n_params; + } else { + Assert(entry->n_params == n_params); + } + MemoryContextSwitchTo(old_context); + + /* + * Prepare query only when it is executed more than autoprepare_threshold times + */ + if (entry->exec_count++ < autoprepare_threshold) { + return false; + } + if (entry->plan == NULL) { + List *parsetree_list; + Node *raw_parse_tree; + const char *command_tag; + Query *query; + List *querytree_list; + Oid *param_types = NULL; + int num_params = 0; + + old_context = MemoryContextSwitchTo(MessageContext); + + parsetree_list = pg_parse_query(gen_query); + + /* + * Only single user statement are allowed in a prepared statement. + */ + if (list_length(parsetree_list) != 1) { + MemoryContextSwitchTo(old_context); + return false; + } + + raw_parse_tree = (Node *) linitial(parsetree_list); + + /* + * Get the command name for possible use in status display. + */ + command_tag = CreateCommandTag(raw_parse_tree); + + /* + * If we are in an aborted transaction, reject all commands except + * COMMIT/ROLLBACK. It is important that this test occur before we + * try to do parse analysis, rewrite, or planning, since all those + * phases try to do database accesses, which may fail in abort state. + * (It might be safe to allow some additional utility commands in this + * state, but not many...) + */ + if (IsAbortedTransactionBlockState() && + !IsTransactionExitStmt(raw_parse_tree)) + ereport(ERROR, + (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), + errmsg("current transaction is aborted, " + "commands ignored until end of transaction block"), + errdetail_abort())); + + /* + * Create the CachedPlanSource before we do parse analysis, since it + * needs to see the unmodified raw parse tree. + */ + psrc = CreateCachedPlan(raw_parse_tree, gen_query, command_tag); + + /* + * Set up a snapshot if parse analysis will need one. + */ + if (analyze_requires_snapshot(raw_parse_tree)) + { + PushActiveSnapshot(GetTransactionSnapshot()); + snapshot_set = true; + } + + /* + * Analyze and rewrite the query. Note that the originally specified + * parameter set is not required to be complete, so we have to use + * parse_analyze_varparams(). + */ + if (log_parser_stats) { + ResetUsage(); + } + + query = parse_analyze_varparams(raw_parse_tree, + gen_query, + ¶m_types, + &num_params); + Assert(num_params == entry->n_params); + + /* + * Check all parameter types got determined. + */ + for (paramno = 0; paramno < n_params; paramno++) + { + Oid ptype = param_types[paramno]; + + if (ptype == InvalidOid || ptype == UNKNOWNOID) { + /* Type of parameter can not be determined */ + MemoryContextSwitchTo(old_context); + return false; + } + } + + if (log_parser_stats) { + ShowUsage("PARSE ANALYSIS STATISTICS"); + } + + querytree_list = pg_rewrite_query(query); + + /* Done with the snapshot used for parsing */ + if (snapshot_set) { + PopActiveSnapshot(); + } + + CompleteCachedPlan(psrc, + querytree_list, + NULL, + param_types, + n_params, + NULL, + NULL, + CURSOR_OPT_PARALLEL_OK, /* allow parallel mode */ + true); /* fixed result */ + + /* If we got a cancel signal during analysis, quit */ + CHECK_FOR_INTERRUPTS(); + + entry->format = 0; /* TEXT is default */ + if (IsA(raw_parse_tree, FetchStmt)) + { + FetchStmt *stmt = (FetchStmt *)raw_parse_tree; + + if (!stmt->ismove) + { + Portal fportal = GetPortalByName(stmt->portalname); + + if (PortalIsValid(fportal) && + (fportal->cursorOptions & CURSOR_OPT_BINARY)) + entry->format = 1; /* BINARY */ + } + } + + SaveCachedPlan(psrc); + entry->plan = psrc; + MemoryContextSwitchTo(old_context); + + /* + * We do NOT close the open transaction command here; that only happens + * when the client sends Sync. Instead, do CommandCounterIncrement just + * in case something happened during parse/plan. + */ + CommandCounterIncrement(); + } + psrc = entry->plan; + + /* + * If we are in aborted transaction state, the only portals we can + * actually run are those containing COMMIT or ROLLBACK commands. We + * disallow binding anything else to avoid problems with infrastructure + * that expects to run inside a valid transaction. We also disallow + * binding any parameters, since we can't risk calling user-defined I/O + * functions. + */ + if (IsAbortedTransactionBlockState() && + (!IsTransactionExitStmt(psrc->raw_parse_tree) || + n_params != 0)) + ereport(ERROR, + (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), + errmsg("current transaction is aborted, " + "commands ignored until end of transaction block"), + errdetail_abort())); + + /* + * Create unnamed portal to run the query or queries in. If there + * already is one, silently drop it. + */ + portal = CreatePortal("", true, true); + /* Don't display the portal in pg_cursors */ + portal->visible = false; + + /* + * Prepare to copy stuff into the portal's memory context. We do all this + * copying first, because it could possibly fail (out-of-memory) and we + * don't want a failure to occur between GetCachedPlan and + * PortalDefineQuery; that would result in leaking our plancache refcount. + */ + old_context = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); + + /* Copy the plan's query string into the portal */ + query_string = pstrdup(psrc->query_string); + + /* + * Set a snapshot if we have parameters to fetch (since the input + * functions might need it) or the query isn't a utility command (and + * hence could require redoing parse analysis and planning). We keep the + * snapshot active till we're done, so that plancache.c doesn't have to + * take new ones. + */ + if (n_params > 0 || + (psrc->raw_parse_tree && + analyze_requires_snapshot(psrc->raw_parse_tree))) + { + PushActiveSnapshot(GetTransactionSnapshot()); + snapshot_set = true; + } else { + snapshot_set = false; + } + + /* + * Fetch parameters, if any, and store in the portal's memory context. + */ + if (n_params > 0) + { + params = (ParamListInfo) palloc(offsetof(ParamListInfoData, params) + + n_params * sizeof(ParamExternData)); + params->paramFetch = NULL; + params->paramFetchArg = NULL; + params->parserSetup = NULL; + params->parserSetupArg = NULL; + params->numParams = n_params; + params->paramMask = NULL; + + for (paramno = 0; paramno < n_params; paramno++) + { + Oid ptype = psrc->param_types[paramno]; + Oid typinput; + Oid typioparam; + + getTypeInputInfo(ptype, &typinput, &typioparam); + + params->params[paramno].value = OidInputFunctionCall(typinput, query_params, typioparam, -1); + params->params[paramno].isnull = false; + + /* + * We mark the params as CONST. This ensures that any custom plan + * makes full use of the parameter values. + */ + params->params[paramno].pflags = PARAM_FLAG_CONST; + params->params[paramno].ptype = ptype; + + query_params += strlen(query_params) + 1; + } + } else { + params = NULL; + } + + /* Done storing stuff in portal's context */ + MemoryContextSwitchTo(old_context); + + /* + * Obtain a plan from the CachedPlanSource. Any cruft from (re)planning + * will be generated in MessageContext. The plan refcount will be + * assigned to the Portal, so it will be released at portal destruction. + */ + cplan = GetCachedPlan(psrc, params, false); + + /* + * Now we can define the portal. + * + * DO NOT put any code that could possibly throw an error between the + * above GetCachedPlan call and here. + */ + PortalDefineQuery(portal, + NULL, + gen_query, + psrc->commandTag, + cplan->stmt_list, + cplan); + + /* Done with the snapshot used for parameter I/O and parsing/planning */ + if (snapshot_set) { + PopActiveSnapshot(); + } + + /* + * And we're ready to start portal execution. + */ + PortalStart(portal, params, 0, InvalidSnapshot); + + /* + * Apply the result format requests to the portal. + */ + PortalSetResultFormat(portal, 1, &entry->format); + + /* Does the portal contain a transaction command? */ + is_xact_command = IsTransactionStmtList(portal->stmts); + + /* + * We must copy the sourceText into MessageContext in + * case the portal is destroyed during finish_xact_command. Can avoid the + * copy if it's not an xact command, though. + */ + if (is_xact_command) + { + source_text = pstrdup(portal->sourceText); + /* + * An xact command shouldn't have any parameters, which is a good + * thing because they wouldn't be around after finish_xact_command. + */ + portal_params = NULL; + } + else + { + source_text = portal->sourceText; + portal_params = portal->portalParams; + } + + /* + * Report query to various monitoring facilities. + */ + debug_query_string = source_text; + + pgstat_report_activity(STATE_RUNNING, source_text); + + set_ps_display(portal->commandTag, false); + + if (save_log_statement_stats) { + ResetUsage(); + } + + BeginCommand(portal->commandTag, dest); + + PortalSetResultFormat(portal, 1, &entry->format); + + + /* + * Create dest receiver in MessageContext (we don't want it in transaction + * context, because that may get deleted if portal contains VACUUM). + */ + receiver = CreateDestReceiver(dest); + SetRemoteDestReceiverParams(receiver, portal); + + /* + * If we re-issue an Execute protocol request against an existing portal, + * then we are only fetching more rows rather than completely re-executing + * the query from the start. atStart is never reset for a v3 portal, so we + * are safe to use this check. + */ + execute_is_fetch = !portal->atStart; + + /* Log immediately if dictated by log_statement */ + if (check_log_statement(portal->stmts)) + { + ereport(LOG, + (errmsg("%s %s%s%s: %s", + execute_is_fetch ? + _("execute fetch from") : + _("execute"), + "", + "", + "", + source_text), + errhidestmt(true), + errdetail_params(portal_params))); + was_logged = true; + } + + /* Check for cancel signal before we start execution */ + CHECK_FOR_INTERRUPTS(); + + /* + * Run the portal to completion, and then drop it (and the receiver). + */ + (void) PortalRun(portal, + FETCH_ALL, + true, + receiver, + receiver, + completion_tag); + + (*receiver->rDestroy) (receiver); + + PortalDrop(portal, false); + + /* + * Tell client that we're done with this query. Note we emit exactly + * one EndCommand report for each raw parsetree, thus one for each SQL + * command the client sent, regardless of rewriting. (But a command + * aborted by error will not send an EndCommand report at all.) + */ + EndCommand(completion_tag, dest); + + /* + * Close down transaction statement, if one is open. + */ + finish_xact_command(); + + /* + * Emit duration logging if appropriate. + */ + switch (check_log_duration(msec_str, was_logged)) + { + case 1: + ereport(LOG, + (errmsg("duration: %s ms", msec_str), + errhidestmt(true))); + break; + case 2: + ereport(LOG, + (errmsg("duration: %s ms %s %s%s%s: %s", + msec_str, + execute_is_fetch ? + _("execute fetch from") : + _("execute"), + "", + "", + "", + source_text), + errhidestmt(true), + errdetail_params(portal_params))); + break; + } + + if (save_log_statement_stats) { + ShowUsage("EXECUTE MESSAGE STATISTICS"); + } + debug_query_string = NULL; + + return true; +} diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 4f1891f..d3acf5b 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -450,6 +450,9 @@ int tcp_keepalives_idle; int tcp_keepalives_interval; int tcp_keepalives_count; + +int autoprepare_threshold; + /* * SSL renegotiation was been removed in PostgreSQL 9.5, but we tolerate it * being set to zero (meaning never renegotiate) for backward compatibility. @@ -1949,6 +1952,19 @@ static struct config_int ConfigureNamesInt[] = check_max_stack_depth, assign_max_stack_depth, NULL }, + /* + * Threshold for implicit preparing of frequently executed queries + */ + { + {"autoprepare_threshold", PGC_USERSET, QUERY_TUNING_OTHER, + gettext_noop("Threshold for autopreparing query."), + NULL, + }, + &autoprepare_threshold, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + { {"temp_file_limit", PGC_SUSET, RESOURCES_DISK, gettext_noop("Limits the total size of all temporary files used by each process."), diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index 0bf9f21..ef23569 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -252,6 +252,8 @@ extern int client_min_messages; extern int log_min_duration_statement; extern int log_temp_files; +extern int autoprepare_threshold; + extern int temp_file_limit; extern int num_temp_buffers;