From 0a7fafe704a11a80b332a9f16e466c2e05ec852d Mon Sep 17 00:00:00 2001 From: usernamedt Date: Tue, 30 Mar 2021 17:08:10 +0500 Subject: [PATCH 1/2] Add zlib and zstd streaming compression --- configure | 81 +++++ configure.ac | 20 + src/Makefile.global.in | 1 + src/backend/Makefile | 8 + src/common/Makefile | 3 +- src/common/z_stream.c | 666 ++++++++++++++++++++++++++++++++++ src/include/common/z_stream.h | 90 +++++ src/include/pg_config.h.in | 3 + src/interfaces/libpq/Makefile | 14 + src/tools/msvc/Mkvcbuild.pm | 2 +- src/tools/msvc/Solution.pm | 1 + 11 files changed, 887 insertions(+), 2 deletions(-) create mode 100644 src/common/z_stream.c create mode 100644 src/include/common/z_stream.h diff --git a/configure b/configure index 06ad9aeb71..fd11cbcb62 100755 --- a/configure +++ b/configure @@ -703,6 +703,7 @@ LZ4_LIBS LZ4_CFLAGS with_lz4 with_zlib +with_zstd with_system_tzdata with_libxslt XML2_LIBS @@ -868,6 +869,7 @@ with_libxslt with_system_tzdata with_zlib with_lz4 +with_zstd with_gnu_ld with_ssl with_openssl @@ -8542,6 +8544,85 @@ fi +# +# ZStd +# + + + +# Check whether --with-zstd was given. +if test "${with_zstd+set}" = set; then : + withval=$with_zstd; + case $withval in + yes) + ;; + no) + : + ;; + *) + as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5 + ;; + esac + +else + with_zstd=no + +fi + + + + +if test "$with_zstd" = yes ; then + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5 +$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; } +if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lzstd $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char ZSTD_compress (); +int +main () +{ +return ZSTD_compress (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_zstd_ZSTD_compress=yes +else + ac_cv_lib_zstd_ZSTD_compress=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5 +$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; } +if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIBZSTD 1 +_ACEOF + + LIBS="-lzstd $LIBS" + +else + as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5 +fi + +fi + + # # Zlib diff --git a/configure.ac b/configure.ac index 92193f35fb..b06dedc892 100644 --- a/configure.ac +++ b/configure.ac @@ -1011,6 +1011,13 @@ if test "$with_lz4" = yes; then done fi +# +# Zstd +# +PGAC_ARG_BOOL(with, zstd, no, + [use zstd]) +AC_SUBST(with_zstd) + # # Assignments # @@ -1202,6 +1209,13 @@ failure. It is possible the compiler isn't looking in the proper directory. Use --without-zlib to disable zlib support.])]) fi +if test "$with_zstd" = yes; then + AC_CHECK_LIB(zstd, ZSTD_decompressStream, [], + [AC_MSG_ERROR([zstd library not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory.])]) +fi + if test "$enable_spinlocks" = yes; then AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.]) else @@ -1441,6 +1455,12 @@ fi if test "$with_lz4" = yes; then AC_CHECK_HEADERS(lz4.h, [], [AC_MSG_ERROR([lz4.h header file is required for LZ4])]) + +if test "$with_zstd" = yes; then + AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory. +Use --without-zstd to disable zstd support.])]) fi if test "$with_gssapi" = yes ; then diff --git a/src/Makefile.global.in b/src/Makefile.global.in index 74b3a6acd2..070c57d9f4 100644 --- a/src/Makefile.global.in +++ b/src/Makefile.global.in @@ -196,6 +196,7 @@ with_llvm = @with_llvm@ with_system_tzdata = @with_system_tzdata@ with_uuid = @with_uuid@ with_zlib = @with_zlib@ +with_zstd = @with_zstd@ enable_rpath = @enable_rpath@ enable_nls = @enable_nls@ enable_debug = @enable_debug@ diff --git a/src/backend/Makefile b/src/backend/Makefile index 0da848b1fd..6abcffc3e5 100644 --- a/src/backend/Makefile +++ b/src/backend/Makefile @@ -54,6 +54,14 @@ ifeq ($(with_systemd),yes) LIBS += -lsystemd endif +ifeq ($(with_zstd),yes) +LIBS += -lzstd +endif + +ifeq ($(with_zlib),yes) +LIBS += -lz +endif + ########################################################################## all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP) diff --git a/src/common/Makefile b/src/common/Makefile index 5422579a6a..e38ff2b051 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -78,7 +78,8 @@ OBJS_COMMON = \ unicode_norm.o \ username.o \ wait_error.o \ - wchar.o + wchar.o \ + z_stream.o ifeq ($(with_ssl),openssl) OBJS_COMMON += \ diff --git a/src/common/z_stream.c b/src/common/z_stream.c new file mode 100644 index 0000000000..c720578b7e --- /dev/null +++ b/src/common/z_stream.c @@ -0,0 +1,666 @@ +#include "c.h" +#include "pg_config.h" +#include "common/z_stream.h" + +/* + * Functions implementing streaming compression algorithm + */ +typedef struct +{ + /* + * Name of compression algorithm. + */ + char const *(*name) (void); + + /* + * Create new compression stream. level: compression level + */ + void *(*create_compressor) (int level); + + /* + * Create new decompression stream. + */ + void *(*create_decompressor) (); + + /* + * Decompress up to "src_size" compressed bytes from *src and write up to + * "dst_size" raw (decompressed) bytes to *dst. Number of decompressed + * bytes written to *dst is stored in *dst_processed. Number of compressed + * bytes read from *src is stored in *src_processed. + * + * Return codes: ZS_OK if no errors were encountered during decompression + * attempt. This return code does not guarantee that *src_processed > 0 or + * *dst_processed > 0. + * + * ZS_DATA_PENDING means that there might be some data left within + * decompressor internal buffers. + * + * ZS_STREAM_END if encountered end of compressed data stream. + * + * ZS_DECOMPRESS_ERROR if encountered an error during decompression + * attempt. + */ + ssize_t (*decompress) (void *ds, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); + + /* + * Compress up to "src_size" raw (non-compressed) bytes from *src and + * write up to "dst_size" compressed bytes to *dst. Number of compressed + * bytes written to *dst is stored in *dst_processed. Number of + * non-compressed bytes read from *src is stored in *src_processed. + * + * Return codes: ZS_OK if no errors were encountered during compression + * attempt. This return code does not guarantee that *src_processed > 0 or + * *dst_processed > 0. + * + * ZS_DATA_PENDING means that there might be some data left within + * compressor internal buffers. + * + * ZS_COMPRESS_ERROR if encountered an error during compression attempt. + */ + ssize_t (*compress) (void *cs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); + + /* + * Free compression stream created by create_compressor function. + */ + void (*free_compressor) (void *cs); + + /* + * Free decompression stream created by create_decompressor function. + */ + void (*free_decompressor) (void *ds); + + /* + * Get compressor error message. + */ + char const *(*compress_error) (void *cs); + + /* + * Get decompressor error message. + */ + char const *(*decompress_error) (void *ds); + + ssize_t (*end_compression) (void *cs, void *dst, size_t dst_size, size_t *dst_processed); +} ZAlgorithm; + +struct ZStream +{ + ZAlgorithm const *c_algorithm; + void *c_stream; + + ZAlgorithm const *d_algorithm; + void *d_stream; + + bool rx_not_flushed; + bool tx_not_flushed; +}; + +#if HAVE_LIBZSTD + +#include +#include + +/* + * Maximum allowed back-reference distance, expressed as power of 2. + * This setting controls max compressor/decompressor window size. + * More details https://github.com/facebook/zstd/blob/v1.4.7/lib/zstd.h#L536 + */ +#define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */ + + +typedef struct ZPQ_ZSTD_CStream +{ + ZSTD_CStream *stream; + char const *error; /* error message */ +} ZPQ_ZSTD_CStream; + +typedef struct ZPQ_ZSTD_DStream +{ + ZSTD_DStream *stream; + char const *error; /* error message */ +} ZPQ_ZSTD_DStream; + +static void * +zstd_create_compressor(int level) +{ + ZPQ_ZSTD_CStream *c_stream = (ZPQ_ZSTD_CStream *) malloc(sizeof(ZPQ_ZSTD_CStream)); + + c_stream->stream = ZSTD_createCStream(); + ZSTD_initCStream(c_stream->stream, level); +#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3 + ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT); +#endif + c_stream->error = NULL; + return c_stream; +} + +static void * +zstd_create_decompressor() +{ + ZPQ_ZSTD_DStream *d_stream = (ZPQ_ZSTD_DStream *) malloc(sizeof(ZPQ_ZSTD_DStream)); + + d_stream->stream = ZSTD_createDStream(); + ZSTD_initDStream(d_stream->stream); +#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3 + ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT); +#endif + d_stream->error = NULL; + return d_stream; +} + +static ssize_t +zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + ZSTD_inBuffer in; + ZSTD_outBuffer out; + size_t rc; + + in.src = src; + in.pos = 0; + in.size = src_size; + + out.dst = dst; + out.pos = 0; + out.size = dst_size; + + rc = ZSTD_decompressStream(ds->stream, &out, &in); + + *src_processed = in.pos; + *dst_processed = out.pos; + if (ZSTD_isError(rc)) + { + ds->error = ZSTD_getErrorName(rc); + return ZS_DECOMPRESS_ERROR; + } + + if (rc == 0) + { + return ZS_STREAM_END; + } + + if (out.pos == out.size) + { + /* + * if `output.pos == output.size`, there might be some data left + * within internal buffers + */ + return ZS_DATA_PENDING; + } + return ZS_OK; +} + +static ssize_t +zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + ZSTD_inBuffer in; + ZSTD_outBuffer out; + + in.src = src; + in.pos = 0; + in.size = src_size; + + out.dst = dst; + out.pos = 0; + out.size = dst_size; + + if (in.pos < src_size) /* Has something to compress in input buffer */ + { + size_t rc = ZSTD_compressStream(cs->stream, &out, &in); + + *dst_processed = out.pos; + *src_processed = in.pos; + if (ZSTD_isError(rc)) + { + cs->error = ZSTD_getErrorName(rc); + return ZS_COMPRESS_ERROR; + } + } + + if (in.pos == src_size) /* All data is compressed: flush internal zstd + * buffer */ + { + size_t tx_not_flushed = ZSTD_flushStream(cs->stream, &out); + + *dst_processed = out.pos; + if (tx_not_flushed > 0) + { + return ZS_DATA_PENDING; + } + } + + return ZS_OK; +} + +static ssize_t +zstd_end(void *c_stream, void *dst, size_t dst_size, size_t *dst_processed) +{ + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + ZSTD_outBuffer output; + + output.dst = dst; + output.pos = 0; + output.size = dst_size; + + size_t tx_not_flushed; + + do + { + tx_not_flushed = ZSTD_endStream(cs->stream, &output); + } while ((tx_not_flushed > 0) && (output.pos < output.size)); + + *dst_processed = output.pos; + + if (tx_not_flushed > 0) + { + return ZS_DATA_PENDING; + } + return ZS_OK; +} + +static void +zstd_free_compressor(void *c_stream) +{ + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + + if (cs != NULL) + { + ZSTD_freeCStream(cs->stream); + free(cs); + } +} + +static void +zstd_free_decompressor(void *d_stream) +{ + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + + if (ds != NULL) + { + ZSTD_freeDStream(ds->stream); + free(ds); + } +} + +static char const * +zstd_compress_error(void *c_stream) +{ + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + + return cs->error; +} + +static char const * +zstd_decompress_error(void *d_stream) +{ + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + + return ds->error; +} + +static char const * +zstd_name(void) +{ + return "zstd"; +} + +#endif + +#if HAVE_LIBZ + +#include +#include + + +static void * +zlib_create_compressor(int level) +{ + int rc; + z_stream *c_stream = (z_stream *) malloc(sizeof(z_stream)); + + memset(c_stream, 0, sizeof(*c_stream)); + rc = deflateInit(c_stream, level); + if (rc != Z_OK) + { + free(c_stream); + return NULL; + } + return c_stream; +} + +static void * +zlib_create_decompressor() +{ + int rc; + z_stream *d_stream = (z_stream *) malloc(sizeof(z_stream)); + + memset(d_stream, 0, sizeof(*d_stream)); + rc = inflateInit(d_stream); + if (rc != Z_OK) + { + free(d_stream); + return NULL; + } + return d_stream; +} + +static ssize_t +zlib_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + z_stream *ds = (z_stream *) d_stream; + int rc; + + ds->next_in = (Bytef *) src; + ds->avail_in = src_size; + ds->next_out = (Bytef *) dst; + ds->avail_out = dst_size; + + rc = inflate(ds, Z_SYNC_FLUSH); + *src_processed = src_size - ds->avail_in; + *dst_processed = dst_size - ds->avail_out; + + if (rc == Z_STREAM_END) + { + return ZS_STREAM_END; + } + if (rc != Z_OK && rc != Z_BUF_ERROR) + { + return ZS_DECOMPRESS_ERROR; + } + + return ZS_OK; +} + +static ssize_t +zlib_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + z_stream *cs = (z_stream *) c_stream; + int rc; + unsigned deflate_pending = 0; + + + cs->next_out = (Bytef *) dst; + cs->avail_out = dst_size; + cs->next_in = (Bytef *) src; + cs->avail_in = src_size; + + rc = deflate(cs, Z_SYNC_FLUSH); + Assert(rc == Z_OK); + *dst_processed = dst_size - cs->avail_out; + *src_processed = src_size - cs->avail_in; + + deflatePending(cs, &deflate_pending, Z_NULL); /* check if any data left + * in deflate buffer */ + if (deflate_pending > 0) + { + return ZS_DATA_PENDING; + } + return ZS_OK; +} + + +static ssize_t +zlib_end(void *c_stream, void *dst, size_t dst_size, size_t *dst_processed) +{ + z_stream *cs = (z_stream *) c_stream; + int rc; + + cs->next_out = (Bytef *) dst; + cs->avail_out = dst_size; + cs->next_in = NULL; + cs->avail_in = 0; + + rc = deflate(cs, Z_STREAM_END); + Assert(rc == Z_OK || rc == Z_STREAM_END); + *dst_processed = dst_size - cs->avail_out; + if (rc == Z_STREAM_END) + { + return ZS_OK; + } + + return ZS_DATA_PENDING; +} + +static void +zlib_free_compressor(void *c_stream) +{ + z_stream *cs = (z_stream *) c_stream; + + if (cs != NULL) + { + deflateEnd(cs); + free(cs); + } +} + +static void +zlib_free_decompressor(void *d_stream) +{ + z_stream *ds = (z_stream *) d_stream; + + if (ds != NULL) + { + inflateEnd(ds); + free(ds); + } +} + +static char const * +zlib_error(void *stream) +{ + z_stream *zs = (z_stream *) stream; + + return zs->msg; +} + +static char const * +zlib_name(void) +{ + return "zlib"; +} + +#endif + +static char const * +no_compression_name(void) +{ + return NULL; +} + +/* + * Array with all supported compression algorithms. + */ +static ZAlgorithm const zpq_algorithms[] = +{ +#if HAVE_LIBZSTD + {zstd_name, zstd_create_compressor, zstd_create_decompressor, zstd_decompress, zstd_compress, zstd_free_compressor, zstd_free_decompressor, zstd_compress_error, zstd_decompress_error, zstd_end}, +#endif +#if HAVE_LIBZ + {zlib_name, zlib_create_compressor, zlib_create_decompressor, zlib_decompress, zlib_compress, zlib_free_compressor, zlib_free_decompressor, zlib_error, zlib_error, zlib_end}, +#endif + {no_compression_name} +}; + +static ssize_t +zpq_init_compressor(ZStream * zs, int c_alg_impl, int c_level) +{ + zs->c_algorithm = &zpq_algorithms[c_alg_impl]; + zs->c_stream = zpq_algorithms[c_alg_impl].create_compressor(c_level); + if (zs->c_stream == NULL) + { + return -1; + } + return 0; +} + +static ssize_t +zpq_init_decompressor(ZStream * zs, int d_alg_impl) +{ + zs->d_algorithm = &zpq_algorithms[d_alg_impl]; + zs->d_stream = zpq_algorithms[d_alg_impl].create_decompressor(); + if (zs->d_stream == NULL) + { + return -1; + } + return 0; +} + +/* + * Index of used compression algorithm in zpq_algorithms array. + */ +ZStream * +zs_create(int c_alg_impl, int c_level, int d_alg_impl) +{ + ZStream *zs = (ZStream *) malloc(sizeof(ZStream)); + + zs->tx_not_flushed = false; + zs->rx_not_flushed = false; + + if (zpq_init_compressor(zs, c_alg_impl, c_level) || zpq_init_decompressor(zs, d_alg_impl)) + { + free(zs); + return NULL; + } + + return zs; +} + +ssize_t +zs_read(ZStream * zs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + *src_processed = 0; + *dst_processed = 0; + + ssize_t rc = zs->d_algorithm->decompress(zs->d_stream, + src, src_size, src_processed, + dst, dst_size, dst_processed); + + zs->rx_not_flushed = false; + if (rc == ZS_DATA_PENDING) + { + zs->rx_not_flushed = true; + return ZS_OK; + } + + if (rc != ZS_OK && rc != ZS_STREAM_END) + { + return ZS_DECOMPRESS_ERROR; + } + + return rc; +} + +ssize_t +zs_write(ZStream * zs, void const *buf, size_t size, size_t *processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + *processed = 0; + *dst_processed = 0; + + ssize_t rc = zs->c_algorithm->compress(zs->c_stream, + buf, size, processed, + dst, dst_size, dst_processed); + + zs->tx_not_flushed = false; + if (rc == ZS_DATA_PENDING) + { + zs->tx_not_flushed = true; + return ZS_OK; + } + if (rc != ZS_OK) + { + return ZS_COMPRESS_ERROR; + } + + return rc; +} + +void +zs_free(ZStream * zs) +{ + if (zs) + { + if (zs->c_stream) + { + zs->c_algorithm->free_compressor(zs->c_stream); + } + if (zs->d_stream) + { + zs->d_algorithm->free_decompressor(zs->d_stream); + } + free(zs); + } +} + +ssize_t +zs_end(ZStream * zs, void *dst, size_t dst_size, size_t *dst_processed) +{ + *dst_processed = 0; + + ssize_t rc = zs->c_algorithm->end_compression(zs->c_stream, dst, dst_size, dst_processed); + + zs->tx_not_flushed = false; + if (rc == ZS_DATA_PENDING) + { + zs->tx_not_flushed = true; + return ZS_OK; + } + if (rc != ZS_OK) + { + return ZS_COMPRESS_ERROR; + } + + return rc; +} + +char const * +zs_compress_error(ZStream * zs) +{ + return zs->c_algorithm->compress_error(zs->c_stream); +} + +char const * +zs_decompress_error(ZStream * zs) +{ + return zs->d_algorithm->decompress_error(zs->d_stream); +} + +bool +zs_buffered_rx(ZStream * zs) +{ + return zs ? zs->rx_not_flushed : 0; +} + +bool +zs_buffered_tx(ZStream * zs) +{ + return zs ? zs->tx_not_flushed : 0; +} + +/* + * Get list of the supported algorithms. + */ +char ** +zs_get_supported_algorithms(void) +{ + size_t n_algorithms = sizeof(zpq_algorithms) / sizeof(*zpq_algorithms); + char **algorithm_names = malloc(n_algorithms * sizeof(char *)); + + for (size_t i = 0; i < n_algorithms; i++) + { + algorithm_names[i] = (char *) zpq_algorithms[i].name(); + } + + return algorithm_names; +} + +char const * +zs_compress_algorithm_name(ZStream * zs) +{ + return zs ? zs->c_algorithm->name() : NULL; +} + +char const * +zs_decompress_algorithm_name(ZStream * zs) +{ + return zs ? zs->d_algorithm->name() : NULL; +} diff --git a/src/include/common/z_stream.h b/src/include/common/z_stream.h new file mode 100644 index 0000000000..a19ea90ca5 --- /dev/null +++ b/src/include/common/z_stream.h @@ -0,0 +1,90 @@ +/* + * z_stream.h + * Streaming compression + */ + + +#ifndef Z_STREAM_H +#define Z_STREAM_H + +#include + +#define ZS_OK (0) +#define ZS_IO_ERROR (-1) +#define ZS_DECOMPRESS_ERROR (-2) +#define ZS_COMPRESS_ERROR (-3) +#define ZS_STREAM_END (-4) +#define ZS_DATA_PENDING (-5) + +struct ZStream; +typedef struct ZStream ZStream; + +#endif + +/* + * Create compression stream with rx/tx function for reading/sending compressed data. + * c_alg_impl: index of chosen compression algorithm + * c_level: compression c_level + * d_alg_impl: index of chosen decompression algorithm + */ +extern ZStream * zs_create(int c_alg_impl, int c_level, int d_alg_impl); + +/* + * Read up to "size" raw (decompressed) bytes. + * Returns number of decompressed bytes or error code. + * Error code is either ZS_DECOMPRESS_ERROR or error code returned by the rx function. + */ +extern ssize_t zs_read(ZStream * zs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); + +/* + * Write up to "size" raw (decompressed) bytes. + * Returns number of written raw bytes or error code. + * Error code is either ZS_COMPRESS_ERROR or error code returned by the tx function. + * In the last case number of bytes written is stored in *processed. + */ +extern ssize_t zs_write(ZStream * zs, void const *buf, size_t size, size_t *processed, void *dst, size_t dst_size, size_t *dst_processed); + +/* + * Get decompressor error message. + */ +extern char const *zs_decompress_error(ZStream * zs); + +/* + * Get compressor error message. + */ +extern char const *zs_compress_error(ZStream * zs); + +/* + * Return true if non-flushed data might left in internal rx decompression buffer. + */ +extern bool zs_buffered_rx(ZStream * zs); + +/* + * Return true if non-flushed data might left in internal tx compression buffer. + */ +extern bool zs_buffered_tx(ZStream * zs); + +/* + * End the compression stream. + */ +extern ssize_t zs_end(ZStream * zs, void *dst, size_t dst_size, size_t *dst_processed); + +/* + * Free stream created by zs_create function. + */ +extern void zs_free(ZStream * zs); + +/* + * Get the name of chosen compression algorithm. + */ +extern char const *zs_compress_algorithm_name(ZStream * zs); + +/* + * Get the name of chosen decompression algorithm. + */ +extern char const *zs_decompress_algorithm_name(ZStream * zs); + +/* + Returns zero terminated array with compression algorithms names +*/ +extern char **zs_get_supported_algorithms(void); diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index 5e2255a2f5..46be32dbfd 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -352,6 +352,9 @@ /* Define to 1 if you have the `link' function. */ #undef HAVE_LINK +/* Define to 1 if you have the `zstd' library (-lzstd). */ +#undef HAVE_LIBZSTD + /* Define to 1 if the system has the type `locale_t'. */ #undef HAVE_LOCALE_T diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile index 2aca882a2b..34e66376cc 100644 --- a/src/interfaces/libpq/Makefile +++ b/src/interfaces/libpq/Makefile @@ -29,6 +29,20 @@ endif # The MSVC build system scrapes OBJS from this file. If you change any of # the conditional additions of files to OBJS, update Mkvcbuild.pm to match. +ifeq ($(with_zstd),yes) +LIBS += -lzstd +SHLIB_LINK += -lzstd +endif + +ifeq ($(with_zlib),yes) +LIBS += -lz +SHLIB_LINK += -lz +endif + +# We can't use Makefile variables here because the MSVC build system scrapes +# OBJS from this file. + + OBJS = \ $(WIN32RES) \ fe-auth-scram.o \ diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index bc65185130..b9f6e9cec5 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -127,7 +127,7 @@ sub mkvcbuild keywords.c kwlookup.c link-canary.c md5_common.c pg_get_line.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c username.c - wait_error.c wchar.c); + wait_error.c wchar.c z_stream.c); if ($solution->{options}->{openssl}) { diff --git a/src/tools/msvc/Solution.pm b/src/tools/msvc/Solution.pm index 710f26f8ab..01b3483b2d 100644 --- a/src/tools/msvc/Solution.pm +++ b/src/tools/msvc/Solution.pm @@ -308,6 +308,7 @@ sub GenerateFiles HAVE_LIBXML2 => undef, HAVE_LIBXSLT => undef, HAVE_LIBZ => $self->{options}->{zlib} ? 1 : undef, + HAVE_LIBZSTD => $self->{options}->{zstd} ? 1 : undef, HAVE_LINK => undef, HAVE_LOCALE_T => 1, HAVE_LONG_INT_64 => undef, -- 2.24.3 (Apple Git-128)