From d5a97f9db3f98ba18aefeaaf70076f474edb50c3 Mon Sep 17 00:00:00 2001 From: usernamedt Date: Tue, 30 Mar 2021 20:47:12 +0500 Subject: [PATCH 2/2] Implement libpq compression --- .../postgres_fdw/expected/postgres_fdw.out | 2 +- doc/src/sgml/config.sgml | 16 + doc/src/sgml/libpq.sgml | 33 + doc/src/sgml/protocol.sgml | 93 +++ src/backend/catalog/system_views.sql | 9 + src/backend/libpq/pqcomm.c | 246 +++++-- src/backend/postmaster/postmaster.c | 10 + src/backend/utils/activity/backend_status.c | 30 + src/backend/utils/adt/pgstatfuncs.c | 50 +- src/backend/utils/misc/guc.c | 10 + src/bin/pgbench/pgbench.c | 17 +- src/bin/psql/command.c | 23 + src/common/Makefile | 3 +- src/common/zpq_stream.c | 614 ++++++++++++++++++ src/include/catalog/pg_proc.dat | 18 +- src/include/common/zpq_stream.h | 86 +++ src/include/libpq/libpq-be.h | 3 + src/include/libpq/libpq.h | 1 + src/include/libpq/pqcomm.h | 1 + src/include/utils/backend_status.h | 7 + src/interfaces/libpq/exports.txt | 3 + src/interfaces/libpq/fe-connect.c | 109 +++- src/interfaces/libpq/fe-exec.c | 10 +- src/interfaces/libpq/fe-misc.c | 90 ++- src/interfaces/libpq/fe-protocol3.c | 171 +++++ src/interfaces/libpq/libpq-fe.h | 5 + src/interfaces/libpq/libpq-int.h | 23 + src/test/regress/expected/rules.out | 14 +- src/tools/msvc/Mkvcbuild.pm | 2 +- 29 files changed, 1610 insertions(+), 89 deletions(-) create mode 100644 src/common/zpq_stream.c create mode 100644 src/include/common/zpq_stream.h diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 5070d93239..b262f4a1f9 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -9116,7 +9116,7 @@ DO $d$ END; $d$; ERROR: invalid option "password" -HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, keep_connections +HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, compression, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, keep_connections CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')" PL/pgSQL function inline_code_block line 3 at EXECUTE -- If we add a password for our user mapping instead, we should get a different diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index cf75d913ce..0b7a495a97 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -1035,6 +1035,22 @@ include_dir 'conf.d' + + libpq_compression (boolean) + + libpq_compression configuration parameter + + + + + This parameter enables compression of libpq traffic between client and server. + The default is on. + This option allows rejecting compression requests even if it is supported by server + (for example, due to security, or CPU consumption). + + + + diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 7bcb7504a6..ae184396c0 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -1260,6 +1260,39 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname + + compression + + + Request compression of libpq traffic. The client sends a request with + a list of compression algorithms. Compression can be requested by a client + by including the "compression" option in its connection string. + This can either be a boolean value to enable or disable compression + ("true"/"false", + "on"/"off", + "yes"/"no", + "1"/"0"), + "any", + or an explicit list of comma-separated compression algorithms + which can optionally include compression level ("zlib,zstd:5"). + If compression is enabled but an algorithm is not explicitly specified, + the client library sends its full list of + supported algorithms and the server chooses a preferred algorithm. + + If the server accepts one of the algorithms, it replies with an acknowledgment + and all future libpq messages between client and server will be compressed. + If the server rejects the compression request, it is up to the client whether + to continue without compression or to report an error. + Support for compression algorithms must be enabled when the server is compiled. + Currently, two libraries are supported: zlib (default) and zstd (if Postgres was + configured with --with-zstd option). In both cases, streaming mode is used. + By default, compression is not requested by the client. + Please note that using compression together with SSL may expose extra vulnerabilities: + CRIME + + + + client_encoding diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 2f4dde3beb..d4929f4313 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -92,6 +92,15 @@ such as COPY. + + It is possible to compress protocol data to reduce traffic and speed-up client-server interaction. + Compression is especially useful for importing/exporting data to/from the database using the COPY command + and for replication (both physical and logical). Compression can also reduce the server's response time + for queries returning a large amount of data (for example, JSON, BLOBs, text, ...). + Currently, two libraries are supported: zlib (default) and zstd (if Postgres was + configured with --with-zstd option). + + Messaging Overview @@ -262,6 +271,22 @@ + + CompressionAck + + + The server accepts the client's compression request. + Compression is requested when a client connection includes the "compression" option, which includes + a list of requested compression algorithms. + If the server accepts one of these algorithms, it acknowledges use of compression and + all subsequent libpq messages between the client and server will be compressed. + The server chooses an algorithm from the list specified by client and responds with the index of the chosen algorithm from the client-supplied list. + If the server does not accept any of the requested algorithms, then it replies with an index of -1 + and it is up to the client whether to continue without compression or to report an error. + + + + AuthenticationOk @@ -3423,6 +3448,59 @@ following: + + +CompressionAck (B) + + + + + + + + Byte1('z') + + + + Acknowledge use of compression for protocol data. The client sends to the server a list of requested compression algorithms. + If the server supports any of these algorithms, it acknowledges use of this algorithm and all subsequent libpq messages between client and server + will be compressed. + The server selects the preferred algorithm from the list specified by client and responds with the + index of the chosen algorithm in this list. + If the server does not support any of the requested algorithms, it replies with -1 + and it is up to the client whether to continue without compression or to report an error. + After receiving this message with algorithm index other than -1, both server and client switch to compressed mode + and exchange compressed messages. + + + + + + Int32 + + + + Length of message contents in bytes, including self. + + + + + + Byte1 + + + + Index of algorithm in the list of supported algorithms specified by client or -1 if none of them are supported. + + + + + + + + + + @@ -5989,6 +6067,21 @@ StartupMessage (F) + + + _pq_.compression + + + + Request compression of libpq traffic. The value is a list of compression algorithms requested by the client with an optional + specification of compression level: "zlib,zstd:5". + If the server does not accept compression, the backend will ignore the _pq_.compression + parameter and will not send the CompressionAck message to the frontend. + By default, compression is disabled. Please note that using compression together with SSL may expose extra vulnerabilities: + CRIME. + + + In addition to the above, other parameters may be listed. diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 70e578894f..d8f8e59b42 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -840,6 +840,15 @@ CREATE VIEW pg_stat_activity AS LEFT JOIN pg_database AS D ON (S.datid = D.oid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); +CREATE VIEW pg_stat_network_traffic AS + SELECT + S.pid, + S.rx_raw_bytes, + S.tx_raw_bytes, + S.rx_compressed_bytes, + S.tx_compressed_bytes + FROM pg_stat_get_activity(NULL) AS S; + CREATE VIEW pg_stat_replication AS SELECT S.pid, diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 4cd6d6dfbb..8da3084130 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -60,6 +60,7 @@ #include #include #include +#include #include #include #include @@ -77,11 +78,14 @@ #include "common/ip.h" #include "libpq/libpq.h" +#include "libpq/pqformat.h" #include "miscadmin.h" #include "port/pg_bswap.h" #include "storage/ipc.h" #include "utils/guc.h" #include "utils/memutils.h" +#include "utils/builtins.h" +#include "common/zpq_stream.h" /* * Cope with the various platform-specific ways to spell TCP keepalive socket @@ -107,6 +111,7 @@ */ int Unix_socket_permissions; char *Unix_socket_group; +bool libpq_compression; /* Where the Unix socket files are (list of palloc'd strings) */ static List *sock_paths = NIL; @@ -130,6 +135,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE]; static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ static int PqRecvLength; /* End of data available in PqRecvBuffer */ +static ZpqStream * PqStream; + + /* * Message status */ @@ -167,6 +175,116 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods; WaitEventSet *FeBeWaitSet; +static ssize_t +write_compressed(void *arg, void const *data, size_t size) +{ + ssize_t rc = secure_write((Port *) arg, (void *) data, size); + + if (rc > 0) + pgstat_report_network_traffic(0, 0, 0, rc); + return rc; +} + +static ssize_t +read_compressed(void *arg, void *data, size_t size) +{ + ssize_t rc = secure_read((Port *) arg, data, size); + + if (rc > 0) + pgstat_report_network_traffic(0, 0, rc, 0); + return rc; +} + +/* + * Send to the client index of chosen compression algorithm + */ +static void +SendCompressionACK(int algorithm) +{ + StringInfoData buf; + + pq_beginmessage(&buf, 'z'); + pq_sendbyte(&buf, (uint8) algorithm); + pq_endmessage(&buf); + pq_flush(); +} + +/* -------------------------------- + * pq_configure - configure connection using port settings + * + * Right now only compression is toggled in the configure. + * Function returns 0 in case of success, non-null in case of error + * -------------------------------- + */ +int +pq_configure(Port *port) +{ + char *client_compression_algorithms = port->compression_algorithms; + + /* + * If client request compression, it sends list of supported compression + * algorithms separated by comma. + */ + if (client_compression_algorithms && libpq_compression) + { + int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL; + int impl = -1; + char **server_compression_algorithms = zs_get_supported_algorithms(); + int index = -1; + char *protocol_extension = strchr(client_compression_algorithms, ';'); + + /* No protocol extension are currently supported */ + if (protocol_extension) + *protocol_extension = '\0'; + + for (int i = 0; *client_compression_algorithms; i++) + { + char *sep = strchr(client_compression_algorithms, ','); + char *level; + + if (sep != NULL) + *sep = '\0'; + + level = strchr(client_compression_algorithms, ':'); + if (level != NULL) + { + *level = '\0'; /* compression level is ignored now */ + if (sscanf(level + 1, "%d", &compression_level) != 1) + ereport(LOG, + (errmsg("Invalid compression level: %s", level + 1))); + } + for (impl = 0; server_compression_algorithms[impl] != NULL; impl++) + { + if (pg_strcasecmp(client_compression_algorithms, server_compression_algorithms[impl]) == 0) + { + index = i; + goto SendCompressionAck; + } + } + + if (sep != NULL) + client_compression_algorithms = sep + 1; + else + break; + } +SendCompressionAck: + free(server_compression_algorithms); + SendCompressionACK(index); + + if (index >= 0) /* Use compression */ + { + PqStream = zpq_create(impl, compression_level, impl, write_compressed, read_compressed, MyProcPort, + NULL, 0); + if (!PqStream) + { + ereport(LOG, + (errmsg("Failed to initialize compressor %s", server_compression_algorithms[impl]))); + return -1; + } + } + } + return 0; +} /* -------------------------------- * pq_init - initialize libpq at backend startup @@ -270,6 +388,9 @@ socket_close(int code, Datum arg) } #endif /* ENABLE_GSS */ + /* Release compression streams */ + zpq_free(PqStream); + /* * Cleanly shut down SSL layer. Nowhere else does a postmaster child * call this, so this is safe when interrupting BackendInitialize(). @@ -924,12 +1045,15 @@ socket_set_nonblocking(bool nonblocking) /* -------------------------------- * pq_recvbuf - load some bytes into the input buffer * - * returns 0 if OK, EOF if trouble + * nowait parameter toggles non-blocking mode. + * returns number of read bytes, EOF if trouble * -------------------------------- */ static int -pq_recvbuf(void) +pq_recvbuf(bool nowait) { + int r; + if (PqRecvPointer > 0) { if (PqRecvLength > PqRecvPointer) @@ -945,21 +1069,40 @@ pq_recvbuf(void) } /* Ensure that we're in blocking mode */ - socket_set_nonblocking(false); + socket_set_nonblocking(nowait); /* Can fill buffer from PqRecvLength and upwards */ for (;;) { - int r; - - r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, - PQ_RECV_BUFFER_SIZE - PqRecvLength); + /* + * If streaming compression is enabled then use correspondent + * compression read function. + */ + r = PqStream + ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength, + PQ_RECV_BUFFER_SIZE - PqRecvLength) + : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, + PQ_RECV_BUFFER_SIZE - PqRecvLength); if (r < 0) { + if (r == ZS_DECOMPRESS_ERROR) + { + char const *msg = zpq_decompress_error(PqStream); + + if (msg == NULL) + msg = "end of stream"; + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("failed to decompress data: %s", msg))); + return EOF; + } if (errno == EINTR) continue; /* Ok if interrupted */ + if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK)) + return 0; + /* * Careful: an ereport() that tries to write to the client would * cause recursion to here, leading to stack overflow and core @@ -980,7 +1123,8 @@ pq_recvbuf(void) } /* r contains number of bytes read, so just incr length */ PqRecvLength += r; - return 0; + pgstat_report_network_traffic(r, 0, 0, 0); + return r; } } @@ -995,7 +1139,8 @@ pq_getbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer++]; @@ -1014,7 +1159,8 @@ pq_peekbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer]; @@ -1031,48 +1177,15 @@ pq_peekbyte(void) int pq_getbyte_if_available(unsigned char *c) { - int r; + int r = 0; Assert(PqCommReadingMsg); - if (PqRecvPointer < PqRecvLength) + if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0) { *c = PqRecvBuffer[PqRecvPointer++]; return 1; } - - /* Put the socket into non-blocking mode */ - socket_set_nonblocking(true); - - r = secure_read(MyProcPort, c, 1); - if (r < 0) - { - /* - * Ok if no data available without blocking or interrupted (though - * EINTR really shouldn't happen with a non-blocking socket). Report - * other errors. - */ - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) - r = 0; - else - { - /* - * Careful: an ereport() that tries to write to the client would - * cause recursion to here, leading to stack overflow and core - * dump! This message must go *only* to the postmaster log. - */ - ereport(COMMERROR, - (errcode_for_socket_access(), - errmsg("could not receive data from client: %m"))); - r = EOF; - } - } - else if (r == 0) - { - /* EOF detected */ - r = EOF; - } - return r; } @@ -1093,7 +1206,8 @@ pq_getbytes(char *s, size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1127,7 +1241,8 @@ pq_discardbytes(size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1139,7 +1254,6 @@ pq_discardbytes(size_t len) return 0; } - /* -------------------------------- * pq_startmsgread - begin reading a message from the client. * @@ -1344,13 +1458,24 @@ internal_flush(void) char *bufptr = PqSendBuffer + PqSendStart; char *bufend = PqSendBuffer + PqSendPointer; - while (bufptr < bufend) + while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0) + + /* + * has more data to flush or unsent data in internal compression + * buffer + */ { int r; + size_t processed = 0; + size_t available = bufend - bufptr; - r = secure_write(MyProcPort, bufptr, bufend - bufptr); + r = PqStream + ? zpq_write(PqStream, bufptr, available, &processed) + : secure_write(MyProcPort, bufptr, available); + bufptr += processed; + PqSendStart += processed; - if (r <= 0) + if (r < 0 || (r == 0 && available)) { if (errno == EINTR) continue; /* Ok if we were interrupted */ @@ -1393,12 +1518,12 @@ internal_flush(void) InterruptPending = 1; return EOF; } + pgstat_report_network_traffic(0, r, 0, 0); last_reported_send_errno = 0; /* reset after any successful send */ bufptr += r; PqSendStart += r; } - PqSendStart = PqSendPointer = 0; return 0; } @@ -1415,7 +1540,7 @@ socket_flush_if_writable(void) int res; /* Quick exit if nothing to do */ - if (PqSendPointer == PqSendStart) + if ((PqSendPointer == PqSendStart) && (zpq_buffered_tx(PqStream) == 0)) return 0; /* No-op if reentrant call */ @@ -1438,7 +1563,7 @@ socket_flush_if_writable(void) static bool socket_is_send_pending(void) { - return (PqSendStart < PqSendPointer); + return (PqSendStart < PqSendPointer || (zpq_buffered_tx(PqStream) != 0)); } /* -------------------------------- @@ -1961,3 +2086,16 @@ pq_check_connection(void) return true; } + +PG_FUNCTION_INFO_V1(pg_compression_algorithm); + +Datum +pg_compression_algorithm(PG_FUNCTION_ARGS) +{ + char const *algorithm_name = PqStream ? zpq_compress_algorithm_name(PqStream) : NULL; + + if (algorithm_name) + PG_RETURN_TEXT_P(cstring_to_text(algorithm_name)); + else + PG_RETURN_NULL(); +} diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index b05db5a473..93a8321e95 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -2149,6 +2149,8 @@ retry1: port->database_name = pstrdup(valptr); else if (strcmp(nameptr, "user") == 0) port->user_name = pstrdup(valptr); + else if (strcmp(nameptr, "_pq_.compression") == 0) + port->compression_algorithms = pstrdup(valptr); else if (strcmp(nameptr, "options") == 0) port->cmdline_options = pstrdup(valptr); else if (strcmp(nameptr, "replication") == 0) @@ -4443,6 +4445,14 @@ BackendInitialize(Port *port) if (status != STATUS_OK) proc_exit(0); + if (pq_configure(port)) + { + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("failed to send compression message: %m"))); + proc_exit(0); + } + /* * Now that we have the user and database name, we can set the process * title for ps. It's good to do this as early as possible in startup. diff --git a/src/backend/utils/activity/backend_status.c b/src/backend/utils/activity/backend_status.c index a368101103..3c35a65ba8 100644 --- a/src/backend/utils/activity/backend_status.c +++ b/src/backend/utils/activity/backend_status.c @@ -338,6 +338,9 @@ pgstat_bestart(void) lbeentry.st_xact_start_timestamp = 0; lbeentry.st_databaseid = MyDatabaseId; + lbeentry.st_tx_raw_bytes = lbeentry.st_rx_raw_bytes = + lbeentry.st_tx_compressed_bytes = lbeentry.st_rx_compressed_bytes = 0; + /* We have userid for client-backends, wal-sender and bgworker processes */ if (lbeentry.st_backendType == B_BACKEND || lbeentry.st_backendType == B_WAL_SENDER @@ -713,6 +716,33 @@ pgstat_report_xact_timestamp(TimestampTz tstamp) PGSTAT_END_WRITE_ACTIVITY(beentry); } +/* + * Report current transaction start timestamp as the specified value. + * Zero means there is no active transaction. + */ +void +pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes) +{ + volatile PgBackendStatus *beentry = MyBEEntry; + + if (!pgstat_track_activities || !beentry) + return; + + /* + * Update my status entry, following the protocol of bumping + * st_changecount before and after. We use a volatile pointer here to + * ensure the compiler doesn't try to get cute. + */ + PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); + + beentry->st_rx_raw_bytes += rx_raw_bytes; + beentry->st_tx_raw_bytes += tx_raw_bytes; + beentry->st_rx_compressed_bytes += rx_compressed_bytes; + beentry->st_tx_compressed_bytes += tx_compressed_bytes; + + PGSTAT_END_WRITE_ACTIVITY(beentry); +} + /* ---------- * pgstat_read_current_status() - * diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 87f02d572e..af52c70080 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -569,7 +569,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) Datum pg_stat_get_activity(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_ACTIVITY_COLS 30 +#define PG_STAT_GET_ACTIVITY_COLS 34 int num_backends = pgstat_fetch_stat_numbackends(); int curr_backend; int pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0); @@ -918,6 +918,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS) nulls[29] = true; else values[29] = UInt64GetDatum(beentry->st_query_id); + values[30] = beentry->st_rx_raw_bytes; + values[31] = beentry->st_tx_raw_bytes; + values[32] = beentry->st_rx_compressed_bytes; + values[33] = beentry->st_tx_compressed_bytes; } else { @@ -946,6 +950,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS) nulls[27] = true; nulls[28] = true; nulls[29] = true; + nulls[30] = true; + nulls[31] = true; + nulls[32] = true; + nulls[33] = true; } tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -1145,6 +1153,46 @@ pg_stat_get_backend_start(PG_FUNCTION_ARGS) PG_RETURN_TIMESTAMPTZ(result); } +Datum +pg_stat_get_network_traffic(PG_FUNCTION_ARGS) +{ +#define PG_STAT_NETWORK_TRAFFIC_COLS 4 + TupleDesc tupdesc; + Datum values[PG_STAT_NETWORK_TRAFFIC_COLS]; + bool nulls[PG_STAT_NETWORK_TRAFFIC_COLS]; + int32 beid = PG_GETARG_INT32(0); + PgBackendStatus *beentry; + + if ((beentry = pgstat_fetch_stat_beentry(beid)) == NULL) + PG_RETURN_NULL(); + else if (!HAS_PGSTAT_PERMISSIONS(beentry->st_userid)) + PG_RETURN_NULL(); + + /* Initialise values and NULL flags arrays */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(PG_STAT_NETWORK_TRAFFIC_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "rx_raw_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "tx_raw_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "rx_compressed_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "tx_compressed_bytes", + INT8OID, -1, 0); + BlessTupleDesc(tupdesc); + + /* Fill values and NULLs */ + values[0] = beentry->st_rx_raw_bytes; + values[1] = beentry->st_tx_raw_bytes; + values[2] = beentry->st_rx_compressed_bytes; + values[3] = beentry->st_tx_compressed_bytes; + + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} Datum pg_stat_get_backend_client_addr(PG_FUNCTION_ARGS) diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index b130874bdc..ab614d1c6c 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1358,6 +1358,16 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"libpq_compression", PGC_SIGHUP, CLIENT_CONN_OTHER, + gettext_noop("Compress client-server traffic."), + NULL + }, + &libpq_compression, + true, + NULL, NULL, NULL + }, + { {"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT, gettext_noop("Logs each checkpoint."), diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index da1d9ec535..118be0cefa 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -6610,6 +6610,9 @@ threadRun(void *arg) int nsocks; /* number of sockets to be waited for */ pg_time_usec_t min_usec; pg_time_usec_t now = 0; /* set this only if needed */ + bool buffered_rx = false; /* true if some of the clients has + * data left in SSL/ZPQ read + * buffers */ /* * identify which client sockets should be checked for input, and @@ -6644,6 +6647,9 @@ threadRun(void *arg) */ int sock = PQsocket(st->con); + /* check if conn has buffered SSL / ZPQ read data */ + buffered_rx = buffered_rx || PQreadPending(st->con); + if (sock < 0) { pg_log_error("invalid socket: %s", PQerrorMessage(st->con)); @@ -6687,7 +6693,7 @@ threadRun(void *arg) { if (nsocks > 0) { - rc = wait_on_socket_set(sockets, min_usec); + rc = buffered_rx ? 1 : wait_on_socket_set(sockets, min_usec); } else /* nothing active, simple sleep */ { @@ -6696,7 +6702,7 @@ threadRun(void *arg) } else /* no explicit delay, wait without timeout */ { - rc = wait_on_socket_set(sockets, 0); + rc = buffered_rx ? 1 : wait_on_socket_set(sockets, 0); } if (rc < 0) @@ -6735,8 +6741,11 @@ threadRun(void *arg) pg_log_error("invalid socket: %s", PQerrorMessage(st->con)); goto done; } - - if (!socket_has_input(sockets, sock, nsocks++)) + if (PQreadPending(st->con)) + { + nsocks++; + } + else if (!socket_has_input(sockets, sock, nsocks++)) continue; } else if (st->state == CSTATE_FINISHED || diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c index 543401c6d6..b621a4a09c 100644 --- a/src/bin/psql/command.c +++ b/src/bin/psql/command.c @@ -168,6 +168,7 @@ static void print_with_linenumbers(FILE *output, char *lines, const char *header_keyword); static void minimal_error_message(PGresult *res); +static void printCompressionInfo(void); static void printSSLInfo(void); static void printGSSInfo(void); static bool printPsetInfo(const char *param, printQueryOpt *popt); @@ -628,6 +629,7 @@ exec_command_conninfo(PsqlScanState scan_state, bool active_branch) printf(_("You are connected to database \"%s\" as user \"%s\" on host \"%s\" at port \"%s\".\n"), db, PQuser(pset.db), host, PQport(pset.db)); } + printCompressionInfo(); printSSLInfo(); printGSSInfo(); } @@ -3581,6 +3583,27 @@ connection_warnings(bool in_startup) } } +/* + * printCompressionInfo + * + * Print information about used compressor/decompressor + */ +static void +printCompressionInfo(void) +{ + char *compressor = PQcompressor(pset.db); + char *decompressor = PQdecompressor(pset.db); + + if (compressor != NULL) + { + printf(_("Compressor %s\n"), compressor); + } + + if (decompressor != NULL) + { + printf(_("Decompressor %s\n"), decompressor); + } +} /* * printSSLInfo diff --git a/src/common/Makefile b/src/common/Makefile index abc5a72325..c2720d4191 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -79,7 +79,8 @@ OBJS_COMMON = \ username.o \ wait_error.o \ wchar.o \ - z_stream.o + z_stream.o \ + zpq_stream.o ifeq ($(with_ssl),openssl) OBJS_COMMON += \ diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c new file mode 100644 index 0000000000..6534075516 --- /dev/null +++ b/src/common/zpq_stream.c @@ -0,0 +1,614 @@ +#include "postgres_fe.h" +#include "common/zpq_stream.h" +#include "c.h" +#include "pg_config.h" +#include "port/pg_bswap.h" +#include "common/z_stream.h" + +#define ZPQ_BUFFER_SIZE 819200 /* We have to flush stream after each + * protocol command and command is mostly + * limited by record length, which in turn + * is usually less than page size (except + * TOAST) */ +#define ZPQ_COMPRESSED_MSG_TYPE 'm' +#define ZPQ_COMPRESS_THRESHOLD 60 + +typedef struct ZpqBuffer ZpqBuffer; + +struct ZpqBuffer +{ + char buf[ZPQ_BUFFER_SIZE]; + size_t size; + size_t pos; +}; + +static inline void +zpq_buf_init(ZpqBuffer * zb) +{ + zb->size = 0; + zb->pos = 0; +} + +static inline size_t +zpq_buf_left(ZpqBuffer * zb) +{ + Assert(zb->buf); + return ZPQ_BUFFER_SIZE - zb->size; +} + +static inline size_t +zpq_buf_unread(ZpqBuffer * zb) +{ + return zb->size - zb->pos; +} + +static inline char * +zpq_buf_size(ZpqBuffer * zb) +{ + return (char *) (zb->buf) + zb->size; +} + +static inline char * +zpq_buf_pos(ZpqBuffer * zb) +{ + return (char *) (zb->buf) + zb->pos; +} + +static inline void +zpq_buf_size_advance(ZpqBuffer * zb, size_t value) +{ + zb->size += value; +} + +static inline void +zpq_buf_pos_advance(ZpqBuffer * zb, size_t value) +{ + zb->pos += value; +} + +static inline void +zpq_buf_reuse(ZpqBuffer * zb) +{ + size_t unread = zpq_buf_unread(zb); + + if (unread > 5) /* can read message header, don't do anything */ + return; + if (unread == 0) + { + zb->size = 0; + zb->pos = 0; + return; + } + memmove(zb->buf, zb->buf + zb->pos, unread); + zb->size = unread; + zb->pos = 0; +} + +struct ZpqStream +{ + ZStream *z_stream; /* underlying compression stream */ + + size_t tx_total; /* amount of bytes sent to tx_func */ + + size_t tx_total_raw; /* amount of bytes received by zpq_write */ + size_t rx_total; /* amount of bytes read by rx_func */ + size_t rx_total_raw; /* amount of bytes returned by zpq_write */ + bool is_compressing; /* current compression state */ + + bool is_decompressing; /* current decompression state */ + size_t rx_msg_bytes_left; /* number of bytes left to process without + * + * changing the decompression state */ + size_t tx_msg_bytes_left; /* number of bytes left to process without + * changing the compression state */ + + ZpqBuffer rx_in; /* buffer for unprocessed data read by rx_func */ + ZpqBuffer tx_in; /* buffer for unprocessed data consumed by + * zpq_write */ + ZpqBuffer tx_out; /* buffer for processed data waiting for send + * via tx_func */ + + zpq_rx_func rx_func; + zpq_tx_func tx_func; + void *arg; +}; + +/* + * Check if should compress message of msg_type with msg_len. + * Return true if should, false if should not. + */ +static inline bool +zpq_should_compress(char msg_type, uint32 msg_len) +{ + return msg_len >= ZPQ_COMPRESS_THRESHOLD && (msg_type == 'd' || msg_type == 'D'); +} + +/* + * Check if message is a CompressedMessage. + * Return true if it is, otherwise false. + * */ +static inline bool +zpq_is_compressed_message(char msg_type) +{ + return msg_type == ZPQ_COMPRESSED_MSG_TYPE; +} + +ZpqStream * +zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size) +{ + ZpqStream *zpq = (ZpqStream *) malloc(sizeof(ZpqStream)); + + zpq->is_compressing = false; + zpq->is_decompressing = false; + zpq->rx_msg_bytes_left = 0; + zpq->tx_msg_bytes_left = 0; + zpq_buf_init(&zpq->tx_in); + + zpq->tx_total = 0; + zpq->tx_total_raw = 0; + zpq->rx_total = 0; + zpq->rx_total_raw = 0; + + zpq_buf_init(&zpq->rx_in); + zpq_buf_size_advance(&zpq->rx_in, rx_data_size); + Assert(rx_data_size < ZPQ_BUFFER_SIZE); + memcpy(zpq->rx_in.buf, rx_data, rx_data_size); + + zpq_buf_init(&zpq->tx_out); + + zpq->rx_func = rx_func; + zpq->tx_func = tx_func; + zpq->arg = arg; + + zpq->z_stream = zs_create(c_alg_impl, c_level, d_alg_impl); + if (zpq->z_stream == NULL) + { + free(zpq); + return NULL; + } + return zpq; +} + +/* Compress up to src_size bytes from *src into CompressedMessage and write it to the tx buffer. + * Returns ZS_OK on success, ZS_COMPRESS_ERROR if encountered a compression error. */ +static inline ssize_t +zpq_write_compressed_message(ZpqStream * zpq, char const *src, size_t src_size, size_t *src_processed) +{ + size_t compressed_len; + ssize_t rc; + uint32 size; + + /* check if have enough space */ + if (zpq_buf_left(&zpq->tx_out) <= 5) + { + /* too little space for CompressedMessage, abort */ + *src_processed = 0; + return ZS_OK; + } + + compressed_len = 0; + rc = zs_write(zpq->z_stream, src, src_size, src_processed, + zpq_buf_size(&zpq->tx_out) + 5, zpq_buf_left(&zpq->tx_out) - 5, &compressed_len); + + if (compressed_len > 0) + { + /* write CompressedMessage type */ + *zpq_buf_size(&zpq->tx_out) = ZPQ_COMPRESSED_MSG_TYPE; + size = pg_hton32(compressed_len + 4); + + memcpy(zpq_buf_size(&zpq->tx_out) + 1, &size, sizeof(uint32)); /* write msg length */ + compressed_len += 5; /* append header length to compressed data + * length */ + } + + zpq->tx_total_raw += *src_processed; + zpq->tx_total += compressed_len; + + zpq_buf_size_advance(&zpq->tx_out, compressed_len); + return rc; +} + +/* Copy the data directly from *src to the tx buffer */ +static void +zpq_write_uncompressed(ZpqStream * zpq, char const *src, size_t src_size, size_t *src_processed) +{ + src_size = Min(zpq_buf_left(&zpq->tx_out), src_size); + memcpy(zpq_buf_size(&zpq->tx_out), src, src_size); + + zpq->tx_total_raw += src_size; + zpq->tx_total += src_size; + zpq_buf_size_advance(&zpq->tx_out, src_size); + *src_processed = src_size; +} + +/* Determine if should compress the next message and + * change the current compression state */ +static ssize_t +zpq_toggle_compression(ZpqStream * zpq, char msg_type, uint32 msg_len) +{ + if (zpq_should_compress(msg_type, msg_len)) + { + zpq->is_compressing = true; + } + else if (zpq->is_compressing) + { + /* + * Won't compress the next message, should now finish the compression. + * Make sure there is no buffered data left in underlying compression + * stream + */ + while (zs_buffered_tx(zpq->z_stream)) + { + size_t flushed_len = 0; + ssize_t flush_rc = zpq_write_compressed_message(zpq, NULL, 0, &flushed_len); + + if (flush_rc != ZS_OK) + { + return flush_rc; + } + } + zpq->is_compressing = false; + } + zpq->tx_msg_bytes_left = msg_len + 1; + return 0; +} + +/* + * Internal write function. Reads the data from *src buffer, + * determines the postgres messages type and length. + * If message matches the compression criteria, it wraps the message into + * CompressedMessage. Otherwise, leaves the message unchanged. + * If *src data ends with incomplete message header, this function is not + * going to read this message header. + * Returns number of written raw bytes or error code. + * In the last case number of bytes written is stored in *processed. + */ +static ssize_t +zpq_write_internal(ZpqStream * zpq, void const *src, size_t src_size, size_t *processed) +{ + size_t src_pos = 0; + ssize_t rc; + + do + { + /* + * try to read ahead the next message types and increase + * tx_msg_bytes_left, if possible + */ + while (zpq->tx_msg_bytes_left > 0 && src_size - src_pos >= zpq->tx_msg_bytes_left + 5) + { + char msg_type = *((char *) src + src_pos + zpq->tx_msg_bytes_left); + uint32 msg_len; + + memcpy(&msg_len, (char *) src + src_pos + zpq->tx_msg_bytes_left + 1, 4); + msg_len = pg_ntoh32(msg_len); + if (zpq_should_compress(msg_type, msg_len) != zpq->is_compressing) + { + /* + * cannot proceed further, encountered compression toggle + * point + */ + break; + } + zpq->tx_msg_bytes_left += msg_len + 1; + } + + /* + * Write CompressedMessage if currently is compressing or have some + * buffered data left in underlying compression stream + */ + if (zs_buffered_tx(zpq->z_stream) || (zpq->is_compressing && zpq->tx_msg_bytes_left > 0)) + { + size_t buf_processed = 0; + size_t to_compress = Min(zpq->tx_msg_bytes_left, src_size - src_pos); + + rc = zpq_write_compressed_message(zpq, (char *) src + src_pos, to_compress, &buf_processed); + src_pos += buf_processed; + zpq->tx_msg_bytes_left -= buf_processed; + + if (rc != ZS_OK) + { + *processed = src_pos; + return rc; + } + } + + /* + * If not going to compress the data from *src, just write it + * uncompressed. + */ + else if (zpq->tx_msg_bytes_left > 0) + { /* determine next message type */ + size_t copy_len = Min(src_size - src_pos, zpq->tx_msg_bytes_left); + size_t copy_processed = 0; + + zpq_write_uncompressed(zpq, (char *) src + src_pos, copy_len, ©_processed); + src_pos += copy_processed; + zpq->tx_msg_bytes_left -= copy_processed; + } + + /* + * Reached the compression toggle point, fetch next message header to + * determine compression state. + */ + else + { + char msg_type; + uint32 msg_len; + + if (src_size - src_pos < 5) + { + /* + * must return here because we can't continue without full + * message header + */ + *processed = src_pos; + return ZPQ_INCOMPLETE_HEADER; + } + + msg_type = *((char *) src + src_pos); + memcpy(&msg_len, (char *) src + src_pos + 1, 4); + msg_len = pg_ntoh32(msg_len); + rc = zpq_toggle_compression(zpq, msg_type, msg_len); + if (rc) + { + return rc; + } + } + + /* + * repeat sending while there is some data in input or internal + * compression buffer + */ + } while (src_pos < src_size && zpq_buf_left(&zpq->tx_out) > 5); + + return src_pos; +} + +ssize_t +zpq_write(ZpqStream * zpq, void const *src, size_t src_size, size_t *src_processed) +{ + size_t src_pos = 0; + ssize_t rc; + + while (zpq_buf_left(&zpq->tx_out) > 5) + { + size_t copy_len = Min(zpq_buf_left(&zpq->tx_in), src_size - src_pos); + size_t processed; + + memcpy(zpq_buf_size(&zpq->tx_in), (char *) src + src_pos, copy_len); + zpq_buf_size_advance(&zpq->tx_in, copy_len); + src_pos += copy_len; + + if (zpq_buf_unread(&zpq->tx_in) == 0 && !zs_buffered_tx(zpq->z_stream)) { + break; + } + + processed = 0; + + rc = zpq_write_internal(zpq, zpq_buf_pos(&zpq->tx_in), zpq_buf_unread(&zpq->tx_in), &processed); + if (rc > 0) + { + zpq_buf_pos_advance(&zpq->tx_in, rc); + zpq_buf_reuse(&zpq->tx_in); + } + else + { + zpq_buf_pos_advance(&zpq->tx_in, processed); + zpq_buf_reuse(&zpq->tx_in); + if (rc == ZPQ_INCOMPLETE_HEADER) { + break; + } + *src_processed = src_pos; + return rc; + } + } + + /* + * call the tx_func if have any bytes to send + */ + while (zpq_buf_unread(&zpq->tx_out)) + { + rc = zpq->tx_func(zpq->arg, zpq_buf_pos(&zpq->tx_out), zpq_buf_unread(&zpq->tx_out)); + if (rc > 0) + { + zpq_buf_pos_advance(&zpq->tx_out, rc); + } + else + { + *src_processed = src_pos; + zpq_buf_reuse(&zpq->tx_out); + return rc; + } + } + + zpq_buf_reuse(&zpq->tx_out); + return src_pos; +} + + +/* Decompress bytes from RX buffer and write up to dst_len of uncompressed data to *dst. + * Returns: + * ZS_OK on success, + * ZS_STREAM_END if reached end of compressed chunk + * ZS_DECOMPRESS_ERROR if encountered a decompression error */ +static inline ssize_t +zpq_read_compressed_message(ZpqStream * zpq, char *dst, size_t dst_len, size_t *dst_processed) +{ + size_t rx_processed = 0; + ssize_t rc; + size_t read_len = Min(zpq->rx_msg_bytes_left, zpq_buf_unread(&zpq->rx_in)); + + rc = zs_read(zpq->z_stream, zpq_buf_pos(&zpq->rx_in), read_len, &rx_processed, + dst, dst_len, dst_processed); + + zpq_buf_pos_advance(&zpq->rx_in, rx_processed); + zpq->rx_total_raw += *dst_processed; + zpq->rx_msg_bytes_left -= rx_processed; + return rc; +} + +/* Copy up to dst_len bytes from rx buffer to *dst. + * Returns amount of bytes copied. */ +static inline size_t +zpq_read_uncompressed(ZpqStream * zpq, char *dst, size_t dst_len) +{ + size_t copy_len; + Assert(zpq_buf_unread(&zpq->rx_in) > 0); + copy_len = Min(zpq->rx_msg_bytes_left, Min(zpq_buf_unread(&zpq->rx_in), dst_len)); + + memcpy(dst, zpq_buf_pos(&zpq->rx_in), copy_len); + + zpq_buf_pos_advance(&zpq->rx_in, copy_len); + zpq->rx_total_raw += copy_len; + zpq->rx_msg_bytes_left -= copy_len; + return copy_len; +} + +/* Determine if should decompress the next message and + * change the current decompression state */ +static inline void +zpq_toggle_decompression(ZpqStream * zpq) +{ + uint32 msg_len; + char msg_type = *zpq_buf_pos(&zpq->rx_in); + + zpq->is_decompressing = zpq_is_compressed_message(msg_type); + + memcpy(&msg_len, zpq_buf_pos(&zpq->rx_in) + 1, 4); + zpq->rx_msg_bytes_left = pg_ntoh32(msg_len) + 1; + + if (zpq->is_decompressing) + { + /* compressed message header is no longer needed, just skip it */ + zpq_buf_pos_advance(&zpq->rx_in, 5); + zpq->rx_msg_bytes_left -= 5; + } +} + +ssize_t +zpq_read(ZpqStream * zpq, void *dst, size_t dst_size) +{ + size_t dst_pos = 0; + size_t dst_processed = 0; + ssize_t rc; + + /* Read until some data fetched */ + while (dst_pos == 0) + { + zpq_buf_reuse(&zpq->rx_in); + + if (!zpq_buffered_rx(zpq)) + { + rc = zpq->rx_func(zpq->arg, zpq_buf_size(&zpq->rx_in), zpq_buf_left(&zpq->rx_in)); + if (rc > 0) /* read fetches some data */ + { + zpq->rx_total += rc; + zpq_buf_size_advance(&zpq->rx_in, rc); + } + else /* read failed */ + { + return rc; + } + } + + /* + * try to read ahead the next message types and increase + * rx_msg_bytes_left, if possible + */ + while (zpq->rx_msg_bytes_left > 0 && (zpq_buf_unread(&zpq->rx_in) >= zpq->rx_msg_bytes_left + 5)) + { + char msg_type; + uint32 msg_len; + + msg_type = *(zpq_buf_pos(&zpq->rx_in) + zpq->rx_msg_bytes_left); + if (zpq->is_decompressing || zpq_is_compressed_message(msg_type)) + { + /* + * cannot proceed further, encountered compression toggle + * point + */ + break; + } + + memcpy(&msg_len, zpq_buf_pos(&zpq->rx_in) + zpq->rx_msg_bytes_left + 1, 4); + zpq->rx_msg_bytes_left += pg_ntoh32(msg_len) + 1; + } + + + if (zpq->rx_msg_bytes_left > 0 || zs_buffered_rx(zpq->z_stream)) + { + dst_processed = 0; + if (zpq->is_decompressing || zs_buffered_rx(zpq->z_stream)) + { + rc = zpq_read_compressed_message(zpq, dst, dst_size - dst_pos, &dst_processed); + dst_pos += dst_processed; + if (rc == ZS_STREAM_END) + { + continue; + } + if (rc != ZS_OK) + { + return rc; + } + } + else + dst_pos += zpq_read_uncompressed(zpq, dst, dst_size - dst_pos); + } + else if (zpq_buf_unread(&zpq->rx_in) >= 5) + zpq_toggle_decompression(zpq); + } + return dst_pos; +} + +bool +zpq_buffered_rx(ZpqStream * zpq) +{ + return zpq ? zpq_buf_unread(&zpq->rx_in) >= 5 || (zpq_buf_unread(&zpq->rx_in) > 0 && zpq->rx_msg_bytes_left > 0) || zs_buffered_rx(zpq->z_stream) : 0; +} + +bool +zpq_buffered_tx(ZpqStream * zpq) +{ + return zpq ? zpq_buf_unread(&zpq->tx_in) >= 5 || (zpq_buf_unread(&zpq->tx_in) > 0 && zpq->tx_msg_bytes_left > 0) || zpq_buf_unread(&zpq->tx_out) > 0 || + zs_buffered_tx(zpq->z_stream) : 0; +} + + + +void +zpq_free(ZpqStream * zpq) +{ + if (zpq) + { + if (zpq->z_stream) + { + zs_free(zpq->z_stream); + } + free(zpq); + } +} + +char const * +zpq_compress_error(ZpqStream * zpq) +{ + return zs_compress_error(zpq->z_stream); +} + +char const * +zpq_decompress_error(ZpqStream * zpq) +{ + return zs_decompress_error(zpq->z_stream); +} + +char const * +zpq_compress_algorithm_name(ZpqStream * zpq) +{ + return zs_compress_algorithm_name(zpq->z_stream); +} + +char const * +zpq_decompress_algorithm_name(ZpqStream * zpq) +{ + return zs_decompress_algorithm_name(zpq->z_stream); +} diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index b1ee078a1d..0d2db8d749 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5279,9 +5279,9 @@ proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'int4', - proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,text,numeric,text,bool,text,bool,int4,int8}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,query_id}', + proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,text,numeric,text,bool,text,bool,int4,int8,int8,int8,int8,int8}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,query_id,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}', prosrc => 'pg_stat_get_activity' }, { oid => '3318', descr => 'statistics: information about progress of backends running maintenance command', @@ -5581,6 +5581,14 @@ proargnames => '{wal_records,wal_fpi,wal_bytes,wal_buffers_full,wal_write,wal_sync,wal_write_time,wal_sync_time,stats_reset}', prosrc => 'pg_stat_get_wal' }, +{ oid => '9598', descr => 'statistics: information about network traffic', + proname => 'pg_stat_get_network_traffic', proisstrict => 'f', provolatile => 's', + proparallel => 'r', prorettype => 'record', proargtypes => 'int4', + proallargtypes => '{int4,int8,int8,int8,int8}', + proargmodes => '{i,o,o,o,o}', + proargnames => '{_beid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}', + prosrc => 'pg_stat_get_network_traffic' }, + { oid => '2306', descr => 'statistics: information about SLRU caches', proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', @@ -5712,6 +5720,10 @@ proname => 'pg_tablespace_location', provolatile => 's', prorettype => 'text', proargtypes => 'oid', prosrc => 'pg_tablespace_location' }, +{ oid => '9257', descr => 'connection compression algorithm', + proname => 'pg_compression_algorithm', provolatile => 's', prorettype => 'text', + proargtypes => '', prosrc => 'pg_compression_algorithm' }, + { oid => '1946', descr => 'convert bytea value into some ascii-only text string', proname => 'encode', prorettype => 'text', proargtypes => 'bytea text', diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h new file mode 100644 index 0000000000..d1c0ee8deb --- /dev/null +++ b/src/include/common/zpq_stream.h @@ -0,0 +1,86 @@ +/* + * zpq_stream.h + * Streaming compression for libpq + */ + +#include "z_stream.h" + +#ifndef ZPQ_STREAM_H +#define ZPQ_STREAM_H + +#include + + +#define ZPQ_DEFAULT_COMPRESSION_LEVEL (1) +#define ZPQ_INCOMPLETE_HEADER (-6) +struct ZpqStream; +typedef struct ZpqStream ZpqStream; + +typedef ssize_t (*zpq_tx_func) (void *arg, void const *data, size_t size); +typedef ssize_t (*zpq_rx_func) (void *arg, void *data, size_t size); + + +#endif + +/* + * Create compression stream with rx/tx function for reading/sending compressed data. + * c_alg_impl: index of chosen compression algorithm + * c_level: compression c_level + * d_alg_impl: index of chosen decompression algorithm + * tx_func: function for writing compressed data in underlying stream + * rx_func: function for reading compressed data from underlying stream + * arg: context passed to the function + * rx_data: received data (compressed data already fetched from input stream) + * rx_data_size: size of data fetched from input stream + */ +extern ZpqStream * zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size); + +/* + * Write up to "src_size" raw (decompressed) bytes. + * Returns number of written raw bytes or error code. + * Error code is either ZPQ_COMPRESS_ERROR or error code returned by the tx function. + * In the last case number of bytes written is stored in *src_processed. + */ +extern ssize_t zpq_write(ZpqStream * zpq, void const *src, size_t src_size, size_t *src_processed); + +/* + * Read up to "dst_size" raw (decompressed) bytes. + * Returns number of decompressed bytes or error code. + * Error code is either ZPQ_DECOMPRESS_ERROR or error code returned by the rx function. + */ +extern ssize_t zpq_read(ZpqStream * zpq, void *dst, size_t dst_size); + +/* + * Return true if non-flushed data left in internal rx decompression buffer. + */ +extern bool zpq_buffered_rx(ZpqStream * zpq); + +/* + * Return true if non-flushed data left in internal tx compression buffer. + */ +extern bool zpq_buffered_tx(ZpqStream * zpq); + +/* + * Free stream created by zs_create function. + */ +extern void zpq_free(ZpqStream * zpq); + +/* + * Get decompressor error message. + */ +extern char const *zpq_decompress_error(ZpqStream * zpq); + +/* + * Get compressor error message. + */ +extern char const *zpq_compress_error(ZpqStream * zpq); + +/* + * Get the name of chosen compression algorithm. + */ +extern char const *zpq_compress_algorithm_name(ZpqStream * zpq); + +/* + * Get the name of chosen decompression algorithm. + */ +extern char const *zpq_decompress_algorithm_name(ZpqStream * zpq); diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 02015efe13..bc3e47b51f 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -188,6 +188,9 @@ typedef struct Port int keepalives_count; int tcp_user_timeout; + char *compression_algorithms; /* Compression algorithms supported by + * client */ + /* * GSSAPI structures. */ diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 3ebbc8d665..dbd4957fc2 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -62,6 +62,7 @@ extern void StreamClose(pgsocket sock); extern void TouchSocketFiles(void); extern void RemoveSocketFiles(void); extern void pq_init(void); +extern int pq_configure(Port *port); extern int pq_getbytes(char *s, size_t len); extern void pq_startmsgread(void); extern void pq_endmsgread(void); diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index be9d970574..ec76348d0e 100644 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -136,6 +136,7 @@ typedef ProtocolVersion MsgType; typedef uint32 PacketLen; extern bool Db_user_namespace; +extern bool libpq_compression; /* * In protocol 3.0 and later, the startup packet length is not fixed, but diff --git a/src/include/utils/backend_status.h b/src/include/utils/backend_status.h index 0cbcc9c943..b9fc6b77cb 100644 --- a/src/include/utils/backend_status.h +++ b/src/include/utils/backend_status.h @@ -144,6 +144,12 @@ typedef struct PgBackendStatus /* application name; MUST be null-terminated */ char *st_appname; + /* client-server traffic information */ + uint64 st_rx_raw_bytes; + uint64 st_tx_raw_bytes; + uint64 st_rx_compressed_bytes; + uint64 st_tx_compressed_bytes; + /* * Current command string; MUST be null-terminated. Note that this string * possibly is truncated in the middle of a multi-byte character. As @@ -301,6 +307,7 @@ extern void pgstat_report_query_id(uint64 query_id, bool force); extern void pgstat_report_tempfile(size_t filesize); extern void pgstat_report_appname(const char *appname); extern void pgstat_report_xact_timestamp(TimestampTz tstamp); +extern void pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes); extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser); extern const char *pgstat_get_crashed_backend_activity(int pid, char *buffer, int buflen); diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index a00701f2c5..bcd1791ddc 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -184,3 +184,6 @@ PQexitPipelineMode 181 PQpipelineSync 182 PQpipelineStatus 183 PQtraceSetFlags 184 +PQcompressor 185 +PQdecompressor 186 +PQreadPending 187 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index aa654dd6a8..c78f8bbd2a 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -24,6 +24,7 @@ #include "common/ip.h" #include "common/link-canary.h" #include "common/scram-common.h" +#include "common/zpq_stream.h" #include "common/string.h" #include "fe-auth.h" #include "libpq-fe.h" @@ -340,6 +341,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = { "Replication", "D", 5, offsetof(struct pg_conn, replication)}, + {"compression", "PGCOMPRESSION", NULL, NULL, + "Libpq-compression", "", 16, + offsetof(struct pg_conn, compression)}, + {"target_session_attrs", "PGTARGETSESSIONATTRS", DefaultTargetSessionAttrs, NULL, "Target-Session-Attrs", "", 15, /* sizeof("prefer-standby") = 15 */ @@ -448,6 +453,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock; void pqDropConnection(PGconn *conn, bool flushInput) { + /* Release compression streams */ + zpq_free(conn->zpqStream); + conn->zpqStream = NULL; + /* Drop any SSL state */ pqsecure_close(conn); @@ -3236,11 +3245,85 @@ keep_going: /* We will come back to here until there is */ conn->inCursor = conn->inStart; - /* Read type byte */ - if (pqGetc(&beresp, conn)) + while (1) { - /* We'll come back when there is more data */ - return PGRES_POLLING_READING; + /* Read type byte */ + if (pqGetc(&beresp, conn)) + { + /* We'll come back when there is more data */ + return PGRES_POLLING_READING; + } + + if (beresp == 'z') /* Switch on compression */ + { + int index; + char resp; + + /* Read message length word */ + if (pqGetInt(&msgLength, 4, conn)) + { + /* We'll come back when there is more data */ + return PGRES_POLLING_READING; + } + if (msgLength != 5) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "expected compression algorithm specification message length is 5 bytes, but %d is received\n"), + msgLength); + goto error_return; + } + pqGetc(&resp, conn); + index = resp; + if (index == (char) -1) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "server does not support requested compression algorithms %s\n"), + conn->compression); + goto error_return; + } + /* Use unigned comparison to handle negative values */ + if ((unsigned)index >= conn->n_compressors) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "server returns incorrect compression algorithm index: %d\n"), + index); + goto error_return; + } + Assert(!conn->zpqStream); + conn->zpqStream = zpq_create(conn->compressors[index].impl, + conn->compressors[index].level, + conn->compressors[index].impl, + (zpq_tx_func) pqsecure_write, (zpq_rx_func) pqsecure_read, + conn, + &conn->inBuffer[conn->inCursor], + conn->inEnd - conn->inCursor); + if (!conn->zpqStream) + { + char **supported_algorithms = zs_get_supported_algorithms(); + + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "failed to initialize compressor %s\n"), + supported_algorithms[conn->compressors[index].impl]); + free(supported_algorithms); + goto error_return; + } + /* reset buffer */ + conn->inStart = conn->inCursor = conn->inEnd = 0; + } + else if (conn->n_compressors != 0 && beresp == 'v') /* negotiate protocol + * version */ + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "server is not supporting libpq compression\n")); + goto error_return; + } + else + break; } /* @@ -4061,6 +4144,8 @@ freePGconn(PGconn *conn) free(conn->dbName); if (conn->replication) free(conn->replication); + if (conn->compression) + free(conn->compression); if (conn->pguser) free(conn->pguser); if (conn->pgpass) @@ -6585,6 +6670,22 @@ PQuser(const PGconn *conn) return conn->pguser; } +char * +PQcompressor(const PGconn *conn) +{ + if (!conn || !conn->zpqStream) + return NULL; + return (char*)zpq_compress_algorithm_name(conn->zpqStream); +} + +char * +PQdecompressor(const PGconn *conn) +{ + if (!conn || !conn->zpqStream) + return NULL; + return (char*)zpq_decompress_algorithm_name(conn->zpqStream); +} + char * PQpass(const PGconn *conn) { diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 03592bdce9..9c88841eaa 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1959,9 +1959,7 @@ PQgetResult(PGconn *conn) * EOF indication. We expect therefore that this won't result in any * undue delay in reporting a previous write failure.) */ - if (flushResult || - pqWait(true, false, conn) || - pqReadData(conn) < 0) + if (flushResult || pqWait(true, false, conn) || pqReadData(conn) < 0) { /* * conn->errorMessage has been set by pqWait or pqReadData. We @@ -3704,6 +3702,12 @@ pqPipelineFlush(PGconn *conn) return 0; } +int +PQreadPending(PGconn *conn) +{ + return pqReadPending(conn); +} + /* * PQfreemem - safely frees memory allocated diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index 082b583c15..a0e7ef79bb 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -53,12 +53,24 @@ #include "pg_config_paths.h" #include "port/pg_bswap.h" +#include + static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn); static int pqSendSome(PGconn *conn, int len); static int pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time); static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time); +/* + * Use zpq_read if compression is switched on + */ +#define pq_read_conn(conn) \ + (conn->zpqStream \ + ? zpq_read(conn->zpqStream, conn->inBuffer + conn->inEnd, \ + conn->inBufSize - conn->inEnd) \ + : pqsecure_read(conn, conn->inBuffer + conn->inEnd, \ + conn->inBufSize - conn->inEnd)) + /* * PQlibVersion: return the libpq version number */ @@ -617,10 +629,17 @@ pqReadData(PGconn *conn) /* OK, try to read some data */ retry3: - nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd, - conn->inBufSize - conn->inEnd); + nread = pq_read_conn(conn); if (nread < 0) { + if (nread == ZS_DECOMPRESS_ERROR) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zpqStream)); + return -1; + } + switch (SOCK_ERRNO) { case EINTR: @@ -712,10 +731,18 @@ retry3: * arrived. */ retry4: - nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd, - conn->inBufSize - conn->inEnd); + nread = pq_read_conn(conn); + if (nread < 0) { + if (nread == ZS_DECOMPRESS_ERROR) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zpqStream)); + return -1; + } + switch (SOCK_ERRNO) { case EINTR: @@ -826,12 +853,18 @@ pqSendSome(PGconn *conn, int len) } /* while there's still data to send */ - while (len > 0) + while (len > 0 || zpq_buffered_tx(conn->zpqStream)) { int sent; + size_t processed = 0; + /* + * Use zpq_write if compression is switched on + */ + sent = conn->zpqStream + ? zpq_write(conn->zpqStream, ptr, len, &processed) #ifndef WIN32 - sent = pqsecure_write(conn, ptr, len); + : pqsecure_write(conn, ptr, len); #else /* @@ -839,8 +872,11 @@ pqSendSome(PGconn *conn, int len) * failure-point appears to be different in different versions of * Windows, but 64k should always be safe. */ - sent = pqsecure_write(conn, ptr, Min(len, 65536)); + : pqsecure_write(conn, ptr, Min(len, 65536)); #endif + ptr += processed; + len -= processed; + remaining -= processed; if (sent < 0) { @@ -896,7 +932,7 @@ pqSendSome(PGconn *conn, int len) remaining -= sent; } - if (len > 0) + if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zpqStream)) { /* * We didn't send it all, wait till we can send more. @@ -989,6 +1025,8 @@ pqFlush(PGconn *conn) int pqWait(int forRead, int forWrite, PGconn *conn) { + if (forRead && conn->inCursor < conn->inEnd) + return 0; return pqWaitTimed(forRead, forWrite, conn, (time_t) -1); } @@ -1046,6 +1084,9 @@ pqWriteReady(PGconn *conn) * * If SSL is in use, the SSL buffer is checked prior to checking the socket * for read data directly. + * + * If ZPQ stream is in use, the ZPQ buffer is checked prior to checking + * the socket for read data directly. */ static int pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) @@ -1061,14 +1102,10 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) return -1; } -#ifdef USE_SSL - /* Check for SSL library buffering read bytes */ - if (forRead && conn->ssl_in_use && pgtls_read_pending(conn)) + if (forRead && (pqReadPending(conn) > 0)) { - /* short-circuit the select */ return 1; } -#endif /* We will retry as long as we get EINTR */ do @@ -1087,6 +1124,33 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) return result; } +/* + * Check if there is some data pending in ZPQ / SSL read buffers. + * Returns -1 on failure, 0 if no, 1 if yes. + */ +int +pqReadPending(PGconn *conn) +{ + if (!conn) + return -1; + + /* check for ZPQ stream buffered read bytes */ + if (zpq_buffered_rx(conn->zpqStream)) + { + /* short-circuit the select */ + return 1; + } + +#ifdef USE_SSL + /* Check for SSL library buffering read bytes */ + if (conn->ssl_in_use && pgtls_read_pending(conn)) + { + /* short-circuit the select */ + return 1; + } +#endif + return 0; +} /* * Check a file descriptor for read and/or write data, possibly waiting. diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index b45fb7e705..c20e120d06 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -2167,6 +2167,165 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen, return startpacket; } +/* + * Build comma-separated list of compression algorithms requested by client. + * It can be either explicitly specified by user in connection string, or + * include all algorithms supported by client library. + * This function returns true if the compression string is successfully parsed and + * stores a comma-separated list of algorithms in *client_compressors. + * If compression is disabled, then NULL is assigned to *client_compressors. + * Also it creates an array of compressor descriptors, each element of which corresponds to + * the corresponding algorithm name in *client_compressors list. This array is stored in PGconn + * and is used during handshake when a compression acknowledgment response is received from the server. + */ +static bool +build_compressors_list(PGconn *conn, char **client_compressors, bool build_descriptors) +{ + char **supported_algorithms = zs_get_supported_algorithms(); + char *value = conn->compression; + int n_supported_algorithms; + int total_len = 0; + int i; + + for (n_supported_algorithms = 0; supported_algorithms[n_supported_algorithms] != NULL; n_supported_algorithms++) + { + total_len += strlen(supported_algorithms[n_supported_algorithms]) + 1; + } + + if (pg_strcasecmp(value, "true") == 0 || + pg_strcasecmp(value, "yes") == 0 || + pg_strcasecmp(value, "on") == 0 || + pg_strcasecmp(value, "any") == 0 || + pg_strcasecmp(value, "1") == 0) + { + /* Compression is enabled: choose algorithm automatically */ + char *p; + + if (n_supported_algorithms == 0) + { + *client_compressors = NULL; /* no compressors are available */ + conn->compressors = NULL; + conn->n_compressors = 0; + return true; + } + *client_compressors = p = malloc(total_len); + if (build_descriptors) + conn->compressors = malloc(n_supported_algorithms * sizeof(pg_conn_compressor)); + for (i = 0; i < n_supported_algorithms; i++) + { + strcpy(p, supported_algorithms[i]); + p += strlen(p); + *p++ = ','; + if (build_descriptors) + { + conn->compressors[i].impl = i; + conn->compressors[i].level = ZPQ_DEFAULT_COMPRESSION_LEVEL; + } + } + p[-1] = '\0'; + return true; + } + else if (*value == 0 || + pg_strcasecmp(value, "false") == 0 || + pg_strcasecmp(value, "no") == 0 || + pg_strcasecmp(value, "off") == 0 || + pg_strcasecmp(value, "0") == 0) + { + /* Compression is disabled */ + *client_compressors = NULL; + conn->compressors = NULL; + conn->n_compressors = 0; + return true; + } + else + { + /* List of compression algorithms separated by commas */ + char *src, + *dst; + int n_suggested_algorithms = 0; + char *suggested_algorithms = strdup(value); + + src = suggested_algorithms; + *client_compressors = dst = strdup(value); + + if (build_descriptors) + conn->compressors = malloc(n_supported_algorithms * sizeof(pg_conn_compressor)); + + while (*src != '\0') + { + char *sep = strchr(src, ','); + char *col; + int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL; + bool found; + + if (sep != NULL) + *sep = '\0'; + + strcpy(dst, src); + + col = strchr(src, ':'); + if (col != NULL) + { + *col = '\0'; + if (sscanf(col + 1, "%d", &compression_level) != 1 && !build_descriptors) + { + fprintf(stderr, + libpq_gettext("WARNING: invalid compression level %s in compression option '%s'\n"), + col + 1, value); + return false; + } + } + found = false; + for (i = 0; supported_algorithms[i] != NULL; i++) + { + if (pg_strcasecmp(src, supported_algorithms[i]) == 0) + { + if (build_descriptors) + { + conn->compressors[n_suggested_algorithms].impl = i; + conn->compressors[n_suggested_algorithms].level = compression_level; + } + n_suggested_algorithms += 1; + dst += strlen(dst); + *dst++ = ','; + found = true; + break; + } + } + if (!found) + { + fprintf(stderr, + libpq_gettext("WARNING: algorithm %s is not supported by client\n"), + src); + } + if (sep) + src = sep + 1; + else + break; + } + free(suggested_algorithms); + conn->n_compressors = n_suggested_algorithms; + if (n_suggested_algorithms == 0) + { + if (!build_descriptors) + fprintf(stderr, + libpq_gettext("WARNING: none of the specified algorithms are supported by client: %s\n"), + value); + else + { + free(conn->compressors); + conn->compressors = NULL; + conn->n_compressors = 0; + } + free(*client_compressors); + *client_compressors = NULL; + return false; + } + dst[-1] = '\0'; + return true; + } +} + /* * Build a startup packet given a filled-in PGconn structure. * @@ -2213,6 +2372,18 @@ build_startup_packet(const PGconn *conn, char *packet, ADD_STARTUP_OPTION("replication", conn->replication); if (conn->pgoptions && conn->pgoptions[0]) ADD_STARTUP_OPTION("options", conn->pgoptions); + if (conn->compression && conn->compression[0]) + { + char *client_compression_algorithms; + + if (build_compressors_list((PGconn *) conn, &client_compression_algorithms, packet == NULL)) + { + if (client_compression_algorithms) + { + ADD_STARTUP_OPTION("_pq_.compression", client_compression_algorithms); + } + } + } if (conn->send_appname) { /* Use appname if present, otherwise use fallback */ diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 227adde5a5..1f2dbeafd4 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -331,6 +331,8 @@ extern char *PQhostaddr(const PGconn *conn); extern char *PQport(const PGconn *conn); extern char *PQtty(const PGconn *conn); extern char *PQoptions(const PGconn *conn); +extern char *PQcompressor(const PGconn *conn); +extern char *PQdecompressor(const PGconn *conn); extern ConnStatusType PQstatus(const PGconn *conn); extern PGTransactionStatusType PQtransactionStatus(const PGconn *conn); extern const char *PQparameterStatus(const PGconn *conn, @@ -486,6 +488,9 @@ extern PGPing PQpingParams(const char *const *keywords, /* Force the write buffer to be written (or at least try) */ extern int PQflush(PGconn *conn); +extern int + PQreadPending(PGconn *conn); + /* * "Fast path" interface --- not really recommended for application * use diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index e81dc37906..2a8f564015 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -40,6 +40,7 @@ /* include stuff common to fe and be */ #include "getaddrinfo.h" #include "libpq/pqcomm.h" +#include "common/zpq_stream.h" /* include stuff found in fe only */ #include "pqexpbuffer.h" @@ -339,6 +340,16 @@ typedef struct pg_conn_host * found in password file. */ } pg_conn_host; + +/* + * Descriptors of compression algorithms chosen by client + */ +typedef struct pg_conn_compressor +{ + int impl; /* compression implementation index */ + int level; /* compression level */ +} pg_conn_compressor; + /* * PGconn stores all the state data associated with a single connection * to a backend. @@ -391,6 +402,14 @@ struct pg_conn * "sspi") */ char *ssl_min_protocol_version; /* minimum TLS protocol version */ char *ssl_max_protocol_version; /* maximum TLS protocol version */ + + char *compression; /* stream compression (boolean value, "any" or + * list of compression algorithms separated by + * comma) */ + pg_conn_compressor *compressors; /* descriptors of compression + * algorithms chosen by client */ + unsigned n_compressors; /* size of compressors array */ + char *target_session_attrs; /* desired session properties */ /* Optional file to write trace info to */ @@ -570,6 +589,9 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer; /* expansible string */ + + /* Compression stream */ + ZpqStream *zpqStream; }; /* PGcancel stores all data necessary to cancel a connection. A copy of this @@ -702,6 +724,7 @@ extern int pqWaitTimed(int forRead, int forWrite, PGconn *conn, time_t finish_time); extern int pqReadReady(PGconn *conn); extern int pqWriteReady(PGconn *conn); +extern int pqReadPending(PGconn *conn); /* === in fe-secure.c === */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6dff5439e0..f940dcca6f 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1765,7 +1765,7 @@ pg_stat_activity| SELECT s.datid, s.query_id, s.query, s.backend_type - FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id) + FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes) LEFT JOIN pg_database d ON ((s.datid = d.oid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_all_indexes| SELECT c.oid AS relid, @@ -1877,8 +1877,14 @@ pg_stat_gssapi| SELECT s.pid, s.gss_auth AS gss_authenticated, s.gss_princ AS principal, s.gss_enc AS encrypted - FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id) + FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes) WHERE (s.client_port IS NOT NULL); +pg_stat_network_traffic| SELECT s.pid, + s.rx_raw_bytes, + s.tx_raw_bytes, + s.rx_compressed_bytes, + s.tx_compressed_bytes + FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes); pg_stat_prefetch_recovery| SELECT s.stats_reset, s.prefetch, s.skip_hit, @@ -2058,7 +2064,7 @@ pg_stat_replication| SELECT s.pid, w.sync_priority, w.sync_state, w.reply_time - FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id) + FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes) JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_replication_slots| SELECT s.slot_name, @@ -2090,7 +2096,7 @@ pg_stat_ssl| SELECT s.pid, s.ssl_client_dn AS client_dn, s.ssl_client_serial AS client_serial, s.ssl_issuer_dn AS issuer_dn - FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id) + FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes) WHERE (s.client_port IS NOT NULL); pg_stat_subscription| SELECT su.oid AS subid, su.subname, diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index 6cf27b4dec..785c927fe7 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -127,7 +127,7 @@ sub mkvcbuild keywords.c kwlookup.c link-canary.c md5_common.c pg_get_line.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c username.c - wait_error.c wchar.c z_stream.c); + wait_error.c wchar.c z_stream.c zpq_stream.c); if ($solution->{options}->{openssl}) { -- 2.28.0