diff --git a/doc/src/sgml/ref/psql-ref.sgml b/doc/src/sgml/ref/psql-ref.sgml index f5c9552..0ed2e3d 100644 --- a/doc/src/sgml/ref/psql-ref.sgml +++ b/doc/src/sgml/ref/psql-ref.sgml @@ -2445,6 +2445,96 @@ lo_import 152801 + + \rotate [ colV [-]colH] + + + Execute the current query buffer (like \g) and shows the results + inside a crosstab grid. The contents at + output column colV are transformed into a + vertical header and the contents at + output column colH into a + horizontal header. The results for the other output columns are projected inside the grid. + + + + colV + and colH can indicate a + column position (starting at 1), or a column name. Normal case folding + and quoting rules apply on column names. By default, + colV designates column 1 + and colH column 2. + A query having less than two output columns cannot be rotated, and + colH must differ from + colV. + + + + The vertical header, displayed as the leftmost column in the output, + contains the set of all distinct values found in + column colV, in the order + of their first appearance in the results. + + + The horizontal header, displayed as the first row in the output, + contains the set of all distinct non-null values found in + column colH. It is + sorted in ascending order of values, unless a minus (-) sign + precedes colH, in which + case this order is reversed. + + + + The query results being tuples of N columns + (including colV and + colH), + for each distinct value x of + colH + and each distinct value y of + colV, + a cell is output at the intersection (x,y) in the grid, + and its contents are determined by these rules: + + + + if there is no corresponding row in the results such that the value + for colH + is x and the value + for colV + is y, the cell is empty. + + + + + + if there is exactly one row such that the value + for colH + is x and the value + for colV + is y, then the N-2 other + columns are displayed in the cell, separated between each other by + a space character if needed. + + If N=2, the letter X is displayed in the cell as + if a virtual third column contained that character. + + + + + + if there are several corresponding rows, the behavior is identical to one row + except that the values coming from different rows are stacked + vertically, rows being separated by newline characters inside + the same cell. + + + + + + + + + \s [ filename ] diff --git a/src/bin/psql/Makefile b/src/bin/psql/Makefile index f1336d5..92f9f44 100644 --- a/src/bin/psql/Makefile +++ b/src/bin/psql/Makefile @@ -23,7 +23,7 @@ override CPPFLAGS := -I. -I$(srcdir) -I$(libpq_srcdir) -I$(top_srcdir)/src/bin/p OBJS= command.o common.o help.o input.o stringutils.o mainloop.o copy.o \ startup.o prompt.o variables.o large_obj.o print.o describe.o \ tab-complete.o mbprint.o dumputils.o keywords.o kwlookup.o \ - sql_help.o \ + sql_help.o rotate.o \ $(WIN32RES) diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c index 8156b76..3550d6c 100644 --- a/src/bin/psql/command.c +++ b/src/bin/psql/command.c @@ -46,6 +46,7 @@ #include "mainloop.h" #include "print.h" #include "psqlscan.h" +#include "rotate.h" #include "settings.h" #include "variables.h" @@ -1083,6 +1084,34 @@ exec_command(const char *cmd, free(pw2); } + /* \rotate -- execute a query and show results rotated along axis */ + else if (strcmp(cmd, "rotate") == 0) + { + char *opt1, + *opt2; + + opt1 = psql_scan_slash_option(scan_state, + OT_NORMAL, NULL, false); + opt2 = psql_scan_slash_option(scan_state, + OT_NORMAL, NULL, false); + + if (opt1 && !opt2) + { + psql_error(_("\\%s: missing second argument\n"), cmd); + success = false; + } + else + { + pset.rotate_col_V = opt1 ? pg_strdup(opt1): NULL; + pset.rotate_col_H = opt2 ? pg_strdup(opt2): NULL; + pset.rotate_output = true; + status = PSQL_CMD_SEND; + } + + free(opt1); + free(opt2); + } + /* \prompt -- prompt and set variable */ else if (strcmp(cmd, "prompt") == 0) { diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index 0e266a3..032db38 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -23,6 +23,7 @@ #include "command.h" #include "copy.h" #include "mbprint.h" +#include "rotate.h" @@ -1061,7 +1062,12 @@ SendQuery(const char *query) /* but printing results isn't: */ if (OK && results) - OK = PrintQueryResults(results); + { + if (pset.rotate_output) + OK = PrintRotate(results, pset.rotate_col_V, pset.rotate_col_H); + else + OK = PrintQueryResults(results); + } } else { @@ -1177,6 +1183,17 @@ sendquery_cleanup: pset.gset_prefix = NULL; } + pset.rotate_output = false; + if (pset.rotate_col_V) + { + free(pset.rotate_col_V); + pset.rotate_col_V = NULL; + } + if (pset.rotate_col_H) + { + free(pset.rotate_col_H); + pset.rotate_col_H = NULL; + } return OK; } diff --git a/src/bin/psql/help.c b/src/bin/psql/help.c index 5b63e76..3dde055 100644 --- a/src/bin/psql/help.c +++ b/src/bin/psql/help.c @@ -175,6 +175,7 @@ slashUsage(unsigned short int pager) fprintf(output, _(" \\g [FILE] or ; execute query (and send results to file or |pipe)\n")); fprintf(output, _(" \\gset [PREFIX] execute query and store results in psql variables\n")); fprintf(output, _(" \\q quit psql\n")); + fprintf(output, _(" \\rotate [COLV COLH] execute query and rotate results into crosstab view\n")); fprintf(output, _(" \\watch [SEC] execute query every SEC seconds\n")); fprintf(output, "\n"); diff --git a/src/bin/psql/rotate.c b/src/bin/psql/rotate.c new file mode 100644 index 0000000..7dcc3e9 --- /dev/null +++ b/src/bin/psql/rotate.c @@ -0,0 +1,566 @@ +/* + * psql - the PostgreSQL interactive terminal + * + * Copyright (c) 2000-2015, PostgreSQL Global Development Group + * + * src/bin/psql/rotate.c + */ + +#include "common.h" +#include "pqexpbuffer.h" +#include "rotate.h" +#include "settings.h" + +#include + +static int +headerCompare(const void *a, const void *b) +{ + return strcmp( ((struct pivot_field*)a)->name, + ((struct pivot_field*)b)->name); +} + +static void +accumHeader(char* name, int* count, struct pivot_field **sorted_tab, int row_number) +{ + struct pivot_field *p; + + /* + * Search for name in sorted_tab. If it doesn't exist, insert it, + * otherwise do nothing. + */ + + if (*count >= 1) + { + p = (struct pivot_field*) bsearch(&name, + *sorted_tab, + *count, + sizeof(struct pivot_field), + headerCompare); + } + else + p=NULL; + + if (!p) + { + *sorted_tab = pg_realloc(*sorted_tab, sizeof(struct pivot_field) * (1+*count)); + (*sorted_tab)[*count].name = name; + (*sorted_tab)[*count].rank = *count; + (*count)++; + + qsort(*sorted_tab, + *count, + sizeof(struct pivot_field), + headerCompare); + } +} + +/* + * Send a query to sort all column values cast to the Oid passed in a VALUES clause + */ +static bool +sortColumns(Oid coltype, struct pivot_field *columns, int nb_cols, int direction) +{ + bool retval = false; + PGresult *res = NULL; + PQExpBufferData query; + int i; + Oid *param_types; + const char** param_values; + int* param_lengths; + int* param_formats; + + if (nb_cols < 2) + return true; /* nothing to sort */ + + param_types = (Oid*) pg_malloc(nb_cols*sizeof(Oid)); + param_values = (const char**) pg_malloc(nb_cols*sizeof(char*)); + param_lengths = (int*) pg_malloc(nb_cols*sizeof(int)); + param_formats = (int*) pg_malloc(nb_cols*sizeof(int)); + + initPQExpBuffer(&query); + + /* + * The query returns the original position of each value in our list, + * ordered by its new position. The value itself is not returned. + */ + appendPQExpBuffer(&query, "SELECT n FROM (VALUES"); + + for (i=1; i <= nb_cols; i++) + { + if (i < nb_cols) + appendPQExpBuffer(&query, "($%d,%d),", i, i); + else + { + appendPQExpBuffer(&query, "($%d,%d)) AS l(x,n) ORDER BY x", i, i); + if (direction < 0) + appendPQExpBuffer(&query, " DESC"); + } + + param_types[i-1] = coltype; + param_values[i-1] = columns[i-1].name; + param_lengths[i-1] = strlen(columns[i-1].name); + param_formats[i-1] = 0; + } + + res = PQexecParams(pset.db, + query.data, + nb_cols, + param_types, + param_values, + param_lengths, + param_formats, + 0); + + if (res) + { + ExecStatusType status = PQresultStatus(res); + if (status == PGRES_TUPLES_OK) + { + for (i=0; i < PQntuples(res); i++) + { + int old_pos = atoi(PQgetvalue(res, i, 0)); + + if (old_pos < 1 || old_pos > nb_cols || i >= nb_cols) + { + /* + * A position outside of the range is normally impossible. + * If this happens, we're facing a malfunctioning or hostile + * server or middleware. + */ + psql_error(_("Unexpected value when sorting horizontal headers")); + goto cleanup; + } + else + { + columns[old_pos-1].rank = i; + } + } + } + else + { + psql_error(_("Query error when sorting horizontal headers: %s"), + PQerrorMessage(pset.db)); + goto cleanup; + } + } + + retval = true; + +cleanup: + termPQExpBuffer(&query); + if (res) + PQclear(res); + pg_free(param_types); + pg_free(param_values); + pg_free(param_lengths); + pg_free(param_formats); + return retval; +} + +static void +printRotation(const PGresult *results, + int num_columns, + struct pivot_field *piv_columns, + int field_for_columns, + int num_rows, + struct pivot_field *piv_rows, + int field_for_rows) +{ + printQueryOpt popt = pset.popt; + printTableContent cont; + int i, j, rn; + int* horiz_map; /* map indices from sorted horizontal headers to piv_columns */ + char** allocated_cells; /* Pointers for cell contents that are allocated + * in this function, when cells cannot simply point to + * PQgetvalue(results, ...) */ + + printTableInit(&cont, &popt.topt, popt.title, num_columns+1, num_rows); + + /* Step 1: set target column names (horizontal header) */ + + /* The name of the first column is kept unchanged by the rotation */ + printTableAddHeader(&cont, PQfname(results, 0), false, 'l'); + + /* + * To iterate over piv_columns[] by piv_columns[].rank, create a reverse map + * associating each piv_columns[].rank to its index in piv_columns. + * This avoids an O(N^2) loop later + */ + horiz_map = (int*) pg_malloc(sizeof(int) * num_columns); + for (i = 0; i < num_columns; i++) + { + horiz_map[piv_columns[i].rank] = i; + } + + for (i = 0; i < num_columns; i++) + { + printTableAddHeader(&cont, piv_columns[horiz_map[i]].name, false, 'l'); + } + pg_free(horiz_map); + + /* Step 2: set row names in the first output column (vertical header) */ + for (i = 0; i < num_rows; i++) + { + int k = piv_rows[i].rank; + cont.cells[k*(num_columns+1)] = piv_rows[i].name; + /* Initialize all cells inside the grid to an empty value */ + for (j = 0; j < num_columns; j++) + cont.cells[k*(num_columns+1)+j+1] = ""; + } + cont.cellsadded = num_rows * (num_columns+1); + + allocated_cells = (char**) pg_malloc0(num_rows * num_columns * sizeof(char*)); + + /* Step 3: set all the cells "inside the grid" */ + for (rn = 0; rn < PQntuples(results); rn++) + { + char* row_name; + char* col_name; + int row_number; + int col_number; + struct pivot_field *p; + + row_number = col_number = -1; + /* Find target row */ + if (!PQgetisnull(results, rn, field_for_rows)) + { + row_name = PQgetvalue(results, rn, field_for_rows); + p = (struct pivot_field*) bsearch(&row_name, + piv_rows, + num_rows, + sizeof(struct pivot_field), + headerCompare); + if (p) + row_number = p->rank; + } + + /* Find target column */ + if (!PQgetisnull(results, rn, field_for_columns)) + { + col_name = PQgetvalue(results, rn, field_for_columns); + p = (struct pivot_field*) bsearch(&col_name, + piv_columns, + num_columns, + sizeof(struct pivot_field), + headerCompare); + if (p) + col_number = p->rank; + } + + /* Place value into cell */ + if (col_number>=0 && row_number>=0) + { + int idx = 1 + col_number + row_number*(num_columns+1); + int src_col = 0; /* column number in source result */ + int k = 0; + + do { + char *content; + + if (PQnfields(results) == 2) + { + /* + special case: when the source has only 2 columns, use a + X (cross/checkmark) for the cell content, and set + src_col to a virtual additional column. + */ + content = "X"; + src_col = 3; + } + else if (src_col == field_for_rows || src_col == field_for_columns) + { + /* + The source values that produce headers are not processed + in this loop, only the values that end up inside the grid. + */ + src_col++; + continue; + } + else + { + content = (!PQgetisnull(results, rn, src_col)) ? + PQgetvalue(results, rn, src_col) : + (popt.nullPrint ? popt.nullPrint : ""); + } + + if (cont.cells[idx] != NULL && cont.cells[idx][0] != '\0') + { + /* + * Multiple values for the same (row,col) are projected + * into the same cell. When this happens, separate the + * previous content of the cell from the new value by a + * newline. + */ + int content_size = + strlen(cont.cells[idx]) + + 2 /* room for [CR],LF or space */ + + strlen(content) + + 1; /* '\0' */ + char *new_content; + + /* + * idx2 is an index into allocated_cells. It differs from + * idx (index into cont.cells), because vertical and + * horizontal headers are included in `cont.cells` but + * excluded from allocated_cells. + */ + int idx2 = (row_number * num_columns) + col_number; + + if (allocated_cells[idx2] != NULL) + { + new_content = pg_realloc(allocated_cells[idx2], content_size); + } + else + { + /* + * At this point, cont.cells[idx] still contains a + * PQgetvalue() pointer. Just after, it will contain + * a new pointer maintained in allocated_cells[], and + * freed at the end of this function. + */ + new_content = pg_malloc(content_size); + strcpy(new_content, cont.cells[idx]); + } + cont.cells[idx] = new_content; + allocated_cells[idx2] = new_content; + + /* + * Contents that are on adjacent columns in the source results get + * separated by one space in the target. + * Contents that are on different rows in the source get + * separated by newlines in the target. + */ + if (k==0) + strcat(new_content, "\n"); + else + strcat(new_content, " "); + strcat(new_content, content); + } + else + { + cont.cells[idx] = content; + } + k++; + src_col++; + } while (src_col < PQnfields(results)); + } + } + + printTable(&cont, pset.queryFout, pset.logfile); + printTableCleanup(&cont); + + + for (i=0; i < num_rows * num_columns; i++) + { + if (allocated_cells[i] != NULL) + pg_free(allocated_cells[i]); + } + + pg_free(allocated_cells); +} + +/* + * Compare a user-supplied argument against a field name obtained by PQfname(), + * which is already case-folded. + * If arg is not enclosed in double quotes, pg_strcasecmp applies, otherwise + * do a case-sensitive comparison with these rules: + * - double quotes enclosing 'arg' are filtered out + * - double quotes inside 'arg' are expected to be doubled + */ +static int +fieldnameCmp(const char* arg, const char* fieldname) +{ + const unsigned char* p = (const unsigned char*) arg; + const unsigned char* f = (const unsigned char*) fieldname; + unsigned char c; + + if (*p++ != '"') + return pg_strcasecmp(arg, fieldname); + + while ((c=*p++)) + { + if (c=='"') + { + if (*p=='"') + p++; /* skip second quote and continue */ + else if (*p=='\0') + return *p-*f; /* p finishes before f or is identical */ + + } + if (*f=='\0') + return 1; /* f finishes before p */ + if (c!=*f) + return c-*f; + f++; + } + return (*f=='\0') ? 0 : 1; +} + + +/* + * arg can be a number or a column name, possibly quoted (like in an ORDER BY clause) + * Returns: + * on success, the 0-based index of the column + * or -1 if the column number or name cannot correspond to the PGresult, + * or if it's ambiguous (arg corresponding to several columns) + */ +static int +indexOfColumn(const char* arg, PGresult* res) +{ + int idx; + + if (strspn(arg, "0123456789") == strlen(arg)) + { + /* if arg contains only digits, it's a column number */ + idx = atoi(arg) - 1; + if (idx < 0 || idx >= PQnfields(res)) + { + psql_error(_("Invalid column number: %s\n"), arg); + return -1; + } + } + else + { + int i; + idx = -1; + for (i=0; i < PQnfields(res); i++) + { + if (fieldnameCmp(arg, PQfname(res, i)) == 0) + { + if (idx>=0) + { + /* if another idx was already found for the same name */ + psql_error(_("Ambiguous column name: %s\n"), arg); + return -1; + } + idx = i; + } + } + if (idx == -1) + { + psql_error(_("Invalid column name: %s\n"), arg); + return -1; + } + } + return idx; +} + +bool +PrintRotate(PGresult* res, + const char* opt_field_for_rows, /* COLV or null */ + const char* opt_field_for_columns) /* [-]COLH or null */ +{ + int rn; + struct pivot_field *piv_columns = NULL; + struct pivot_field *piv_rows = NULL; + int num_columns = 0; + int num_rows = 0; + bool retval = false; + int columns_sort_direction = 1; /* 1:ascending, -1:descending */ + + /* 0-based index of the field whose distinct values will become COLUMN headers */ + int field_for_columns; + + /* 0-based index of the field whose distinct values will become ROW headers */ + int field_for_rows; + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + goto error_return; + + if (PQnfields(res) < 2) + { + psql_error(_("A query to rotate must return at least two columns\n")); + goto error_return; + } + + field_for_rows = (opt_field_for_rows != NULL) + ? indexOfColumn(opt_field_for_rows, res) + : 0; + + if (field_for_rows < 0) + goto error_return; + + if (opt_field_for_columns == NULL) + field_for_columns = 1; + else + { + /* + * descending sort is requested if the column reference is + * preceded with a minus sign + */ + if (*opt_field_for_columns == '-') + { + columns_sort_direction = -1; + opt_field_for_columns++; + } + field_for_columns = indexOfColumn(opt_field_for_columns, res); + if (field_for_columns < 0) + goto error_return; + } + + + if (field_for_columns == field_for_rows) + { + psql_error(_("The same column cannot be used for both vertical and horizontal headers\n")); + goto error_return; + } + + /* + * First pass: accumulate row names and column names, each into their + * array. Use client-side sort but only to build the set of DISTINCT + * values. The final order displayed depends only on server-side + * sorts. + */ + for (rn = 0; rn < PQntuples(res); rn++) + { + if (!PQgetisnull(res, rn, field_for_rows)) + { + accumHeader(PQgetvalue(res, rn, field_for_rows), + &num_rows, + &piv_rows, + rn); + } + + if (!PQgetisnull(res, rn, field_for_columns)) + { + accumHeader(PQgetvalue(res, rn, field_for_columns), + &num_columns, + &piv_columns, + rn); + if (num_columns > 1600) + { + psql_error(_("Maximum number of columns (1600) exceeded\n")); + goto error_return; + } + } + } + + /* + * Second pass: sort the list of target columns on the server. + */ + if (!sortColumns(PQftype(res, field_for_columns), + piv_columns, + num_columns, + columns_sort_direction)) + goto error_return; + + /* + * Third pass: print the rotated results. + */ + printRotation(res, + num_columns, + piv_columns, + field_for_columns, + num_rows, + piv_rows, + field_for_rows); + + retval = true; + +error_return: + pg_free(piv_columns); + pg_free(piv_rows); + + return retval; +} diff --git a/src/bin/psql/rotate.h b/src/bin/psql/rotate.h new file mode 100644 index 0000000..ba027b6 --- /dev/null +++ b/src/bin/psql/rotate.h @@ -0,0 +1,33 @@ +/* + * psql - the PostgreSQL interactive terminal + * + * Copyright (c) 2000-2015, PostgreSQL Global Development Group + * + * src/bin/psql/rotate.h + */ + +#ifndef ROTATE_H +#define ROTATE_H + +struct pivot_field +{ + /* Pointer obtained from PGgetvalue() for colV or colH */ + char* name; + + /* Rank of the field in its list, starting at 0. + * - For headers stacked vertically, rank=N means it's the + * Nth distinct field encountered when looping through rows + * in their initial order. + * - For headers stacked horizontally, rank is obtained + * by server-side sorting in sortColumns() + */ + int rank; +}; + +/* prototypes */ +extern bool +PrintRotate(PGresult* res, + const char* opt_field_for_rows, + const char* opt_field_for_columns); + +#endif /* ROTATE_H */ diff --git a/src/bin/psql/settings.h b/src/bin/psql/settings.h index 1885bb1..2164928 100644 --- a/src/bin/psql/settings.h +++ b/src/bin/psql/settings.h @@ -90,6 +90,9 @@ typedef struct _psqlSettings char *gfname; /* one-shot file output argument for \g */ char *gset_prefix; /* one-shot prefix argument for \gset */ + bool rotate_output; /* one-shot request to print rotated results */ + char *rotate_col_V; /* one-shot \rotate 1st argument */ + char *rotate_col_H; /* one-shot \rotate 2nd argument */ bool notty; /* stdin or stdout is not a tty (as determined * on startup) */