diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index eff512d4..33f385cb 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -149,12 +149,39 @@ typedef enum IVM_SUB } IvmOp; +typedef enum +{ + BAD_AGGFUNC, + IVM_COUNT, + IVM_SUM, + IVM_AVG, + IVM_MIN, + IVM_MAX +} IvmAggType; + +typedef struct aggname_entry +{ + const char *agg_name; + int val; +} aggname_entry; + +/* map aggregate name with enum */ +static aggname_entry ivm_agg_lookup[] = +{ + { "count",IVM_COUNT}, + { "sum", IVM_SUM}, + { "avg", IVM_AVG}, + { "min",IVM_MIN}, + { "max",IVM_MAX} +}; + /* ENR name for materialized view delta */ #define NEW_DELTA_ENRNAME "new_delta" #define OLD_DELTA_ENRNAME "old_delta" static int matview_maintenance_depth = 0; +static int get_ivm_aggfunc(const char *agg_name); static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self); static void transientrel_shutdown(DestReceiver *self); @@ -191,16 +218,7 @@ static Query *rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable * static void apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *new_tuplestores, TupleDesc tupdesc_old, TupleDesc tupdesc_new, Query *query, bool use_count, char *count_colname); -static void append_set_clause_for_count(const char *resname, StringInfo buf_old, - StringInfo buf_new,StringInfo aggs_list); -static void append_set_clause_for_sum(const char *resname, StringInfo buf_old, - StringInfo buf_new, StringInfo aggs_list); -static void append_set_clause_for_avg(const char *resname, StringInfo buf_old, - StringInfo buf_new, StringInfo aggs_list, - const char *aggtype); -static void append_set_clause_for_minmax(const char *resname, StringInfo buf_old, - StringInfo buf_new, StringInfo aggs_list, - bool is_min); + static char *get_operation_string(IvmOp op, const char *col, const char *arg1, const char *arg2, const char* count_col, const char *castType); static char *get_null_condition_string(IvmOp op, const char *arg1, const char *arg2, @@ -235,6 +253,25 @@ static void mv_HashPreparedPlan(MV_QueryKey *key, SPIPlanPtr plan); static void mv_BuildQueryKey(MV_QueryKey *key, Oid matview_id, int32 query_type); static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort); +static void append_update_set_caluse(const char *aggname, char *resname, StringInfo buf_old, + StringInfo buf_new,StringInfo aggs_list, + char *aggtype); + +int +get_ivm_aggfunc(const char *agg_name) +{ +#define NKEYS (sizeof(ivm_agg_lookup)/sizeof(aggname_entry)) + + int i; + for (i=0; i < NKEYS; i++) + { + aggname_entry *entry = &ivm_agg_lookup[i]; + if (strcmp(entry->agg_name, agg_name) == 0) + return entry->val; + } + return BAD_AGGFUNC; +} + /* * SetMatViewPopulatedState * Mark a materialized view as populated, or not. @@ -2210,38 +2247,23 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n { Aggref *aggref = (Aggref *) tle->expr; const char *aggname = get_func_name(aggref->aggfnoid); + char *aggtype = format_type_be(aggref->aggtype); /* * We can use function names here because it is already checked if these * can be used in IMMV by its OID at the definition time. */ - - /* count */ - if (!strcmp(aggname, "count")) - append_set_clause_for_count(resname, aggs_set_old, aggs_set_new, aggs_list_buf); - - /* sum */ - else if (!strcmp(aggname, "sum")) - append_set_clause_for_sum(resname, aggs_set_old, aggs_set_new, aggs_list_buf); - - /* avg */ - else if (!strcmp(aggname, "avg")) - append_set_clause_for_avg(resname, aggs_set_old, aggs_set_new, aggs_list_buf, - format_type_be(aggref->aggtype)); - - /* min/max */ - else if (!strcmp(aggname, "min") || !strcmp(aggname, "max")) + (void) append_update_set_caluse(aggname, resname, aggs_set_old, + aggs_set_new, aggs_list_buf, aggtype); + + if (!strcmp(aggname, "min") || !strcmp(aggname, "max")) { bool is_min = (!strcmp(aggname, "min")); - append_set_clause_for_minmax(resname, aggs_set_old, aggs_set_new, aggs_list_buf, is_min); - /* make a resname list of min and max aggregates */ minmax_list = lappend(minmax_list, resname); is_min_list = lappend_int(is_min_list, is_min); } - else - elog(ERROR, "unsupported aggregate function: %s", aggname); } } @@ -2338,224 +2360,6 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n elog(ERROR, "SPI_finish failed"); } -/* - * append_set_clause_for_count - * - * Append SET clause string for count aggregation to given buffers. - * Also, append resnames required for calculating the aggregate value. - */ -static void -append_set_clause_for_count(const char *resname, StringInfo buf_old, - StringInfo buf_new,StringInfo aggs_list) -{ - /* For tuple deletion */ - if (buf_old) - { - /* resname = mv.resname - t.resname */ - appendStringInfo(buf_old, - ", %s = %s", - quote_qualified_identifier(NULL, resname), - get_operation_string(IVM_SUB, resname, "mv", "t", NULL, NULL)); - } - /* For tuple insertion */ - if (buf_new) - { - /* resname = mv.resname + diff.resname */ - appendStringInfo(buf_new, - ", %s = %s", - quote_qualified_identifier(NULL, resname), - get_operation_string(IVM_ADD, resname, "mv", "diff", NULL, NULL)); - } - - appendStringInfo(aggs_list, ", %s", - quote_qualified_identifier("diff", resname) - ); -} - -/* - * append_set_clause_for_sum - * - * Append SET clause string for sum aggregation to given buffers. - * Also, append resnames required for calculating the aggregate value. - */ -static void -append_set_clause_for_sum(const char *resname, StringInfo buf_old, - StringInfo buf_new, StringInfo aggs_list) -{ - char *count_col = IVM_colname("count", resname); - - /* For tuple deletion */ - if (buf_old) - { - /* sum = mv.sum - t.sum */ - appendStringInfo(buf_old, - ", %s = %s", - quote_qualified_identifier(NULL, resname), - get_operation_string(IVM_SUB, resname, "mv", "t", count_col, NULL) - ); - /* count = mv.count - t.count */ - appendStringInfo(buf_old, - ", %s = %s", - quote_qualified_identifier(NULL, count_col), - get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL) - ); - } - /* For tuple insertion */ - if (buf_new) - { - /* sum = mv.sum + diff.sum */ - appendStringInfo(buf_new, - ", %s = %s", - quote_qualified_identifier(NULL, resname), - get_operation_string(IVM_ADD, resname, "mv", "diff", count_col, NULL) - ); - /* count = mv.count + diff.count */ - appendStringInfo(buf_new, - ", %s = %s", - quote_qualified_identifier(NULL, count_col), - get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL) - ); - } - - appendStringInfo(aggs_list, ", %s, %s", - quote_qualified_identifier("diff", resname), - quote_qualified_identifier("diff", IVM_colname("count", resname)) - ); -} - -/* - * append_set_clause_for_avg - * - * Append SET clause string for avg aggregation to given buffers. - * Also, append resnames required for calculating the aggregate value. - */ -static void -append_set_clause_for_avg(const char *resname, StringInfo buf_old, - StringInfo buf_new, StringInfo aggs_list, - const char *aggtype) -{ - char *sum_col = IVM_colname("sum", resname); - char *count_col = IVM_colname("count", resname); - - /* For tuple deletion */ - if (buf_old) - { - /* avg = (mv.sum - t.sum)::aggtype / (mv.count - t.count) */ - appendStringInfo(buf_old, - ", %s = %s OPERATOR(pg_catalog./) %s", - quote_qualified_identifier(NULL, resname), - get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, aggtype), - get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL) - ); - /* sum = mv.sum - t.sum */ - appendStringInfo(buf_old, - ", %s = %s", - quote_qualified_identifier(NULL, sum_col), - get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, NULL) - ); - /* count = mv.count - t.count */ - appendStringInfo(buf_old, - ", %s = %s", - quote_qualified_identifier(NULL, count_col), - get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL) - ); - - } - /* For tuple insertion */ - if (buf_new) - { - /* avg = (mv.sum + diff.sum)::aggtype / (mv.count + diff.count) */ - appendStringInfo(buf_new, - ", %s = %s OPERATOR(pg_catalog./) %s", - quote_qualified_identifier(NULL, resname), - get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, aggtype), - get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL) - ); - /* sum = mv.sum + diff.sum */ - appendStringInfo(buf_new, - ", %s = %s", - quote_qualified_identifier(NULL, sum_col), - get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, NULL) - ); - /* count = mv.count + diff.count */ - appendStringInfo(buf_new, - ", %s = %s", - quote_qualified_identifier(NULL, count_col), - get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL) - ); - } - - appendStringInfo(aggs_list, ", %s, %s, %s", - quote_qualified_identifier("diff", resname), - quote_qualified_identifier("diff", IVM_colname("sum", resname)), - quote_qualified_identifier("diff", IVM_colname("count", resname)) - ); -} - -/* - * append_set_clause_for_minmax - * - * Append SET clause string for min or max aggregation to given buffers. - * Also, append resnames required for calculating the aggregate value. - * is_min is true if this is min, false if not. - */ -static void -append_set_clause_for_minmax(const char *resname, StringInfo buf_old, - StringInfo buf_new, StringInfo aggs_list, - bool is_min) -{ - char *count_col = IVM_colname("count", resname); - - /* For tuple deletion */ - if (buf_old) - { - /* - * If the new value doesn't became NULL then use the value remaining - * in the view although this will be recomputated afterwords. - */ - appendStringInfo(buf_old, - ", %s = CASE WHEN %s THEN NULL ELSE %s END", - quote_qualified_identifier(NULL, resname), - get_null_condition_string(IVM_SUB, "mv", "t", count_col), - quote_qualified_identifier("mv", resname) - ); - /* count = mv.count - t.count */ - appendStringInfo(buf_old, - ", %s = %s", - quote_qualified_identifier(NULL, count_col), - get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL) - ); - } - /* For tuple insertion */ - if (buf_new) - { - /* - * min = LEAST(mv.min, diff.min) - * max = GREATEST(mv.max, diff.max) - */ - appendStringInfo(buf_new, - ", %s = CASE WHEN %s THEN NULL ELSE %s(%s,%s) END", - quote_qualified_identifier(NULL, resname), - get_null_condition_string(IVM_ADD, "mv", "diff", count_col), - - is_min ? "LEAST" : "GREATEST", - quote_qualified_identifier("mv", resname), - quote_qualified_identifier("diff", resname) - ); - /* count = mv.count + diff.count */ - appendStringInfo(buf_new, - ", %s = %s", - quote_qualified_identifier(NULL, count_col), - get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL) - ); - } - - appendStringInfo(aggs_list, ", %s, %s", - quote_qualified_identifier("diff", resname), - quote_qualified_identifier("diff", IVM_colname("count", resname)) - ); -} - /* * get_operation_string * @@ -3482,3 +3286,228 @@ isIvmName(const char *s) return (strncmp(s, "__ivm_", 6) == 0); return false; } + +void +append_update_set_caluse(const char *aggname, char *resname, StringInfo buf_old, + StringInfo buf_new, StringInfo aggs_list, + char *aggtype) +{ + char *count_col = IVM_colname("count", resname); + char *sum_col = IVM_colname("sum", resname); + + switch (get_ivm_aggfunc(aggname)) + { + case IVM_COUNT: + if (buf_old) + appendStringInfo(buf_old, + ", %s = %s", + quote_qualified_identifier(NULL, resname), + get_operation_string(IVM_SUB, resname, "mv", "t", NULL, NULL)); + + if (buf_new) + appendStringInfo(buf_new, + ", %s = %s", + quote_qualified_identifier(NULL, resname), + get_operation_string(IVM_ADD, resname, "mv", "diff", NULL, NULL)); + + appendStringInfo(aggs_list, ", %s", + quote_qualified_identifier("diff", resname)); + break; + + case IVM_SUM: + if (buf_old) + { + /* sum = mv.sum - t.sum */ + appendStringInfo(buf_old, + ", %s = %s", + quote_qualified_identifier(NULL, resname), + get_operation_string(IVM_SUB, resname, "mv", "t", count_col, NULL) + ); + + /* count = mv.count - t.count */ + appendStringInfo(buf_old, + ", %s = %s", + quote_qualified_identifier(NULL, count_col), + get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL) + ); + } + + if (buf_new) + { + /* sum = mv.sum + diff.sum */ + appendStringInfo(buf_new, + ", %s = %s", + quote_qualified_identifier(NULL, resname), + get_operation_string(IVM_ADD, resname, "mv", "diff", count_col, NULL) + ); + + /* count = mv.count + diff.count */ + appendStringInfo(buf_new, + ", %s = %s", + quote_qualified_identifier(NULL, count_col), + get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL) + ); + } + + appendStringInfo(aggs_list, ", %s, %s", + quote_qualified_identifier("diff", resname), + quote_qualified_identifier("diff", IVM_colname("count", resname))); + break; + + case IVM_AVG: + if (buf_old) + { + /* avg = (mv.sum - t.sum)::aggtype / (mv.count - t.count) */ + appendStringInfo(buf_old, + ", %s = %s OPERATOR(pg_catalog./) %s", + quote_qualified_identifier(NULL, resname), + get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, aggtype), + get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL) + ); + + /* sum = mv.sum - t.sum */ + appendStringInfo(buf_old, + ", %s = %s", + quote_qualified_identifier(NULL, sum_col), + get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, NULL) + ); + + /* count = mv.count - t.count */ + appendStringInfo(buf_old, + ", %s = %s", + quote_qualified_identifier(NULL, count_col), + get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL) + ); + } + + if (buf_new) + { + /* avg = (mv.sum + diff.sum)::aggtype / (mv.count + diff.count) */ + appendStringInfo(buf_new, + ", %s = %s OPERATOR(pg_catalog./) %s", + quote_qualified_identifier(NULL, resname), + get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, aggtype), + get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL) + ); + /* sum = mv.sum + diff.sum */ + appendStringInfo(buf_new, + ", %s = %s", + quote_qualified_identifier(NULL, sum_col), + get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, NULL) + ); + /* count = mv.count + diff.count */ + appendStringInfo(buf_new, + ", %s = %s", + quote_qualified_identifier(NULL, count_col), + get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL) + ); + } + + appendStringInfo(aggs_list, ", %s, %s, %s", + quote_qualified_identifier("diff", resname), + quote_qualified_identifier("diff", IVM_colname("sum", resname)), + quote_qualified_identifier("diff", IVM_colname("count", resname))); + + break; + + case IVM_MIN: + if (buf_old) + { + /* + * If the new value doesn't became NULL then use the value remaining + * in the view although this will be recomputated afterwords. + */ + appendStringInfo(buf_old, + ", %s = CASE WHEN %s THEN NULL ELSE %s END", + quote_qualified_identifier(NULL, resname), + get_null_condition_string(IVM_SUB, "mv", "t", count_col), + quote_qualified_identifier("mv", resname) + ); + /* count = mv.count - t.count */ + appendStringInfo(buf_old, + ", %s = %s", + quote_qualified_identifier(NULL, count_col), + get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL) + ); + } + + if (buf_new) + { + /* min = LEAST(mv.min, diff.min) */ + appendStringInfo(buf_new, + ", %s = CASE WHEN %s THEN NULL ELSE %s(%s,%s) END", + quote_qualified_identifier(NULL, resname), + get_null_condition_string(IVM_ADD, "mv", "diff", count_col), + "LEAST", + quote_qualified_identifier("mv", resname), + quote_qualified_identifier("diff", resname) + ); + + /* count = mv.count + diff.count */ + appendStringInfo(buf_new, + ", %s = %s", + quote_qualified_identifier(NULL, count_col), + get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL) + ); + } + appendStringInfo(aggs_list, ", %s, %s", + quote_qualified_identifier("diff", resname), + quote_qualified_identifier("diff", IVM_colname("count", resname))); + break; + + case IVM_MAX: + if (buf_old) + { + /* + * If the new value doesn't became NULL then use the value remaining + * in the view although this will be recomputated afterwords. + */ + appendStringInfo(buf_old, + ", %s = CASE WHEN %s THEN NULL ELSE %s END", + quote_qualified_identifier(NULL, resname), + get_null_condition_string(IVM_SUB, "mv", "t", count_col), + quote_qualified_identifier("mv", resname) + ); + /* count = mv.count - t.count */ + appendStringInfo(buf_old, + ", %s = %s", + quote_qualified_identifier(NULL, count_col), + get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL) + ); + } + + if (buf_new) + { + /* max = GREATEST(mv.max, diff.max) */ + appendStringInfo(buf_new, + ", %s = CASE WHEN %s THEN NULL ELSE %s(%s,%s) END", + quote_qualified_identifier(NULL, resname), + get_null_condition_string(IVM_ADD, "mv", "diff", count_col), + "GREATEST", + quote_qualified_identifier("mv", resname), + quote_qualified_identifier("diff", resname) + ); + + /* count = mv.count + diff.count */ + appendStringInfo(buf_new, + ", %s = %s", + quote_qualified_identifier(NULL, count_col), + get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL) + ); + } + + appendStringInfo(aggs_list, ", %s, %s", + quote_qualified_identifier("diff", resname), + quote_qualified_identifier("diff", IVM_colname("count", resname))); + break; + + case BAD_AGGFUNC: + elog(ERROR, "unsupported aggregate function: %s", aggname); + break; + + default: + elog(ERROR, "unsupported aggregate function: %s", aggname); + break; + } + return; +} \ No newline at end of file