From 9dc58140b5066b0f65e202c7722d264d846181d8 Mon Sep 17 00:00:00 2001 From: usernamedt Date: Thu, 29 Jul 2021 18:12:50 +0500 Subject: [PATCH 2/2] Implement libpq compression --- configure | 293 ++++-- configure.ac | 19 +- .../postgres_fdw/expected/postgres_fdw.out | 2 +- doc/src/sgml/config.sgml | 21 + doc/src/sgml/libpq.sgml | 37 + doc/src/sgml/protocol.sgml | 174 +++ src/backend/catalog/system_views.sql | 9 + src/backend/libpq/pqcomm.c | 277 ++++- src/backend/postmaster/postmaster.c | 10 + src/backend/utils/activity/backend_status.c | 30 + src/backend/utils/adt/pgstatfuncs.c | 50 +- src/backend/utils/misc/guc.c | 30 + src/bin/pgbench/pgbench.c | 17 +- src/bin/psql/command.c | 23 + src/common/Makefile | 3 +- src/common/z_stream.c | 201 ++-- src/common/zpq_stream.c | 996 ++++++++++++++++++ src/include/catalog/pg_proc.dat | 18 +- src/include/common/z_stream.h | 29 +- src/include/common/zpq_stream.h | 117 ++ src/include/libpq/libpq-be.h | 3 + src/include/libpq/libpq.h | 1 + src/include/libpq/pqcomm.h | 3 + src/include/pg_config.h.in | 6 +- src/include/utils/backend_status.h | 7 + src/interfaces/libpq/exports.txt | 3 + src/interfaces/libpq/fe-connect.c | 92 +- src/interfaces/libpq/fe-exec.c | 10 +- src/interfaces/libpq/fe-misc.c | 90 +- src/interfaces/libpq/fe-protocol3.c | 56 + src/interfaces/libpq/libpq-fe.h | 5 + src/interfaces/libpq/libpq-int.h | 14 + src/test/regress/expected/rules.out | 14 +- src/tools/msvc/Mkvcbuild.pm | 2 +- 34 files changed, 2406 insertions(+), 256 deletions(-) create mode 100644 src/common/zpq_stream.c create mode 100644 src/include/common/zpq_stream.h diff --git a/configure b/configure index bc09a55f09..12668ea969 100755 --- a/configure +++ b/configure @@ -699,11 +699,13 @@ with_gnu_ld LD LDFLAGS_SL LDFLAGS_EX +ZSTD_LIBS +ZSTD_CFLAGS +with_zstd LZ4_LIBS LZ4_CFLAGS with_lz4 with_zlib -with_zstd with_system_tzdata with_libxslt XML2_LIBS @@ -899,6 +901,8 @@ XML2_CFLAGS XML2_LIBS LZ4_CFLAGS LZ4_LIBS +ZSTD_CFLAGS +ZSTD_LIBS LDFLAGS_EX LDFLAGS_SL PERL @@ -1578,6 +1582,7 @@ Optional Packages: use system time zone data in DIR --without-zlib do not use Zlib --with-lz4 build with LZ4 support + --with-zstd build with ZSTD support --with-gnu-ld assume the C compiler uses GNU ld [default=no] --with-ssl=LIB use LIB for SSL/TLS support (openssl) --with-openssl obsolete spelling of --with-ssl=openssl @@ -1607,6 +1612,8 @@ Some influential environment variables: XML2_LIBS linker flags for XML2, overriding pkg-config LZ4_CFLAGS C compiler flags for LZ4, overriding pkg-config LZ4_LIBS linker flags for LZ4, overriding pkg-config + ZSTD_CFLAGS C compiler flags for ZSTD, overriding pkg-config + ZSTD_LIBS linker flags for ZSTD, overriding pkg-config LDFLAGS_EX extra linker flags for linking executables only LDFLAGS_SL extra linker flags for linking shared libraries only PERL Perl program @@ -8636,85 +8643,6 @@ fi -# -# ZStd -# - - - -# Check whether --with-zstd was given. -if test "${with_zstd+set}" = set; then : - withval=$with_zstd; - case $withval in - yes) - ;; - no) - : - ;; - *) - as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5 - ;; - esac - -else - with_zstd=no - -fi - - - - -if test "$with_zstd" = yes ; then - { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5 -$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; } -if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then : - $as_echo_n "(cached) " >&6 -else - ac_check_lib_save_LIBS=$LIBS -LIBS="-lzstd $LIBS" -cat confdefs.h - <<_ACEOF >conftest.$ac_ext -/* end confdefs.h. */ - -/* Override any GCC internal prototype to avoid an error. - Use char because int might match the return type of a GCC - builtin and then its argument prototype would still apply. */ -#ifdef __cplusplus -extern "C" -#endif -char ZSTD_compress (); -int -main () -{ -return ZSTD_compress (); - ; - return 0; -} -_ACEOF -if ac_fn_c_try_link "$LINENO"; then : - ac_cv_lib_zstd_ZSTD_compress=yes -else - ac_cv_lib_zstd_ZSTD_compress=no -fi -rm -f core conftest.err conftest.$ac_objext \ - conftest$ac_exeext conftest.$ac_ext -LIBS=$ac_check_lib_save_LIBS -fi -{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5 -$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; } -if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then : - cat >>confdefs.h <<_ACEOF -#define HAVE_LIBZSTD 1 -_ACEOF - - LIBS="-lzstd $LIBS" - -else - as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5 -fi - -fi - - # # Zlib @@ -8886,6 +8814,145 @@ fi done fi +# +# Zstd +# +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking whether to build with ZSTD support" >&5 +$as_echo_n "checking whether to build with ZSTD support... " >&6; } + + + +# Check whether --with-zstd was given. +if test "${with_zstd+set}" = set; then : + withval=$with_zstd; + case $withval in + yes) + +$as_echo "#define HAVE_LIBZSTD 1" >>confdefs.h + + ;; + no) + : + ;; + *) + as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5 + ;; + esac + +else + with_zstd=no + +fi + + +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $with_zstd" >&5 +$as_echo "$with_zstd" >&6; } + + +if test "$with_zstd" = yes; then + +pkg_failed=no +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for libzstd >= 1.0.0" >&5 +$as_echo_n "checking for libzstd >= 1.0.0... " >&6; } + +if test -n "$ZSTD_CFLAGS"; then + pkg_cv_ZSTD_CFLAGS="$ZSTD_CFLAGS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libzstd >= 1.0.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libzstd >= 1.0.0") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_ZSTD_CFLAGS=`$PKG_CONFIG --cflags "libzstd >= 1.0.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi +if test -n "$ZSTD_LIBS"; then + pkg_cv_ZSTD_LIBS="$ZSTD_LIBS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libzstd >= 1.0.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libzstd >= 1.0.0") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_ZSTD_LIBS=`$PKG_CONFIG --libs "libzstd >= 1.0.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi + + + +if test $pkg_failed = yes; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + +if $PKG_CONFIG --atleast-pkgconfig-version 0.20; then + _pkg_short_errors_supported=yes +else + _pkg_short_errors_supported=no +fi + if test $_pkg_short_errors_supported = yes; then + ZSTD_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "libzstd >= 1.0.0" 2>&1` + else + ZSTD_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "libzstd >= 1.0.0" 2>&1` + fi + # Put the nasty error message in config.log where it belongs + echo "$ZSTD_PKG_ERRORS" >&5 + + as_fn_error $? "Package requirements (libzstd >= 1.0.0) were not met: + +$ZSTD_PKG_ERRORS + +Consider adjusting the PKG_CONFIG_PATH environment variable if you +installed software in a non-standard prefix. + +Alternatively, you may set the environment variables ZSTD_CFLAGS +and ZSTD_LIBS to avoid the need to call pkg-config. +See the pkg-config man page for more details." "$LINENO" 5 +elif test $pkg_failed = untried; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + { { $as_echo "$as_me:${as_lineno-$LINENO}: error: in \`$ac_pwd':" >&5 +$as_echo "$as_me: error: in \`$ac_pwd':" >&2;} +as_fn_error $? "The pkg-config script could not be found or is too old. Make sure it +is in your PATH or set the PKG_CONFIG environment variable to the full +path to pkg-config. + +Alternatively, you may set the environment variables ZSTD_CFLAGS +and ZSTD_LIBS to avoid the need to call pkg-config. +See the pkg-config man page for more details. + +To get pkg-config, see . +See \`config.log' for more details" "$LINENO" 5; } +else + ZSTD_CFLAGS=$pkg_cv_ZSTD_CFLAGS + ZSTD_LIBS=$pkg_cv_ZSTD_LIBS + { $as_echo "$as_me:${as_lineno-$LINENO}: result: yes" >&5 +$as_echo "yes" >&6; } + +fi + for pgac_option in $ZSTD_CFLAGS; do + case $pgac_option in + -I*|-D*) CPPFLAGS="$CPPFLAGS $pgac_option";; + esac + done + for pgac_option in $ZSTD_LIBS; do + case $pgac_option in + -L*) LDFLAGS="$LDFLAGS $pgac_option";; + esac + done +fi + # # Assignments # @@ -12436,6 +12503,58 @@ fi fi +if test "$with_zstd" = yes; then + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_decompressStream in -lzstd" >&5 +$as_echo_n "checking for ZSTD_decompressStream in -lzstd... " >&6; } +if ${ac_cv_lib_zstd_ZSTD_decompressStream+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lzstd $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char ZSTD_decompressStream (); +int +main () +{ +return ZSTD_decompressStream (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_zstd_ZSTD_decompressStream=yes +else + ac_cv_lib_zstd_ZSTD_decompressStream=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_decompressStream" >&5 +$as_echo "$ac_cv_lib_zstd_ZSTD_decompressStream" >&6; } +if test "x$ac_cv_lib_zstd_ZSTD_decompressStream" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIBZSTD 1 +_ACEOF + + LIBS="-lzstd $LIBS" + +else + as_fn_error $? "zstd library not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory." "$LINENO" 5 +fi + +fi + if test "$enable_spinlocks" = yes; then $as_echo "#define HAVE_SPINLOCKS 1" >>confdefs.h @@ -13767,6 +13886,20 @@ fi done +fi + +if test "$with_zstd" = yes; then + ac_fn_c_check_header_mongrel "$LINENO" "zstd.h" "ac_cv_header_zstd_h" "$ac_includes_default" +if test "x$ac_cv_header_zstd_h" = xyes; then : + +else + as_fn_error $? "zstd header not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory. +Use --without-zstd to disable zstd support." "$LINENO" 5 +fi + + fi if test "$with_gssapi" = yes ; then diff --git a/configure.ac b/configure.ac index da445e87c7..bc89b53b21 100644 --- a/configure.ac +++ b/configure.ac @@ -1017,10 +1017,25 @@ fi # # Zstd # -PGAC_ARG_BOOL(with, zstd, no, - [use zstd]) +AC_MSG_CHECKING([whether to build with ZSTD support]) +PGAC_ARG_BOOL(with, zstd, no, [build with ZSTD support], [AC_DEFINE([HAVE_LIBZSTD], 1, [Define to 1 to build with ZSTD support. (--with-zstd)])]) +AC_MSG_RESULT([$with_zstd]) AC_SUBST(with_zstd) +if test "$with_zstd" = yes; then + PKG_CHECK_MODULES(ZSTD, [libzstd >= 1.0.0]) + for pgac_option in $ZSTD_CFLAGS; do + case $pgac_option in + -I*|-D*) CPPFLAGS="$CPPFLAGS $pgac_option";; + esac + done + for pgac_option in $ZSTD_LIBS; do + case $pgac_option in + -L*) LDFLAGS="$LDFLAGS $pgac_option";; + esac + done +fi + # # Assignments # diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index ed25e7a743..f1ec8b941d 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -9258,7 +9258,7 @@ DO $d$ END; $d$; ERROR: invalid option "password" -HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, keep_connections +HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, compression, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, keep_connections CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')" PL/pgSQL function inline_code_block line 3 at EXECUTE -- If we add a password for our user mapping instead, we should get a different diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 43772c2a98..8fedd43e16 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -1035,6 +1035,27 @@ include_dir 'conf.d' + + libpq_compression (string) + + libpq_compression configuration parameter + + + + + This parameter controls the available client-server traffic compression methods. + It allows rejecting compression requests even if it is supported by the server (for example, due to security, or CPU consumption). + The default is on, which means that all supported compression methods are allowed. + For more precise control, a list of the allowed compression methods can be specified. + For example, to allow only zstd and zlib, set the setting to zstd,zlib. + Also, maximal allowed compression level can be specified for each method, e.g. zstd:1,zlib:2 setting will set the + maximal compression level for zstd to 1 and zlib to 2. + If a client requests the compression with a higher compression level, it will be set to the maximal allowed one. + Default (and recommended) maximal compression level for each algorithm is 1. + + + + diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 2e4f615a65..b92e379b33 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -1260,6 +1260,43 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname + + compression + + + Request compression of libpq traffic. The client sends a request with + a list of compression algorithms. Compression can be requested by a client + by including the "compression" option in its connection string. + This can either be a boolean value to enable or disable compression + ("true"/"false", + "on"/"off", + "yes"/"no", + "1"/"0"), + "any", + or an explicit list of comma-separated compression algorithms + which can optionally include compression level ("zlib,zstd:5"). + If compression is enabled but an algorithm is not explicitly specified, + the client library sends its full list of supported algorithms. + The server intersects the received compression algorithms with the allowed ones (controlled via the libpq_compression server config setting). + If the intersection is not empty, the server responds with CompressionAck containing the final list of the compression algorithms that can be used for the compression of libpq messages between the client and server. + If the intersection is empty (server does not accept any of the requested algorithms), then it replies with CompressionAck containing the empty list and it is up to the client whether to continue without compression or to report an error. + + + After sending the CompressionAck message, the server can send the SetCompressionMethod message to set the current compression algorithm for server-to-client traffic compression. + After receiving the CompressionAck message, the client can send the SetCompressionMethod message to set the current compression algorithm for client-to-server traffic compression. + Compressed data is transmitted via the CompressedData messages. + + + Support for compression algorithms must be enabled when the server is compiled. + Currently, two libraries are supported: zlib (default) and zstd (if Postgres was + configured with --with-zstd option). In both cases, streaming mode is used. + By default, compression is not requested by the client. + Please note that using compression together with SSL may expose extra vulnerabilities: + CRIME + + + + client_encoding diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index e8cb78ff1f..353db387d7 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -92,6 +92,15 @@ such as COPY. + + It is possible to compress protocol data to reduce traffic and speed-up client-server interaction. + Compression is especially useful for importing/exporting data to/from the database using the COPY command + and for replication (both physical and logical). Compression can also reduce the server's response time + for queries returning a large amount of data (for example, JSON, BLOBs, text, ...). + Currently, two libraries are supported: zlib (default) and zstd (if Postgres was + configured with --with-zstd option). + + Messaging Overview @@ -262,6 +271,21 @@ + + CompressionAck + + + The server accepts the client's compression request. + Compression is requested when a client connection includes the "compression" option, which contains a list of the requested compression algorithms. + The server intersects the requested compression algorithms with the allowed ones (controlled via the libpq_compression server config setting). + If the intersection is not empty, the server responds with CompressionAck containing the final list of the compression algorithms that can be used for the compression of libpq messages between the client and server. + If the intersection is empty (server does not accept any of the requested algorithms), then it replies with CompressionAck containing the empty list and it is up to the client whether to continue without compression or to report an error. + After sending the CompressionAck message, the server can send the SetCompressionMethod message to set the current compression algorithm for server-to-client traffic compression. + After receiving the CompressionAck message, the client can send the SetCompressionMethod message to set the current compression algorithm for client-to-server traffic compression. + + + + AuthenticationOk @@ -3445,6 +3469,141 @@ following: + + +CompressionAck (B) + + + + + + + + Byte1('z') + + + + Acknowledge use of compression for protocol data. + + + + + + Int32 + + + + Length of message contents in bytes, including self. + + + + + + Byten + + + + List of the negotiated compression algorithms. + + + + + + + + + + + + +SetCompressionMethod (F & B) + + + + + + + + Byte1('k') + + + + Switch the current compression algorithm. Following CompressedData messages will be compressed via the compressed algorithm specified in this message. + + + + + + Int32(5) + + + + Length of message contents in bytes, including self. + + + + + + Int8 + + + + Index of the new compression algorithm. + + + + + + + + + + + +CompressedData (F & B) + + + + + + + + + Byte1('m') + + + + Identifies the message as compressed data. + + + + + + Int32 + + + + Length of message contents in bytes, including self. + + + + + + Byten + + + + Compressed message data. + + + + + + + + + + AuthenticationSASLContinue (B) @@ -6010,6 +6169,21 @@ StartupMessage (F) + + + _pq_.compression + + + + Request compression of libpq traffic. The value is a list of compression algorithms requested by the client with an optional + specification of compression level: "zlib,zstd:5". + If the server does not support compression, the backend will ignore the _pq_.compression + parameter and will not send the CompressionAck message to the frontend. + By default, compression is disabled. Please note that using compression together with SSL may expose extra vulnerabilities: + CRIME. + + + In addition to the above, other parameters may be listed. diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 55f6e3711d..a8120fac7a 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -840,6 +840,15 @@ CREATE VIEW pg_stat_activity AS LEFT JOIN pg_database AS D ON (S.datid = D.oid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); +CREATE VIEW pg_stat_network_traffic AS + SELECT + S.pid, + S.rx_raw_bytes, + S.tx_raw_bytes, + S.rx_compressed_bytes, + S.tx_compressed_bytes + FROM pg_stat_get_activity(NULL) AS S; + CREATE VIEW pg_stat_replication AS SELECT S.pid, diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 89a5f901aa..bc726421c4 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -60,6 +60,7 @@ #include #include #include +#include #include #include #include @@ -77,11 +78,14 @@ #include "common/ip.h" #include "libpq/libpq.h" +#include "libpq/pqformat.h" #include "miscadmin.h" #include "port/pg_bswap.h" #include "storage/ipc.h" #include "utils/guc.h" #include "utils/memutils.h" +#include "utils/builtins.h" +#include "common/zpq_stream.h" /* * Cope with the various platform-specific ways to spell TCP keepalive socket @@ -108,6 +112,9 @@ int Unix_socket_permissions; char *Unix_socket_group; +/* GUC variable containing the allowed compression algorithms list (separated by comma) */ +char *libpq_compress_algorithms; + /* Where the Unix socket files are (list of palloc'd strings) */ static List *sock_paths = NIL; @@ -130,6 +137,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE]; static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ static int PqRecvLength; /* End of data available in PqRecvBuffer */ +static ZpqStream * PqStream; + + /* * Message status */ @@ -167,6 +177,145 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods; WaitEventSet *FeBeWaitSet; +static ssize_t +write_compressed(void *arg, void const *data, size_t size) +{ + ssize_t rc = secure_write((Port *) arg, (void *) data, size); + + if (rc > 0) + pgstat_report_network_traffic(0, 0, 0, rc); + return rc; +} + +static ssize_t +read_compressed(void *arg, void *data, size_t size) +{ + ssize_t rc = secure_read((Port *) arg, data, size); + + if (rc > 0) + pgstat_report_network_traffic(0, 0, rc, 0); + return rc; +} + +/* + * Send the chosen compression algorithms to the client + */ +static void +SendCompressionACK(char *compressors_str) +{ + StringInfoData buf; + + pq_beginmessage(&buf, 'z'); + pq_sendstring(&buf, compressors_str); + pq_endmessage(&buf); + pq_flush(); +} + +/* -------------------------------- + * pq_configure - configure connection using port settings + * + * Right now only compression is toggled in the configure. + * Function returns 0 in case of success, non-null in case of error + * -------------------------------- + */ +int +pq_configure(Port *port) +{ + zpq_compressor *server_compressors; + zpq_compressor *client_compressors; + zpq_compressor *res_compressors; + size_t n_client_compressors; + size_t n_server_compressors; + size_t n_res_compressors = 0; + char *res_compressors_str; + char *client_compression_algorithms = port->compression_algorithms; + + if (!client_compression_algorithms || !libpq_compress_algorithms) + { + return 0; + } + + if (!zpq_parse_compression_setting(libpq_compress_algorithms, &server_compressors, &n_server_compressors)) + { + ereport(LOG, (errmsg("failed to parse configured compression setting: %s", libpq_compress_algorithms))); + return 0; + } + + if (n_server_compressors == 0) + { + /* + * No enabled server compressors available, abort the compression + * initialization. + */ + /* Send the compression acknowledgment with empty compressors list. */ + char *empty_response = ""; + + SendCompressionACK(empty_response); + return 0; + } + + if (!zpq_deserialize_compressors(client_compression_algorithms, &client_compressors, &n_client_compressors)) + { + ereport(LOG, (errmsg("failed to parse received client compression methods: %s", client_compression_algorithms))); + return 0; + } + + if (n_client_compressors == 0) + { + /* + * client did not provide any compression algorithms that server is + * compiled to support + */ + ereport(LOG, (errmsg("server doesn't support any of the compression methods requested by the client: %s", client_compression_algorithms))); + return 0; + } + + res_compressors = malloc(Max(n_client_compressors, n_server_compressors) * sizeof(zpq_compressor)); + + /* + * Intersect client and server compressors to determine the final list of + * the supported compressors. O(N^2) is negligible because of a small + * number of the compression methods. + */ + for (size_t i = 0; i < n_client_compressors; i++) + { + for (size_t j = 0; j < n_server_compressors; j++) + { + if (client_compressors[i].impl == server_compressors[j].impl) + { + res_compressors[n_res_compressors].impl = client_compressors[i].impl; + /* prefer the lower compression level */ + res_compressors[n_res_compressors++].level = Min(client_compressors[i].level, server_compressors[j].level); + break; + } + } + } + free(client_compressors); + free(server_compressors); + + if (n_res_compressors == 0) + { + char *empty_response = ""; + + ereport(LOG, (errmsg("did not find any matches between the client and server compression methods, not enabling compression"))); + free(res_compressors); + /* send the compression ack with empty compressors list */ + SendCompressionACK(empty_response); + return 0; + } + + res_compressors_str = zpq_serialize_compressors(res_compressors, n_res_compressors); + SendCompressionACK(res_compressors_str); + + /* Init compression */ + PqStream = zpq_create(res_compressors, n_res_compressors, write_compressed, read_compressed, MyProcPort, NULL, 0); + if (!PqStream) + { + ereport(LOG, (errmsg("failed to initialize the compression stream"))); + return -1; + } + return 0; +} /* -------------------------------- * pq_init - initialize libpq at backend startup @@ -270,6 +419,9 @@ socket_close(int code, Datum arg) } #endif /* ENABLE_GSS */ + /* Release compression streams */ + zpq_free(PqStream); + /* * Cleanly shut down SSL layer. Nowhere else does a postmaster child * call this, so this is safe when interrupting BackendInitialize(). @@ -926,12 +1078,15 @@ socket_set_nonblocking(bool nonblocking) /* -------------------------------- * pq_recvbuf - load some bytes into the input buffer * - * returns 0 if OK, EOF if trouble + * nowait parameter toggles non-blocking mode. + * returns number of read bytes, EOF if trouble * -------------------------------- */ static int -pq_recvbuf(void) +pq_recvbuf(bool nowait) { + int r; + if (PqRecvPointer > 0) { if (PqRecvLength > PqRecvPointer) @@ -947,21 +1102,40 @@ pq_recvbuf(void) } /* Ensure that we're in blocking mode */ - socket_set_nonblocking(false); + socket_set_nonblocking(nowait); /* Can fill buffer from PqRecvLength and upwards */ for (;;) { - int r; - - r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, - PQ_RECV_BUFFER_SIZE - PqRecvLength); + /* + * If streaming compression is enabled then use correspondent + * compression read function. + */ + r = PqStream + ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength, + PQ_RECV_BUFFER_SIZE - PqRecvLength) + : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, + PQ_RECV_BUFFER_SIZE - PqRecvLength); if (r < 0) { + if (r == ZS_DECOMPRESS_ERROR) + { + char const *msg = zpq_decompress_error(PqStream); + + if (msg == NULL) + msg = "end of stream"; + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("failed to decompress data: %s", msg))); + return EOF; + } if (errno == EINTR) continue; /* Ok if interrupted */ + if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK)) + return 0; + /* * Careful: an ereport() that tries to write to the client would * cause recursion to here, leading to stack overflow and core @@ -982,7 +1156,8 @@ pq_recvbuf(void) } /* r contains number of bytes read, so just incr length */ PqRecvLength += r; - return 0; + pgstat_report_network_traffic(r, 0, 0, 0); + return r; } } @@ -997,7 +1172,8 @@ pq_getbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer++]; @@ -1016,7 +1192,8 @@ pq_peekbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer]; @@ -1033,48 +1210,15 @@ pq_peekbyte(void) int pq_getbyte_if_available(unsigned char *c) { - int r; + int r = 0; Assert(PqCommReadingMsg); - if (PqRecvPointer < PqRecvLength) + if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0) { *c = PqRecvBuffer[PqRecvPointer++]; return 1; } - - /* Put the socket into non-blocking mode */ - socket_set_nonblocking(true); - - r = secure_read(MyProcPort, c, 1); - if (r < 0) - { - /* - * Ok if no data available without blocking or interrupted (though - * EINTR really shouldn't happen with a non-blocking socket). Report - * other errors. - */ - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) - r = 0; - else - { - /* - * Careful: an ereport() that tries to write to the client would - * cause recursion to here, leading to stack overflow and core - * dump! This message must go *only* to the postmaster log. - */ - ereport(COMMERROR, - (errcode_for_socket_access(), - errmsg("could not receive data from client: %m"))); - r = EOF; - } - } - else if (r == 0) - { - /* EOF detected */ - r = EOF; - } - return r; } @@ -1095,7 +1239,8 @@ pq_getbytes(char *s, size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1129,7 +1274,8 @@ pq_discardbytes(size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1141,7 +1287,6 @@ pq_discardbytes(size_t len) return 0; } - /* -------------------------------- * pq_startmsgread - begin reading a message from the client. * @@ -1345,13 +1490,24 @@ internal_flush(void) char *bufptr = PqSendBuffer + PqSendStart; char *bufend = PqSendBuffer + PqSendPointer; - while (bufptr < bufend) + while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0) + + /* + * has more data to flush or unsent data in internal compression + * buffer + */ { int r; + size_t processed = 0; + size_t available = bufend - bufptr; - r = secure_write(MyProcPort, bufptr, bufend - bufptr); + r = PqStream + ? zpq_write(PqStream, bufptr, available, &processed) + : secure_write(MyProcPort, bufptr, available); + bufptr += processed; + PqSendStart += processed; - if (r <= 0) + if (r < 0 || (r == 0 && available)) { if (errno == EINTR) continue; /* Ok if we were interrupted */ @@ -1394,12 +1550,12 @@ internal_flush(void) InterruptPending = 1; return EOF; } + pgstat_report_network_traffic(0, r, 0, 0); last_reported_send_errno = 0; /* reset after any successful send */ bufptr += r; PqSendStart += r; } - PqSendStart = PqSendPointer = 0; return 0; } @@ -1416,7 +1572,7 @@ socket_flush_if_writable(void) int res; /* Quick exit if nothing to do */ - if (PqSendPointer == PqSendStart) + if ((PqSendPointer == PqSendStart) && (zpq_buffered_tx(PqStream) == 0)) return 0; /* No-op if reentrant call */ @@ -1439,7 +1595,7 @@ socket_flush_if_writable(void) static bool socket_is_send_pending(void) { - return (PqSendStart < PqSendPointer); + return (PqSendStart < PqSendPointer || (zpq_buffered_tx(PqStream) != 0)); } /* -------------------------------- @@ -1962,3 +2118,16 @@ pq_check_connection(void) return true; } + +PG_FUNCTION_INFO_V1(pg_compression_algorithm); + +Datum +pg_compression_algorithm(PG_FUNCTION_ARGS) +{ + char const *algorithm_name = PqStream ? zpq_compress_algorithm_name(PqStream) : NULL; + + if (algorithm_name) + PG_RETURN_TEXT_P(cstring_to_text(algorithm_name)); + else + PG_RETURN_NULL(); +} diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 5a050898fe..2ba1cdda86 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -2152,6 +2152,8 @@ retry1: port->database_name = pstrdup(valptr); else if (strcmp(nameptr, "user") == 0) port->user_name = pstrdup(valptr); + else if (strcmp(nameptr, "_pq_.compression") == 0) + port->compression_algorithms = pstrdup(valptr); else if (strcmp(nameptr, "options") == 0) port->cmdline_options = pstrdup(valptr); else if (strcmp(nameptr, "replication") == 0) @@ -4462,6 +4464,14 @@ BackendInitialize(Port *port) if (status != STATUS_OK) proc_exit(0); + if (pq_configure(port)) + { + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("failed to send compression message: %m"))); + proc_exit(0); + } + /* * Now that we have the user and database name, we can set the process * title for ps. It's good to do this as early as possible in startup. diff --git a/src/backend/utils/activity/backend_status.c b/src/backend/utils/activity/backend_status.c index 2901f9f5a9..cf694d127d 100644 --- a/src/backend/utils/activity/backend_status.c +++ b/src/backend/utils/activity/backend_status.c @@ -338,6 +338,9 @@ pgstat_bestart(void) lbeentry.st_xact_start_timestamp = 0; lbeentry.st_databaseid = MyDatabaseId; + lbeentry.st_tx_raw_bytes = lbeentry.st_rx_raw_bytes = + lbeentry.st_tx_compressed_bytes = lbeentry.st_rx_compressed_bytes = 0; + /* We have userid for client-backends, wal-sender and bgworker processes */ if (lbeentry.st_backendType == B_BACKEND || lbeentry.st_backendType == B_WAL_SENDER @@ -713,6 +716,33 @@ pgstat_report_xact_timestamp(TimestampTz tstamp) PGSTAT_END_WRITE_ACTIVITY(beentry); } +/* + * Report current transaction start timestamp as the specified value. + * Zero means there is no active transaction. + */ +void +pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes) +{ + volatile PgBackendStatus *beentry = MyBEEntry; + + if (!pgstat_track_activities || !beentry) + return; + + /* + * Update my status entry, following the protocol of bumping + * st_changecount before and after. We use a volatile pointer here to + * ensure the compiler doesn't try to get cute. + */ + PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); + + beentry->st_rx_raw_bytes += rx_raw_bytes; + beentry->st_tx_raw_bytes += tx_raw_bytes; + beentry->st_rx_compressed_bytes += rx_compressed_bytes; + beentry->st_tx_compressed_bytes += tx_compressed_bytes; + + PGSTAT_END_WRITE_ACTIVITY(beentry); +} + /* ---------- * pgstat_read_current_status() - * diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index f0e09eae4d..7b745721bd 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -570,7 +570,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) Datum pg_stat_get_activity(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_ACTIVITY_COLS 30 +#define PG_STAT_GET_ACTIVITY_COLS 34 int num_backends = pgstat_fetch_stat_numbackends(); int curr_backend; int pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0); @@ -919,6 +919,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS) nulls[29] = true; else values[29] = UInt64GetDatum(beentry->st_query_id); + values[30] = beentry->st_rx_raw_bytes; + values[31] = beentry->st_tx_raw_bytes; + values[32] = beentry->st_rx_compressed_bytes; + values[33] = beentry->st_tx_compressed_bytes; } else { @@ -947,6 +951,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS) nulls[27] = true; nulls[28] = true; nulls[29] = true; + nulls[30] = true; + nulls[31] = true; + nulls[32] = true; + nulls[33] = true; } tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -1146,6 +1154,46 @@ pg_stat_get_backend_start(PG_FUNCTION_ARGS) PG_RETURN_TIMESTAMPTZ(result); } +Datum +pg_stat_get_network_traffic(PG_FUNCTION_ARGS) +{ +#define PG_STAT_NETWORK_TRAFFIC_COLS 4 + TupleDesc tupdesc; + Datum values[PG_STAT_NETWORK_TRAFFIC_COLS]; + bool nulls[PG_STAT_NETWORK_TRAFFIC_COLS]; + int32 beid = PG_GETARG_INT32(0); + PgBackendStatus *beentry; + + if ((beentry = pgstat_fetch_stat_beentry(beid)) == NULL) + PG_RETURN_NULL(); + else if (!HAS_PGSTAT_PERMISSIONS(beentry->st_userid)) + PG_RETURN_NULL(); + + /* Initialise values and NULL flags arrays */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(PG_STAT_NETWORK_TRAFFIC_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "rx_raw_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "tx_raw_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "rx_compressed_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "tx_compressed_bytes", + INT8OID, -1, 0); + BlessTupleDesc(tupdesc); + + /* Fill values and NULLs */ + values[0] = beentry->st_rx_raw_bytes; + values[1] = beentry->st_tx_raw_bytes; + values[2] = beentry->st_rx_compressed_bytes; + values[3] = beentry->st_tx_compressed_bytes; + + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} Datum pg_stat_get_backend_client_addr(PG_FUNCTION_ARGS) diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index a2e0f8de7e..38e3a7c0ae 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -51,6 +51,7 @@ #include "commands/vacuum.h" #include "commands/variable.h" #include "common/string.h" +#include "common/zpq_stream.h" #include "funcapi.h" #include "jit/jit.h" #include "libpq/auth.h" @@ -190,6 +191,7 @@ static bool check_stage_log_stats(bool *newval, void **extra, GucSource source); static bool check_log_stats(bool *newval, void **extra, GucSource source); static bool check_canonical_path(char **newval, void **extra, GucSource source); static bool check_timezone_abbreviations(char **newval, void **extra, GucSource source); +static bool check_libpq_compression(char **newval, void **extra, GucSource source); static void assign_timezone_abbreviations(const char *newval, void *extra); static void pg_timezone_abbrev_initialize(void); static const char *show_archive_command(void); @@ -4539,6 +4541,16 @@ static struct config_string ConfigureNamesString[] = NULL, NULL, NULL }, + { + {"libpq_compression", PGC_SIGHUP, CLIENT_CONN_OTHER, + gettext_noop("Sets the list of allowed libpq compression algorithms."), + NULL + }, + &libpq_compress_algorithms, + "on", + check_libpq_compression, NULL, NULL + }, + { {"application_name", PGC_USERSET, LOGGING_WHAT, gettext_noop("Sets the application name to be reported in statistics and logs."), @@ -4970,6 +4982,8 @@ static struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL @@ -11796,6 +11810,22 @@ check_ssl(bool *newval, void **extra, GucSource source) return true; } +static bool +check_libpq_compression(char **newval, void **extra, GucSource source) +{ + zpq_compressor *compressors; + size_t n_compressors; + + if (!zpq_parse_compression_setting(*newval, &compressors, &n_compressors)) + { + GUC_check_errdetail("Cannot parse the libpq_compression setting."); + return false; + } + + free(compressors); + return true; +} + static bool check_stage_log_stats(bool *newval, void **extra, GucSource source) { diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 364b5a2e47..3038a71850 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -6671,6 +6671,9 @@ threadRun(void *arg) int nsocks; /* number of sockets to be waited for */ pg_time_usec_t min_usec; pg_time_usec_t now = 0; /* set this only if needed */ + bool buffered_rx = false; /* true if some of the clients has + * data left in SSL/ZPQ read + * buffers */ /* * identify which client sockets should be checked for input, and @@ -6705,6 +6708,9 @@ threadRun(void *arg) */ int sock = PQsocket(st->con); + /* check if conn has buffered SSL / ZPQ read data */ + buffered_rx = buffered_rx || PQreadPending(st->con); + if (sock < 0) { pg_log_error("invalid socket: %s", PQerrorMessage(st->con)); @@ -6748,7 +6754,7 @@ threadRun(void *arg) { if (nsocks > 0) { - rc = wait_on_socket_set(sockets, min_usec); + rc = buffered_rx ? 1 : wait_on_socket_set(sockets, min_usec); } else /* nothing active, simple sleep */ { @@ -6757,7 +6763,7 @@ threadRun(void *arg) } else /* no explicit delay, wait without timeout */ { - rc = wait_on_socket_set(sockets, 0); + rc = buffered_rx ? 1 : wait_on_socket_set(sockets, 0); } if (rc < 0) @@ -6796,8 +6802,11 @@ threadRun(void *arg) pg_log_error("invalid socket: %s", PQerrorMessage(st->con)); goto done; } - - if (!socket_has_input(sockets, sock, nsocks++)) + if (PQreadPending(st->con)) + { + nsocks++; + } + else if (!socket_has_input(sockets, sock, nsocks++)) continue; } else if (st->state == CSTATE_FINISHED || diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c index d704c4220c..f704b48224 100644 --- a/src/bin/psql/command.c +++ b/src/bin/psql/command.c @@ -169,6 +169,7 @@ static void print_with_linenumbers(FILE *output, char *lines, const char *header_keyword); static void minimal_error_message(PGresult *res); +static void printCompressionInfo(void); static void printSSLInfo(void); static void printGSSInfo(void); static bool printPsetInfo(const char *param, printQueryOpt *popt); @@ -629,6 +630,7 @@ exec_command_conninfo(PsqlScanState scan_state, bool active_branch) printf(_("You are connected to database \"%s\" as user \"%s\" on host \"%s\" at port \"%s\".\n"), db, PQuser(pset.db), host, PQport(pset.db)); } + printCompressionInfo(); printSSLInfo(); printGSSInfo(); } @@ -3582,6 +3584,27 @@ connection_warnings(bool in_startup) } } +/* + * printCompressionInfo + * + * Print information about used compressor/decompressor + */ +static void +printCompressionInfo(void) +{ + char *compressor = PQcompressor(pset.db); + char *decompressor = PQdecompressor(pset.db); + + if (compressor != NULL) + { + printf(_("Compressor %s\n"), compressor); + } + + if (decompressor != NULL) + { + printf(_("Decompressor %s\n"), decompressor); + } +} /* * printSSLInfo diff --git a/src/common/Makefile b/src/common/Makefile index abc5a72325..c2720d4191 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -79,7 +79,8 @@ OBJS_COMMON = \ username.o \ wait_error.o \ wchar.o \ - z_stream.o + z_stream.o \ + zpq_stream.o ifeq ($(with_ssl),openssl) OBJS_COMMON += \ diff --git a/src/common/z_stream.c b/src/common/z_stream.c index 0f00b0b7fe..0392e745d7 100644 --- a/src/common/z_stream.c +++ b/src/common/z_stream.c @@ -84,14 +84,9 @@ typedef struct struct ZStream { - ZAlgorithm const *c_algorithm; - void *c_stream; - - ZAlgorithm const *d_algorithm; - void *d_stream; - - bool rx_not_flushed; - bool tx_not_flushed; + ZAlgorithm const *algorithm; + void *stream; + bool not_flushed; }; #if HAVE_LIBZSTD @@ -107,25 +102,32 @@ struct ZStream #define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */ -typedef struct ZPQ_ZSTD_CStream +typedef struct ZS_ZSTD_CStream { ZSTD_CStream *stream; char const *error; /* error message */ -} ZPQ_ZSTD_CStream; +} ZS_ZSTD_CStream; -typedef struct ZPQ_ZSTD_DStream +typedef struct ZS_ZSTD_DStream { ZSTD_DStream *stream; char const *error; /* error message */ -} ZPQ_ZSTD_DStream; +} ZS_ZSTD_DStream; static void * zstd_create_compressor(int level) { - ZPQ_ZSTD_CStream *c_stream = (ZPQ_ZSTD_CStream *) malloc(sizeof(ZPQ_ZSTD_CStream)); + size_t rc; + ZS_ZSTD_CStream *c_stream = (ZS_ZSTD_CStream *) malloc(sizeof(ZS_ZSTD_CStream)); c_stream->stream = ZSTD_createCStream(); - ZSTD_initCStream(c_stream->stream, level); + rc = ZSTD_initCStream(c_stream->stream, level); + if (ZSTD_isError(rc)) + { + ZSTD_freeCStream(c_stream->stream); + free(c_stream); + return NULL; + } #if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3 ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT); #endif @@ -136,10 +138,17 @@ zstd_create_compressor(int level) static void * zstd_create_decompressor() { - ZPQ_ZSTD_DStream *d_stream = (ZPQ_ZSTD_DStream *) malloc(sizeof(ZPQ_ZSTD_DStream)); + size_t rc; + ZS_ZSTD_DStream *d_stream = (ZS_ZSTD_DStream *) malloc(sizeof(ZS_ZSTD_DStream)); d_stream->stream = ZSTD_createDStream(); - ZSTD_initDStream(d_stream->stream); + rc = ZSTD_initDStream(d_stream->stream); + if (ZSTD_isError(rc)) + { + ZSTD_freeDStream(d_stream->stream); + free(d_stream); + return NULL; + } #if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3 ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT); #endif @@ -150,7 +159,7 @@ zstd_create_decompressor() static ssize_t zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + ZS_ZSTD_DStream *ds = (ZS_ZSTD_DStream *) d_stream; ZSTD_inBuffer in; ZSTD_outBuffer out; size_t rc; @@ -192,7 +201,7 @@ zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_pr static ssize_t zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream; ZSTD_inBuffer in; ZSTD_outBuffer out; @@ -236,7 +245,7 @@ static ssize_t zstd_end(void *c_stream, void *dst, size_t dst_size, size_t *dst_processed) { size_t tx_not_flushed; - ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream; ZSTD_outBuffer output; output.dst = dst; @@ -260,7 +269,7 @@ zstd_end(void *c_stream, void *dst, size_t dst_size, size_t *dst_processed) static void zstd_free_compressor(void *c_stream) { - ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream; if (cs != NULL) { @@ -272,7 +281,7 @@ zstd_free_compressor(void *c_stream) static void zstd_free_decompressor(void *d_stream) { - ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + ZS_ZSTD_DStream *ds = (ZS_ZSTD_DStream *) d_stream; if (ds != NULL) { @@ -284,7 +293,7 @@ zstd_free_decompressor(void *d_stream) static char const * zstd_compress_error(void *c_stream) { - ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream; return cs->error; } @@ -292,7 +301,7 @@ zstd_compress_error(void *c_stream) static char const * zstd_decompress_error(void *d_stream) { - ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + ZS_ZSTD_DStream *ds = (ZS_ZSTD_DStream *) d_stream; return ds->error; } @@ -469,7 +478,7 @@ no_compression_name(void) /* * Array with all supported compression algorithms. */ -static ZAlgorithm const zpq_algorithms[] = +static ZAlgorithm const zs_algorithms[] = { #if HAVE_LIBZSTD {zstd_name, zstd_create_compressor, zstd_create_decompressor, zstd_decompress, zstd_compress, zstd_free_compressor, zstd_free_decompressor, zstd_compress_error, zstd_decompress_error, zstd_end}, @@ -480,12 +489,22 @@ static ZAlgorithm const zpq_algorithms[] = {no_compression_name} }; +inline bool +zs_is_valid_impl_id(unsigned int id) +{ + return id >= 0 && id < (sizeof(zs_algorithms) / sizeof(*zs_algorithms)); +} + static ssize_t -zpq_init_compressor(ZStream * zs, int c_alg_impl, int c_level) +zs_init_compressor(ZStream * zs, unsigned int c_alg_impl, int c_level) { - zs->c_algorithm = &zpq_algorithms[c_alg_impl]; - zs->c_stream = zpq_algorithms[c_alg_impl].create_compressor(c_level); - if (zs->c_stream == NULL) + if (!zs_is_valid_impl_id(c_alg_impl)) + { + return -1; + } + zs->algorithm = &zs_algorithms[c_alg_impl]; + zs->stream = zs->algorithm->create_compressor(c_level); + if (zs->stream == NULL) { return -1; } @@ -493,11 +512,15 @@ zpq_init_compressor(ZStream * zs, int c_alg_impl, int c_level) } static ssize_t -zpq_init_decompressor(ZStream * zs, int d_alg_impl) +zs_init_decompressor(ZStream * zs, unsigned int d_alg_impl) { - zs->d_algorithm = &zpq_algorithms[d_alg_impl]; - zs->d_stream = zpq_algorithms[d_alg_impl].create_decompressor(); - if (zs->d_stream == NULL) + if (!zs_is_valid_impl_id(d_alg_impl)) + { + return -1; + } + zs->algorithm = &zs_algorithms[d_alg_impl]; + zs->stream = zs->algorithm->create_decompressor(); + if (zs->stream == NULL) { return -1; } @@ -505,17 +528,32 @@ zpq_init_decompressor(ZStream * zs, int d_alg_impl) } /* - * Index of used compression algorithm in zpq_algorithms array. + * Index of used compression algorithm in zs_algorithms array. */ ZStream * -zs_create(int c_alg_impl, int c_level, int d_alg_impl) +zs_create_compressor(unsigned int c_alg_impl, int c_level) { ZStream *zs = (ZStream *) malloc(sizeof(ZStream)); - zs->tx_not_flushed = false; - zs->rx_not_flushed = false; + zs->not_flushed = false; - if (zpq_init_compressor(zs, c_alg_impl, c_level) || zpq_init_decompressor(zs, d_alg_impl)) + if (zs_init_compressor(zs, c_alg_impl, c_level)) + { + free(zs); + return NULL; + } + + return zs; +} + +ZStream * +zs_create_decompressor(unsigned int d_alg_impl) +{ + ZStream *zs = (ZStream *) malloc(sizeof(ZStream)); + + zs->not_flushed = false; + + if (zs_init_decompressor(zs, d_alg_impl)) { free(zs); return NULL; @@ -528,17 +566,18 @@ ssize_t zs_read(ZStream * zs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { ssize_t rc; + *src_processed = 0; *dst_processed = 0; - rc = zs->d_algorithm->decompress(zs->d_stream, - src, src_size, src_processed, - dst, dst_size, dst_processed); + rc = zs->algorithm->decompress(zs->stream, + src, src_size, src_processed, + dst, dst_size, dst_processed); - zs->rx_not_flushed = false; + zs->not_flushed = false; if (rc == ZS_DATA_PENDING) { - zs->rx_not_flushed = true; + zs->not_flushed = true; return ZS_OK; } @@ -554,17 +593,18 @@ ssize_t zs_write(ZStream * zs, void const *buf, size_t size, size_t *processed, void *dst, size_t dst_size, size_t *dst_processed) { ssize_t rc; + *processed = 0; *dst_processed = 0; - rc = zs->c_algorithm->compress(zs->c_stream, - buf, size, processed, - dst, dst_size, dst_processed); + rc = zs->algorithm->compress(zs->stream, + buf, size, processed, + dst, dst_size, dst_processed); - zs->tx_not_flushed = false; + zs->not_flushed = false; if (rc == ZS_DATA_PENDING) { - zs->tx_not_flushed = true; + zs->not_flushed = true; return ZS_OK; } if (rc != ZS_OK) @@ -576,34 +616,50 @@ zs_write(ZStream * zs, void const *buf, size_t size, size_t *processed, void *ds } void -zs_free(ZStream * zs) +zs_compressor_free(ZStream * zs) { - if (zs) + if (zs == NULL) { - if (zs->c_stream) - { - zs->c_algorithm->free_compressor(zs->c_stream); - } - if (zs->d_stream) - { - zs->d_algorithm->free_decompressor(zs->d_stream); - } - free(zs); + return; + } + + if (zs->stream) + { + zs->algorithm->free_compressor(zs->stream); } + + free(zs); +} + +void +zs_decompressor_free(ZStream * zs) +{ + if (zs == NULL) + { + return; + } + + if (zs->stream) + { + zs->algorithm->free_decompressor(zs->stream); + } + + free(zs); } ssize_t -zs_end(ZStream * zs, void *dst, size_t dst_size, size_t *dst_processed) +zs_end_compression(ZStream * zs, void *dst, size_t dst_size, size_t *dst_processed) { ssize_t rc; + *dst_processed = 0; - rc = zs->c_algorithm->end_compression(zs->c_stream, dst, dst_size, dst_processed); + rc = zs->algorithm->end_compression(zs->stream, dst, dst_size, dst_processed); - zs->tx_not_flushed = false; + zs->not_flushed = false; if (rc == ZS_DATA_PENDING) { - zs->tx_not_flushed = true; + zs->not_flushed = true; return ZS_OK; } if (rc != ZS_OK) @@ -617,26 +673,21 @@ zs_end(ZStream * zs, void *dst, size_t dst_size, size_t *dst_processed) char const * zs_compress_error(ZStream * zs) { - return zs->c_algorithm->compress_error(zs->c_stream); + return zs->algorithm->compress_error(zs->stream); } char const * zs_decompress_error(ZStream * zs) { - return zs->d_algorithm->decompress_error(zs->d_stream); + return zs->algorithm->decompress_error(zs->stream); } bool -zs_buffered_rx(ZStream * zs) +zs_buffered(ZStream * zs) { - return zs ? zs->rx_not_flushed : 0; + return zs ? zs->not_flushed : 0; } -bool -zs_buffered_tx(ZStream * zs) -{ - return zs ? zs->tx_not_flushed : 0; -} /* * Get list of the supported algorithms. @@ -644,12 +695,12 @@ zs_buffered_tx(ZStream * zs) char ** zs_get_supported_algorithms(void) { - size_t n_algorithms = sizeof(zpq_algorithms) / sizeof(*zpq_algorithms); + size_t n_algorithms = sizeof(zs_algorithms) / sizeof(*zs_algorithms); char **algorithm_names = malloc(n_algorithms * sizeof(char *)); for (size_t i = 0; i < n_algorithms; i++) { - algorithm_names[i] = (char *) zpq_algorithms[i].name(); + algorithm_names[i] = (char *) zs_algorithms[i].name(); } return algorithm_names; @@ -658,11 +709,11 @@ zs_get_supported_algorithms(void) char const * zs_compress_algorithm_name(ZStream * zs) { - return zs ? zs->c_algorithm->name() : NULL; + return zs ? zs->algorithm->name() : NULL; } char const * zs_decompress_algorithm_name(ZStream * zs) { - return zs ? zs->d_algorithm->name() : NULL; + return zs ? zs->algorithm->name() : NULL; } diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c new file mode 100644 index 0000000000..247ac67322 --- /dev/null +++ b/src/common/zpq_stream.c @@ -0,0 +1,996 @@ +#ifndef FRONTEND +#include "postgres.h" +#else +#include "postgres_fe.h" +#endif +#include +#include + +#include "common/zpq_stream.h" +#include "pg_config.h" +#include "port/pg_bswap.h" + +/* log warnings on backend */ +#ifndef FRONTEND +#define pg_log_warning(...) elog(WARNING, __VA_ARGS__) +#else +#define pg_log_warning(...) (void)0 +#endif + +/* ZpqBuffer size, in bytes */ +#define ZPQ_BUFFER_SIZE 819200 +/* CompressedData msg_type */ +#define ZPQ_COMPRESSED_MSG_TYPE 'm' +/* SetCompressionMethod msg_type */ +#define ZPQ_SET_COMPRESSION_MSG_TYPE 'k' + +#define ZPQ_COMPRESS_THRESHOLD 60 + +typedef struct ZpqBuffer ZpqBuffer; + + +/* ZpqBuffer used as RX/TX buffer in ZpqStream */ +struct ZpqBuffer +{ + char buf[ZPQ_BUFFER_SIZE]; + size_t size; /* current size of buf */ + size_t pos; /* current position in buf, in range [0, size] */ +}; + +static inline void +zpq_buf_init(ZpqBuffer * zb) +{ + zb->size = 0; + zb->pos = 0; +} + +static inline size_t +zpq_buf_left(ZpqBuffer * zb) +{ + Assert(zb->buf); + return ZPQ_BUFFER_SIZE - zb->size; +} + +static inline size_t +zpq_buf_unread(ZpqBuffer * zb) +{ + return zb->size - zb->pos; +} + +static inline char * +zpq_buf_size(ZpqBuffer * zb) +{ + return (char *) (zb->buf) + zb->size; +} + +static inline char * +zpq_buf_pos(ZpqBuffer * zb) +{ + return (char *) (zb->buf) + zb->pos; +} + +static inline void +zpq_buf_size_advance(ZpqBuffer * zb, size_t value) +{ + zb->size += value; +} + +static inline void +zpq_buf_pos_advance(ZpqBuffer * zb, size_t value) +{ + zb->pos += value; +} + +static inline void +zpq_buf_reuse(ZpqBuffer * zb) +{ + size_t unread = zpq_buf_unread(zb); + + if (unread > 5) /* can read message header, don't do anything */ + return; + if (unread == 0) + { + zb->size = 0; + zb->pos = 0; + return; + } + memmove(zb->buf, zb->buf + zb->pos, unread); + zb->size = unread; + zb->pos = 0; +} + +struct ZpqStream +{ + ZStream *c_stream; /* underlying compression stream */ + ZStream *d_stream; /* underlying decompression stream */ + + size_t tx_total; /* amount of bytes sent to tx_func */ + + size_t tx_total_raw; /* amount of bytes received by zpq_write */ + size_t rx_total; /* amount of bytes read by rx_func */ + size_t rx_total_raw; /* amount of bytes returned by zpq_write */ + bool is_compressing; /* current compression state */ + + bool is_decompressing; /* current decompression state */ + size_t rx_msg_bytes_left; /* number of bytes left to process without + * changing the decompression state */ + size_t tx_msg_bytes_left; /* number of bytes left to process without + * changing the compression state */ + + ZpqBuffer rx_in; /* buffer for unprocessed data read by rx_func */ + ZpqBuffer tx_in; /* buffer for unprocessed data consumed by + * zpq_write */ + ZpqBuffer tx_out; /* buffer for processed data waiting for send + * via tx_func */ + + zpq_rx_func rx_func; + zpq_tx_func tx_func; + void *arg; + + zpq_compressor *compressors; /* compressors array holds the available + * compressors to use for + * compression/decompression */ + size_t n_compressors; /* size of the compressors array */ + int compress_alg_idx; /* index of the active compression + * algorithm */ + int decompress_alg_idx; /* index of the active decompression + * algorithm */ + int compressor_by_msg_type[256]; /* map to choose a compressor + * by the protocol message + * type */ + + bool reading_set_compression; /* utility marker indicating + * partial SetCompressionMethod + * read */ +}; + +/* + * Message compression map defines the logic for choosing the compressor + * based on the protocol message type. Currently, it is a basic prototype do demonstrate + * the capabilities of the on-the-fly compression switch. + */ +static inline void +zpq_build_msg_compression_map(ZpqStream * zpq) +{ + int i; + char **supported_algorithms = zs_get_supported_algorithms(); + + for (i = 0; i < 256; i++) + { + zpq->compressor_by_msg_type[i] = -1; + } + + for (i = 0; i < zpq->n_compressors; i++) + { + if (strcasecmp(supported_algorithms[zpq->compressors[i].impl], "zstd") == 0) + { + /* compress CopyData and DataRow messages with ZSTD */ + zpq->compressor_by_msg_type['d'] = i; + zpq->compressor_by_msg_type['D'] = i; + } + else if (strcasecmp(supported_algorithms[zpq->compressors[i].impl], "zlib") == 0) + { + /* compress Query messages with ZLIB (just an example) */ + zpq->compressor_by_msg_type['Q'] = i; + } + } +} + +/* + * Choose the index of compressor to use for the message of msg_type with msg_len. + * Return values: + * - the non-negative index of zpq->compressors array + * - -1, if message should not be compressed + */ +static inline int +zpq_choose_compressor(ZpqStream * zpq, char msg_type, uint32 msg_len) +{ + if (msg_len >= ZPQ_COMPRESS_THRESHOLD) + { + return zpq->compressor_by_msg_type[(unsigned char) msg_type]; + } + return -1; +} + +/* + * Check if should compress message of msg_type with msg_len. + * Return true if should, false if should not. + */ +static inline bool +zpq_should_compress(ZpqStream * zpq, char msg_type, uint32 msg_len) +{ + return zpq_choose_compressor(zpq, msg_type, msg_len) == -1; +} + +/* + * Check if message is a CompressedData. + * Return true if it is, otherwise false. + * */ +static inline bool +zpq_is_compressed_msg(char msg_type) +{ + return msg_type == ZPQ_COMPRESSED_MSG_TYPE; +} + +/* + * Check if message is a SetCompressionMethod. + * Return true if it is, otherwise false. + * */ +static inline bool +zpq_is_set_compression_msg(char msg_type) +{ + return msg_type == ZPQ_SET_COMPRESSION_MSG_TYPE; +} + +ZpqStream * +zpq_create(zpq_compressor * compressors, size_t n_compressors, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size) +{ + ZpqStream *zpq; + + /* zpqStream needs at least one compressor */ + if (n_compressors == 0 || compressors == NULL) + { + return NULL; + } + zpq = (ZpqStream *) malloc(sizeof(ZpqStream)); + + zpq->compressors = compressors; + zpq->n_compressors = n_compressors; + zpq->compress_alg_idx = -1; + zpq->decompress_alg_idx = -1; + + zpq->is_compressing = false; + zpq->is_decompressing = false; + zpq->rx_msg_bytes_left = 0; + zpq->tx_msg_bytes_left = 0; + zpq_buf_init(&zpq->tx_in); + + zpq->tx_total = 0; + zpq->tx_total_raw = 0; + zpq->rx_total = 0; + zpq->rx_total_raw = 0; + + zpq_buf_init(&zpq->rx_in); + zpq_buf_size_advance(&zpq->rx_in, rx_data_size); + Assert(rx_data_size < ZPQ_BUFFER_SIZE); + memcpy(zpq->rx_in.buf, rx_data, rx_data_size); + + zpq_buf_init(&zpq->tx_out); + + zpq->rx_func = rx_func; + zpq->tx_func = tx_func; + zpq->arg = arg; + zpq->reading_set_compression = false; + + zpq_build_msg_compression_map(zpq); + + return zpq; +} + +/* Compress up to src_size bytes from *src into CompressedData and write it to the tx buffer. + * Returns ZS_OK on success, ZS_COMPRESS_ERROR if encountered a compression error. */ +static inline ssize_t +zpq_write_compressed_message(ZpqStream * zpq, char const *src, size_t src_size, size_t *src_processed) +{ + size_t compressed_len; + ssize_t rc; + uint32 size; + + /* check if have enough space */ + if (zpq_buf_left(&zpq->tx_out) <= 5) + { + /* too little space for CompressedData, abort */ + *src_processed = 0; + return ZS_OK; + } + + compressed_len = 0; + rc = zs_write(zpq->c_stream, src, src_size, src_processed, + zpq_buf_size(&zpq->tx_out) + 5, zpq_buf_left(&zpq->tx_out) - 5, &compressed_len); + + if (compressed_len > 0) + { + /* write CompressedData type */ + *zpq_buf_size(&zpq->tx_out) = ZPQ_COMPRESSED_MSG_TYPE; + size = pg_hton32(compressed_len + 4); + + memcpy(zpq_buf_size(&zpq->tx_out) + 1, &size, sizeof(uint32)); /* write msg length */ + compressed_len += 5; /* append header length to compressed data + * length */ + } + + zpq->tx_total_raw += *src_processed; + zpq->tx_total += compressed_len; + zpq_buf_size_advance(&zpq->tx_out, compressed_len); + return rc; +} + +/* Copy the data directly from *src to the tx buffer */ +static void +zpq_write_uncompressed(ZpqStream * zpq, char const *src, size_t src_size, size_t *src_processed) +{ + src_size = Min(zpq_buf_left(&zpq->tx_out), src_size); + memcpy(zpq_buf_size(&zpq->tx_out), src, src_size); + + zpq->tx_total_raw += src_size; + zpq->tx_total += src_size; + zpq_buf_size_advance(&zpq->tx_out, src_size); + *src_processed = src_size; +} + +static ssize_t +zpq_write_set_compression_msg(ZpqStream * zpq, int new_compress_idx) +{ + uint32 len; + uint8 idx; + + /* + * check if have enough space: msg_type(1 byte) + msg_len(4 bytes) + + * compress_alg_idx(1 byte) + */ + if (zpq_buf_left(&zpq->tx_out) < 6) + { + return -1; + } + + /* write CompressedData type */ + *zpq_buf_size(&zpq->tx_out) = ZPQ_SET_COMPRESSION_MSG_TYPE; + len = pg_hton32(5); + memcpy(zpq_buf_size(&zpq->tx_out) + 1, &len, sizeof(uint32)); /* write msg length */ + + /* currently we expect idx to be in range [0, 255] */ + Assert(new_compress_idx >= 0 && new_compress_idx <= UINT8_MAX); + idx = (uint8) new_compress_idx; + memcpy(zpq_buf_size(&zpq->tx_out) + 5, &idx, sizeof(uint8)); /* write + * new_compress_idx */ + + zpq->tx_total_raw += 6; + zpq->tx_total += 6; + zpq_buf_size_advance(&zpq->tx_out, 6); + return 0; +} + +/* Determine if should compress the next message and change the current compression state */ +static ssize_t +zpq_toggle_compression(ZpqStream * zpq, char msg_type, uint32 msg_len) +{ + int new_compress_idx = zpq_choose_compressor(zpq, msg_type, msg_len); + bool should_compress = new_compress_idx != -1; + + /* + * negative new_compress_idx indicates that we should not compress this + * message + */ + if (should_compress) + { + /* + * if the new compressor does not match the current one, process the + * switch + */ + if (zpq->compress_alg_idx != new_compress_idx) + { + if (zpq_write_set_compression_msg(zpq, new_compress_idx)) + { + /* + * come back later when we can write the entire + * SetCompressionMethod message + */ + return 0; + } + + zs_compressor_free(zpq->c_stream); + zpq->c_stream = zs_create_compressor(zpq->compressors[new_compress_idx].impl, zpq->compressors[new_compress_idx].level); + if (zpq->c_stream == NULL) + { + return ZPQ_FATAL_ERROR; + } + zpq->compress_alg_idx = new_compress_idx; + } + } + + zpq->is_compressing = should_compress; + zpq->tx_msg_bytes_left = msg_len + 1; + return 0; +} + +/* + * Internal write function. Reads the data from *src buffer, + * determines the postgres messages type and length. + * If message matches the compression criteria, it wraps the message into + * CompressedData. Otherwise, leaves the message unchanged. + * If *src data ends with incomplete message header, this function is not + * going to read this message header. + * Returns number of written raw bytes or error code. + * In the last case number of bytes written is stored in *processed. + */ +static ssize_t +zpq_write_internal(ZpqStream * zpq, void const *src, size_t src_size, size_t *processed) +{ + size_t src_pos = 0; + ssize_t rc; + + do + { + /* + * try to read ahead the next message types and increase + * tx_msg_bytes_left, if possible + */ + while (zpq->tx_msg_bytes_left > 0 && src_size - src_pos >= zpq->tx_msg_bytes_left + 5) + { + char msg_type = *((char *) src + src_pos + zpq->tx_msg_bytes_left); + uint32 msg_len; + + memcpy(&msg_len, (char *) src + src_pos + zpq->tx_msg_bytes_left + 1, 4); + msg_len = pg_ntoh32(msg_len); + if (zpq_should_compress(zpq, msg_type, msg_len) != zpq->is_compressing) + { + /* + * cannot proceed further, encountered compression toggle + * point + */ + break; + } + zpq->tx_msg_bytes_left += msg_len + 1; + } + + /* + * Write CompressedData if currently is compressing or have some + * buffered data left in underlying compression stream + */ + if (zs_buffered(zpq->c_stream) || (zpq->is_compressing && zpq->tx_msg_bytes_left > 0)) + { + size_t buf_processed = 0; + size_t to_compress = Min(zpq->tx_msg_bytes_left, src_size - src_pos); + + rc = zpq_write_compressed_message(zpq, (char *) src + src_pos, to_compress, &buf_processed); + src_pos += buf_processed; + zpq->tx_msg_bytes_left -= buf_processed; + + if (rc != ZS_OK) + { + *processed = src_pos; + return rc; + } + } + + /* + * If not going to compress the data from *src, just write it + * uncompressed. + */ + else if (zpq->tx_msg_bytes_left > 0) + { /* determine next message type */ + size_t copy_len = Min(src_size - src_pos, zpq->tx_msg_bytes_left); + size_t copy_processed = 0; + + zpq_write_uncompressed(zpq, (char *) src + src_pos, copy_len, ©_processed); + src_pos += copy_processed; + zpq->tx_msg_bytes_left -= copy_processed; + } + + /* + * Reached the compression toggle point, fetch next message header to + * determine compression state. + */ + else + { + char msg_type; + uint32 msg_len; + + if (src_size - src_pos < 5) + { + /* + * must return here because we can't continue without full + * message header + */ + *processed = src_pos; + return ZPQ_INCOMPLETE_HEADER; + } + + msg_type = *((char *) src + src_pos); + memcpy(&msg_len, (char *) src + src_pos + 1, 4); + msg_len = pg_ntoh32(msg_len); + rc = zpq_toggle_compression(zpq, msg_type, msg_len); + if (rc) + { + return rc; + } + } + + /* + * repeat sending while there is some data in input or internal + * compression buffer + */ + } while (src_pos < src_size && zpq_buf_left(&zpq->tx_out) > 6); + + return src_pos; +} + +ssize_t +zpq_write(ZpqStream * zpq, void const *src, size_t src_size, size_t *src_processed) +{ + size_t src_pos = 0; + ssize_t rc; + + /* try to process as much data as possible before calling the tx_func */ + while (zpq_buf_left(&zpq->tx_out) > 6) + { + size_t copy_len = Min(zpq_buf_left(&zpq->tx_in), src_size - src_pos); + size_t processed; + + memcpy(zpq_buf_size(&zpq->tx_in), (char *) src + src_pos, copy_len); + zpq_buf_size_advance(&zpq->tx_in, copy_len); + src_pos += copy_len; + + if (zpq_buf_unread(&zpq->tx_in) == 0 && !zs_buffered(zpq->c_stream)) + { + break; + } + + processed = 0; + + rc = zpq_write_internal(zpq, zpq_buf_pos(&zpq->tx_in), zpq_buf_unread(&zpq->tx_in), &processed); + if (rc > 0) + { + zpq_buf_pos_advance(&zpq->tx_in, rc); + zpq_buf_reuse(&zpq->tx_in); + } + else + { + zpq_buf_pos_advance(&zpq->tx_in, processed); + zpq_buf_reuse(&zpq->tx_in); + if (rc == ZPQ_INCOMPLETE_HEADER) + { + break; + } + *src_processed = src_pos; + return rc; + } + } + + /* + * call the tx_func if have any bytes to send + */ + while (zpq_buf_unread(&zpq->tx_out)) + { + rc = zpq->tx_func(zpq->arg, zpq_buf_pos(&zpq->tx_out), zpq_buf_unread(&zpq->tx_out)); + if (rc > 0) + { + zpq_buf_pos_advance(&zpq->tx_out, rc); + } + else + { + *src_processed = src_pos; + zpq_buf_reuse(&zpq->tx_out); + return rc; + } + } + + zpq_buf_reuse(&zpq->tx_out); + return src_pos; +} + +/* Decompress bytes from RX buffer and write up to dst_len of uncompressed data to *dst. + * Returns: + * ZS_OK on success, + * ZS_STREAM_END if reached end of compressed chunk + * ZS_DECOMPRESS_ERROR if encountered a decompression error */ +static inline ssize_t +zpq_read_compressed_message(ZpqStream * zpq, char *dst, size_t dst_len, size_t *dst_processed) +{ + size_t rx_processed = 0; + ssize_t rc; + size_t read_len = Min(zpq->rx_msg_bytes_left, zpq_buf_unread(&zpq->rx_in)); + + rc = zs_read(zpq->d_stream, zpq_buf_pos(&zpq->rx_in), read_len, &rx_processed, + dst, dst_len, dst_processed); + + zpq_buf_pos_advance(&zpq->rx_in, rx_processed); + zpq->rx_total_raw += *dst_processed; + zpq->rx_msg_bytes_left -= rx_processed; + return rc; +} + +/* Copy up to dst_len bytes from rx buffer to *dst. + * Returns amount of bytes copied. */ +static inline size_t +zpq_read_uncompressed(ZpqStream * zpq, char *dst, size_t dst_len) +{ + size_t copy_len; + + Assert(zpq_buf_unread(&zpq->rx_in) > 0); + copy_len = Min(zpq->rx_msg_bytes_left, Min(zpq_buf_unread(&zpq->rx_in), dst_len)); + + memcpy(dst, zpq_buf_pos(&zpq->rx_in), copy_len); + + zpq_buf_pos_advance(&zpq->rx_in, copy_len); + zpq->rx_total_raw += copy_len; + zpq->rx_msg_bytes_left -= copy_len; + return copy_len; +} + +/* Determine if should decompress the next message and + * change the current decompression state */ +static inline void +zpq_toggle_decompression(ZpqStream * zpq) +{ + uint32 msg_len; + char msg_type = *zpq_buf_pos(&zpq->rx_in); + + memcpy(&msg_len, zpq_buf_pos(&zpq->rx_in) + 1, 4); + msg_len = pg_ntoh32(msg_len); + + if (zpq_is_set_compression_msg(msg_type)) + { + Assert(msg_len == 5); + zpq->reading_set_compression = true; + /* set compression message header is no longer needed, just skip it */ + zpq_buf_pos_advance(&zpq->rx_in, 5); + } + else + { + zpq->is_decompressing = zpq_is_compressed_msg(msg_type); + zpq->rx_msg_bytes_left = msg_len + 1; + + if (zpq->is_decompressing) + { + /* compressed message header is no longer needed, just skip it */ + zpq_buf_pos_advance(&zpq->rx_in, 5); + zpq->rx_msg_bytes_left -= 5; + } + } +} + +static inline ssize_t +zpq_process_switch(ZpqStream * zpq) +{ + uint8 algorithm_idx; + + if (zpq_buf_unread(&zpq->rx_in) < 1) + { + return 0; + } + + algorithm_idx = *zpq_buf_pos(&zpq->rx_in); + + zpq_buf_pos_advance(&zpq->rx_in, 1); + zpq->reading_set_compression = false; + + if (algorithm_idx != zpq->decompress_alg_idx) + { + zs_decompressor_free(zpq->d_stream); + zpq->d_stream = zs_create_decompressor(zpq->compressors[algorithm_idx].impl); + if (zpq->d_stream == NULL) + { + return ZPQ_FATAL_ERROR; + } + zpq->decompress_alg_idx = algorithm_idx; + } + + return 0; +} + +ssize_t +zpq_read(ZpqStream * zpq, void *dst, size_t dst_size) +{ + size_t dst_pos = 0; + size_t dst_processed = 0; + ssize_t rc; + + /* Read until some data fetched */ + while (dst_pos == 0) + { + zpq_buf_reuse(&zpq->rx_in); + + if (!zpq_buffered_rx(zpq)) + { + rc = zpq->rx_func(zpq->arg, zpq_buf_size(&zpq->rx_in), zpq_buf_left(&zpq->rx_in)); + if (rc > 0) /* read fetches some data */ + { + zpq->rx_total += rc; + zpq_buf_size_advance(&zpq->rx_in, rc); + } + else /* read failed */ + { + return rc; + } + } + + /* + * try to read ahead the next message types and increase + * rx_msg_bytes_left, if possible + */ + while (zpq->rx_msg_bytes_left > 0 && (zpq_buf_unread(&zpq->rx_in) >= zpq->rx_msg_bytes_left + 5)) + { + char msg_type; + uint32 msg_len; + + msg_type = *(zpq_buf_pos(&zpq->rx_in) + zpq->rx_msg_bytes_left); + if (zpq->is_decompressing || zpq_is_compressed_msg(msg_type) || zpq_is_set_compression_msg(msg_type)) + { + /* + * cannot proceed further, encountered compression toggle + * point + */ + break; + } + + memcpy(&msg_len, zpq_buf_pos(&zpq->rx_in) + zpq->rx_msg_bytes_left + 1, 4); + zpq->rx_msg_bytes_left += pg_ntoh32(msg_len) + 1; + } + + + if (zpq->rx_msg_bytes_left > 0 || zs_buffered(zpq->d_stream)) + { + dst_processed = 0; + if (zpq->is_decompressing || zs_buffered(zpq->d_stream)) + { + rc = zpq_read_compressed_message(zpq, dst, dst_size - dst_pos, &dst_processed); + dst_pos += dst_processed; + if (rc == ZS_STREAM_END) + { + continue; + } + if (rc != ZS_OK) + { + return rc; + } + } + else + dst_pos += zpq_read_uncompressed(zpq, dst, dst_size - dst_pos); + } + else if (zpq->reading_set_compression) + { + zpq_process_switch(zpq); + } + else if (zpq_buf_unread(&zpq->rx_in) >= 5) + zpq_toggle_decompression(zpq); + } + return dst_pos; +} + +bool +zpq_buffered_rx(ZpqStream * zpq) +{ + return zpq ? zpq_buf_unread(&zpq->rx_in) >= 5 || (zpq_buf_unread(&zpq->rx_in) > 0 && zpq->rx_msg_bytes_left > 0) || + zs_buffered(zpq->d_stream) : 0; +} + +bool +zpq_buffered_tx(ZpqStream * zpq) +{ + return zpq ? zpq_buf_unread(&zpq->tx_in) >= 5 || (zpq_buf_unread(&zpq->tx_in) > 0 && zpq->tx_msg_bytes_left > 0) || zpq_buf_unread(&zpq->tx_out) > 0 || + zs_buffered(zpq->c_stream) : 0; +} + +void +zpq_free(ZpqStream * zpq) +{ + if (zpq) + { + if (zpq->c_stream) + { + zs_compressor_free(zpq->c_stream); + } + if (zpq->d_stream) + { + zs_decompressor_free(zpq->d_stream); + } + free(zpq); + } +} + +char const * +zpq_compress_error(ZpqStream * zpq) +{ + return zs_compress_error(zpq->c_stream); +} + +char const * +zpq_decompress_error(ZpqStream * zpq) +{ + return zs_decompress_error(zpq->d_stream); +} + +char const * +zpq_compress_algorithm_name(ZpqStream * zpq) +{ + return zs_compress_algorithm_name(zpq->c_stream); +} + +char const * +zpq_decompress_algorithm_name(ZpqStream * zpq) +{ + return zs_decompress_algorithm_name(zpq->d_stream); +} + +bool +zpq_parse_compression_setting(char *val, zpq_compressor * *compressors, size_t *n_compressors) +{ + int i; + char **supported_algorithms = zs_get_supported_algorithms(); + size_t n_supported_algorithms = 0; + char *protocol_extension = strchr(val, ';'); + + *compressors = NULL; + *n_compressors = 0; + + /* No protocol extensions are currently supported */ + if (protocol_extension) + *protocol_extension = '\0'; + + while (supported_algorithms[n_supported_algorithms] != NULL) + { + n_supported_algorithms += 1; + } + + if (pg_strcasecmp(val, "true") == 0 || + pg_strcasecmp(val, "yes") == 0 || + pg_strcasecmp(val, "on") == 0 || + pg_strcasecmp(val, "any") == 0 || + pg_strcasecmp(val, "1") == 0) + { + /* return all available compressors */ + *n_compressors = n_supported_algorithms; + + if (n_supported_algorithms) + { + *compressors = malloc(n_supported_algorithms * sizeof(zpq_compressor)); + for (i = 0; i < n_supported_algorithms; i++) + { + (*compressors)[i].impl = i; + (*compressors)[i].level = ZPQ_DEFAULT_COMPRESSION_LEVEL; + } + } + return true; + } + + if (*val == 0 || + pg_strcasecmp(val, "false") == 0 || + pg_strcasecmp(val, "no") == 0 || + pg_strcasecmp(val, "off") == 0 || + pg_strcasecmp(val, "0") == 0) + { + /* Compression is disabled */ + return true; + } + + + return zpq_deserialize_compressors(val, compressors, n_compressors); +} + +bool +zpq_deserialize_compressors(char const *c_string, zpq_compressor * *compressors, size_t *n_compressors) +{ + int selected_alg_mask = 0; /* bitmask of already selected + * algorithms to avoid duplicates in + * compressors */ + char **supported_algorithms = zs_get_supported_algorithms(); + size_t n_supported_algorithms = 0; + char *c_string_dup = strdup(c_string); /* following parsing can + * modify the string */ + char *p = c_string_dup; + + *n_compressors = 0; + + while (supported_algorithms[n_supported_algorithms] != NULL) + { + n_supported_algorithms += 1; + } + + *compressors = malloc(n_supported_algorithms * sizeof(zpq_compressor)); + + while (*p != '\0') + { + char *sep = strchr(p, ','); + char *col; + int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL; + bool found; + + if (sep != NULL) + *sep = '\0'; + + col = strchr(p, ':'); + if (col != NULL) + { + *col = '\0'; + if (sscanf(col + 1, "%d", &compression_level) != 1) + { + pg_log_warning("invalid compression level %s in compression option '%s'", col + 1, p); + free(*compressors); + free(c_string_dup); + *compressors = NULL; + *n_compressors = 0; + return false; + } + } + found = false; + for (int i = 0; supported_algorithms[i] != NULL; i++) + { + if (pg_strcasecmp(p, supported_algorithms[i]) == 0) + { + if (selected_alg_mask & (1 << i)) + { + /* duplicates are not allowed */ + pg_log_warning("duplicate algorithm %s in compressors string %s", p, c_string); + free(*compressors); + free(c_string_dup); + *compressors = NULL; + *n_compressors = 0; + return false; + } + + (*compressors)[*n_compressors].impl = i; + (*compressors)[*n_compressors].level = compression_level; + + selected_alg_mask |= 1 << i; + *n_compressors += 1; + found = true; + break; + } + } + if (!found) + { + pg_log_warning("algorithm %s is not supported", p); + } + if (sep) + p = sep + 1; + else + break; + } + + if (*n_compressors == 0) + { + free(*compressors); + *compressors = NULL; + } + free(c_string_dup); + return true; +} + +char * +zpq_serialize_compressors(zpq_compressor const *compressors, size_t n_compressors) +{ + char *res; + char *p; + size_t i; + size_t total_len = 0; + char **supported_algorithms = zs_get_supported_algorithms(); + + if (n_compressors == 0) + { + return NULL; + } + + for (i = 0; i < n_compressors; i++) + { + size_t level_len; + + if (!zs_is_valid_impl_id(compressors[i].impl)) + { + pg_log_warning("algorithm impl_id %d is incorrect", compressors[i].impl); + return NULL; + } + + /* determine the length of the compression level string */ + level_len = compressors[i].level == 0 ? 1 : (int) floor(log10(abs(compressors[i].level))) + 1; + if (compressors[i].level < 0) + { + level_len += 1; /* add the leading "-" */ + } + + /* + * single entry looks like "alg_name:compression_level," so +2 is for + * ":" and "," symbols + */ + total_len += strlen(supported_algorithms[compressors[i].impl]) + level_len + 2; + } + + res = p = malloc(total_len); + + for (i = 0; i < n_compressors; i++) + { + p += sprintf(p, "%s:%d,", supported_algorithms[compressors[i].impl], compressors[i].level); + } + p[-1] = '\0'; + return res; +} diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index fde251fa4f..b540b58a0c 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5280,9 +5280,9 @@ proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'int4', - proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,text,numeric,text,bool,text,bool,int4,int8}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,query_id}', + proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,text,numeric,text,bool,text,bool,int4,int8,int8,int8,int8,int8}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,query_id,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}', prosrc => 'pg_stat_get_activity' }, { oid => '3318', descr => 'statistics: information about progress of backends running maintenance command', @@ -5582,6 +5582,14 @@ proargnames => '{wal_records,wal_fpi,wal_bytes,wal_buffers_full,wal_write,wal_sync,wal_write_time,wal_sync_time,stats_reset}', prosrc => 'pg_stat_get_wal' }, +{ oid => '9598', descr => 'statistics: information about network traffic', + proname => 'pg_stat_get_network_traffic', proisstrict => 'f', provolatile => 's', + proparallel => 'r', prorettype => 'record', proargtypes => 'int4', + proallargtypes => '{int4,int8,int8,int8,int8}', + proargmodes => '{i,o,o,o,o}', + proargnames => '{_beid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}', + prosrc => 'pg_stat_get_network_traffic' }, + { oid => '2306', descr => 'statistics: information about SLRU caches', proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', @@ -5713,6 +5721,10 @@ proname => 'pg_tablespace_location', provolatile => 's', prorettype => 'text', proargtypes => 'oid', prosrc => 'pg_tablespace_location' }, +{ oid => '9257', descr => 'connection compression algorithm', + proname => 'pg_compression_algorithm', provolatile => 's', prorettype => 'text', + proargtypes => '', prosrc => 'pg_compression_algorithm' }, + { oid => '1946', descr => 'convert bytea value into some ascii-only text string', proname => 'encode', prorettype => 'text', proargtypes => 'bytea text', diff --git a/src/include/common/z_stream.h b/src/include/common/z_stream.h index a19ea90ca5..790968a4a5 100644 --- a/src/include/common/z_stream.h +++ b/src/include/common/z_stream.h @@ -22,12 +22,17 @@ typedef struct ZStream ZStream; #endif /* - * Create compression stream with rx/tx function for reading/sending compressed data. + * Create compression stream for sending compressed data. * c_alg_impl: index of chosen compression algorithm * c_level: compression c_level + */ +extern ZStream * zs_create_compressor(unsigned int c_alg_impl, int c_level); + +/* + * Create decompression stream for reading compressed data. * d_alg_impl: index of chosen decompression algorithm */ -extern ZStream * zs_create(int c_alg_impl, int c_level, int d_alg_impl); +extern ZStream * zs_create_decompressor(unsigned int d_alg_impl); /* * Read up to "size" raw (decompressed) bytes. @@ -57,22 +62,27 @@ extern char const *zs_compress_error(ZStream * zs); /* * Return true if non-flushed data might left in internal rx decompression buffer. */ -extern bool zs_buffered_rx(ZStream * zs); +extern bool zs_buffered(ZStream * zs); /* * Return true if non-flushed data might left in internal tx compression buffer. */ -extern bool zs_buffered_tx(ZStream * zs); +extern bool zs_buffered(ZStream * zs); /* * End the compression stream. */ -extern ssize_t zs_end(ZStream * zs, void *dst, size_t dst_size, size_t *dst_processed); +extern ssize_t zs_end_compression(ZStream * zs, void *dst, size_t dst_size, size_t *dst_processed); /* - * Free stream created by zs_create function. + * Free stream created by zs_create_compressor function. */ -extern void zs_free(ZStream * zs); +extern void zs_compressor_free(ZStream * zs); + +/* + * Free stream created by zs_create_decompressor function. + */ +extern void zs_decompressor_free(ZStream * zs); /* * Get the name of chosen compression algorithm. @@ -88,3 +98,8 @@ extern char const *zs_decompress_algorithm_name(ZStream * zs); Returns zero terminated array with compression algorithms names */ extern char **zs_get_supported_algorithms(void); + +/* + Returns true if provided id is a valid compression algorithm id, otherwise returns false +*/ +extern bool zs_is_valid_impl_id(unsigned int id); diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h new file mode 100644 index 0000000000..fe9eaa0f95 --- /dev/null +++ b/src/include/common/zpq_stream.h @@ -0,0 +1,117 @@ +/* + * zpq_stream.h + * Streaming compression for libpq + */ +#include "z_stream.h" + +#ifndef ZPQ_STREAM_H +#define ZPQ_STREAM_H + +#define ZPQ_DEFAULT_COMPRESSION_LEVEL (1) +#define ZPQ_INCOMPLETE_HEADER (-6) +#define ZPQ_FATAL_ERROR (-7) +struct ZpqStream; +typedef struct ZpqStream ZpqStream; + +typedef ssize_t (*zpq_tx_func) (void *arg, void const *data, size_t size); +typedef ssize_t (*zpq_rx_func) (void *arg, void *data, size_t size); + +/* + * Descriptor of compression algorithm chosen by client + */ +typedef struct zpq_compressor +{ + unsigned int impl; /* compression algorithm index */ + int level; /* compression level */ +} zpq_compressor; + +#endif + +/* + * Create compression stream with rx/tx function for reading/sending compressed data. + * tx_func: function for writing compressed data in underlying stream + * rx_func: function for reading compressed data from underlying stream + * arg: context passed to the function + * rx_data: received data (compressed data already fetched from input stream) + * rx_data_size: size of data fetched from input stream + */ +extern ZpqStream * zpq_create(zpq_compressor * compressors, size_t n_compressors, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size); + +/* + * Write up to "src_size" raw (decompressed) bytes. + * Returns number of written raw bytes or error code. + * Error code is either ZPQ_COMPRESS_ERROR or error code returned by the tx function. + * In the last case number of bytes written is stored in *src_processed. + */ +extern ssize_t zpq_write(ZpqStream * zpq, void const *src, size_t src_size, size_t *src_processed); + +/* + * Read up to "dst_size" raw (decompressed) bytes. + * Returns number of decompressed bytes or error code. + * Error code is either ZPQ_DECOMPRESS_ERROR or error code returned by the rx function. + */ +extern ssize_t zpq_read(ZpqStream * zpq, void *dst, size_t dst_size); + +/* + * Return true if non-flushed data left in internal rx decompression buffer. + */ +extern bool zpq_buffered_rx(ZpqStream * zpq); + +/* + * Return true if non-flushed data left in internal tx compression buffer. + */ +extern bool zpq_buffered_tx(ZpqStream * zpq); + +/* + * Free stream created by zs_create function. + */ +extern void zpq_free(ZpqStream * zpq); + +/* + * Get decompressor error message. + */ +extern char const *zpq_decompress_error(ZpqStream * zpq); + +/* + * Get compressor error message. + */ +extern char const *zpq_compress_error(ZpqStream * zpq); + +/* + * Get the name of the current compression algorithm. + */ +extern char const *zpq_compress_algorithm_name(ZpqStream * zpq); + +/* + * Get the name of the current decompression algorithm. + */ +extern char const *zpq_decompress_algorithm_name(ZpqStream * zpq); + +/* + * Parse comma-separated list of compression algorithms. It can be either explicitly specified, or include all algorithms supported by client library. + * Returns: + * - true if the compression string is successfully parsed + * - false otherwise + * It also populates the compressors array with the recognized compressors. Size of the array is stored in n_compressors. + * If no supported compressors recognized or if compression is disabled, then NULL is assigned to *compressors and n_compressors is set to 0. + */ +extern bool + zpq_parse_compression_setting(char *val, zpq_compressor * *compressors, size_t *n_compressors); + +/* Serialize the compressors array to string so it can be transmitted to the other side during the compression startup. + * For example, for array of two compressors (zstd, level 1), (zlib, level 2) resulting string would look like "zstd:1,zlib:2". + * Returns the resulting string. + */ +extern char + *zpq_serialize_compressors(zpq_compressor const *compressors, size_t n_compressors); + +/* Deserialize the compressors string received during the compression setup to a compressors array. + * For example, for string "zstd:1,zlib:2" compressors would be populated with 2 elements: (zstd, level 1), (zlib, level 2). + * Returns: + * - true if the compressors string is successfully parsed + * - false otherwise + * It also populates the compressors array with the recognized compressors. Size of the array is stored in n_compressors. + * If no supported compressors recognized or string is empty, then NULL is assigned to *compressors and n_compressors is set to 0. + */ +bool + zpq_deserialize_compressors(char const *c_string, zpq_compressor * *compressors, size_t *n_compressors); diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 02015efe13..bc3e47b51f 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -188,6 +188,9 @@ typedef struct Port int keepalives_count; int tcp_user_timeout; + char *compression_algorithms; /* Compression algorithms supported by + * client */ + /* * GSSAPI structures. */ diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 6c51b2f20f..92f3c81653 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -71,6 +71,7 @@ extern void StreamClose(pgsocket sock); extern void TouchSocketFiles(void); extern void RemoveSocketFiles(void); extern void pq_init(void); +extern int pq_configure(Port *port); extern int pq_getbytes(char *s, size_t len); extern void pq_startmsgread(void); extern void pq_endmsgread(void); diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index be9d970574..4beb9c5085 100644 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -137,6 +137,9 @@ typedef uint32 PacketLen; extern bool Db_user_namespace; +/* List of allowed compression algorithms */ +extern char *libpq_compress_algorithms; + /* * In protocol 3.0 and later, the startup packet length is not fixed, but * we set an arbitrary limit on it anyway. This is just to prevent simple diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index edf5597185..843c3fb8ac 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -360,12 +360,12 @@ /* Define to 1 if you have the `z' library (-lz). */ #undef HAVE_LIBZ -/* Define to 1 if you have the `link' function. */ -#undef HAVE_LINK - /* Define to 1 if you have the `zstd' library (-lzstd). */ #undef HAVE_LIBZSTD +/* Define to 1 if you have the `link' function. */ +#undef HAVE_LINK + /* Define to 1 if the system has the type `locale_t'. */ #undef HAVE_LOCALE_T diff --git a/src/include/utils/backend_status.h b/src/include/utils/backend_status.h index 8042b817df..6bbf7d530e 100644 --- a/src/include/utils/backend_status.h +++ b/src/include/utils/backend_status.h @@ -144,6 +144,12 @@ typedef struct PgBackendStatus /* application name; MUST be null-terminated */ char *st_appname; + /* client-server traffic information */ + uint64 st_rx_raw_bytes; + uint64 st_tx_raw_bytes; + uint64 st_rx_compressed_bytes; + uint64 st_tx_compressed_bytes; + /* * Current command string; MUST be null-terminated. Note that this string * possibly is truncated in the middle of a multi-byte character. As @@ -301,6 +307,7 @@ extern void pgstat_report_query_id(uint64 query_id, bool force); extern void pgstat_report_tempfile(size_t filesize); extern void pgstat_report_appname(const char *appname); extern void pgstat_report_xact_timestamp(TimestampTz tstamp); +extern void pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes); extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser); extern const char *pgstat_get_crashed_backend_activity(int pid, char *buffer, int buflen); diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index e8bcc88370..33ce6f2513 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -186,3 +186,6 @@ PQpipelineStatus 183 PQsetTraceFlags 184 PQmblenBounded 185 PQsendFlushRequest 186 +PQcompressor 187 +PQdecompressor 188 +PQreadPending 189 \ No newline at end of file diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index e950b41374..b449776a2d 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -24,6 +24,7 @@ #include "common/ip.h" #include "common/link-canary.h" #include "common/scram-common.h" +#include "common/zpq_stream.h" #include "common/string.h" #include "fe-auth.h" #include "libpq-fe.h" @@ -339,6 +340,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = { "Replication", "D", 5, offsetof(struct pg_conn, replication)}, + {"compression", "PGCOMPRESSION", NULL, NULL, + "Libpq-compression", "", 16, + offsetof(struct pg_conn, compression)}, + {"target_session_attrs", "PGTARGETSESSIONATTRS", DefaultTargetSessionAttrs, NULL, "Target-Session-Attrs", "", 15, /* sizeof("prefer-standby") = 15 */ @@ -447,6 +452,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock; void pqDropConnection(PGconn *conn, bool flushInput) { + /* Release compression streams */ + zpq_free(conn->zpqStream); + conn->zpqStream = NULL; + /* Drop any SSL state */ pqsecure_close(conn); @@ -3245,11 +3254,14 @@ keep_going: /* We will come back to here until there is } /* - * Validate message type: we expect only an authentication - * request or an error here. Anything else probably means - * it's not Postgres on the other end at all. + * Validate message type. We expect only: + * - authentication request ('R') + * - error ('E') + * - protocol compression acknowledgment ('z') + * - NegotiateProtocolVersion in cases when server does not support protocol compression + * Anything else probably means it's not Postgres on the other end at all. */ - if (!(beresp == 'R' || beresp == 'E')) + if (!(beresp == 'R' || beresp == 'E' || beresp == 'z' || beresp == 'v')) { appendPQExpBuffer(&conn->errorMessage, libpq_gettext("expected authentication request from server, but received %c\n"), @@ -3327,6 +3339,60 @@ keep_going: /* We will come back to here until there is return PGRES_POLLING_READING; } + if (beresp == 'z') /* Switch on compression */ + { + zpq_compressor *compressors; + size_t n_compressors; + char *resp = malloc(msgLength); + + pqGetnchar(resp, msgLength, conn); + + if (!zpq_deserialize_compressors(resp, &compressors, &n_compressors)) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("server returned unrecognized compression setting: %s\n"), + resp); + free(resp); + goto error_return; + } + free(resp); + + if (n_compressors == 0) + { + /* + * If there are no compressors returned, it means that + * the server rejected all sent compression + * algorithms. Mark the data as consumed and do not + * enable the compression. + */ + conn->inStart = conn->inCursor; + goto keep_going; + } + + Assert(!conn->zpqStream); + conn->zpqStream = zpq_create(compressors, n_compressors, + (zpq_tx_func) pqsecure_write, (zpq_rx_func) pqsecure_read, + conn, + &conn->inBuffer[conn->inCursor], + conn->inEnd - conn->inCursor); + if (!conn->zpqStream) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("failed to initialize compression\n")); + free(compressors); + goto error_return; + } + /* reset buffer */ + conn->inStart = conn->inCursor = conn->inEnd = 0; + } + + if (conn->n_compressors != 0 && beresp == 'v') /* NegotiateProtocolVersion */ + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("server does not support protocol compression\n")); + goto error_return; + } + /* Handle errors. */ if (beresp == 'E') { @@ -4064,6 +4130,8 @@ freePGconn(PGconn *conn) free(conn->dbName); if (conn->replication) free(conn->replication); + if (conn->compression) + free(conn->compression); if (conn->pguser) free(conn->pguser); if (conn->pgpass) @@ -6595,6 +6663,22 @@ PQuser(const PGconn *conn) return conn->pguser; } +char * +PQcompressor(const PGconn *conn) +{ + if (!conn || !conn->zpqStream) + return NULL; + return (char *) zpq_compress_algorithm_name(conn->zpqStream); +} + +char * +PQdecompressor(const PGconn *conn) +{ + if (!conn || !conn->zpqStream) + return NULL; + return (char *) zpq_decompress_algorithm_name(conn->zpqStream); +} + char * PQpass(const PGconn *conn) { diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index aca81890bb..ee36372c10 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1996,9 +1996,7 @@ PQgetResult(PGconn *conn) * EOF indication. We expect therefore that this won't result in any * undue delay in reporting a previous write failure.) */ - if (flushResult || - pqWait(true, false, conn) || - pqReadData(conn) < 0) + if (flushResult || pqWait(true, false, conn) || pqReadData(conn) < 0) { /* * conn->errorMessage has been set by pqWait or pqReadData. We @@ -3771,6 +3769,12 @@ pqPipelineFlush(PGconn *conn) return 0; } +int +PQreadPending(PGconn *conn) +{ + return pqReadPending(conn); +} + /* * PQfreemem - safely frees memory allocated diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index 9a2a970293..8b7ab3367e 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -53,12 +53,24 @@ #include "pg_config_paths.h" #include "port/pg_bswap.h" +#include + static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn); static int pqSendSome(PGconn *conn, int len); static int pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time); static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time); +/* + * Use zpq_read if compression is switched on + */ +#define pq_read_conn(conn) \ + (conn->zpqStream \ + ? zpq_read(conn->zpqStream, conn->inBuffer + conn->inEnd, \ + conn->inBufSize - conn->inEnd) \ + : pqsecure_read(conn, conn->inBuffer + conn->inEnd, \ + conn->inBufSize - conn->inEnd)) + /* * PQlibVersion: return the libpq version number */ @@ -617,10 +629,17 @@ pqReadData(PGconn *conn) /* OK, try to read some data */ retry3: - nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd, - conn->inBufSize - conn->inEnd); + nread = pq_read_conn(conn); if (nread < 0) { + if (nread == ZS_DECOMPRESS_ERROR) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zpqStream)); + return -1; + } + switch (SOCK_ERRNO) { case EINTR: @@ -712,10 +731,18 @@ retry3: * arrived. */ retry4: - nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd, - conn->inBufSize - conn->inEnd); + nread = pq_read_conn(conn); + if (nread < 0) { + if (nread == ZS_DECOMPRESS_ERROR) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zpqStream)); + return -1; + } + switch (SOCK_ERRNO) { case EINTR: @@ -826,12 +853,18 @@ pqSendSome(PGconn *conn, int len) } /* while there's still data to send */ - while (len > 0) + while (len > 0 || zpq_buffered_tx(conn->zpqStream)) { int sent; + size_t processed = 0; + /* + * Use zpq_write if compression is switched on + */ + sent = conn->zpqStream + ? zpq_write(conn->zpqStream, ptr, len, &processed) #ifndef WIN32 - sent = pqsecure_write(conn, ptr, len); + : pqsecure_write(conn, ptr, len); #else /* @@ -839,8 +872,11 @@ pqSendSome(PGconn *conn, int len) * failure-point appears to be different in different versions of * Windows, but 64k should always be safe. */ - sent = pqsecure_write(conn, ptr, Min(len, 65536)); +: pqsecure_write(conn, ptr, Min(len, 65536)); #endif + ptr += processed; + len -= processed; + remaining -= processed; if (sent < 0) { @@ -896,7 +932,7 @@ pqSendSome(PGconn *conn, int len) remaining -= sent; } - if (len > 0) + if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zpqStream)) { /* * We didn't send it all, wait till we can send more. @@ -989,6 +1025,8 @@ pqFlush(PGconn *conn) int pqWait(int forRead, int forWrite, PGconn *conn) { + if (forRead && conn->inCursor < conn->inEnd) + return 0; return pqWaitTimed(forRead, forWrite, conn, (time_t) -1); } @@ -1046,6 +1084,9 @@ pqWriteReady(PGconn *conn) * * If SSL is in use, the SSL buffer is checked prior to checking the socket * for read data directly. + * + * If ZPQ stream is in use, the ZPQ buffer is checked prior to checking + * the socket for read data directly. */ static int pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) @@ -1061,14 +1102,10 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) return -1; } -#ifdef USE_SSL - /* Check for SSL library buffering read bytes */ - if (forRead && conn->ssl_in_use && pgtls_read_pending(conn)) + if (forRead && (pqReadPending(conn) > 0)) { - /* short-circuit the select */ return 1; } -#endif /* We will retry as long as we get EINTR */ do @@ -1088,6 +1125,33 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) return result; } +/* + * Check if there is some data pending in ZPQ / SSL read buffers. + * Returns -1 on failure, 0 if no, 1 if yes. + */ +int +pqReadPending(PGconn *conn) +{ + if (!conn) + return -1; + + /* check for ZPQ stream buffered read bytes */ + if (zpq_buffered_rx(conn->zpqStream)) + { + /* short-circuit the select */ + return 1; + } + +#ifdef USE_SSL + /* Check for SSL library buffering read bytes */ + if (conn->ssl_in_use && pgtls_read_pending(conn)) + { + /* short-circuit the select */ + return 1; + } +#endif + return 0; +} /* * Check a file descriptor for read and/or write data, possibly waiting. diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 2e83305348..6e270adb34 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -2167,6 +2167,50 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen, return startpacket; } +/* + * Build comma-separated list of compression algorithms requested by client. + * It can be either explicitly specified by user in connection string, or + * include all algorithms supported by client library. + * This function returns true if the compression string is successfully parsed and + * stores a comma-separated list of algorithms in *client_compressors. + * If compression is disabled, then NULL is assigned to *client_compressors. + * Also it creates an array of compressor descriptors, each element of which corresponds to + * the corresponding algorithm name in *client_compressors list. This array is stored in PGconn + * and is used during handshake when a compression acknowledgment response is received from the server. + */ +static bool +build_compressors_list(PGconn *conn, char **client_compressors, bool build_descriptors) +{ + zpq_compressor *compressors; + size_t n_compressors; + + if (!zpq_parse_compression_setting(conn->compression, &compressors, &n_compressors)) + { + return false; + } + + *client_compressors = NULL; + if (build_descriptors) + { + conn->compressors = compressors; + conn->n_compressors = n_compressors; + } + + if (n_compressors == 0) + { + /* no compressors available, return */ + return true; + } + + *client_compressors = zpq_serialize_compressors(compressors, n_compressors); + + if (!build_descriptors) + { + free(compressors); + } + return true; +} + /* * Build a startup packet given a filled-in PGconn structure. * @@ -2213,6 +2257,18 @@ build_startup_packet(const PGconn *conn, char *packet, ADD_STARTUP_OPTION("replication", conn->replication); if (conn->pgoptions && conn->pgoptions[0]) ADD_STARTUP_OPTION("options", conn->pgoptions); + if (conn->compression && conn->compression[0]) + { + char *client_compression_algorithms; + + if (build_compressors_list((PGconn *) conn, &client_compression_algorithms, packet == NULL)) + { + if (client_compression_algorithms) + { + ADD_STARTUP_OPTION("_pq_.compression", client_compression_algorithms); + } + } + } if (conn->send_appname) { /* Use appname if present, otherwise use fallback */ diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index a6fd69aceb..1e951320d8 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -340,6 +340,8 @@ extern char *PQhostaddr(const PGconn *conn); extern char *PQport(const PGconn *conn); extern char *PQtty(const PGconn *conn); extern char *PQoptions(const PGconn *conn); +extern char *PQcompressor(const PGconn *conn); +extern char *PQdecompressor(const PGconn *conn); extern ConnStatusType PQstatus(const PGconn *conn); extern PGTransactionStatusType PQtransactionStatus(const PGconn *conn); extern const char *PQparameterStatus(const PGconn *conn, @@ -498,6 +500,9 @@ extern PGPing PQpingParams(const char *const *keywords, /* Force the write buffer to be written (or at least try) */ extern int PQflush(PGconn *conn); +extern int + PQreadPending(PGconn *conn); + /* * "Fast path" interface --- not really recommended for application * use diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index e9f214b61b..ae63d7568e 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -40,6 +40,7 @@ /* include stuff common to fe and be */ #include "getaddrinfo.h" #include "libpq/pqcomm.h" +#include "common/zpq_stream.h" /* include stuff found in fe only */ #include "fe-auth-sasl.h" #include "pqexpbuffer.h" @@ -340,6 +341,7 @@ typedef struct pg_conn_host * found in password file. */ } pg_conn_host; + /* * PGconn stores all the state data associated with a single connection * to a backend. @@ -392,6 +394,14 @@ struct pg_conn * "sspi") */ char *ssl_min_protocol_version; /* minimum TLS protocol version */ char *ssl_max_protocol_version; /* maximum TLS protocol version */ + + char *compression; /* stream compression (boolean value, "any" or + * list of compression algorithms separated by + * comma) */ + zpq_compressor *compressors; /* descriptors of compression algorithms + * chosen by client */ + unsigned n_compressors; /* size of compressors array */ + char *target_session_attrs; /* desired session properties */ /* Optional file to write trace info to */ @@ -572,6 +582,9 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer; /* expansible string */ + + /* Compression stream */ + ZpqStream *zpqStream; }; /* PGcancel stores all data necessary to cancel a connection. A copy of this @@ -697,6 +710,7 @@ extern int pqWaitTimed(int forRead, int forWrite, PGconn *conn, time_t finish_time); extern int pqReadReady(PGconn *conn); extern int pqWriteReady(PGconn *conn); +extern int pqReadPending(PGconn *conn); /* === in fe-secure.c === */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e5ab11275d..392aa333f0 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1765,7 +1765,7 @@ pg_stat_activity| SELECT s.datid, s.query_id, s.query, s.backend_type - FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id) + FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes) LEFT JOIN pg_database d ON ((s.datid = d.oid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_all_indexes| SELECT c.oid AS relid, @@ -1877,8 +1877,14 @@ pg_stat_gssapi| SELECT s.pid, s.gss_auth AS gss_authenticated, s.gss_princ AS principal, s.gss_enc AS encrypted - FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id) + FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes) WHERE (s.client_port IS NOT NULL); +pg_stat_network_traffic| SELECT s.pid, + s.rx_raw_bytes, + s.tx_raw_bytes, + s.rx_compressed_bytes, + s.tx_compressed_bytes + FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes); pg_stat_progress_analyze| SELECT s.pid, s.datid, d.datname, @@ -2047,7 +2053,7 @@ pg_stat_replication| SELECT s.pid, w.sync_priority, w.sync_state, w.reply_time - FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id) + FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes) JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_replication_slots| SELECT s.slot_name, @@ -2081,7 +2087,7 @@ pg_stat_ssl| SELECT s.pid, s.ssl_client_dn AS client_dn, s.ssl_client_serial AS client_serial, s.ssl_issuer_dn AS issuer_dn - FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id) + FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes) WHERE (s.client_port IS NOT NULL); pg_stat_subscription| SELECT su.oid AS subid, su.subname, diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index 6276951234..3de10b909f 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -133,7 +133,7 @@ sub mkvcbuild keywords.c kwlookup.c link-canary.c md5_common.c pg_get_line.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c username.c - wait_error.c wchar.c z_stream.c); + wait_error.c wchar.c z_stream.c zpq_stream.c); if ($solution->{options}->{openssl}) { -- 2.28.0