From ca3b34481f5d1404ff0fdb60c65ec0b7befd86fb Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Mon, 26 Feb 2024 23:48:31 +1300 Subject: [PATCH v4 3/5] Streaming Read API --- contrib/pg_prewarm/pg_prewarm.c | 40 +- src/backend/storage/Makefile | 2 +- src/backend/storage/aio/Makefile | 14 + src/backend/storage/aio/meson.build | 5 + src/backend/storage/aio/streaming_read.c | 612 ++++++++++++++++++++++ src/backend/storage/buffer/bufmgr.c | 641 ++++++++++++++++------- src/backend/storage/buffer/localbuf.c | 14 +- src/backend/storage/meson.build | 1 + src/include/storage/bufmgr.h | 45 ++ src/include/storage/streaming_read.h | 52 ++ src/tools/pgindent/typedefs.list | 3 + 11 files changed, 1218 insertions(+), 211 deletions(-) create mode 100644 src/backend/storage/aio/Makefile create mode 100644 src/backend/storage/aio/meson.build create mode 100644 src/backend/storage/aio/streaming_read.c create mode 100644 src/include/storage/streaming_read.h diff --git a/contrib/pg_prewarm/pg_prewarm.c b/contrib/pg_prewarm/pg_prewarm.c index 8541e4d6e46..1cc84bcb0c2 100644 --- a/contrib/pg_prewarm/pg_prewarm.c +++ b/contrib/pg_prewarm/pg_prewarm.c @@ -20,6 +20,7 @@ #include "miscadmin.h" #include "storage/bufmgr.h" #include "storage/smgr.h" +#include "storage/streaming_read.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -38,6 +39,25 @@ typedef enum static PGIOAlignedBlock blockbuffer; +struct pg_prewarm_streaming_read_private +{ + BlockNumber blocknum; + int64 last_block; +}; + +static BlockNumber +pg_prewarm_streaming_read_next(PgStreamingRead *pgsr, + void *pgsr_private, + void *per_buffer_data) +{ + struct pg_prewarm_streaming_read_private *p = pgsr_private; + + if (p->blocknum <= p->last_block) + return p->blocknum++; + + return InvalidBlockNumber; +} + /* * pg_prewarm(regclass, mode text, fork text, * first_block int8, last_block int8) @@ -183,18 +203,36 @@ pg_prewarm(PG_FUNCTION_ARGS) } else if (ptype == PREWARM_BUFFER) { + struct pg_prewarm_streaming_read_private p; + PgStreamingRead *pgsr; + /* * In buffer mode, we actually pull the data into shared_buffers. */ + + /* Set up the private state for our streaming buffer read callback. */ + p.blocknum = first_block; + p.last_block = last_block; + + pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_FULL, + &p, + 0, + NULL, + BMR_REL(rel), + forkNumber, + pg_prewarm_streaming_read_next); + for (block = first_block; block <= last_block; ++block) { Buffer buf; CHECK_FOR_INTERRUPTS(); - buf = ReadBufferExtended(rel, forkNumber, block, RBM_NORMAL, NULL); + buf = pg_streaming_read_buffer_get_next(pgsr, NULL); ReleaseBuffer(buf); ++blocks_done; } + Assert(pg_streaming_read_buffer_get_next(pgsr, NULL) == InvalidBuffer); + pg_streaming_read_free(pgsr); } /* Close relation, release lock. */ diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile index 8376cdfca20..eec03f6f2b4 100644 --- a/src/backend/storage/Makefile +++ b/src/backend/storage/Makefile @@ -8,6 +8,6 @@ subdir = src/backend/storage top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = buffer file freespace ipc large_object lmgr page smgr sync +SUBDIRS = aio buffer file freespace ipc large_object lmgr page smgr sync include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile new file mode 100644 index 00000000000..bcab44c802f --- /dev/null +++ b/src/backend/storage/aio/Makefile @@ -0,0 +1,14 @@ +# +# Makefile for storage/aio +# +# src/backend/storage/aio/Makefile +# + +subdir = src/backend/storage/aio +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + streaming_read.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build new file mode 100644 index 00000000000..39aef2a84a2 --- /dev/null +++ b/src/backend/storage/aio/meson.build @@ -0,0 +1,5 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +backend_sources += files( + 'streaming_read.c', +) diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c new file mode 100644 index 00000000000..71f2c4a70b6 --- /dev/null +++ b/src/backend/storage/aio/streaming_read.c @@ -0,0 +1,612 @@ +#include "postgres.h" + +#include "storage/streaming_read.h" +#include "utils/rel.h" + +/* + * Element type for PgStreamingRead's circular array of block ranges. + */ +typedef struct PgStreamingReadRange +{ + bool need_wait; + bool advice_issued; + BlockNumber blocknum; + int nblocks; + int per_buffer_data_index; + Buffer buffers[MAX_BUFFERS_PER_TRANSFER]; + ReadBuffersOperation operation; +} PgStreamingReadRange; + +/* + * Streaming read object. + */ +struct PgStreamingRead +{ + int max_ios; + int ios_in_progress; + int max_pinned_buffers; + int pinned_buffers; + int pinned_buffers_trigger; + int next_tail_buffer; + int ramp_up_pin_limit; + int ramp_up_pin_stall; + bool finished; + bool advice_enabled; + void *pgsr_private; + PgStreamingReadBufferCB callback; + + BufferAccessStrategy strategy; + BufferManagerRelation bmr; + ForkNumber forknum; + + /* Sometimes we need to buffer one block for flow control. */ + BlockNumber unget_blocknum; + void *unget_per_buffer_data; + + /* Next expected block, for detecting sequential access. */ + BlockNumber seq_blocknum; + + /* Space for optional per-buffer private data. */ + size_t per_buffer_data_size; + void *per_buffer_data; + + /* Circular buffer of ranges. */ + int size; + int head; + int tail; + PgStreamingReadRange ranges[FLEXIBLE_ARRAY_MEMBER]; +}; + +static PgStreamingRead * +pg_streaming_read_buffer_alloc_internal(int flags, + void *pgsr_private, + size_t per_buffer_data_size, + BufferAccessStrategy strategy) +{ + PgStreamingRead *pgsr; + int size; + int max_ios; + uint32 max_pinned_buffers; + + + /* + * Decide how many assumed I/Os we will allow to run concurrently. That + * is, advice to the kernel to tell it that we will soon read. This + * number also affects how far we look ahead for opportunities to start + * more I/Os. + */ + if (flags & PGSR_FLAG_MAINTENANCE) + max_ios = maintenance_io_concurrency; + else + max_ios = effective_io_concurrency; + + /* + * The desired level of I/O concurrency controls how far ahead we are + * willing to look ahead. We also clamp it to at least + * MAX_BUFFER_PER_TRANFER so that we can have a chance to build up a full + * sized read, even when max_ios is zero. + */ + max_pinned_buffers = Max(max_ios * 4, MAX_BUFFERS_PER_TRANSFER); + + /* + * The *_io_concurrency GUCs might be set to 0, but we want to allow at + * least one, to keep our gating logic simple. + */ + max_ios = Max(max_ios, 1); + + /* + * Don't allow this backend to pin too many buffers. For now we'll apply + * the limit for the shared buffer pool and the local buffer pool, without + * worrying which it is. + */ + LimitAdditionalPins(&max_pinned_buffers); + LimitAdditionalLocalPins(&max_pinned_buffers); + Assert(max_pinned_buffers > 0); + + /* + * pgsr->ranges is a circular buffer. When it is empty, head == tail. + * When it is full, there is an empty element between head and tail. Head + * can also be empty (nblocks == 0), therefore we need two extra elements + * for non-occupied ranges, on top of max_pinned_buffers to allow for the + * maxmimum possible number of occupied ranges of the smallest possible + * size of one. + */ + size = max_pinned_buffers + 2; + + pgsr = (PgStreamingRead *) + palloc0(offsetof(PgStreamingRead, ranges) + + sizeof(pgsr->ranges[0]) * size); + + pgsr->max_ios = max_ios; + pgsr->per_buffer_data_size = per_buffer_data_size; + pgsr->max_pinned_buffers = max_pinned_buffers; + pgsr->pgsr_private = pgsr_private; + pgsr->strategy = strategy; + pgsr->size = size; + + pgsr->unget_blocknum = InvalidBlockNumber; + +#ifdef USE_PREFETCH + + /* + * This system supports prefetching advice. As long as direct I/O isn't + * enabled, and the caller hasn't promised sequential access, we can use + * it. + */ + if ((io_direct_flags & IO_DIRECT_DATA) == 0 && + (flags & PGSR_FLAG_SEQUENTIAL) == 0) + pgsr->advice_enabled = true; +#endif + + /* + * We start off building small ranges, but double that quickly, for the + * benefit of users that don't know how far ahead they'll read. This can + * be disabled by users that already know they'll read all the way. + */ + if (flags & PGSR_FLAG_FULL) + pgsr->ramp_up_pin_limit = INT_MAX; + else + pgsr->ramp_up_pin_limit = 1; + + /* + * We want to avoid creating ranges that are smaller than they could be + * just because we hit max_pinned_buffers. We only look ahead when the + * number of pinned buffers falls below this trigger number, or put + * another way, we stop looking ahead when we wouldn't be able to build a + * "full sized" range. + */ + pgsr->pinned_buffers_trigger = + Max(1, (int) max_pinned_buffers - MAX_BUFFERS_PER_TRANSFER); + + /* Space for the callback to store extra data along with each block. */ + if (per_buffer_data_size) + pgsr->per_buffer_data = palloc(per_buffer_data_size * max_pinned_buffers); + + return pgsr; +} + +/* + * Create a new streaming read object that can be used to perform the + * equivalent of a series of ReadBuffer() calls for one fork of one relation. + * Internally, it generates larger vectored reads where possible by looking + * ahead. + */ +PgStreamingRead * +pg_streaming_read_buffer_alloc(int flags, + void *pgsr_private, + size_t per_buffer_data_size, + BufferAccessStrategy strategy, + BufferManagerRelation bmr, + ForkNumber forknum, + PgStreamingReadBufferCB next_block_cb) +{ + PgStreamingRead *result; + + result = pg_streaming_read_buffer_alloc_internal(flags, + pgsr_private, + per_buffer_data_size, + strategy); + result->callback = next_block_cb; + result->bmr = bmr; + result->forknum = forknum; + + return result; +} + +/* + * Find the per-buffer data index for the Nth block of a range. + */ +static int +get_per_buffer_data_index(PgStreamingRead *pgsr, PgStreamingReadRange *range, int n) +{ + int result; + + /* + * Find slot in the circular buffer of per-buffer data, without using the + * expensive % operator. + */ + result = range->per_buffer_data_index + n; + if (result >= pgsr->max_pinned_buffers) + result -= pgsr->max_pinned_buffers; + Assert(result == (range->per_buffer_data_index + n) % pgsr->max_pinned_buffers); + + return result; +} + +/* + * Return a pointer to the per-buffer data by index. + */ +static void * +get_per_buffer_data_by_index(PgStreamingRead *pgsr, int per_buffer_data_index) +{ + return (char *) pgsr->per_buffer_data + + pgsr->per_buffer_data_size * per_buffer_data_index; +} + +/* + * Return a pointer to the per-buffer data for the Nth block of a range. + */ +static void * +get_per_buffer_data(PgStreamingRead *pgsr, PgStreamingReadRange *range, int n) +{ + return get_per_buffer_data_by_index(pgsr, + get_per_buffer_data_index(pgsr, + range, + n)); +} + +/* + * Start reading the head range, and create a new head range. The new head + * range is returned. It may not be empty, if StartReadBuffers() couldn't + * start the entire range; in that case the returned range contains the + * remaining portion of the range. + */ +static PgStreamingReadRange * +pg_streaming_read_start_head_range(PgStreamingRead *pgsr) +{ + PgStreamingReadRange *head_range; + PgStreamingReadRange *new_head_range; + int nblocks_pinned; + int flags; + + /* Caller should make sure we never exceed max_ios. */ + Assert(pgsr->ios_in_progress < pgsr->max_ios); + + /* Should only call if the head range has some blocks to read. */ + head_range = &pgsr->ranges[pgsr->head]; + Assert(head_range->nblocks > 0); + + /* + * If advice hasn't been suppressed, and this system supports it, this + * isn't a strictly sequential pattern, then we'll issue advice. + */ + if (pgsr->advice_enabled && head_range->blocknum != pgsr->seq_blocknum) + flags = READ_BUFFERS_ISSUE_ADVICE; + else + flags = 0; + + + /* Start reading as many blocks as we can from the head range. */ + nblocks_pinned = head_range->nblocks; + head_range->need_wait = + StartReadBuffers(pgsr->bmr, + head_range->buffers, + pgsr->forknum, + head_range->blocknum, + &nblocks_pinned, + pgsr->strategy, + flags, + &head_range->operation); + + /* Did that start an I/O? */ + if (head_range->need_wait && (flags & READ_BUFFERS_ISSUE_ADVICE)) + { + head_range->advice_issued = true; + pgsr->ios_in_progress++; + Assert(pgsr->ios_in_progress <= pgsr->max_ios); + } + + /* + * StartReadBuffers() might have pinned fewer blocks than we asked it to, + * but always at least one. + */ + Assert(nblocks_pinned <= head_range->nblocks); + Assert(nblocks_pinned >= 1); + pgsr->pinned_buffers += nblocks_pinned; + + /* + * Remember where the next block would be after that, so we can detect + * sequential access next time. + */ + pgsr->seq_blocknum = head_range->blocknum + nblocks_pinned; + + /* + * Create a new head range. There must be space, because we have enough + * elements for every range to hold just one block, up to the pin limit. + */ + Assert(pgsr->size > pgsr->max_pinned_buffers); + Assert((pgsr->head + 1) % pgsr->size != pgsr->tail); + if (++pgsr->head == pgsr->size) + pgsr->head = 0; + new_head_range = &pgsr->ranges[pgsr->head]; + new_head_range->nblocks = 0; + new_head_range->advice_issued = false; + + /* + * If we didn't manage to start the whole read above, we split the range, + * moving the remainder into the new head range. + */ + if (nblocks_pinned < head_range->nblocks) + { + int nblocks_remaining = head_range->nblocks - nblocks_pinned; + + head_range->nblocks = nblocks_pinned; + + new_head_range->blocknum = head_range->blocknum + nblocks_pinned; + new_head_range->nblocks = nblocks_remaining; + } + + /* The new range has per-buffer data starting after the previous range. */ + new_head_range->per_buffer_data_index = + get_per_buffer_data_index(pgsr, head_range, nblocks_pinned); + + return new_head_range; +} + +/* + * Ask the callback which block it would like us to read next, with a small + * buffer in front to allow pg_streaming_unget_block() to work. + */ +static BlockNumber +pg_streaming_get_block(PgStreamingRead *pgsr, void *per_buffer_data) +{ + BlockNumber result; + + if (unlikely(pgsr->unget_blocknum != InvalidBlockNumber)) + { + /* + * If we had to unget a block, now it is time to return that one + * again. + */ + result = pgsr->unget_blocknum; + pgsr->unget_blocknum = InvalidBlockNumber; + + /* + * The same per_buffer_data element must have been used, and still + * contains whatever data the callback wrote into it. So we just + * sanity-check that we were called with the value that + * pg_streaming_unget_block() pushed back. + */ + Assert(per_buffer_data == pgsr->unget_per_buffer_data); + } + else + { + /* Use the installed callback directly. */ + result = pgsr->callback(pgsr, pgsr->pgsr_private, per_buffer_data); + } + + return result; +} + +/* + * In order to deal with short reads in StartReadBuffers(), we sometimes need + * to defer handling of a block until later. This *must* be called with the + * last value returned by pg_streaming_get_block(). + */ +static void +pg_streaming_unget_block(PgStreamingRead *pgsr, BlockNumber blocknum, void *per_buffer_data) +{ + Assert(pgsr->unget_blocknum == InvalidBlockNumber); + pgsr->unget_blocknum = blocknum; + pgsr->unget_per_buffer_data = per_buffer_data; +} + +static void +pg_streaming_read_look_ahead(PgStreamingRead *pgsr) +{ + PgStreamingReadRange *range; + + /* + * If we're still ramping up, we may have to stall to wait for buffers to + * be consumed first before we do any more prefetching. + */ + if (pgsr->ramp_up_pin_stall > 0) + { + Assert(pgsr->pinned_buffers > 0); + return; + } + + /* + * If we're finished or can't start more I/O, then don't look ahead. + */ + if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios) + return; + + /* + * We'll also wait until the number of pinned buffers falls below our + * trigger level, so that we have the chance to create a full range. + */ + if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger) + return; + + do + { + BlockNumber blocknum; + void *per_buffer_data; + + /* Do we have a full-sized range? */ + range = &pgsr->ranges[pgsr->head]; + if (range->nblocks == lengthof(range->buffers)) + { + /* Start as much of it as we can. */ + range = pg_streaming_read_start_head_range(pgsr); + + /* If we're now at the I/O limit, stop here. */ + if (pgsr->ios_in_progress == pgsr->max_ios) + return; + + /* + * If we couldn't form a full range, then stop here to avoid + * creating small I/O. + */ + if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger) + return; + + /* + * That might have only been partially started, but always + * processes at least one so that'll do for now. + */ + Assert(range->nblocks < lengthof(range->buffers)); + } + + /* Find per-buffer data slot for the next block. */ + per_buffer_data = get_per_buffer_data(pgsr, range, range->nblocks); + + /* Find out which block the callback wants to read next. */ + blocknum = pg_streaming_get_block(pgsr, per_buffer_data); + if (blocknum == InvalidBlockNumber) + { + /* End of stream. */ + pgsr->finished = true; + break; + } + + /* + * Is there a head range that we cannot extend, because the requested + * block is not consecutive? + */ + if (range->nblocks > 0 && + range->blocknum + range->nblocks != blocknum) + { + /* Yes. Start it, so we can begin building a new one. */ + range = pg_streaming_read_start_head_range(pgsr); + + /* + * It's possible that it was only partially started, and we have a + * new range with the remainder. Keep starting I/Os until we get + * it all out of the way, or we hit the I/O limit. + */ + while (range->nblocks > 0 && pgsr->ios_in_progress < pgsr->max_ios) + range = pg_streaming_read_start_head_range(pgsr); + + /* + * We have to 'unget' the block returned by the callback if we + * don't have enough I/O capacity left to start something. + */ + if (pgsr->ios_in_progress == pgsr->max_ios) + { + pg_streaming_unget_block(pgsr, blocknum, per_buffer_data); + return; + } + } + + /* If we have a new, empty range, initialize the start block. */ + if (range->nblocks == 0) + { + range->blocknum = blocknum; + } + + /* This block extends the range by one. */ + Assert(range->blocknum + range->nblocks == blocknum); + range->nblocks++; + + } while (pgsr->pinned_buffers + range->nblocks < pgsr->max_pinned_buffers && + pgsr->pinned_buffers + range->nblocks < pgsr->ramp_up_pin_limit); + + /* If we've hit the ramp-up limit, insert a stall. */ + if (pgsr->pinned_buffers + range->nblocks >= pgsr->ramp_up_pin_limit) + { + /* Can't get here if an earlier stall hasn't finished. */ + Assert(pgsr->ramp_up_pin_stall == 0); + /* Don't do any more prefetching until these buffers are consumed. */ + pgsr->ramp_up_pin_stall = pgsr->ramp_up_pin_limit; + /* Double it. It will soon be out of the way. */ + pgsr->ramp_up_pin_limit *= 2; + } + + /* Start as much as we can. */ + while (range->nblocks > 0) + { + range = pg_streaming_read_start_head_range(pgsr); + if (pgsr->ios_in_progress == pgsr->max_ios) + break; + } +} + +Buffer +pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_data) +{ + pg_streaming_read_look_ahead(pgsr); + + /* See if we have one buffer to return. */ + while (pgsr->tail != pgsr->head) + { + PgStreamingReadRange *tail_range; + + tail_range = &pgsr->ranges[pgsr->tail]; + + /* + * Do we need to perform an I/O before returning the buffers from this + * range? + */ + if (tail_range->need_wait) + { + WaitReadBuffers(&tail_range->operation); + tail_range->need_wait = false; + + /* + * We don't really know if the kernel generated a physical I/O + * when we issued advice, let alone when it finished, but it has + * certainly finished now because we've performed the read. + */ + if (tail_range->advice_issued) + { + Assert(pgsr->ios_in_progress > 0); + pgsr->ios_in_progress--; + } + } + + /* Are there more buffers available in this range? */ + if (pgsr->next_tail_buffer < tail_range->nblocks) + { + int buffer_index; + Buffer buffer; + + buffer_index = pgsr->next_tail_buffer++; + buffer = tail_range->buffers[buffer_index]; + + Assert(BufferIsValid(buffer)); + + /* We are giving away ownership of this pinned buffer. */ + Assert(pgsr->pinned_buffers > 0); + pgsr->pinned_buffers--; + + if (pgsr->ramp_up_pin_stall > 0) + pgsr->ramp_up_pin_stall--; + + if (per_buffer_data) + *per_buffer_data = get_per_buffer_data(pgsr, tail_range, buffer_index); + + return buffer; + } + + /* Advance tail to next range, if there is one. */ + if (++pgsr->tail == pgsr->size) + pgsr->tail = 0; + pgsr->next_tail_buffer = 0; + + /* + * If tail crashed into head, and head is not empty, then it is time + * to start that range. + */ + if (pgsr->tail == pgsr->head && + pgsr->ranges[pgsr->head].nblocks > 0) + pg_streaming_read_start_head_range(pgsr); + } + + Assert(pgsr->pinned_buffers == 0); + + return InvalidBuffer; +} + +void +pg_streaming_read_free(PgStreamingRead *pgsr) +{ + Buffer buffer; + + /* Stop looking ahead. */ + pgsr->finished = true; + + /* Unpin anything that wasn't consumed. */ + while ((buffer = pg_streaming_read_buffer_get_next(pgsr, NULL)) != InvalidBuffer) + ReleaseBuffer(buffer); + + Assert(pgsr->pinned_buffers == 0); + Assert(pgsr->ios_in_progress == 0); + + /* Release memory. */ + if (pgsr->per_buffer_data) + pfree(pgsr->per_buffer_data); + + pfree(pgsr); +} diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index bdf89bbc4dc..3b1b0ad99df 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -19,6 +19,11 @@ * and pin it so that no one can destroy it while this process * is using it. * + * StartReadBuffers() -- as above, but for multiple contiguous blocks in + * two steps. + * + * WaitReadBuffers() -- second step of StartReadBuffers(). + * * ReleaseBuffer() -- unpin a buffer * * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty". @@ -472,10 +477,9 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref) ) -static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, +static Buffer ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum, BlockNumber blockNum, - ReadBufferMode mode, BufferAccessStrategy strategy, - bool *hit); + ReadBufferMode mode, BufferAccessStrategy strategy); static BlockNumber ExtendBufferedRelCommon(BufferManagerRelation bmr, ForkNumber fork, BufferAccessStrategy strategy, @@ -501,7 +505,7 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); -static bool StartBufferIO(BufferDesc *buf, bool forInput); +static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits, bool forget_owner); static void AbortBufferIO(Buffer buffer); @@ -782,7 +786,6 @@ Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy) { - bool hit; Buffer buf; /* @@ -795,15 +798,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot access temporary tables of other sessions"))); - /* - * Read the buffer, and update pgstat counters to reflect a cache hit or - * miss. - */ - pgstat_count_buffer_read(reln); - buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence, - forkNum, blockNum, mode, strategy, &hit); - if (hit) - pgstat_count_buffer_hit(reln); + buf = ReadBuffer_common(BMR_REL(reln), + forkNum, blockNum, mode, strategy); + return buf; } @@ -823,13 +820,12 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool permanent) { - bool hit; - SMgrRelation smgr = smgropen(rlocator, InvalidBackendId); - return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT : - RELPERSISTENCE_UNLOGGED, forkNum, blockNum, - mode, strategy, &hit); + return ReadBuffer_common(BMR_SMGR(smgr, permanent ? RELPERSISTENCE_PERMANENT : + RELPERSISTENCE_UNLOGGED), + forkNum, blockNum, + mode, strategy); } /* @@ -995,35 +991,68 @@ ExtendBufferedRelTo(BufferManagerRelation bmr, */ if (buffer == InvalidBuffer) { - bool hit; - Assert(extended_by == 0); - buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence, - fork, extend_to - 1, mode, strategy, - &hit); + buffer = ReadBuffer_common(bmr, fork, extend_to - 1, mode, strategy); } return buffer; } +/* + * Zero a buffer and lock it, as part of the implementation of + * RBM_ZERO_AND_LOCK or RBM_ZERO_AND_CLEANUP_LOCK. The buffer must be already + * pinned. It does not have to be valid, but it is valid and locked on + * return. + */ +static void +ZeroBuffer(Buffer buffer, ReadBufferMode mode) +{ + BufferDesc *bufHdr; + uint32 buf_state; + + Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK); + + if (BufferIsLocal(buffer)) + bufHdr = GetLocalBufferDescriptor(-buffer - 1); + else + { + bufHdr = GetBufferDescriptor(buffer - 1); + if (mode == RBM_ZERO_AND_LOCK) + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + else + LockBufferForCleanup(buffer); + } + + memset(BufferGetPage(buffer), 0, BLCKSZ); + + if (BufferIsLocal(buffer)) + { + buf_state = pg_atomic_read_u32(&bufHdr->state); + buf_state |= BM_VALID; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + else + { + buf_state = LockBufHdr(bufHdr); + buf_state |= BM_VALID; + UnlockBufHdr(bufHdr, buf_state); + } +} + /* * ReadBuffer_common -- common logic for all ReadBuffer variants * * *hit is set to true if the request was satisfied from shared buffer cache. */ static Buffer -ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, +ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, - BufferAccessStrategy strategy, bool *hit) + BufferAccessStrategy strategy) { - BufferDesc *bufHdr; - Block bufBlock; - bool found; - IOContext io_context; - IOObject io_object; - bool isLocalBuf = SmgrIsTemp(smgr); - - *hit = false; + ReadBuffersOperation operation; + Buffer buffer; + int nblocks; + int flags; /* * Backward compatibility path, most code should use ExtendBufferedRel() @@ -1042,181 +1071,404 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) flags |= EB_LOCK_FIRST; - return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence), - forkNum, strategy, flags); + return ExtendBufferedRel(bmr, forkNum, strategy, flags); } - TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend); + nblocks = 1; + if (mode == RBM_ZERO_ON_ERROR) + flags = READ_BUFFERS_ZERO_ON_ERROR; + else + flags = 0; + if (StartReadBuffers(bmr, + &buffer, + forkNum, + blockNum, + &nblocks, + strategy, + flags, + &operation)) + WaitReadBuffers(&operation); + Assert(nblocks == 1); /* single block can't be short */ + + if (mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO_AND_LOCK) + ZeroBuffer(buffer, mode); + + return buffer; +} + +static Buffer +PrepareReadBuffer(BufferManagerRelation bmr, + ForkNumber forkNum, + BlockNumber blockNum, + BufferAccessStrategy strategy, + bool *foundPtr) +{ + BufferDesc *bufHdr; + bool isLocalBuf; + IOContext io_context; + IOObject io_object; + + Assert(blockNum != P_NEW); + Assert(bmr.smgr); + + isLocalBuf = SmgrIsTemp(bmr.smgr); if (isLocalBuf) { - /* - * We do not use a BufferAccessStrategy for I/O of temporary tables. - * However, in some cases, the "strategy" may not be NULL, so we can't - * rely on IOContextForStrategy() to set the right IOContext for us. - * This may happen in cases like CREATE TEMPORARY TABLE AS... - */ io_context = IOCONTEXT_NORMAL; io_object = IOOBJECT_TEMP_RELATION; - bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found); - if (found) - pgBufferUsage.local_blks_hit++; - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || - mode == RBM_ZERO_ON_ERROR) - pgBufferUsage.local_blks_read++; } else { - /* - * lookup the buffer. IO_IN_PROGRESS is set if the requested block is - * not currently in memory. - */ io_context = IOContextForStrategy(strategy); io_object = IOOBJECT_RELATION; - bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum, - strategy, &found, io_context); - if (found) - pgBufferUsage.shared_blks_hit++; - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || - mode == RBM_ZERO_ON_ERROR) - pgBufferUsage.shared_blks_read++; } - /* At this point we do NOT hold any locks. */ + TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend); - /* if it was already in the buffer pool, we're done */ - if (found) + ResourceOwnerEnlarge(CurrentResourceOwner); + if (isLocalBuf) + { + bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, foundPtr); + if (*foundPtr) + pgBufferUsage.local_blks_hit++; + } + else + { + bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, blockNum, + strategy, foundPtr, io_context); + if (*foundPtr) + pgBufferUsage.shared_blks_hit++; + } + if (bmr.rel) + { + /* + * While pgBufferUsage's "read" counter isn't bumped unless we reach + * WaitReadBuffers() (so, not for hits, and not for buffers that are + * zeroed instead), the per-relation stats always count them. + */ + pgstat_count_buffer_read(bmr.rel); + if (*foundPtr) + pgstat_count_buffer_hit(bmr.rel); + } + if (*foundPtr) { - /* Just need to update stats before we exit */ - *hit = true; VacuumPageHit++; pgstat_count_io_op(io_object, io_context, IOOP_HIT); - if (VacuumCostActive) VacuumCostBalance += VacuumCostPageHit; TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - found); + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend, + true); + } - /* - * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked - * on return. - */ - if (!isLocalBuf) - { - if (mode == RBM_ZERO_AND_LOCK) - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), - LW_EXCLUSIVE); - else if (mode == RBM_ZERO_AND_CLEANUP_LOCK) - LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr)); - } + return BufferDescriptorGetBuffer(bufHdr); +} - return BufferDescriptorGetBuffer(bufHdr); +/* + * Begin reading a range of blocks beginning at blockNum and extending for + * *nblocks. On return, up to *nblocks pinned buffers holding those blocks + * are written into the buffers array, and *nblocks is updated to contain the + * actual number, which may be fewer than requested. + * + * If false is returned, no I/O is necessary and WaitReadBuffers() is not + * necessary. If true is returned, one I/O has been started, and + * WaitReadBuffers() must be called with the same operation object before the + * buffers are accessed. Along with the operation object, the caller-supplied + * array of buffers must remain valid until WaitReadBuffers() is called. + * + * Currently the I/O is only started with optional operating system advice, + * and the real I/O happens in WaitReadBuffers(). In future work, true I/O + * could be initiated here. + */ +bool +StartReadBuffers(BufferManagerRelation bmr, + Buffer *buffers, + ForkNumber forkNum, + BlockNumber blockNum, + int *nblocks, + BufferAccessStrategy strategy, + int flags, + ReadBuffersOperation *operation) +{ + int actual_nblocks = *nblocks; + + if (bmr.rel) + { + bmr.smgr = RelationGetSmgr(bmr.rel); + bmr.relpersistence = bmr.rel->rd_rel->relpersistence; } - /* - * if we have gotten to this point, we have allocated a buffer for the - * page but its contents are not yet valid. IO_IN_PROGRESS is set for it, - * if it's a shared buffer. - */ - Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ + operation->bmr = bmr; + operation->forknum = forkNum; + operation->blocknum = blockNum; + operation->buffers = buffers; + operation->nblocks = actual_nblocks; + operation->strategy = strategy; + operation->flags = flags; - bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); + operation->io_buffers_len = 0; - /* - * Read in the page, unless the caller intends to overwrite it and just - * wants us to allocate a buffer. - */ - if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) - MemSet((char *) bufBlock, 0, BLCKSZ); - else + for (int i = 0; i < actual_nblocks; ++i) { - instr_time io_start = pgstat_prepare_io_time(track_io_timing); + bool found; - smgrread(smgr, forkNum, blockNum, bufBlock); + buffers[i] = PrepareReadBuffer(bmr, + forkNum, + blockNum + i, + strategy, + &found); - pgstat_count_io_op_time(io_object, io_context, - IOOP_READ, io_start, 1); + if (found) + { + /* + * Terminate the read as soon as we get a hit. It could be a + * single buffer hit, or it could be a hit that follows a readable + * range. We don't want to create more than one readable range, + * so we stop here. + */ + actual_nblocks = operation->nblocks = *nblocks = i + 1; + } + else + { + /* Extend the readable range to cover this block. */ + operation->io_buffers_len++; + } + } - /* check for garbage data */ - if (!PageIsVerifiedExtended((Page) bufBlock, blockNum, - PIV_LOG_WARNING | PIV_REPORT_STAT)) + if (operation->io_buffers_len > 0) + { + if (flags & READ_BUFFERS_ISSUE_ADVICE) { - if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages) - { - ereport(WARNING, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s; zeroing out page", - blockNum, - relpath(smgr->smgr_rlocator, forkNum)))); - MemSet((char *) bufBlock, 0, BLCKSZ); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s", - blockNum, - relpath(smgr->smgr_rlocator, forkNum)))); + /* + * In theory we should only do this if PrepareReadBuffers() had to + * allocate new buffers above. That way, if two calls to + * StartReadBuffers() were made for the same blocks before + * WaitReadBuffers(), only the first would issue the advice. + * That'd be a better simulation of true asynchronous I/O, which + * would only start the I/O once, but isn't done here for + * simplicity. Note also that the following call might actually + * issue two advice calls if we cross a segment boundary; in a + * true asynchronous version we might choose to process only one + * real I/O at a time in that case. + */ + smgrprefetch(bmr.smgr, forkNum, blockNum, operation->io_buffers_len); } + + /* Indicate that WaitReadBuffers() should be called. */ + return true; } + else + { + return false; + } +} - /* - * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer - * content lock before marking the page as valid, to make sure that no - * other backend sees the zeroed page before the caller has had a chance - * to initialize it. - * - * Since no-one else can be looking at the page contents yet, there is no - * difference between an exclusive lock and a cleanup-strength lock. (Note - * that we cannot use LockBuffer() or LockBufferForCleanup() here, because - * they assert that the buffer is already valid.) - */ - if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) && - !isLocalBuf) +static inline bool +WaitReadBuffersCanStartIO(Buffer buffer, bool nowait) +{ + if (BufferIsLocal(buffer)) { - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE); + BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1); + + return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0; } + else + return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); +} + +void +WaitReadBuffers(ReadBuffersOperation *operation) +{ + BufferManagerRelation bmr; + Buffer *buffers; + int nblocks; + BlockNumber blocknum; + ForkNumber forknum; + bool isLocalBuf; + IOContext io_context; + IOObject io_object; + + /* + * Currently operations are only allowed to include a read of some range, + * with an optional extra buffer that is already pinned at the end. So + * nblocks can be at most one more than io_buffers_len. + */ + Assert((operation->nblocks == operation->io_buffers_len) || + (operation->nblocks == operation->io_buffers_len + 1)); + /* Find the range of the physical read we need to perform. */ + nblocks = operation->io_buffers_len; + if (nblocks == 0) + return; /* nothing to do */ + + buffers = &operation->buffers[0]; + blocknum = operation->blocknum; + forknum = operation->forknum; + bmr = operation->bmr; + + isLocalBuf = SmgrIsTemp(bmr.smgr); if (isLocalBuf) { - /* Only need to adjust flags */ - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); - - buf_state |= BM_VALID; - pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; } else { - /* Set BM_VALID, terminate IO, and wake up any waiters */ - TerminateBufferIO(bufHdr, false, BM_VALID, true); + io_context = IOContextForStrategy(operation->strategy); + io_object = IOOBJECT_RELATION; } - VacuumPageMiss++; - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageMiss; + /* + * We count all these blocks as read by this backend. This is traditional + * behavior, but might turn out to be not true if we find that someone + * else has beaten us and completed the read of some of these blocks. In + * that case the system globally double-counts, but we traditionally don't + * count this as a "hit", and we don't have a separate counter for "miss, + * but another backend completed the read". + */ + if (isLocalBuf) + pgBufferUsage.local_blks_read += nblocks; + else + pgBufferUsage.shared_blks_read += nblocks; - TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - found); + for (int i = 0; i < nblocks; ++i) + { + int io_buffers_len; + Buffer io_buffers[MAX_BUFFERS_PER_TRANSFER]; + void *io_pages[MAX_BUFFERS_PER_TRANSFER]; + instr_time io_start; + BlockNumber io_first_block; - return BufferDescriptorGetBuffer(bufHdr); + /* + * Skip this block if someone else has already completed it. If an + * I/O is already in progress in another backend, this will wait for + * the outcome: either done, or something went wrong and we will + * retry. + */ + if (!WaitReadBuffersCanStartIO(buffers[i], false)) + { + /* + * Report this as a 'hit' for this backend, even though it must + * have started out as a miss in PrepareReadBuffer(). + */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend, + true); + continue; + } + + /* We found a buffer that we need to read in. */ + io_buffers[0] = buffers[i]; + io_pages[0] = BufferGetBlock(buffers[i]); + io_first_block = blocknum + i; + io_buffers_len = 1; + + /* + * How many neighboring-on-disk blocks can we can scatter-read into + * other buffers at the same time? In this case we don't wait if we + * see an I/O already in progress. We already hold BM_IO_IN_PROGRESS + * for the head block, so we should get on with that I/O as soon as + * possible. We'll come back to this block again, above. + */ + while ((i + 1) < nblocks && + WaitReadBuffersCanStartIO(buffers[i + 1], true)) + { + /* Must be consecutive block numbers. */ + Assert(BufferGetBlockNumber(buffers[i + 1]) == + BufferGetBlockNumber(buffers[i]) + 1); + + io_buffers[io_buffers_len] = buffers[++i]; + io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); + } + + io_start = pgstat_prepare_io_time(track_io_timing); + smgrreadv(bmr.smgr, forknum, io_first_block, io_pages, io_buffers_len); + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start, + io_buffers_len); + + /* Verify each block we read, and terminate the I/O. */ + for (int j = 0; j < io_buffers_len; ++j) + { + BufferDesc *bufHdr; + Block bufBlock; + + if (isLocalBuf) + { + bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1); + bufBlock = LocalBufHdrGetBlock(bufHdr); + } + else + { + bufHdr = GetBufferDescriptor(io_buffers[j] - 1); + bufBlock = BufHdrGetBlock(bufHdr); + } + + /* check for garbage data */ + if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j, + PIV_LOG_WARNING | PIV_REPORT_STAT)) + { + if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages) + { + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s; zeroing out page", + io_first_block + j, + relpath(bmr.smgr->smgr_rlocator, forknum)))); + memset(bufBlock, 0, BLCKSZ); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s", + io_first_block + j, + relpath(bmr.smgr->smgr_rlocator, forknum)))); + } + + /* Terminate I/O and set BM_VALID. */ + if (isLocalBuf) + { + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + buf_state |= BM_VALID; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + else + { + /* Set BM_VALID, terminate IO, and wake up any waiters */ + TerminateBufferIO(bufHdr, false, BM_VALID, true); + } + + /* Report I/Os as completing individually. */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend, + false); + } + + VacuumPageMiss += io_buffers_len; + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; + } } /* - * BufferAlloc -- subroutine for ReadBuffer. Handles lookup of a shared - * buffer. If no buffer exists already, selects a replacement - * victim and evicts the old page, but does NOT read in new page. + * BufferAlloc -- subroutine for StartReadBuffers. Handles lookup of a shared + * buffer. If no buffer exists already, selects a replacement victim and + * evicts the old page, but does NOT read in new page. * * "strategy" can be a buffer replacement strategy object, or NULL for * the default strategy. The selected buffer's usage_count is advanced when @@ -1224,11 +1476,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * * The returned buffer is pinned and is already marked as holding the * desired page. If it already did have the desired page, *foundPtr is - * set true. Otherwise, *foundPtr is set false and the buffer is marked - * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it. - * - * *foundPtr is actually redundant with the buffer's BM_VALID flag, but - * we keep it for simplicity in ReadBuffer. + * set true. Otherwise, *foundPtr is set false. * * io_context is passed as an output parameter to avoid calling * IOContextForStrategy() when there is a shared buffers hit and no IO @@ -1287,19 +1535,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { /* * We can only get here if (a) someone else is still reading in - * the page, or (b) a previous read attempt failed. We have to - * wait for any active read attempt to finish, and then set up our - * own read attempt if the page is still not BM_VALID. - * StartBufferIO does it all. + * the page, (b) a previous read attempt failed, or (c) someone + * called StartReadBuffers() but not yet WaitReadBuffers(). */ - if (StartBufferIO(buf, true)) - { - /* - * If we get here, previous attempts to read the buffer must - * have failed ... but we shall bravely try again. - */ - *foundPtr = false; - } + *foundPtr = false; } return buf; @@ -1364,19 +1603,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { /* * We can only get here if (a) someone else is still reading in - * the page, or (b) a previous read attempt failed. We have to - * wait for any active read attempt to finish, and then set up our - * own read attempt if the page is still not BM_VALID. - * StartBufferIO does it all. + * the page, (b) a previous read attempt failed, or (c) someone + * called StartReadBuffers() but not yet WaitReadBuffers(). */ - if (StartBufferIO(existing_buf_hdr, true)) - { - /* - * If we get here, previous attempts to read the buffer must - * have failed ... but we shall bravely try again. - */ - *foundPtr = false; - } + *foundPtr = false; } return existing_buf_hdr; @@ -1408,15 +1638,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, LWLockRelease(newPartitionLock); /* - * Buffer contents are currently invalid. Try to obtain the right to - * start I/O. If StartBufferIO returns false, then someone else managed - * to read it before we did, so there's nothing left for BufferAlloc() to - * do. + * Buffer contents are currently invalid. */ - if (StartBufferIO(victim_buf_hdr, true)) - *foundPtr = false; - else - *foundPtr = true; + *foundPtr = false; return victim_buf_hdr; } @@ -1770,7 +1994,7 @@ again: * pessimistic, but outside of toy-sized shared_buffers it should allow * sufficient pins. */ -static void +void LimitAdditionalPins(uint32 *additional_pins) { uint32 max_backends; @@ -2035,7 +2259,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, buf_state &= ~BM_VALID; UnlockBufHdr(existing_hdr, buf_state); - } while (!StartBufferIO(existing_hdr, true)); + } while (!StartBufferIO(existing_hdr, true, false)); } else { @@ -2058,7 +2282,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, LWLockRelease(partition_lock); /* XXX: could combine the locked operations in it with the above */ - StartBufferIO(victim_buf_hdr, true); + StartBufferIO(victim_buf_hdr, true, false); } } @@ -2373,7 +2597,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) else { /* - * If we previously pinned the buffer, it must surely be valid. + * If we previously pinned the buffer, it is likely to be valid, but + * it may not be if StartReadBuffers() was called and + * WaitReadBuffers() hasn't been called yet. We'll check by loading + * the flags without locking. This is racy, but it's OK to return + * false spuriously: when WaitReadBuffers() calls StartBufferIO(), + * it'll see that it's now valid. * * Note: We deliberately avoid a Valgrind client request here. * Individual access methods can optionally superimpose buffer page @@ -2382,7 +2611,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) * that the buffer page is legitimately non-accessible here. We * cannot meddle with that. */ - result = true; + result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0; } ref->refcount++; @@ -3450,7 +3679,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * someone else flushed the buffer before we could, so we need not do * anything. */ - if (!StartBufferIO(buf, false)) + if (!StartBufferIO(buf, false, false)) return; /* Setup error traceback support for ereport() */ @@ -5185,9 +5414,15 @@ WaitIO(BufferDesc *buf) * * Returns true if we successfully marked the buffer as I/O busy, * false if someone else already did the work. + * + * If nowait is true, then we don't wait for an I/O to be finished by another + * backend. In that case, false indicates either that the I/O was already + * finished, or is still in progress. This is useful for callers that want to + * find out if they can perform the I/O as part of a larger operation, without + * waiting for the answer or distinguishing the reasons why not. */ static bool -StartBufferIO(BufferDesc *buf, bool forInput) +StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) { uint32 buf_state; @@ -5200,6 +5435,8 @@ StartBufferIO(BufferDesc *buf, bool forInput) if (!(buf_state & BM_IO_IN_PROGRESS)) break; UnlockBufHdr(buf, buf_state); + if (nowait) + return false; WaitIO(buf); } diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 1f02fed250e..6956d4e5b49 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -109,10 +109,9 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, * LocalBufferAlloc - * Find or create a local buffer for the given page of the given relation. * - * API is similar to bufmgr.c's BufferAlloc, except that we do not need - * to do any locking since this is all local. Also, IO_IN_PROGRESS - * does not get set. Lastly, we support only default access strategy - * (hence, usage_count is always advanced). + * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do + * any locking since this is all local. We support only default access + * strategy (hence, usage_count is always advanced). */ BufferDesc * LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, @@ -288,7 +287,7 @@ GetLocalVictimBuffer(void) } /* see LimitAdditionalPins() */ -static void +void LimitAdditionalLocalPins(uint32 *additional_pins) { uint32 max_pins; @@ -298,9 +297,10 @@ LimitAdditionalLocalPins(uint32 *additional_pins) /* * In contrast to LimitAdditionalPins() other backends don't play a role - * here. We can allow up to NLocBuffer pins in total. + * here. We can allow up to NLocBuffer pins in total, but it might not be + * initialized yet so read num_temp_buffers. */ - max_pins = (NLocBuffer - NLocalPinnedBuffers); + max_pins = (num_temp_buffers - NLocalPinnedBuffers); if (*additional_pins >= max_pins) *additional_pins = max_pins; diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build index 40345bdca27..739d13293fb 100644 --- a/src/backend/storage/meson.build +++ b/src/backend/storage/meson.build @@ -1,5 +1,6 @@ # Copyright (c) 2022-2024, PostgreSQL Global Development Group +subdir('aio') subdir('buffer') subdir('file') subdir('freespace') diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index d51d46d3353..b57f71f97e3 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -14,6 +14,7 @@ #ifndef BUFMGR_H #define BUFMGR_H +#include "port/pg_iovec.h" #include "storage/block.h" #include "storage/buf.h" #include "storage/bufpage.h" @@ -158,6 +159,11 @@ extern PGDLLIMPORT int32 *LocalRefCount; #define BUFFER_LOCK_SHARE 1 #define BUFFER_LOCK_EXCLUSIVE 2 +/* + * Maximum number of buffers for multi-buffer I/O functions. This is set to + * allow 128kB transfers, unless BLCKSZ and IOV_MAX imply a a smaller maximum. + */ +#define MAX_BUFFERS_PER_TRANSFER Min(PG_IOV_MAX, (128 * 1024) / BLCKSZ) /* * prototypes for functions in bufmgr.c @@ -177,6 +183,42 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool permanent); + +#define READ_BUFFERS_ZERO_ON_ERROR 0x01 +#define READ_BUFFERS_ISSUE_ADVICE 0x02 + +/* + * Private state used by StartReadBuffers() and WaitReadBuffers(). Declared + * in public header only to allow inclusion in other structs, but contents + * should not be accessed. + */ +struct ReadBuffersOperation +{ + /* Parameters passed in to StartReadBuffers(). */ + BufferManagerRelation bmr; + Buffer *buffers; + ForkNumber forknum; + BlockNumber blocknum; + int nblocks; + BufferAccessStrategy strategy; + int flags; + + /* Range of buffers, if we need to perform a read. */ + int io_buffers_len; +}; + +typedef struct ReadBuffersOperation ReadBuffersOperation; + +extern bool StartReadBuffers(BufferManagerRelation bmr, + Buffer *buffers, + ForkNumber forknum, + BlockNumber blocknum, + int *nblocks, + BufferAccessStrategy strategy, + int flags, + ReadBuffersOperation *operation); +extern void WaitReadBuffers(ReadBuffersOperation *operation); + extern void ReleaseBuffer(Buffer buffer); extern void UnlockReleaseBuffer(Buffer buffer); extern bool BufferIsExclusiveLocked(Buffer buffer); @@ -250,6 +292,9 @@ extern bool HoldingBufferPinThatDelaysRecovery(void); extern bool BgBufferSync(struct WritebackContext *wb_context); +extern void LimitAdditionalPins(uint32 *additional_pins); +extern void LimitAdditionalLocalPins(uint32 *additional_pins); + /* in buf_init.c */ extern void InitBufferPool(void); extern Size BufferShmemSize(void); diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h new file mode 100644 index 00000000000..c4d3892bb26 --- /dev/null +++ b/src/include/storage/streaming_read.h @@ -0,0 +1,52 @@ +#ifndef STREAMING_READ_H +#define STREAMING_READ_H + +#include "storage/bufmgr.h" +#include "storage/fd.h" +#include "storage/smgr.h" + +/* Default tuning, reasonable for many users. */ +#define PGSR_FLAG_DEFAULT 0x00 + +/* + * I/O streams that are performing maintenance work on behalf of potentially + * many users. + */ +#define PGSR_FLAG_MAINTENANCE 0x01 + +/* + * We usually avoid issuing prefetch advice automatically when sequential + * access is detected, but this flag explicitly disables it, for cases that + * might not be correctly detected. Explicit advice is known to perform worse + * than letting the kernel (at least Linux) detect sequential access. + */ +#define PGSR_FLAG_SEQUENTIAL 0x02 + +/* + * We usually ramp up from smaller reads to larger ones, to support users who + * don't know if it's worth reading lots of buffers yet. This flag disables + * that, declaring ahead of time that we'll be reading all available buffers. + */ +#define PGSR_FLAG_FULL 0x04 + +struct PgStreamingRead; +typedef struct PgStreamingRead PgStreamingRead; + +/* Callback that returns the next block number to read. */ +typedef BlockNumber (*PgStreamingReadBufferCB) (PgStreamingRead *pgsr, + void *pgsr_private, + void *per_buffer_private); + +extern PgStreamingRead *pg_streaming_read_buffer_alloc(int flags, + void *pgsr_private, + size_t per_buffer_private_size, + BufferAccessStrategy strategy, + BufferManagerRelation bmr, + ForkNumber forknum, + PgStreamingReadBufferCB next_block_cb); + +extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr); +extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_private); +extern void pg_streaming_read_free(PgStreamingRead *pgsr); + +#endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index fc8b15d0cf2..cfb58cf4836 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2097,6 +2097,8 @@ PgStat_TableCounts PgStat_TableStatus PgStat_TableXactStatus PgStat_WalStats +PgStreamingRead +PgStreamingReadRange PgXmlErrorContext PgXmlStrictness Pg_finfo_record @@ -2267,6 +2269,7 @@ ReInitializeDSMForeignScan_function ReScanForeignScan_function ReadBufPtrType ReadBufferMode +ReadBuffersOperation ReadBytePtrType ReadExtraTocPtrType ReadFunc -- 2.40.1