From 9822841ce1cbaa214869568b9dd3edbd35efb0d7 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 14 Mar 2017 15:26:55 +0300 Subject: [PATCH 7/8] Reversed hashed Agg implementation. Only AGG_PLAIN and AGG_HASHED are reversed. The part relating to putting tuples to hashtable is practically the same with hashtable lookups inlined. To iterate over the hashtable effectively, method foreach was added to simplehash.h. As in SeqScan or Hashjoin, the goal is to have only one loop iterating over the tuples and sending them. We can have only one 'foreach' type per hashtable in current implementation, this obviously should be changed if needed. --- src/backend/executor/execGrouping.c | 75 ---- src/backend/executor/execProcnode.c | 17 + src/backend/executor/nodeAgg.c | 793 ++++++++++++++++++++++++++---------- src/include/executor/executor.h | 98 ++++- src/include/executor/nodeAgg.h | 6 +- src/include/lib/simplehash.h | 60 +++ 6 files changed, 760 insertions(+), 289 deletions(-) diff --git a/src/backend/executor/execGrouping.c b/src/backend/executor/execGrouping.c index 4b1f634e21..7d5ae4aa04 100644 --- a/src/backend/executor/execGrouping.c +++ b/src/backend/executor/execGrouping.c @@ -51,81 +51,6 @@ static int TupleHashTableMatch(struct tuplehash_hash *tb, const MinimalTuple tup *****************************************************************************/ /* - * execTuplesMatch - * Return true if two tuples match in all the indicated fields. - * - * This actually implements SQL's notion of "not distinct". Two nulls - * match, a null and a not-null don't match. - * - * slot1, slot2: the tuples to compare (must have same columns!) - * numCols: the number of attributes to be examined - * matchColIdx: array of attribute column numbers - * eqFunctions: array of fmgr lookup info for the equality functions to use - * evalContext: short-term memory context for executing the functions - * - * NB: evalContext is reset each time! - */ -bool -execTuplesMatch(TupleTableSlot *slot1, - TupleTableSlot *slot2, - int numCols, - AttrNumber *matchColIdx, - FmgrInfo *eqfunctions, - MemoryContext evalContext) -{ - MemoryContext oldContext; - bool result; - int i; - - /* Reset and switch into the temp context. */ - MemoryContextReset(evalContext); - oldContext = MemoryContextSwitchTo(evalContext); - - /* - * We cannot report a match without checking all the fields, but we can - * report a non-match as soon as we find unequal fields. So, start - * comparing at the last field (least significant sort key). That's the - * most likely to be different if we are dealing with sorted input. - */ - result = true; - - for (i = numCols; --i >= 0;) - { - AttrNumber att = matchColIdx[i]; - Datum attr1, - attr2; - bool isNull1, - isNull2; - - attr1 = slot_getattr(slot1, att, &isNull1); - - attr2 = slot_getattr(slot2, att, &isNull2); - - if (isNull1 != isNull2) - { - result = false; /* one null and one not; they aren't equal */ - break; - } - - if (isNull1) - continue; /* both are null, treat as equal */ - - /* Apply the type-specific equality function */ - - if (!DatumGetBool(FunctionCall2(&eqfunctions[i], - attr1, attr2))) - { - result = false; /* they aren't equal */ - break; - } - } - - MemoryContextSwitchTo(oldContext); - - return result; -} - -/* * execTuplesUnequal * Return true if two tuples are definitely unequal in the indicated * fields. diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 1ebb0da36f..ab0312a4bf 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -193,6 +193,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags, PlanState *parent) /* * materialization nodes */ + case T_Agg: + result = (PlanState *) ExecInitAgg((Agg *) node, + estate, eflags, parent); + break; + case T_Hash: result = (PlanState *) ExecInitHash((Hash *) node, estate, eflags, parent); @@ -282,6 +287,10 @@ ExecPushTuple(TupleTableSlot *slot, PlanState *pusher) if (nodeTag(receiver) == T_LimitState) return ExecPushTupleToLimit(slot, (LimitState *) receiver); + + else if (nodeTag(receiver) == T_AggState) + return ExecPushTupleToAgg(slot, (AggState *) receiver); + else if (nodeTag(receiver) == T_HashState) return ExecPushTupleToHash(slot, (HashState *) receiver); @@ -324,6 +333,10 @@ ExecPushNull(TupleTableSlot *slot, PlanState *pusher) if (nodeTag(receiver) == T_LimitState) return ExecPushNullToLimit(slot, (LimitState *) receiver); + + else if (nodeTag(receiver) == T_AggState) + return ExecPushNullToAgg(slot, (AggState *) receiver); + else if (nodeTag(receiver) == T_HashState) return ExecPushNullToHash(slot, (HashState *) receiver); @@ -397,6 +410,10 @@ ExecEndNode(PlanState *node) /* * materialization nodes */ + case T_AggState: + ExecEndAgg((AggState *) node); + break; + case T_HashState: ExecEndHash((HashState *) node); break; diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index fa19358d19..8de9c0af53 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -153,6 +153,8 @@ #include "postgres.h" #include "access/htup_details.h" +#include "access/parallel.h" +#include "access/hash.h" #include "catalog/objectaccess.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_proc.h" @@ -440,7 +442,6 @@ typedef struct AggStatePerPhaseData Sort *sortnode; /* Sort node for input ordering for phase */ } AggStatePerPhaseData; - static void initialize_phase(AggState *aggstate, int newphase); static TupleTableSlot *fetch_input_tuple(AggState *aggstate); static void initialize_aggregates(AggState *aggstate, @@ -460,10 +461,10 @@ static void process_ordered_aggregate_single(AggState *aggstate, static void process_ordered_aggregate_multi(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate); -static void finalize_aggregate(AggState *aggstate, - AggStatePerAgg peragg, - AggStatePerGroup pergroupstate, - Datum *resultVal, bool *resultIsNull); +static inline void finalize_aggregate(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroupstate, + Datum *resultVal, bool *resultIsNull); static void finalize_partialaggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, @@ -471,19 +472,22 @@ static void finalize_partialaggregate(AggState *aggstate, static void prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet); -static void finalize_aggregates(AggState *aggstate, - AggStatePerAgg peragg, - AggStatePerGroup pergroup, - int currentSet); +static inline void finalize_aggregates(AggState *aggstate, + AggStatePerAgg peraggs, + AggStatePerGroup pergroup, + int currentSet); static TupleTableSlot *project_aggregates(AggState *aggstate); +static inline bool project_aggregates_and_push(AggState *aggstate); +static inline bool AggPushHashEntry(TupleHashEntry entry, void *astate); static Bitmapset *find_unaggregated_cols(AggState *aggstate); static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos); static void build_hash_table(AggState *aggstate); -static TupleHashEntryData *lookup_hash_entry(AggState *aggstate, - TupleTableSlot *inputslot); +static inline TupleHashEntryData *lookup_hash_entry(AggState *aggstate, + TupleTableSlot *inputslot); static TupleTableSlot *agg_retrieve_direct(AggState *aggstate); -static void agg_fill_hash_table(AggState *aggstate); -static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); +static void agg_puttup_plain(AggState *aggstate, TupleTableSlot *outerslot); +static bool agg_finalize_plain(AggState *aggstate); +static void agg_puttup_hash_table(AggState *aggstate, TupleTableSlot *outerslot); static Datum GetAggInitVal(Datum textInitVal, Oid transtype); static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggstate, EState *estate, @@ -498,6 +502,39 @@ static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, Oid aggserialfn, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, List *transnos); +/* + * We use our own hash table instead of defined in execGrouping.c, see notes + * below. + */ +/* define parameters necessary to generate the tuple hash table interface */ +#define SH_PREFIX aggtuplehash +#define SH_ELEMENT_TYPE TupleHashEntryData +#define SH_KEY_TYPE MinimalTuple +#define SH_SCOPE static inline +#define SH_FOREACH_ON +#define SH_FOREACH_ACC_TYPE bool +#define SH_DECLARE +#include "lib/simplehash.h" +static inline bool inline_and(bool old, bool new); + +/* + * And our own copies of funcs from execGrouping.c + */ +static TupleHashTable BuildAggTupleHashTable(int numCols, AttrNumber *keyColIdx, + FmgrInfo *eqfunctions, + FmgrInfo *hashfunctions, + long nbuckets, Size additionalsize, + MemoryContext tablecxt, + MemoryContext tempcxt, + bool use_variable_hash_iv); +static inline TupleHashEntry LookupAggTupleHashEntry(TupleHashTable hashtable, + TupleTableSlot *slot, + bool *isnew); +static inline uint32 AggTupleHashTableHash(struct aggtuplehash_hash *tb, + const MinimalTuple tuple); +static inline int AggTupleHashTableMatch(struct aggtuplehash_hash *tb, + const MinimalTuple tuple1, + const MinimalTuple tuple2); /* @@ -1573,7 +1610,6 @@ finalize_aggregates(AggState *aggstate, Datum *aggvalues = econtext->ecxt_aggvalues; bool *aggnulls = econtext->ecxt_aggnulls; int aggno; - int transno; Assert(currentSet == 0 || ((Agg *) aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); @@ -1581,32 +1617,6 @@ finalize_aggregates(AggState *aggstate, aggstate->current_set = currentSet; /* - * If there were any DISTINCT and/or ORDER BY aggregates, sort their - * inputs and run the transition functions. - */ - for (transno = 0; transno < aggstate->numtrans; transno++) - { - AggStatePerTrans pertrans = &aggstate->pertrans[transno]; - AggStatePerGroup pergroupstate; - - pergroupstate = &pergroup[transno + (currentSet * (aggstate->numtrans))]; - - if (pertrans->numSortCols > 0) - { - Assert(((Agg *) aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); - - if (pertrans->numInputs == 1) - process_ordered_aggregate_single(aggstate, - pertrans, - pergroupstate); - else - process_ordered_aggregate_multi(aggstate, - pertrans, - pergroupstate); - } - } - - /* * Run the final functions. */ for (aggno = 0; aggno < aggstate->numaggs; aggno++) @@ -1617,12 +1627,8 @@ finalize_aggregates(AggState *aggstate, pergroupstate = &pergroup[transno + (currentSet * (aggstate->numtrans))]; - if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) - finalize_partialaggregate(aggstate, peragg, pergroupstate, - &aggvalues[aggno], &aggnulls[aggno]); - else - finalize_aggregate(aggstate, peragg, pergroupstate, - &aggvalues[aggno], &aggnulls[aggno]); + finalize_aggregate(aggstate, peragg, pergroupstate, + &aggvalues[aggno], &aggnulls[aggno]); } } @@ -1654,6 +1660,105 @@ project_aggregates(AggState *aggstate) } /* + * Project the result of a group (whose aggs have already been calculated by + * finalize_aggregates), and push all tuples. Returns true if all tuples were + * pushed, false if the parent doesn't want to accept tuples anymore. + */ +static inline bool +project_aggregates_and_push(AggState *aggstate) +{ + ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; + + /* + * Check the qual (HAVING clause); if the group does not match, ignore it. + */ + if (ExecQual(aggstate->ss.ps.qual, econtext, false)) + { + /* + * Form and return or store a projection tuple using the aggregate + * results and the representative input tuple. + */ + TupleTableSlot *slot; + + slot = ExecProject(aggstate->ss.ps.ps_ProjInfo); + return ExecPushTuple(slot, (PlanState *) aggstate); + + } + else + InstrCountFiltered1(aggstate, 1); + + return true; +} + +/* + * Finalize one TupleHashEntry, project the result and push it. Returns true + * if all tuples are were pushed, false if the parent doesn't want to accept + * tuples anymore. + */ +static inline bool +AggPushHashEntry(TupleHashEntry entry, void *astate) +{ + AggState *aggstate = (AggState *) astate; + ExprContext *econtext; + AggStatePerAgg peragg; + AggStatePerGroup pergroup; + TupleTableSlot *firstSlot; + TupleTableSlot *hashslot; + int i; + + /* + * get state info from node + */ + /* econtext is the per-output-tuple expression context */ + econtext = aggstate->ss.ps.ps_ExprContext; + peragg = aggstate->peragg; + firstSlot = aggstate->ss.ss_ScanTupleSlot; + hashslot = aggstate->hashslot; + + /* + * Clear the per-output-tuple context for each group + * + * We intentionally don't use ReScanExprContext here; if any aggs have + * registered shutdown callbacks, they mustn't be called yet, since we + * might not be done with that agg. + */ + ResetExprContext(econtext); + + /* + * Store the copied first input tuple in the tuple table slot reserved + * for it, so that it can be used in ExecProject. + */ + ExecStoreMinimalTuple(entry->firstTuple, hashslot, false); + slot_getallattrs(hashslot); + + ExecClearTuple(firstSlot); + memset(firstSlot->tts_isnull, true, + firstSlot->tts_tupleDescriptor->natts * sizeof(bool)); + + for (i = 0; i < aggstate->numhashGrpCols; i++) + { + int varNumber = aggstate->hashGrpColIdxInput[i] - 1; + + firstSlot->tts_values[varNumber] = hashslot->tts_values[i]; + firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i]; + } + ExecStoreVirtualTuple(firstSlot); + + pergroup = (AggStatePerGroup) entry->additional; + + finalize_aggregates(aggstate, peragg, pergroup, 0); + + /* + * Use the representative input tuple for any references to + * non-aggregated input columns in the qual and tlist. + */ + econtext->ecxt_outertuple = firstSlot; + + return project_aggregates_and_push(aggstate); +} + + +/* * find_unaggregated_cols * Construct a bitmapset of the column numbers of un-aggregated Vars * appearing in our targetlist and qual (HAVING clause) @@ -1719,12 +1824,12 @@ build_hash_table(AggState *aggstate) additionalsize = aggstate->numaggs * sizeof(AggStatePerGroupData); - aggstate->hashtable = BuildTupleHashTable(node->numCols, - aggstate->hashGrpColIdxHash, - aggstate->phase->eqfunctions, - aggstate->hashfunctions, - node->numGroups, - additionalsize, + aggstate->hashtable = BuildAggTupleHashTable(node->numCols, + aggstate->hashGrpColIdxHash, + aggstate->phase->eqfunctions, + aggstate->hashfunctions, + node->numGroups, + additionalsize, aggstate->aggcontexts[0]->ecxt_per_tuple_memory, tmpmem, DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)); @@ -1845,7 +1950,7 @@ hash_agg_entry_size(int numAggs) * * When called, CurrentMemoryContext should be the per-query context. */ -static TupleHashEntryData * +static inline TupleHashEntryData * lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot) { TupleTableSlot *hashslot = aggstate->hashslot; @@ -1867,7 +1972,7 @@ lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot) ExecStoreVirtualTuple(hashslot); /* find or create the hashtable entry using the filtered tuple */ - entry = LookupTupleHashEntry(aggstate->hashtable, hashslot, &isnew); + entry = LookupAggTupleHashEntry(aggstate->hashtable, hashslot, &isnew); if (isnew) { @@ -1883,43 +1988,121 @@ lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot) } /* - * ExecAgg - - * - * ExecAgg receives tuples from its outer subplan and aggregates over - * the appropriate attribute for each aggregate function use (Aggref - * node) appearing in the targetlist or qual of the node. The number - * of tuples to aggregate over depends on whether grouped or plain - * aggregation is selected. In grouped aggregation, we produce a result - * row for each group; in plain aggregation there's a single result row - * for the whole query. In either case, the value of each aggregate is - * stored in the expression context to be used when ExecProject evaluates - * the result tuple. + * ExecPushTupleToAgg - + * + * pushTupleToAgg receives a tuple from its outer subplan and aggregates it + * over the appropriate attribute for each aggregate function use (Aggref + * node) appearing in the targetlist or qual of the node. The number of + * tuples to aggregate over depends on whether grouped or plain aggregation + * is selected. In grouped aggregation, we produce a result row for each + * group; in plain aggregation there's a single result row for the whole + * query. In either case, the value of each aggregate is stored in the + * expression context to be used when ExecProject evaluates the result + * tuple. */ -TupleTableSlot * -ExecAgg(AggState *node) +bool +ExecPushTupleToAgg(TupleTableSlot *slot, AggState *node) { - TupleTableSlot *result; + AggStrategy strategy = node->phase->aggnode->aggstrategy; + /* Only AGG_HASHED and AGG_PLAIN is supported at the moment */ + Assert(strategy == AGG_HASHED || strategy == AGG_PLAIN); + /* AGGSPLIT is not supported at the moment */ + Assert(node->aggsplit == AGGSPLIT_SIMPLE); + /* neither AGG_HASHED nor AGG_PLAIN support multiple grouping sets */ + Assert(node->phase->numsets == 0); + Assert(!node->agg_done); + + if (strategy == AGG_PLAIN) + agg_puttup_plain(node, slot); + else + agg_puttup_hash_table(node, slot); + + return true; +} + +/* NULL tuple arrived, finalize aggregation and push tuples */ +void +ExecPushNullToAgg(TupleTableSlot *slot, AggState *node) +{ + AggStrategy strategy = node->phase->aggnode->aggstrategy; + bool parent_accepts_tuples; - if (!node->agg_done) + if (strategy == AGG_PLAIN) { - /* Dispatch based on strategy */ - switch (node->phase->aggnode->aggstrategy) - { - case AGG_HASHED: - if (!node->table_filled) - agg_fill_hash_table(node); - result = agg_retrieve_hash_table(node); - break; - default: - result = agg_retrieve_direct(node); - break; - } + parent_accepts_tuples = agg_finalize_plain(node); + } + else + { + node->table_filled = true; + /* For each tuple in hashtable, push it */ + parent_accepts_tuples = aggtuplehash_foreach( + (aggtuplehash_hash *) node->hashtable->hashtab, node); + } - if (!TupIsNull(result)) - return result; + if (parent_accepts_tuples) + /* If parent still waits for tuples, let it know we are done */ + ExecPushNull(NULL, (PlanState *) node); + + node->agg_done = true; +} + +/* advance aggregates for arrived tuple in AGG_PLAIN */ +static void +agg_puttup_plain(AggState *aggstate, TupleTableSlot *slot) +{ + AggStatePerGroup pergroup = aggstate->pergroup; + ExprContext *tmpcontext = aggstate->tmpcontext; + TupleTableSlot *firstSlot = aggstate->ss.ss_ScanTupleSlot; + + if (TupIsNull(firstSlot)) + { + /* + * First call of agg_puttup_plain + */ + int numReset = 1; /*always one grouping set in AGG_PLAIN */ + + /* always first and only grouping set in AGG_PLAIN */ + aggstate->projected_set = 0; + + /* + * Initialize working state for a new input tuple group. + */ + initialize_aggregates(aggstate, pergroup, numReset); + + /* + * Store the copied first input tuple in the tuple table slot + * reserved for it. The tuple will be deleted when it is + * cleared from the slot. + */ + ExecStoreTuple(ExecCopySlotTuple(slot), + firstSlot, + InvalidBuffer, + true); } - return NULL; + /* set up for advance_aggregates call */ + tmpcontext->ecxt_outertuple = slot; + + advance_aggregates(aggstate, pergroup); +} + +/* finalize and push AGG_PLAIN */ +static bool +agg_finalize_plain(AggState *aggstate) +{ + AggStatePerGroup pergroup = aggstate->pergroup; + AggStatePerAgg peragg = aggstate->peragg; + TupleTableSlot *firstSlot = aggstate->ss.ss_ScanTupleSlot; + int currentSet; + + if (TupIsNull(firstSlot)) + /* agg_puttup_plain was never called */ + return true; + + currentSet = aggstate->projected_set; + prepare_projection_slot(aggstate, firstSlot, currentSet); + finalize_aggregates(aggstate, peragg, pergroup, currentSet); + return project_aggregates_and_push(aggstate); } /* @@ -2247,141 +2430,31 @@ agg_retrieve_direct(AggState *aggstate) } /* - * ExecAgg for hashed case: phase 1, read input and build hash table + * ExecPushTupleToAgg for hashed case, add one tuple to the hashtable */ static void -agg_fill_hash_table(AggState *aggstate) +agg_puttup_hash_table(AggState *aggstate, TupleTableSlot *outerslot) { ExprContext *tmpcontext; TupleHashEntryData *entry; - TupleTableSlot *outerslot; /* - * get state info from node - * * tmpcontext is the per-input-tuple expression context */ tmpcontext = aggstate->tmpcontext; - /* - * Process each outer-plan tuple, and then fetch the next one, until we - * exhaust the outer plan. - */ - for (;;) - { - outerslot = fetch_input_tuple(aggstate); - if (TupIsNull(outerslot)) - break; - /* set up for advance_aggregates call */ - tmpcontext->ecxt_outertuple = outerslot; - - /* Find or build hashtable entry for this tuple's group */ - entry = lookup_hash_entry(aggstate, outerslot); - - /* Advance the aggregates */ - if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) - combine_aggregates(aggstate, (AggStatePerGroup) entry->additional); - else - advance_aggregates(aggstate, (AggStatePerGroup) entry->additional); - - /* Reset per-input-tuple context after each tuple */ - ResetExprContext(tmpcontext); - } - - aggstate->table_filled = true; - /* Initialize to walk the hash table */ - ResetTupleHashIterator(aggstate->hashtable, &aggstate->hashiter); -} - -/* - * ExecAgg for hashed case: phase 2, retrieving groups from hash table - */ -static TupleTableSlot * -agg_retrieve_hash_table(AggState *aggstate) -{ - ExprContext *econtext; - AggStatePerAgg peragg; - AggStatePerGroup pergroup; - TupleHashEntryData *entry; - TupleTableSlot *firstSlot; - TupleTableSlot *result; - TupleTableSlot *hashslot; - - /* - * get state info from node - */ - /* econtext is the per-output-tuple expression context */ - econtext = aggstate->ss.ps.ps_ExprContext; - peragg = aggstate->peragg; - firstSlot = aggstate->ss.ss_ScanTupleSlot; - hashslot = aggstate->hashslot; - - /* - * We loop retrieving groups until we find one satisfying - * aggstate->ss.ps.qual - */ - while (!aggstate->agg_done) - { - int i; + /* set up for advance_aggregates call */ + tmpcontext->ecxt_outertuple = outerslot; - /* - * Find the next entry in the hash table - */ - entry = ScanTupleHashTable(aggstate->hashtable, &aggstate->hashiter); - if (entry == NULL) - { - /* No more entries in hashtable, so done */ - aggstate->agg_done = TRUE; - return NULL; - } - - /* - * Clear the per-output-tuple context for each group - * - * We intentionally don't use ReScanExprContext here; if any aggs have - * registered shutdown callbacks, they mustn't be called yet, since we - * might not be done with that agg. - */ - ResetExprContext(econtext); - - /* - * Transform representative tuple back into one with the right - * columns. - */ - ExecStoreMinimalTuple(entry->firstTuple, hashslot, false); - slot_getallattrs(hashslot); - - ExecClearTuple(firstSlot); - memset(firstSlot->tts_isnull, true, - firstSlot->tts_tupleDescriptor->natts * sizeof(bool)); - - for (i = 0; i < aggstate->numhashGrpCols; i++) - { - int varNumber = aggstate->hashGrpColIdxInput[i] - 1; - - firstSlot->tts_values[varNumber] = hashslot->tts_values[i]; - firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i]; - } - ExecStoreVirtualTuple(firstSlot); - - pergroup = (AggStatePerGroup) entry->additional; - - finalize_aggregates(aggstate, peragg, pergroup, 0); - - /* - * Use the representative input tuple for any references to - * non-aggregated input columns in the qual and tlist. - */ - econtext->ecxt_outertuple = firstSlot; + /* Find or build hashtable entry for this tuple's group */ + entry = lookup_hash_entry(aggstate, outerslot); - result = project_aggregates(aggstate); - if (result) - return result; - } + /* Advance the aggregates */ + advance_aggregates(aggstate, (AggStatePerGroup) entry->additional); - /* No more groups */ - return NULL; + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(tmpcontext); } /* ----------------- @@ -2392,7 +2465,7 @@ agg_retrieve_hash_table(AggState *aggstate) * ----------------- */ AggState * -ExecInitAgg(Agg *node, EState *estate, int eflags) +ExecInitAgg(Agg *node, EState *estate, int eflags, PlanState *parent) { AggState *aggstate; AggStatePerAgg peraggs; @@ -2421,6 +2494,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate = makeNode(AggState); aggstate->ss.ps.plan = (Plan *) node; aggstate->ss.ps.state = estate; + aggstate->ss.ps.parent = parent; aggstate->aggs = NIL; aggstate->numaggs = 0; @@ -2523,7 +2597,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) if (node->aggstrategy == AGG_HASHED) eflags &= ~EXEC_FLAG_REWIND; outerPlan = outerPlan(node); - outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags, NULL); + outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags, + (PlanState *) aggstate); /* * initialize source tuple type. @@ -3780,3 +3855,309 @@ aggregate_dummy(PG_FUNCTION_ARGS) fcinfo->flinfo->fn_oid); return (Datum) 0; /* keep compiler quiet */ } + +/* + * We want to use our own hashtable instead of defined in execGrouping.c, + * because + * - we want to inline its interface functions + * - we want 'foreach' method with inlined action + * + * While we need new hashtable, stored type (TupleHashEntry) is exactly the + * same. Because of that types (tuplehash_hash *) and (aggtuplehash_hash *) + * are fully compatible. So, we won't change type of aggstate->hashtable to + * copy-pasted TupleHashTableData with the only field hashtab changed to + * aggtuplehas_hash *; instead, we will use casts where needed. + * + * Since functions in execGrouping.c are hard-linked with `tuplehash` + * hashtable defined there, we can't use them and need our own versions too, + * so they will be basically copypasted with changed hashtable name. Of + * course, it is no good, but again, our goal for now is to estimate + * perfomance benefits. Later, if needed, execGrouping may be generalized to + * handle any hashtable. + * + */ + +/* + * Define parameters for tuple hash table code generation. + */ +#define SH_PREFIX aggtuplehash +#define SH_ELEMENT_TYPE TupleHashEntryData +#define SH_KEY_TYPE MinimalTuple +#define SH_KEY firstTuple +#define SH_HASH_KEY(tb, key) AggTupleHashTableHash(tb, key) +#define SH_EQUAL(tb, a, b) AggTupleHashTableMatch(tb, a, b) == 0 +#define SH_SCOPE static inline +#define SH_STORE_HASH +#define SH_GET_HASH(tb, a) a->hash +#define SH_FOREACH_ON +#define SH_FOREACH_ACC_TYPE bool +#define SH_FOREACH_ACC_INIT true +#define SH_FOREACH_FUNC AggPushHashEntry +#define SH_FOREACH_ACC_FUNC inline_and +#define SH_DEFINE +#include "lib/simplehash.h" + +static inline bool inline_and(bool old, bool new) +{ + return old && new; +} + +/* + * Funcs from execGrouping.c + */ + +/* + * Construct an empty TupleHashTable + * + * numCols, keyColIdx: identify the tuple fields to use as lookup key + * eqfunctions: equality comparison functions to use + * hashfunctions: datatype-specific hashing functions to use + * nbuckets: initial estimate of hashtable size + * additionalsize: size of data stored in ->additional + * tablecxt: memory context in which to store table and table entries + * tempcxt: short-lived context for evaluation hash and comparison functions + * + * The function arrays may be made with execTuplesHashPrepare(). Note they + * are not cross-type functions, but expect to see the table datatype(s) + * on both sides. + * + * Note that keyColIdx, eqfunctions, and hashfunctions must be allocated in + * storage that will live as long as the hashtable does. + */ +static TupleHashTable +BuildAggTupleHashTable(int numCols, AttrNumber *keyColIdx, + FmgrInfo *eqfunctions, + FmgrInfo *hashfunctions, + long nbuckets, Size additionalsize, + MemoryContext tablecxt, MemoryContext tempcxt, + bool use_variable_hash_iv) +{ + TupleHashTable hashtable; + Size entrysize = sizeof(TupleHashEntryData) + additionalsize; + + Assert(nbuckets > 0); + + /* Limit initial table size request to not more than work_mem */ + nbuckets = Min(nbuckets, (long) ((work_mem * 1024L) / entrysize)); + + hashtable = (TupleHashTable) + MemoryContextAlloc(tablecxt, sizeof(TupleHashTableData)); + + hashtable->numCols = numCols; + hashtable->keyColIdx = keyColIdx; + hashtable->tab_hash_funcs = hashfunctions; + hashtable->tab_eq_funcs = eqfunctions; + hashtable->tablecxt = tablecxt; + hashtable->tempcxt = tempcxt; + hashtable->entrysize = entrysize; + hashtable->tableslot = NULL; /* will be made on first lookup */ + hashtable->inputslot = NULL; + hashtable->in_hash_funcs = NULL; + hashtable->cur_eq_funcs = NULL; + + /* + * If parallelism is in use, even if the master backend is performing the + * scan itself, we don't want to create the hashtable exactly the same way + * in all workers. As hashtables are iterated over in keyspace-order, + * doing so in all processes in the same way is likely to lead to + * "unbalanced" hashtables when the table size initially is + * underestimated. + */ + if (use_variable_hash_iv) + hashtable->hash_iv = hash_uint32(ParallelWorkerNumber); + else + hashtable->hash_iv = 0; + + hashtable->hashtab = (tuplehash_hash*) aggtuplehash_create(tablecxt, + nbuckets, + hashtable); + + return hashtable; +} + +/* + * Find or create a hashtable entry for the tuple group containing the + * given tuple. The tuple must be the same type as the hashtable entries. + * + * If isnew is NULL, we do not create new entries; we return NULL if no + * match is found. + * + * If isnew isn't NULL, then a new entry is created if no existing entry + * matches. On return, *isnew is true if the entry is newly created, + * false if it existed already. ->additional_data in the new entry has + * been zeroed. + */ +static inline TupleHashEntry +LookupAggTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot, + bool *isnew) +{ + TupleHashEntryData *entry; + MemoryContext oldContext; + bool found; + MinimalTuple key; + + /* If first time through, clone the input slot to make table slot */ + if (hashtable->tableslot == NULL) + { + TupleDesc tupdesc; + + oldContext = MemoryContextSwitchTo(hashtable->tablecxt); + + /* + * We copy the input tuple descriptor just for safety --- we assume + * all input tuples will have equivalent descriptors. + */ + tupdesc = CreateTupleDescCopy(slot->tts_tupleDescriptor); + hashtable->tableslot = MakeSingleTupleTableSlot(tupdesc); + MemoryContextSwitchTo(oldContext); + } + + /* Need to run the hash functions in short-lived context */ + oldContext = MemoryContextSwitchTo(hashtable->tempcxt); + + /* set up data needed by hash and match functions */ + hashtable->inputslot = slot; + hashtable->in_hash_funcs = hashtable->tab_hash_funcs; + hashtable->cur_eq_funcs = hashtable->tab_eq_funcs; + + key = NULL; /* flag to reference inputslot */ + + if (isnew) + { + entry = aggtuplehash_insert((aggtuplehash_hash *) hashtable->hashtab, + key, &found); + + if (found) + { + /* found pre-existing entry */ + *isnew = false; + } + else + { + /* created new entry */ + *isnew = true; + /* zero caller data */ + entry->additional = NULL; + MemoryContextSwitchTo(hashtable->tablecxt); + /* Copy the first tuple into the table context */ + entry->firstTuple = ExecCopySlotMinimalTuple(slot); + } + } + else + { + entry = aggtuplehash_lookup((aggtuplehash_hash *) hashtable->hashtab, + key); + } + + MemoryContextSwitchTo(oldContext); + + return entry; +} + +/* + * Compute the hash value for a tuple + * + * The passed-in key is a pointer to TupleHashEntryData. In an actual hash + * table entry, the firstTuple field points to a tuple (in MinimalTuple + * format). LookupTupleHashEntry sets up a dummy TupleHashEntryData with a + * NULL firstTuple field --- that cues us to look at the inputslot instead. + * This convention avoids the need to materialize virtual input tuples unless + * they actually need to get copied into the table. + * + * Also, the caller must select an appropriate memory context for running + * the hash functions. + */ +static inline uint32 +AggTupleHashTableHash(struct aggtuplehash_hash *tb, const MinimalTuple tuple) +{ + TupleHashTable hashtable = (TupleHashTable) tb->private_data; + int numCols = hashtable->numCols; + AttrNumber *keyColIdx = hashtable->keyColIdx; + uint32 hashkey = hashtable->hash_iv; + TupleTableSlot *slot; + FmgrInfo *hashfunctions; + int i; + + if (tuple == NULL) + { + /* Process the current input tuple for the table */ + slot = hashtable->inputslot; + hashfunctions = hashtable->in_hash_funcs; + } + else + { + /* + * Process a tuple already stored in the table. + * + * (this case never actually occurs due to the way simplehash.h is + * used, as the hash-value is stored in the entries) + */ + slot = hashtable->tableslot; + ExecStoreMinimalTuple(tuple, slot, false); + hashfunctions = hashtable->tab_hash_funcs; + } + + for (i = 0; i < numCols; i++) + { + AttrNumber att = keyColIdx[i]; + Datum attr; + bool isNull; + + /* rotate hashkey left 1 bit at each step */ + hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); + + attr = slot_getattr(slot, att, &isNull); + + if (!isNull) /* treat nulls as having hash key 0 */ + { + uint32 hkey; + + hkey = DatumGetUInt32(FunctionCall1(&hashfunctions[i], + attr)); + hashkey ^= hkey; + } + } + + return hashkey; +} + +/* + * See whether two tuples (presumably of the same hash value) match + * + * As above, the passed pointers are pointers to TupleHashEntryData. + * + * Also, the caller must select an appropriate memory context for running + * the compare functions. + */ +static inline int +AggTupleHashTableMatch(struct aggtuplehash_hash *tb, + const MinimalTuple tuple1, + const MinimalTuple tuple2) +{ + TupleTableSlot *slot1; + TupleTableSlot *slot2; + TupleHashTable hashtable = (TupleHashTable) tb->private_data; + + /* + * We assume that simplehash.h will only ever call us with the first + * argument being an actual table entry, and the second argument being + * LookupTupleHashEntry's dummy TupleHashEntryData. The other direction + * could be supported too, but is not currently required. + */ + Assert(tuple1 != NULL); + slot1 = hashtable->tableslot; + ExecStoreMinimalTuple(tuple1, slot1, false); + Assert(tuple2 == NULL); + slot2 = hashtable->inputslot; + + /* For crosstype comparisons, the inputslot must be first */ + if (execTuplesMatch(slot2, + slot1, + hashtable->numCols, + hashtable->keyColIdx, + hashtable->cur_eq_funcs, + hashtable->tempcxt)) + return 0; + else + return 1; +} diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 386fcb4c8b..af8e98f2b4 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -17,6 +17,7 @@ #include "catalog/partition.h" #include "executor/execdesc.h" #include "nodes/parsenodes.h" +#include "utils/memutils.h" /* @@ -121,12 +122,6 @@ extern bool execCurrentOf(CurrentOfExpr *cexpr, /* * prototypes from functions in execGrouping.c */ -extern bool execTuplesMatch(TupleTableSlot *slot1, - TupleTableSlot *slot2, - int numCols, - AttrNumber *matchColIdx, - FmgrInfo *eqfunctions, - MemoryContext evalContext); extern bool execTuplesUnequal(TupleTableSlot *slot1, TupleTableSlot *slot2, int numCols, @@ -417,4 +412,95 @@ extern void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd); +/* + * Below goes static inlined function moved from execGrouping.c: since + * we have inlined all hashtable interface functions in nodeAgg.c, why not + * inline execTuplesMatch too? + * Obviously this is not a good place for it, it should be moved to + * something like execGrouping.h and all calls updated. + */ + +static inline bool execTuplesMatch(TupleTableSlot *slot1, + TupleTableSlot *slot2, + int numCols, + AttrNumber *matchColIdx, + FmgrInfo *eqfunctions, + MemoryContext evalContext); + +/* + * execTuplesMatch + * Return true if two tuples match in all the indicated fields. + * + * This actually implements SQL's notion of "not distinct". Two nulls + * match, a null and a not-null don't match. + * + * slot1, slot2: the tuples to compare (must have same columns!) + * numCols: the number of attributes to be examined + * matchColIdx: array of attribute column numbers + * eqFunctions: array of fmgr lookup info for the equality functions to use + * evalContext: short-term memory context for executing the functions + * + * NB: evalContext is reset each time! + */ +static inline bool +execTuplesMatch(TupleTableSlot *slot1, + TupleTableSlot *slot2, + int numCols, + AttrNumber *matchColIdx, + FmgrInfo *eqfunctions, + MemoryContext evalContext) +{ + MemoryContext oldContext; + bool result; + int i; + + /* Reset and switch into the temp context. */ + MemoryContextReset(evalContext); + oldContext = MemoryContextSwitchTo(evalContext); + + /* + * We cannot report a match without checking all the fields, but we can + * report a non-match as soon as we find unequal fields. So, start + * comparing at the last field (least significant sort key). That's the + * most likely to be different if we are dealing with sorted input. + */ + result = true; + + for (i = numCols; --i >= 0;) + { + AttrNumber att = matchColIdx[i]; + Datum attr1, + attr2; + bool isNull1, + isNull2; + + attr1 = slot_getattr(slot1, att, &isNull1); + + attr2 = slot_getattr(slot2, att, &isNull2); + + if (isNull1 != isNull2) + { + result = false; /* one null and one not; they aren't equal */ + break; + } + + if (isNull1) + continue; /* both are null, treat as equal */ + + /* Apply the type-specific equality function */ + + if (!DatumGetBool(FunctionCall2(&eqfunctions[i], + attr1, attr2))) + { + result = false; /* they aren't equal */ + break; + } + } + + MemoryContextSwitchTo(oldContext); + + return result; +} + + #endif /* EXECUTOR_H */ diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h index d2fee52e12..0808bfc75a 100644 --- a/src/include/executor/nodeAgg.h +++ b/src/include/executor/nodeAgg.h @@ -16,8 +16,10 @@ #include "nodes/execnodes.h" -extern AggState *ExecInitAgg(Agg *node, EState *estate, int eflags); -extern TupleTableSlot *ExecAgg(AggState *node); +extern AggState *ExecInitAgg(Agg *node, EState *estate, int eflags, + PlanState *parent); +extern bool ExecPushTupleToAgg(TupleTableSlot *slot, AggState *node); +extern void ExecPushNullToAgg(TupleTableSlot *slot, AggState *node); extern void ExecEndAgg(AggState *node); extern void ExecReScanAgg(AggState *node); diff --git a/src/include/lib/simplehash.h b/src/include/lib/simplehash.h index 6c6c3ee0d0..e865b87298 100644 --- a/src/include/lib/simplehash.h +++ b/src/include/lib/simplehash.h @@ -25,12 +25,33 @@ * declarations reside * - SH_USE_NONDEFAULT_ALLOCATOR - if defined no element allocator functions * are defined, so you can supply your own + * - SH_FOREACH_ON -- if defined, SH_TYPE_foreach function for iterating + * over the hashtable will be generated. This function works as follows: + * SH_FOREACH_ACC_TYPE SH_TYPE_foreach(hashtable, void *direct_arg) + * { + * accum = accum_init_val + * for each element in hashtable + * accum = accum_func(accum, foreach_func(element, direct_arg)) + * return accum + * } + * So, you have to specify the following macros if you use this: + * - SH_FOREACH_ACC_TYPE -- type of accum + * - Some more if SH_DEFINE is defined + * The following parameters are only relevant when SH_DEFINE is defined: * - SH_KEY - name of the element in SH_ELEMENT_TYPE containing the hash key * - SH_EQUAL(table, a, b) - compare two table keys * - SH_HASH_KEY(table, key) - generate hash for the key * - SH_STORE_HASH - if defined the hash is stored in the elements * - SH_GET_HASH(tb, a) - return the field to store the hash in + * Macros for foreach: + * - SH_FOREACH_ACC_INIT -- initial value of accum + * - SH_FOREACH_FUNC -- name of foreach_func, it's prototype is + * SH_FOREACH_ACC_TYPE SH_TYPE_foreach(SH_ELEMENT_TYPE el, + * void *direct agg) + * - SH_FOREACH_ACC_FUNC -- name of accum_func, it's prototype is + * SH_FOREACH_ACC_TYPE accum_func(SH_FOREACH_ACC_TYPE old, + * SH_FOREACH_ACC_TYPE new) * * For examples of usage look at simplehash.c (file local definition) and * execnodes.h/execGrouping.c (exposed declaration, file local @@ -75,6 +96,7 @@ #define SH_INSERT SH_MAKE_NAME(insert) #define SH_DELETE SH_MAKE_NAME(delete) #define SH_LOOKUP SH_MAKE_NAME(lookup) +#define SH_FOREACH SH_MAKE_NAME(foreach) #define SH_GROW SH_MAKE_NAME(grow) #define SH_START_ITERATE SH_MAKE_NAME(start_iterate) #define SH_START_ITERATE_AT SH_MAKE_NAME(start_iterate_at) @@ -147,6 +169,9 @@ SH_SCOPE bool SH_DELETE(SH_TYPE *tb, SH_KEY_TYPE key); SH_SCOPE void SH_START_ITERATE(SH_TYPE *tb, SH_ITERATOR *iter); SH_SCOPE void SH_START_ITERATE_AT(SH_TYPE *tb, SH_ITERATOR *iter, uint32 at); SH_SCOPE SH_ELEMENT_TYPE *SH_ITERATE(SH_TYPE *tb, SH_ITERATOR *iter); +#ifdef SH_FOREACH_ON +SH_SCOPE SH_FOREACH_ACC_TYPE SH_FOREACH(SH_TYPE *tb, void *direct_arg); +#endif /* SH_FOREACH_ON */ SH_SCOPE void SH_STAT(SH_TYPE *tb); #endif /* SH_DECLARE */ @@ -827,6 +852,35 @@ SH_ITERATE(SH_TYPE *tb, SH_ITERATOR *iter) } /* + * Iterate over the hashtable, doing something with each value and accumulating + * the result. + */ +#ifdef SH_FOREACH_ON +SH_SCOPE SH_FOREACH_ACC_TYPE SH_FOREACH(SH_TYPE *tb, void *direct_arg) +{ + uint32 cur = 0; + SH_FOREACH_ACC_TYPE accum = SH_FOREACH_ACC_INIT; + SH_FOREACH_ACC_TYPE new_accum; + SH_ELEMENT_TYPE *elem; + + do + { + elem = &tb->data[cur]; + if (elem->status == SH_STATUS_IN_USE) + { + new_accum = SH_FOREACH_FUNC(elem, direct_arg); + accum = SH_FOREACH_ACC_FUNC(accum, new_accum); + } + /* next element in forward direction */ + cur = (cur + 1) & tb->sizemask; + } while (cur != 0); + + return accum; +} +#endif /* SH_FOREACH_ON */ + + +/* * Report some statistics about the state of the hashtable. For * debugging/profiling purposes only. */ @@ -914,6 +968,11 @@ SH_STAT(SH_TYPE *tb) #undef SH_GET_HASH #undef SH_STORE_HASH #undef SH_USE_NONDEFAULT_ALLOCATOR +#undef SH_FOREACH_ON +#undef SH_FOREACH_ACC_TYPE +#undef SH_FOREACH_ACC_INIT +#undef SH_FOREACH_FUNC +#undef SH_FOREACH_ACC_FUNC /* undefine locally declared macros */ #undef SH_MAKE_PREFIX @@ -942,6 +1001,7 @@ SH_STAT(SH_TYPE *tb) #undef SH_START_ITERATE #undef SH_START_ITERATE_AT #undef SH_ITERATE +#undef SH_FOREACH #undef SH_ALLOCATE #undef SH_FREE #undef SH_STAT -- 2.11.0