diff --git a/configure b/configure index ace4ed5..deba608 100755 --- a/configure +++ b/configure @@ -700,6 +700,7 @@ LD LDFLAGS_SL LDFLAGS_EX with_zlib +with_zstd with_system_tzdata with_libxslt XML2_LIBS @@ -867,6 +868,7 @@ with_libxml with_libxslt with_system_tzdata with_zlib +with_zstd with_gnu_ld enable_largefile ' @@ -8571,6 +8573,85 @@ fi +# +# ZStd +# + + + +# Check whether --with-zstd was given. +if test "${with_zstd+set}" = set; then : + withval=$with_zstd; + case $withval in + yes) + ;; + no) + : + ;; + *) + as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5 + ;; + esac + +else + with_zstd=no + +fi + + + + +if test "$with_zstd" = yes ; then + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5 +$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; } +if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lzstd $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char ZSTD_compress (); +int +main () +{ +return ZSTD_compress (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_zstd_ZSTD_compress=yes +else + ac_cv_lib_zstd_ZSTD_compress=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5 +$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; } +if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIBZSTD 1 +_ACEOF + + LIBS="-lzstd $LIBS" + +else + as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5 +fi + +fi + + # # Zlib diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index de60281..4a1f49c 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -1225,6 +1225,20 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname + + compression + + + Request compression of libpq traffic. Client sends to the server list of compression algorithms, supported by client library. + If server supports one of this algorithms, then it acknowledges use of this algorithm and then all libpq messages send both from client to server and + visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression) + message and it is up to the client whether to continue work without compression or report error. + Supported compression algorithms are chosen at configure time. Right now two libraries are supported: zlib (default) and zstd (if Postgres was + configured with --with-zstd option). In both cases streaming mode is used. + + + + client_encoding diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 3a64db6..24f2c70 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -92,6 +92,15 @@ such as COPY. + + It is possible to compress protocol data to reduce traffic and speed-up client-server interaction. + Compression is especial useful for importing/exporting data to/from database using COPY command + and for replication (both physical and logical). Also compression can reduce server response time + in case of queries returning large amount of data (for example returning JSON, BLOBs, text,...) + Right now two libraries are supported: zlib (default) and zstd (if Postgres was + configured with --with-zstd option). In both cases streaming mode is used. + + Messaging Overview @@ -263,6 +272,21 @@ + CompressionAck + + + Server acknowledges using compression for client-server communication protocol. + Compression can be requested by client by including "compression" option in connection string. + Client sends to the server list of compression algorithms, supported by client library + (compression algorithm is identified by one letter: 'f' - Facebook zstd, 'z' - zlib,...). + If server supports one of this algorithms, then it acknowledges use of this algorithm and all subsequent libpq messages send both from client to server and + visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression) + algorithm identifier and it is up to the client whether to continue work without compression or report error. + + + + + AuthenticationOk @@ -3396,6 +3420,56 @@ following: + + +CompressionAck (B) + + + + + + + + Byte1('z') + + + + Acknowledge use of compression for protocol data. Client sends to the server list of compression algorithms, supported by client library. + If server supports one of this algorithms, then it responds with CompressionAck with identifier (letter) of first such algorithm. + If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression) algorithm. + It is up to the client whether to continue work without compression or report error. + After receiving this message with algorithm identifer aother than 'n', both server and client are switched to compression mode + and exchange compressed messages. + + + + + + Int32 + + + + Length of message contents in bytes, including self. + + + + + + Byte1 + + + + Used compression algorithm. Right now the following streaming compression algorithms are supported: 'f' - Facebook zstd, 'z' - zlib, 'n' - no compression. + + + + + + + + + + @@ -5961,6 +6035,19 @@ StartupMessage (F) + + + compression + + + + Request compression of libpq traffic. Value is list of compression algorithms supported by client: + 'f' - Facebook zstd, 'z' - zlib, 'n' - no compression. + By default compression is disabled. Please notice that using compression together with SSL may add extra vulnerabilities: + CRIME. + + + In addition to the above, other parameters may be listed. diff --git a/src/Makefile.global.in b/src/Makefile.global.in index 7ca1e9a..9e11599 100644 --- a/src/Makefile.global.in +++ b/src/Makefile.global.in @@ -196,6 +196,7 @@ with_llvm = @with_llvm@ with_system_tzdata = @with_system_tzdata@ with_uuid = @with_uuid@ with_zlib = @with_zlib@ +with_zstd = @with_zstd@ enable_rpath = @enable_rpath@ enable_nls = @enable_nls@ enable_debug = @enable_debug@ diff --git a/src/backend/Makefile b/src/backend/Makefile index 9706a95..f32a780 100644 --- a/src/backend/Makefile +++ b/src/backend/Makefile @@ -54,6 +54,14 @@ ifeq ($(with_systemd),yes) LIBS += -lsystemd endif +ifeq ($(with_zstd),yes) +LIBS += -lzstd +endif + +ifeq ($(with_zlib),yes) +LIBS += -lz +endif + ########################################################################## all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index ac986c0..cbf453d 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -93,6 +93,7 @@ #include "storage/ipc.h" #include "utils/guc.h" #include "utils/memutils.h" +#include "common/zpq_stream.h" /* * Cope with the various platform-specific ways to spell TCP keepalive socket @@ -141,6 +142,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE]; static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ static int PqRecvLength; /* End of data available in PqRecvBuffer */ +static ZpqStream* PqStream; + + /* * Message status */ @@ -183,6 +187,56 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods; WaitEventSet *FeBeWaitSet; +/* -------------------------------- + * pq_configure - configure connection using port settings + * + * Right now only compression is toggled in the configure. + * Function returns 0 in case of success, non-null in case of error + * -------------------------------- + */ +int +pq_configure(Port* port) +{ + char* client_compression_algorithms = port->compression_algorithms; + /* + * If client request compression, it sends list of supported compression algorithms. + * Each compression algorirthm is idetified by one letter ('f' - Facebook zsts, 'z' - xlib) + */ + if (client_compression_algorithms) + { + char server_compression_algorithms[ZPQ_MAX_ALGORITHMS]; + char compression_algorithm = ZPQ_NO_COMPRESSION; + char compression[6] = {'z',0,0,0,5,0}; /* message length = 5 */ + int rc; + + /* Get list of compression algorithms, supported by server */ + zpq_get_supported_algorithms(server_compression_algorithms); + + /* Intersect lists */ + while (*client_compression_algorithms != '\0') + { + if (strchr(server_compression_algorithms, *client_compression_algorithms)) + { + compression_algorithm = *client_compression_algorithms; + break; + } + client_compression_algorithms += 1; + } + + compression[5] = compression_algorithm; + /* Send 'z' message to the client with selectde comression algorithm ('n' if match is ont found) */ + socket_set_nonblocking(false); + while ((rc = secure_write(MyProcPort, compression, sizeof(compression))) < 0 + && errno == EINTR); + if ((size_t)rc != sizeof(compression)) + return -1; + + /* initialize compression */ + if (zpq_set_algorithm(compression_algorithm)) + PqStream = zpq_create((zpq_tx_func)secure_write, (zpq_rx_func)secure_read, MyProcPort); + } + return 0; +} /* -------------------------------- * pq_init - initialize libpq at backend startup @@ -280,6 +334,9 @@ socket_close(int code, Datum arg) free(MyProcPort->gss); #endif /* ENABLE_GSS || ENABLE_SSPI */ + /* Release compression streams */ + zpq_free(PqStream); + /* * Cleanly shut down SSL layer. Nowhere else does a postmaster child * call this, so this is safe when interrupting BackendInitialize(). @@ -919,12 +976,15 @@ socket_set_nonblocking(bool nonblocking) /* -------------------------------- * pq_recvbuf - load some bytes into the input buffer * - * returns 0 if OK, EOF if trouble + * nowait parameter toggles non-blocking mode. + * returns number of read bytes, EOF if trouble * -------------------------------- */ static int -pq_recvbuf(void) +pq_recvbuf(bool nowait) { + int r; + if (PqRecvPointer > 0) { if (PqRecvLength > PqRecvPointer) @@ -940,21 +1000,38 @@ pq_recvbuf(void) } /* Ensure that we're in blocking mode */ - socket_set_nonblocking(false); + socket_set_nonblocking(nowait); /* Can fill buffer from PqRecvLength and upwards */ for (;;) { - int r; - - r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, - PQ_RECV_BUFFER_SIZE - PqRecvLength); + size_t processed = 0; + /* If srteaming compression is enabled then use correpondent comression read function. */ + r = PqStream + ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength, + PQ_RECV_BUFFER_SIZE - PqRecvLength, &processed) + : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, + PQ_RECV_BUFFER_SIZE - PqRecvLength); + PqRecvLength += processed; if (r < 0) { + if (r == ZPQ_DECOMPRESS_ERROR) + { + char const* msg = zpq_error(PqStream); + if (msg == NULL) + msg = "end of stream"; + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("failed to decompress data: %s", msg))); + return EOF; + } if (errno == EINTR) continue; /* Ok if interrupted */ + if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK)) + return 0; + /* * Careful: an ereport() that tries to write to the client would * cause recursion to here, leading to stack overflow and core @@ -975,7 +1052,7 @@ pq_recvbuf(void) } /* r contains number of bytes read, so just incr length */ PqRecvLength += r; - return 0; + return r; } } @@ -990,7 +1067,7 @@ pq_getbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer++]; @@ -1009,7 +1086,7 @@ pq_peekbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer]; @@ -1030,44 +1107,11 @@ pq_getbyte_if_available(unsigned char *c) Assert(PqCommReadingMsg); - if (PqRecvPointer < PqRecvLength) + if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0) { *c = PqRecvBuffer[PqRecvPointer++]; return 1; } - - /* Put the socket into non-blocking mode */ - socket_set_nonblocking(true); - - r = secure_read(MyProcPort, c, 1); - if (r < 0) - { - /* - * Ok if no data available without blocking or interrupted (though - * EINTR really shouldn't happen with a non-blocking socket). Report - * other errors. - */ - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) - r = 0; - else - { - /* - * Careful: an ereport() that tries to write to the client would - * cause recursion to here, leading to stack overflow and core - * dump! This message must go *only* to the postmaster log. - */ - ereport(COMMERROR, - (errcode_for_socket_access(), - errmsg("could not receive data from client: %m"))); - r = EOF; - } - } - else if (r == 0) - { - /* EOF detected */ - r = EOF; - } - return r; } @@ -1088,7 +1132,7 @@ pq_getbytes(char *s, size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1122,7 +1166,7 @@ pq_discardbytes(size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1163,7 +1207,7 @@ pq_getstring(StringInfo s) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ return EOF; /* Failed to recv data */ } @@ -1413,13 +1457,18 @@ internal_flush(void) char *bufptr = PqSendBuffer + PqSendStart; char *bufend = PqSendBuffer + PqSendPointer; - while (bufptr < bufend) + while (bufptr < bufend || zpq_buffered(PqStream) != 0) /* has more data to flush or unsent data in internal compression buffer */ { - int r; - - r = secure_write(MyProcPort, bufptr, bufend - bufptr); - - if (r <= 0) + int r; + size_t processed = 0; + size_t available = bufend - bufptr; + r = PqStream + ? zpq_write(PqStream, bufptr, available, &processed) + : secure_write(MyProcPort, bufptr, available); + bufptr += processed; + PqSendStart += processed; + + if (r < 0 || (r == 0 && available)) { if (errno == EINTR) continue; /* Ok if we were interrupted */ @@ -1467,7 +1516,6 @@ internal_flush(void) bufptr += r; PqSendStart += r; } - PqSendStart = PqSendPointer = 0; return 0; } diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 959e3b8..60ff1e7 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -2151,6 +2151,8 @@ retry1: port->database_name = pstrdup(valptr); else if (strcmp(nameptr, "user") == 0) port->user_name = pstrdup(valptr); + else if (strcmp(nameptr, "compression") == 0) + port->compression_algorithms = pstrdup(valptr); else if (strcmp(nameptr, "options") == 0) port->cmdline_options = pstrdup(valptr); else if (strcmp(nameptr, "replication") == 0) @@ -4458,6 +4460,14 @@ BackendInitialize(Port *port) if (status != STATUS_OK) proc_exit(0); + if (pq_configure(port)) + { + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("failed to send compression message: %m"))); + proc_exit(0); + } + /* * Now that we have the user and database name, we can set the process * title for ps. It's good to do this as early as possible in startup. diff --git a/src/common/Makefile b/src/common/Makefile index 25c55bd..bc6cba8 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -77,7 +77,8 @@ OBJS_COMMON = \ unicode_norm.o \ username.o \ wait_error.o \ - wchar.o + wchar.o \ + zpq_stream.o ifeq ($(with_openssl),yes) OBJS_COMMON += \ diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c new file mode 100644 index 0000000..d5f83b3 --- /dev/null +++ b/src/common/zpq_stream.c @@ -0,0 +1,498 @@ +#include "postgres_fe.h" +#include "common/zpq_stream.h" +#include "c.h" +#include "pg_config.h" + +/* + * Functions implementing streaming compression algorithm + */ +typedef struct +{ + /* + * Returns letter identifying compression algorithm. + */ + char (*name)(void); + + /* + * Create compression stream with using rx/tx function for fetching/sending compressed data + */ + ZpqStream* (*create)(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg); + + /* + * Read up to "size" raw (decompressed) bytes. + * Returns number of decompressed bytes or error code. + * Error code is either ZPQ_DECOMPRESS_ERROR either error code returned by the rx function. + */ + ssize_t (*read)(ZpqStream *zs, void *buf, size_t size); + + /* + * Write up to "size" raw (decompressed) bytes. + * Returns number of written raw bytes or error code returned by tx function. + * In the last case amount of written raw bytes is stored in *processed. + */ + ssize_t (*write)(ZpqStream *zs, void const *buf, size_t size, size_t *processed); + + /* + * Free stream created by create function. + */ + void (*free)(ZpqStream *zs); + + /* + * Get error message. + */ + char const* (*error)(ZpqStream *zs); + + /* + * Returns amount of data in internal compression buffer. + */ + size_t (*buffered)(ZpqStream *zs); +} ZpqAlgorithm; + + +#if HAVE_LIBZSTD + +#include +#include + +#define ZSTD_BUFFER_SIZE (8*1024) +#define ZSTD_COMPRESSION_LEVEL 1 + +typedef struct ZstdStream +{ + ZSTD_CStream* tx_stream; + ZSTD_DStream* rx_stream; + ZSTD_outBuffer tx; + ZSTD_inBuffer rx; + size_t tx_not_flushed; /* Amount of data in internal zstd buffer */ + size_t tx_buffered; /* Data which is consumed by ztd_read but not yet sent */ + zpq_tx_func tx_func; + zpq_rx_func rx_func; + void* arg; + char const* rx_error; /* Decompress error message */ + size_t tx_total; + size_t tx_total_raw; + size_t rx_total; + size_t rx_total_raw; + char tx_buf[ZSTD_BUFFER_SIZE]; + char rx_buf[ZSTD_BUFFER_SIZE]; +} ZstdStream; + +static ZpqStream* +zstd_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg) +{ + ZstdStream* zs = (ZstdStream*)malloc(sizeof(ZstdStream)); + zs->tx_stream = ZSTD_createCStream(); + ZSTD_initCStream(zs->tx_stream, ZSTD_COMPRESSION_LEVEL); + zs->rx_stream = ZSTD_createDStream(); + ZSTD_initDStream(zs->rx_stream); + zs->tx.dst = zs->tx_buf; + zs->tx.pos = 0; + zs->tx.size = ZSTD_BUFFER_SIZE; + zs->rx.src = zs->rx_buf; + zs->rx.pos = 0; + zs->rx.size = 0; + zs->rx_func = rx_func; + zs->tx_func = tx_func; + zs->tx_buffered = 0; + zs->tx_not_flushed = 0; + zs->rx_error = NULL; + zs->arg = arg; + zs->tx_total = zs->tx_total_raw = 0; + zs->rx_total = zs->rx_total_raw = 0; + return (ZpqStream*)zs; +} + +static ssize_t +zstd_read(ZpqStream *zstream, void *buf, size_t size) +{ + ZstdStream* zs = (ZstdStream*)zstream; + ssize_t rc; + ZSTD_outBuffer out; + out.dst = buf; + out.pos = 0; + out.size = size; + + while (1) + { + rc = ZSTD_decompressStream(zs->rx_stream, &out, &zs->rx); + if (ZSTD_isError(rc)) + { + zs->rx_error = ZSTD_getErrorName(rc); + return ZPQ_DECOMPRESS_ERROR; + } + /* Return result if we fill requested amount of bytes or read operation was performed */ + if (out.pos != 0) + { + zs->rx_total_raw += out.pos; + return out.pos; + } + if (zs->rx.pos == zs->rx.size) + { + zs->rx.pos = zs->rx.size = 0; /* Reset rx buffer */ + } + rc = zs->rx_func(zs->arg, (char*)zs->rx.src + zs->rx.size, ZSTD_BUFFER_SIZE - zs->rx.size); + if (rc > 0) /* read fetches some data */ + { + zs->rx.size += rc; + zs->rx_total += rc; + } + else /* read failed */ + { + zs->rx_total_raw += out.pos; + return rc; + } + } +} + +static ssize_t +zstd_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed) +{ + ZstdStream* zs = (ZstdStream*)zstream; + ssize_t rc; + ZSTD_inBuffer in_buf; + in_buf.src = buf; + in_buf.pos = 0; + in_buf.size = size; + + do + { + if (zs->tx.pos == 0) /* Compress buffer is empty */ + { + zs->tx.dst = zs->tx_buf; /* Reset pointer to the beginning of buffer */ + + if (in_buf.pos < size) /* Has something to compress in input buffer */ + ZSTD_compressStream(zs->tx_stream, &zs->tx, &in_buf); + + if (in_buf.pos == size) /* All data is compressed: flushed internal zstd buffer */ + { + zs->tx_not_flushed = ZSTD_flushStream(zs->tx_stream, &zs->tx); + } + } + rc = zs->tx_func(zs->arg, zs->tx.dst, zs->tx.pos); + if (rc > 0) + { + zs->tx.pos -= rc; + zs->tx.dst = (char*)zs->tx.dst + rc; + zs->tx_total += rc; + } + else + { + *processed = in_buf.pos; + zs->tx_buffered = zs->tx.pos; + zs->tx_total_raw += in_buf.pos; + return rc; + } + } while (zs->tx.pos == 0 && (in_buf.pos < size || zs->tx_not_flushed)); /* repeat sending data until first partial write */ + + zs->tx_total_raw += in_buf.pos; + zs->tx_buffered = zs->tx.pos; + return in_buf.pos; +} + +static void +zstd_free(ZpqStream *zstream) +{ + ZstdStream* zs = (ZstdStream*)zstream; + if (zs != NULL) + { + ZSTD_freeCStream(zs->tx_stream); + ZSTD_freeDStream(zs->rx_stream); + free(zs); + } +} + +static char const* +zstd_error(ZpqStream *zstream) +{ + ZstdStream* zs = (ZstdStream*)zstream; + return zs->rx_error; +} + +static size_t +zstd_buffered(ZpqStream *zstream) +{ + ZstdStream* zs = (ZstdStream*)zstream; + return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0; +} + +static char +zstd_name(void) +{ + return 'f'; +} + +#endif + +#if HAVE_LIBZ + +#include +#include + +#define ZLIB_BUFFER_SIZE 8192 +#define ZLIB_COMPRESSION_LEVEL 1 + +typedef struct ZlibStream +{ + z_stream tx; + z_stream rx; + + zpq_tx_func tx_func; + zpq_rx_func rx_func; + void* arg; + + size_t tx_buffered; + + Bytef tx_buf[ZLIB_BUFFER_SIZE]; + Bytef rx_buf[ZLIB_BUFFER_SIZE]; +} ZlibStream; + +static ZpqStream* +zlib_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg) +{ + int rc; + ZlibStream* zs = (ZlibStream*)malloc(sizeof(ZlibStream)); + memset(&zs->tx, 0, sizeof(zs->tx)); + zs->tx.next_out = zs->tx_buf; + zs->tx.avail_out = ZLIB_BUFFER_SIZE; + zs->tx_buffered = 0; + rc = deflateInit(&zs->tx, ZLIB_COMPRESSION_LEVEL); + if (rc != Z_OK) + { + free(zs); + return NULL; + } + Assert(zs->tx.next_out == zs->tx_buf && zs->tx.avail_out == ZLIB_BUFFER_SIZE); + + memset(&zs->rx, 0, sizeof(zs->tx)); + zs->rx.next_in = zs->rx_buf; + zs->rx.avail_in = ZLIB_BUFFER_SIZE; + rc = inflateInit(&zs->rx); + if (rc != Z_OK) + { + free(zs); + return NULL; + } + Assert(zs->rx.next_in == zs->rx_buf && zs->rx.avail_in == ZLIB_BUFFER_SIZE); + zs->rx.avail_in = 0; + + zs->rx_func = rx_func; + zs->tx_func = tx_func; + zs->arg = arg; + + return (ZpqStream*)zs; +} + +static ssize_t +zlib_read(ZpqStream *zstream, void *buf, size_t size) +{ + ZlibStream* zs = (ZlibStream*)zstream; + int rc; + zs->rx.next_out = (Bytef *)buf; + zs->rx.avail_out = size; + + while (1) + { + if (zs->rx.avail_in != 0) /* If there is some data in receiver buffer, then decompress it */ + { + rc = inflate(&zs->rx, Z_SYNC_FLUSH); + if (rc != Z_OK && rc != Z_BUF_ERROR) + { + return ZPQ_DECOMPRESS_ERROR; + } + if (zs->rx.avail_out != size) + { + return size - zs->rx.avail_out; + } + if (zs->rx.avail_in == 0) + { + zs->rx.next_in = zs->rx_buf; + } + } + else + { + zs->rx.next_in = zs->rx_buf; + } + rc = zs->rx_func(zs->arg, zs->rx.next_in + zs->rx.avail_in, zs->rx_buf + ZLIB_BUFFER_SIZE - zs->rx.next_in - zs->rx.avail_in); + if (rc > 0) + { + zs->rx.avail_in += rc; + } + else + { + return rc; + } + } +} + +static ssize_t +zlib_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed) +{ + ZlibStream* zs = (ZlibStream*)zstream; + int rc; + zs->tx.next_in = (Bytef *)buf; + zs->tx.avail_in = size; + do + { + if (zs->tx.avail_out == ZLIB_BUFFER_SIZE) /* Compress buffer is empty */ + { + zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */ + + if (zs->tx.avail_in != 0) /* Has something in input buffer */ + { + rc = deflate(&zs->tx, Z_SYNC_FLUSH); + Assert(rc == Z_OK); + zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */ + } + } + rc = zs->tx_func(zs->arg, zs->tx.next_out, ZLIB_BUFFER_SIZE - zs->tx.avail_out); + if (rc > 0) + { + zs->tx.next_out += rc; + zs->tx.avail_out += rc; + } + else + { + *processed = size - zs->tx.avail_in; + zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out; + return rc; + } + } while (zs->tx.avail_out == ZLIB_BUFFER_SIZE && zs->tx.avail_in != 0); /* repeat sending data until first partial write */ + + zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out; + + return size - zs->tx.avail_in; +} + +static void +zlib_free(ZpqStream *zstream) +{ + ZlibStream* zs = (ZlibStream*)zstream; + if (zs != NULL) + { + inflateEnd(&zs->rx); + deflateEnd(&zs->tx); + free(zs); + } +} + +static char const* +zlib_error(ZpqStream *zstream) +{ + ZlibStream* zs = (ZlibStream*)zstream; + return zs->rx.msg; +} + +static size_t +zlib_buffered(ZpqStream *zstream) +{ + ZlibStream* zs = (ZlibStream*)zstream; + return zs != NULL ? zs->tx_buffered : 0; +} + +static char +zlib_name(void) +{ + return 'z'; +} + +#endif + +/* + * Array with all supported compression algorithms. + */ +static ZpqAlgorithm const zpq_algorithms[] = +{ +#if HAVE_LIBZSTD + {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered}, +#endif +#if HAVE_LIBZ + {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered}, +#endif + {NULL} +}; + +/* + * Index of used compression algorithm in zpq_algorithms array. + */ +static int zpq_algorithm_impl; + + +ZpqStream* +zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg) +{ + return zpq_algorithms[zpq_algorithm_impl].create(tx_func, rx_func, arg); +} + +ssize_t +zpq_read(ZpqStream *zs, void *buf, size_t size) +{ + return zpq_algorithms[zpq_algorithm_impl].read(zs, buf, size); +} + +ssize_t +zpq_write(ZpqStream *zs, void const *buf, size_t size, size_t* processed) +{ + return zpq_algorithms[zpq_algorithm_impl].write(zs, buf, size, processed); +} + +void +zpq_free(ZpqStream *zs) +{ + if (zs) + zpq_algorithms[zpq_algorithm_impl].free(zs); +} + +char const* +zpq_error(ZpqStream *zs) +{ + return zpq_algorithms[zpq_algorithm_impl].error(zs); +} + + +size_t +zpq_buffered(ZpqStream *zs) +{ + return zs ? zpq_algorithms[zpq_algorithm_impl].buffered(zs) : 0; +} + +/* + * Get list of the supported algorithms. + * Each algorithm is identified by one letter: 'f' - Facebook zstd, 'z' - zlib. + * Algorithm identifies are appended to the provided buffer and terminated by '\0'. + */ +void +zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS]) +{ + int i; + for (i = 0; zpq_algorithms[i].name != NULL; i++) + { + Assert(i < ZPQ_MAX_ALGORITHMS); + algorithms[i] = zpq_algorithms[i].name(); + } + Assert(i < ZPQ_MAX_ALGORITHMS); + algorithms[i] = '\0'; +} + +/* + * Choose current algorithm implementation. + * Returns true if algorithm identifier is located in the list of the supported algorithms, + * false otherwise + */ +bool +zpq_set_algorithm(char name) +{ + int i; + if (name != ZPQ_NO_COMPRESSION) + { + for (i = 0; zpq_algorithms[i].name != NULL; i++) + { + if (zpq_algorithms[i].name() == name) + { + zpq_algorithm_impl = i; + return true; + } + } + } + return false; +} + diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h new file mode 100644 index 0000000..4c52e1b --- /dev/null +++ b/src/include/common/zpq_stream.h @@ -0,0 +1,32 @@ +/* + * zpq_stream.h + * Streaiming compression for libpq + */ + +#ifndef ZPQ_STREAM_H +#define ZPQ_STREAM_H + +#include + +#define ZPQ_IO_ERROR (-1) +#define ZPQ_DECOMPRESS_ERROR (-2) +#define ZPQ_MAX_ALGORITHMS (8) +#define ZPQ_NO_COMPRESSION 'n' + +struct ZpqStream; +typedef struct ZpqStream ZpqStream; + +typedef ssize_t(*zpq_tx_func)(void* arg, void const* data, size_t size); +typedef ssize_t(*zpq_rx_func)(void* arg, void* data, size_t size); + +ZpqStream* zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg); +ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size); +ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed); +char const* zpq_error(ZpqStream* zs); +size_t zpq_buffered(ZpqStream* zs); +void zpq_free(ZpqStream* zs); + +void zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS]); +bool zpq_set_algorithm(char name); + +#endif diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 0a23281..5289f9f 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -170,6 +170,8 @@ typedef struct Port int keepalives_count; int tcp_user_timeout; + char* compression_algorithms; /* Compression algorithms supported by client */ + /* * GSSAPI structures. */ diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index b115247..224ff3d 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -63,6 +63,7 @@ extern void StreamClose(pgsocket sock); extern void TouchSocketFiles(void); extern void RemoveSocketFiles(void); extern void pq_init(void); +extern int pq_configure(Port* port); extern int pq_getbytes(char *s, size_t len); extern int pq_getstring(StringInfo s); extern void pq_startmsgread(void); diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index fb270df..b829fed 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -349,6 +349,9 @@ /* Define to 1 if you have the `link' function. */ #undef HAVE_LINK +/* Define to 1 if you have the `zstd' library (-lzstd). */ +#undef HAVE_LIBZSTD + /* Define to 1 if the system has the type `locale_t'. */ #undef HAVE_LOCALE_T diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile index 4ac5f4b..be8cb34 100644 --- a/src/interfaces/libpq/Makefile +++ b/src/interfaces/libpq/Makefile @@ -29,6 +29,20 @@ endif # The MSVC build system scrapes OBJS from this file. If you change any of # the conditional additions of files to OBJS, update Mkvcbuild.pm to match. +ifeq ($(with_zstd),yes) +LIBS += -lzstd +SHLIB_LINK += -lzstd +endif + +ifeq ($(with_zlib),yes) +LIBS += -lz +SHLIB_LINK += -lz +endif + +# We can't use Makefile variables here because the MSVC build system scrapes +# OBJS from this file. + + OBJS = \ $(WIN32RES) \ fe-auth-scram.o \ diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index b0ca37c..525341f 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -24,6 +24,7 @@ #include "common/ip.h" #include "common/link-canary.h" #include "common/scram-common.h" +#include "common/zpq_stream.h" #include "common/string.h" #include "fe-auth.h" #include "libpq-fe.h" @@ -350,6 +351,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = { "Replication", "D", 5, offsetof(struct pg_conn, replication)}, + {"compression", "COMPRESSION", NULL, NULL, + "Libpq-compression", "", 1, + offsetof(struct pg_conn, compression)}, + {"target_session_attrs", "PGTARGETSESSIONATTRS", DefaultTargetSessionAttrs, NULL, "Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */ @@ -458,6 +463,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock; void pqDropConnection(PGconn *conn, bool flushInput) { + /* Release compression streams */ + zpq_free(conn->zstream); + conn->zstream = NULL; + /* Drop any SSL state */ pqsecure_close(conn); @@ -3216,11 +3225,46 @@ keep_going: /* We will come back to here until there is */ conn->inCursor = conn->inStart; - /* Read type byte */ - if (pqGetc(&beresp, conn)) + while (1) { - /* We'll come back when there is more data */ - return PGRES_POLLING_READING; + /* Read type byte */ + if (pqGetc(&beresp, conn)) + { + /* We'll come back when there is more data */ + return PGRES_POLLING_READING; + } + + if (beresp == 'z') /* Switch on compression */ + { + char algorithm; + /* Read message length word */ + if (pqGetInt(&msgLength, 4, conn)) + { + /* We'll come back when there is more data */ + return PGRES_POLLING_READING; + } + if (msgLength != 5) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "expected compression algorithm specification message length is 5 bytes, but %d is recevied\n"), + msgLength); + goto error_return; + } + pqGetc(&algorithm, conn); + if (!zpq_set_algorithm(algorithm)) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "server is not supported requested compression algorithm\n")); + goto error_return; + } + /* mark byte consumed */ + conn->inStart = conn->inCursor; + Assert(!conn->zstream); + conn->zstream = zpq_create((zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn); + } else + break; } /* @@ -4020,6 +4064,8 @@ freePGconn(PGconn *conn) free(conn->dbName); if (conn->replication) free(conn->replication); + if (conn->compression) + free(conn->compression); if (conn->pguser) free(conn->pguser); if (conn->pgpass) diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index 4ffc7f3..56e8468 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -53,12 +53,21 @@ #include "pg_config_paths.h" #include "port/pg_bswap.h" +#include + static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn); static int pqSendSome(PGconn *conn, int len); static int pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time); static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time); +#define pq_read_conn(conn) \ + (conn->zstream \ + ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd, \ + conn->inBufSize - conn->inEnd) \ + : pqsecure_read(conn, conn->inBuffer + conn->inEnd, \ + conn->inBufSize - conn->inEnd)) + /* * PQlibVersion: return the libpq version number */ @@ -664,10 +673,17 @@ pqReadData(PGconn *conn) /* OK, try to read some data */ retry3: - nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd, - conn->inBufSize - conn->inEnd); + nread = pq_read_conn(conn); if (nread < 0) { + if (nread == ZPQ_DECOMPRESS_ERROR) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("decompress error: %s\n"), + zpq_error(conn->zstream)); + return -1; + } + switch (SOCK_ERRNO) { case EINTR: @@ -759,10 +775,18 @@ retry3: * arrived. */ retry4: - nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd, - conn->inBufSize - conn->inEnd); + nread = pq_read_conn(conn); + if (nread < 0) { + if (nread == ZPQ_DECOMPRESS_ERROR) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("decompress error: %s\n"), + zpq_error(conn->zstream)); + return -1; + } + switch (SOCK_ERRNO) { case EINTR: @@ -875,12 +899,14 @@ pqSendSome(PGconn *conn, int len) } /* while there's still data to send */ - while (len > 0) + while (len > 0 || zpq_buffered(conn->zstream)) { int sent; - + size_t processed = 0; + sent = conn->zstream + ? zpq_write(conn->zstream, ptr, len, &processed) #ifndef WIN32 - sent = pqsecure_write(conn, ptr, len); + : pqsecure_write(conn, ptr, len); #else /* @@ -888,8 +914,11 @@ pqSendSome(PGconn *conn, int len) * failure-point appears to be different in different versions of * Windows, but 64k should always be safe. */ - sent = pqsecure_write(conn, ptr, Min(len, 65536)); + : pqsecure_write(conn, ptr, Min(len, 65536)); #endif + ptr += processed; + len -= processed; + remaining -= processed; if (sent < 0) { @@ -943,7 +972,7 @@ pqSendSome(PGconn *conn, int len) remaining -= sent; } - if (len > 0) + if (len > 0 || sent < 0 || zpq_buffered(conn->zstream)) { /* * We didn't send it all, wait till we can send more. diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 1696525..ef61348 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -2135,6 +2135,83 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen, } /* + * Parse boolean value. This code is copied from backend/utils/atd/bool.c + * because it is not available at frontend. + */ +static bool +parse_bool(const char *value, bool *result) +{ + switch (*value) + { + case 't': + case 'T': + if (pg_strcasecmp(value, "true") == 0) + { + *result = true; + return true; + } + break; + case 'f': + case 'F': + if (pg_strcasecmp(value, "false") == 0) + { + *result = false; + return true; + } + break; + case 'y': + case 'Y': + if (pg_strcasecmp(value, "yes") == 0) + { + *result = true; + return true; + } + break; + case 'n': + case 'N': + if (pg_strcasecmp(value, "no") == 0) + { + *result = false; + return true; + } + break; + case 'o': + case 'O': + /* 'o' is not unique enough */ + if (pg_strcasecmp(value, "on") == 0) + { + *result = true; + return true; + } + else if (pg_strcasecmp(value, "off") == 0) + { + *result = false; + return true; + } + break; + case '1': + if (value[1] == '\0') + { + *result = true; + return true; + } + break; + case '0': + if (value[1] == '\0') + { + *result = false; + return true; + } + break; + default: + break; + } + + *result = false; /* suppress compiler warning */ + return false; +} + +/* * Build a startup packet given a filled-in PGconn structure. * * We need to figure out how much space is needed, then fill it in. @@ -2180,6 +2257,20 @@ build_startup_packet(const PGconn *conn, char *packet, ADD_STARTUP_OPTION("replication", conn->replication); if (conn->pgoptions && conn->pgoptions[0]) ADD_STARTUP_OPTION("options", conn->pgoptions); + if (conn->compression && conn->compression[0]) + { + bool enabled; + /* + * If compressoin is enabled, then send to the server list of compression algorithms + * supported by client + */ + if (parse_bool(conn->compression, &enabled)) + { + char compression_algorithms[ZPQ_MAX_ALGORITHMS]; + zpq_get_supported_algorithms(compression_algorithms); + ADD_STARTUP_OPTION("compression", compression_algorithms); + } + } if (conn->send_appname) { /* Use appname if present, otherwise use fallback */ diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 1de91ae..69ad1e9 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -40,6 +40,7 @@ /* include stuff common to fe and be */ #include "getaddrinfo.h" #include "libpq/pqcomm.h" +#include "common/zpq_stream.h" /* include stuff found in fe only */ #include "pqexpbuffer.h" @@ -369,6 +370,7 @@ struct pg_conn * "sspi") */ char *ssl_min_protocol_version; /* minimum TLS protocol version */ char *ssl_max_protocol_version; /* maximum TLS protocol version */ + char *compression; /* stream compression (0 or 1) */ /* Type of connection to make. Possible values: any, read-write. */ char *target_session_attrs; @@ -527,6 +529,9 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer; /* expansible string */ + + /* Compression stream */ + ZpqStream* zstream; }; /* PGcancel stores all data necessary to cancel a connection. A copy of this diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index 90594bd..daac58c 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -123,7 +123,7 @@ sub mkvcbuild config_info.c controldata_utils.c d2s.c encnames.c exec.c f2s.c file_perm.c file_utils.c hashfn.c ip.c jsonapi.c keywords.c kwlookup.c link-canary.c md5.c - pg_get_line.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c + pg_get_line.c zpq_stream.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c username.c wait_error.c wchar.c);