From 790f913391cb9ffd5440202962674addbfb79001 Mon Sep 17 00:00:00 2001 From: Filip Date: Thu, 24 Oct 2024 12:15:10 +0200 Subject: [PATCH v2] Add support for temporary files compression This commit adds support for temporary files compression, it can be used only for hashjoins now. It also adds GUC parameter temp_file_compression that enables this functionality. For now, it supports just lz4 algorithms. In the future, it could also be implemented pglz and zstd support. --- src/backend/access/gist/gistbuildbuffers.c | 2 +- src/backend/backup/backup_manifest.c | 2 +- src/backend/executor/nodeHashjoin.c | 2 +- src/backend/storage/file/buffile.c | 168 +++++++++++++++++- src/backend/utils/misc/guc_tables.c | 23 +++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/backend/utils/sort/logtape.c | 2 +- src/backend/utils/sort/tuplestore.c | 2 +- src/include/storage/buffile.h | 13 +- 9 files changed, 200 insertions(+), 15 deletions(-) diff --git a/src/backend/access/gist/gistbuildbuffers.c b/src/backend/access/gist/gistbuildbuffers.c index 4c2301da00..9b3b00142a 100644 --- a/src/backend/access/gist/gistbuildbuffers.c +++ b/src/backend/access/gist/gistbuildbuffers.c @@ -54,7 +54,7 @@ gistInitBuildBuffers(int pagesPerBuffer, int levelStep, int maxLevel) * Create a temporary file to hold buffer pages that are swapped out of * memory. */ - gfbb->pfile = BufFileCreateTemp(false); + gfbb->pfile = BufFileCreateTemp(false, false); gfbb->nFileBlocks = 0; /* Initialize free page management. */ diff --git a/src/backend/backup/backup_manifest.c b/src/backend/backup/backup_manifest.c index a2e2f86332..f8a3e1f0f4 100644 --- a/src/backend/backup/backup_manifest.c +++ b/src/backend/backup/backup_manifest.c @@ -65,7 +65,7 @@ InitializeBackupManifest(backup_manifest_info *manifest, manifest->buffile = NULL; else { - manifest->buffile = BufFileCreateTemp(false); + manifest->buffile = BufFileCreateTemp(false, false); manifest->manifest_ctx = pg_cryptohash_create(PG_SHA256); if (pg_cryptohash_init(manifest->manifest_ctx) < 0) elog(ERROR, "failed to initialize checksum of backup manifest: %s", diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 2f7170604d..1b5c6448ef 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -1434,7 +1434,7 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, { MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt); - file = BufFileCreateTemp(false); + file = BufFileCreateTemp(false, true); *fileptr = file; MemoryContextSwitchTo(oldctx); diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index a27f51f622..6cb6dcc783 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -54,6 +54,16 @@ #include "storage/fd.h" #include "utils/resowner.h" +#ifdef USE_LZ4 +#include +#endif + +#define NO_LZ4_SUPPORT() \ + ereport(ERROR, \ + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \ + errmsg("compression method lz4 not supported"), \ + errdetail("This functionality requires the server to be built with lz4 support."))) + /* * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE. * The reason is that we'd like large BufFiles to be spread across multiple @@ -62,6 +72,8 @@ #define MAX_PHYSICAL_FILESIZE 0x40000000 #define BUFFILE_SEG_SIZE (MAX_PHYSICAL_FILESIZE / BLCKSZ) +int temp_file_compression = TEMP_NONE_COMPRESSION; + /* * This data structure represents a buffered file that consists of one or * more physical files (each accessed through a virtual file descriptor @@ -95,7 +107,7 @@ struct BufFile off_t curOffset; /* offset part of current pos */ int pos; /* next read/write position in buffer */ int nbytes; /* total # of valid bytes in buffer */ - + bool compress; /* State of usege file compression */ /* * XXX Should ideally us PGIOAlignedBlock, but might need a way to avoid * wasting per-file alignment padding when some users create many files. @@ -127,6 +139,7 @@ makeBufFileCommon(int nfiles) file->curOffset = 0; file->pos = 0; file->nbytes = 0; + file->compress = false; return file; } @@ -190,7 +203,7 @@ extendBufFile(BufFile *file) * transaction boundaries. */ BufFile * -BufFileCreateTemp(bool interXact) +BufFileCreateTemp(bool interXact, bool compress) { BufFile *file; File pfile; @@ -212,6 +225,15 @@ BufFileCreateTemp(bool interXact) file = makeBufFile(pfile); file->isInterXact = interXact; + if (temp_file_compression != TEMP_NONE_COMPRESSION) + { +#ifdef USE_LZ4 + file->compress = compress; +#else + NO_LZ4_SUPPORT(); +#endif + } + return file; } @@ -275,6 +297,7 @@ BufFileCreateFileSet(FileSet *fileset, const char *name) file->files[0] = MakeNewFileSetSegment(file, 0); file->readOnly = false; + return file; } @@ -455,13 +478,72 @@ BufFileLoadBuffer(BufFile *file) INSTR_TIME_SET_ZERO(io_start); /* - * Read whatever we can get, up to a full bufferload. + * Load data as it is stored in the temporary file */ - file->nbytes = FileRead(thisfile, + if (!file->compress) + { + + /* + * Read whatever we can get, up to a full bufferload. + */ + file->nbytes = FileRead(thisfile, file->buffer.data, sizeof(file->buffer), file->curOffset, WAIT_EVENT_BUFFILE_READ); + /* + * Read and decompress data from the temporary file + * The first reading loads size of the compressed block + * Second reading loads compressed data + */ + } else { + int nread; + int nbytes; + + nread = FileRead(thisfile, + &nbytes, + sizeof(nbytes), + file->curOffset, + WAIT_EVENT_BUFFILE_READ); + /* if not EOF let's continue */ + if (nread > 0) + { + /* + * A long life buffer would make sence to limit number of + * memory allocations + */ + char * buff; + + /* + * Read compressed data, curOffset differs with pos + * It reads less data than it returns to caller + * So the curOffset must be advanced here based on compressed size + */ + file->curOffset+=sizeof(nbytes); + + buff = palloc(nbytes); + + nread = FileRead(thisfile, + buff, + nbytes, + file->curOffset, + WAIT_EVENT_BUFFILE_READ); + +#ifdef USE_LZ4 + file->nbytes = LZ4_decompress_safe(buff, + file->buffer.data,nbytes,sizeof(file->buffer)); + file->curOffset += nread; +#endif + + if (file->nbytes < 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compressed lz4 data is corrupt"))); + pfree(buff); + } + + } + if (file->nbytes < 0) { file->nbytes = 0; @@ -494,9 +576,56 @@ static void BufFileDumpBuffer(BufFile *file) { int wpos = 0; - int bytestowrite; + int bytestowrite = 0; File thisfile; + + /* Save nbytes value because the size changes due to compression */ + int nbytesOriginal = file->nbytes; + + bool compression = false; + + char * DataToWrite; + DataToWrite = file->buffer.data; + + /* + * Prepare compressed data to write + * size of compressed block needs to be added at the beggining of the + * compressed data + */ + + + if (file->compress) { + int cBufferSize = 0; + char * cData; + int cSize = 0; +#ifdef USE_LZ4 + cBufferSize = LZ4_compressBound(file->nbytes); +#endif + /* + * A long life buffer would make sence to limit number of + * memory allocations + */ + compression = true; + cData = palloc(cBufferSize + sizeof(int)); +#ifdef USE_LZ4 + /* + * Using stream compression would lead to the slight improvement in + * compression ratio + */ + cSize = LZ4_compress_default(file->buffer.data, + cData + sizeof(int),file->nbytes, cBufferSize); +#endif + + /* Write size of compressed block in front of compressed data + * It's used to determine amount of data to read within + * decompression process + */ + memcpy(cData,&cSize,sizeof(int)); + file->nbytes=cSize + sizeof(int); + DataToWrite = cData; + } + /* * Unlike BufFileLoadBuffer, we must dump the whole buffer even if it * crosses a component-file boundary; so we need a loop. @@ -535,7 +664,7 @@ BufFileDumpBuffer(BufFile *file) INSTR_TIME_SET_ZERO(io_start); bytestowrite = FileWrite(thisfile, - file->buffer.data + wpos, + DataToWrite + wpos, bytestowrite, file->curOffset, WAIT_EVENT_BUFFILE_WRITE); @@ -564,7 +693,19 @@ BufFileDumpBuffer(BufFile *file) * logical file position, ie, original value + pos, in case that is less * (as could happen due to a small backwards seek in a dirty buffer!) */ - file->curOffset -= (file->nbytes - file->pos); + + + if (!file->compress) + file->curOffset -= (file->nbytes - file->pos); + else + if (nbytesOriginal - file->pos != 0) + /* curOffset must be corrected also if compression is + * enabled, nbytes was changed by compression but we + * have to use the original value of nbytes + */ + file->curOffset-=bytestowrite; + + if (file->curOffset < 0) /* handle possible segment crossing */ { file->curFile--; @@ -577,6 +718,9 @@ BufFileDumpBuffer(BufFile *file) */ file->pos = 0; file->nbytes = 0; + + if (compression) + pfree(DataToWrite); } /* @@ -602,8 +746,14 @@ BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK) { if (file->pos >= file->nbytes) { - /* Try to load more data into buffer. */ - file->curOffset += file->pos; + /* Try to load more data into buffer. + * + * curOffset is moved within BufFileLoadBuffer + * because stored data size differs from loaded/ + * decompressed size + * */ + if (!file->compress) + file->curOffset += file->pos; file->pos = 0; file->nbytes = 0; BufFileLoadBuffer(file); diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 686309db58..3821caf763 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -73,6 +73,7 @@ #include "replication/syncrep.h" #include "storage/bufmgr.h" #include "storage/bufpage.h" +#include "storage/buffile.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" #include "storage/predicate.h" @@ -454,6 +455,17 @@ static const struct config_enum_entry default_toast_compression_options[] = { #endif {NULL, 0, false} }; +/* + * pglz and zstd support should be added as future enhancement + * + */ +static const struct config_enum_entry temp_file_compression_options[] = { + {"no", TEMP_NONE_COMPRESSION, false}, +#ifdef USE_LZ4 + {"lz4", TEMP_LZ4_COMPRESSION, false}, +#endif + {NULL, 0, false} +}; static const struct config_enum_entry wal_compression_options[] = { {"pglz", WAL_COMPRESSION_PGLZ, false}, @@ -4856,6 +4868,17 @@ struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"temp_file_compression", PGC_USERSET, CLIENT_CONN_STATEMENT, + gettext_noop("Sets the default compression method for compressible values."), + NULL + }, + &temp_file_compression, + TEMP_NONE_COMPRESSION, + temp_file_compression_options, + NULL, NULL, NULL + }, + { {"default_transaction_isolation", PGC_USERSET, CLIENT_CONN_STATEMENT, gettext_noop("Sets the transaction isolation level of each new transaction."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 667e0dc40a..e9c0b36352 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -177,6 +177,7 @@ #max_notify_queue_pages = 1048576 # limits the number of SLRU pages allocated # for NOTIFY / LISTEN queue +#temp_file_compression = 'no' # enables temporary files compression # - Kernel Resources - diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index 44b30e86ad..af43b3ebb1 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -592,7 +592,7 @@ LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker) lts->pfile = BufFileCreateFileSet(&fileset->fs, filename); } else - lts->pfile = BufFileCreateTemp(false); + lts->pfile = BufFileCreateTemp(false, false); return lts; } diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c index a720d70200..a952f0f4f5 100644 --- a/src/backend/utils/sort/tuplestore.c +++ b/src/backend/utils/sort/tuplestore.c @@ -860,7 +860,7 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple) */ oldcxt = MemoryContextSwitchTo(state->context->parent); - state->myfile = BufFileCreateTemp(state->interXact); + state->myfile = BufFileCreateTemp(state->interXact, false); MemoryContextSwitchTo(oldcxt); diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index 5f6d7c8e3f..486b552e31 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -32,11 +32,22 @@ typedef struct BufFile BufFile; +typedef enum +{ + TEMP_NONE_COMPRESSION, +#ifdef USE_LZ4 + TEMP_LZ4_COMPRESSION +#endif +} TempCompression; + +extern PGDLLIMPORT int temp_file_compression; + + /* * prototypes for functions in buffile.c */ -extern BufFile *BufFileCreateTemp(bool interXact); +extern BufFile *BufFileCreateTemp(bool interXact, bool compress); extern void BufFileClose(BufFile *file); extern pg_nodiscard size_t BufFileRead(BufFile *file, void *ptr, size_t size); extern void BufFileReadExact(BufFile *file, void *ptr, size_t size); -- 2.46.2