diff --git a/contrib/test_decoding/expected/spill.out b/contrib/test_decoding/expected/spill.out index 10734bd..e1d150b6 100644 --- a/contrib/test_decoding/expected/spill.out +++ b/contrib/test_decoding/expected/spill.out @@ -247,6 +247,25 @@ GROUP BY 1 ORDER BY 1; 'serialize-nested-subbig-subbigabort-subbig-3 | 5000 | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbigabort-subbig-3:5001' | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbigabort-subbig-3:10000' (2 rows) +-- large number of spilling subxacts. Should not exceed maxAllocatedDescs +BEGIN; +DO $$ +BEGIN + FOR i IN 1..600 LOOP + BEGIN + INSERT INTO spill_test select generate_series(1, 5000) ; + EXCEPTION + when division_by_zero then perform 'dummy'; + END; + END LOOP; +END $$; +COMMIT; +SELECT 1 from pg_logical_slot_get_changes('regression_slot', NULL,NULL) LIMIT 1; + ?column? +---------- + 1 +(1 row) + DROP TABLE spill_test; SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot diff --git a/contrib/test_decoding/sql/spill.sql b/contrib/test_decoding/sql/spill.sql index e638cac..715d1f9 100644 --- a/contrib/test_decoding/sql/spill.sql +++ b/contrib/test_decoding/sql/spill.sql @@ -174,6 +174,21 @@ SELECT (regexp_split_to_array(data, ':'))[4] COLLATE "C", COUNT(*), (array_agg(d FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT' GROUP BY 1 ORDER BY 1; +-- large number of spilling subxacts. Should not exceed maxAllocatedDescs +BEGIN; +DO $$ +BEGIN + FOR i IN 1..600 LOOP + BEGIN + INSERT INTO spill_test select generate_series(1, 5000) ; + EXCEPTION + when division_by_zero then perform 'dummy'; + END; + END LOOP; +END $$; +COMMIT; +SELECT 1 from pg_logical_slot_get_changes('regression_slot', NULL,NULL) LIMIT 1; + DROP TABLE spill_test; SELECT pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 62e5424..22dc29f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -103,13 +103,21 @@ typedef struct ReorderBufferTupleCidEnt CommandId combocid; /* just for debugging */ } ReorderBufferTupleCidEnt; +/* Virtual file descriptor with file offset tracking */ +typedef struct TXNEntryFile +{ + File vfd; /* -1 when the file is closed */ + off_t curOffset; /* offset for next write or read. Reset to 0 + * when vfd is opened. */ +} TXNEntryFile; + /* k-way in-order change iteration support structures */ typedef struct ReorderBufferIterTXNEntry { XLogRecPtr lsn; ReorderBufferChange *change; ReorderBufferTXN *txn; - int fd; + TXNEntryFile file; XLogSegNo segno; } ReorderBufferIterTXNEntry; @@ -178,7 +186,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb); * subtransactions * --------------------------------------- */ -static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state); static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state); @@ -194,7 +203,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change); static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno); + TXNEntryFile *file, XLogSegNo *segno); static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -945,15 +954,24 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg) /* * Allocate & initialize an iterator which iterates in lsn order over a * transaction and all its subtransactions. + * + * Note: The iterator state is returned through iter_state parameter rather + * than the function's return value. This is because the state gets cleaned up + * in a PG_CATCH block, so we want to make sure the caller gets back the state + * even if this function throws an exception, so that the state resources can + * be cleaned up. */ -static ReorderBufferIterTXNState * -ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) +static void +ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state) { Size nr_txns = 0; ReorderBufferIterTXNState *state; dlist_iter cur_txn_i; int32 off; + *iter_state = NULL; + /* * Calculate the size of our heap: one element for every transaction that * contains changes. (Besides the transactions already in the reorder @@ -988,10 +1006,13 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) for (off = 0; off < state->nr_txns; off++) { - state->entries[off].fd = -1; + state->entries[off].file.vfd = -1; state->entries[off].segno = 0; } + /* Now that the state fields are initialized, it is safe to return it. */ + *iter_state = state; + /* allocate heap */ state->heap = binaryheap_allocate(state->nr_txns, ReorderBufferIterCompare, @@ -1013,7 +1034,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) { /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, txn); - ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd, + ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file, &state->entries[off].segno); } @@ -1043,7 +1064,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, cur_txn); ReorderBufferRestoreChanges(rb, cur_txn, - &state->entries[off].fd, + &state->entries[off].file, &state->entries[off].segno); } cur_change = dlist_head_element(ReorderBufferChange, node, @@ -1059,8 +1080,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) /* assemble a valid binary heap */ binaryheap_build(state->heap); - - return state; } /* @@ -1124,7 +1143,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) dlist_delete(&change->node); dlist_push_tail(&state->old_change, &change->node); - if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd, + if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, &state->entries[off].segno)) { /* successfully restored changes from disk */ @@ -1163,8 +1182,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, for (off = 0; off < state->nr_txns; off++) { - if (state->entries[off].fd != -1) - CloseTransientFile(state->entries[off].fd); + if (state->entries[off].file.vfd != -1) + FileClose(state->entries[off].file.vfd); } /* free memory we might have "leaked" in the last *Next call */ @@ -1178,7 +1197,9 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, Assert(dlist_is_empty(&state->old_change)); } - binaryheap_free(state->heap); + if (state->heap) + binaryheap_free(state->heap); + pfree(state); } @@ -1500,7 +1521,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, rb->begin(rb, txn); - iterstate = ReorderBufferIterTXNInit(rb, txn); + ReorderBufferIterTXNInit(rb, txn, &iterstate); while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) { Relation relation = NULL; @@ -2517,11 +2538,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, */ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno) + TXNEntryFile *file, XLogSegNo *segno) { Size restored = 0; XLogSegNo last_segno; dlist_mutable_iter cleanup_iter; + File *fd = &file->vfd; Assert(txn->first_lsn != InvalidXLogRecPtr); Assert(txn->final_lsn != InvalidXLogRecPtr); @@ -2562,7 +2584,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, *segno); - *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY); + + /* No harm in resetting the offset even in case of failure */ + file->curOffset = 0; + if (*fd < 0 && errno == ENOENT) { *fd = -1; @@ -2582,23 +2608,26 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, * end of this file. */ ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); + readBytes = FileRead(file->vfd, rb->outbuf, + sizeof(ReorderBufferDiskChange), + file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ); /* eof */ if (readBytes == 0) { - CloseTransientFile(*fd); + FileClose(*fd); *fd = -1; (*segno)++; continue; } - else if (readBytes < 0) + if (readBytes < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from reorderbuffer spill file: %m"))); - else if (readBytes != sizeof(ReorderBufferDiskChange)) + + file->curOffset += readBytes; + + if (readBytes != sizeof(ReorderBufferDiskChange)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", @@ -2611,10 +2640,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, sizeof(ReorderBufferDiskChange) + ondisk->size); ondisk = (ReorderBufferDiskChange *) rb->outbuf; - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), - ondisk->size - sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); + readBytes = FileRead(file->vfd, + rb->outbuf + sizeof(ReorderBufferDiskChange), + ondisk->size - sizeof(ReorderBufferDiskChange), + file->curOffset, + WAIT_EVENT_REORDER_BUFFER_READ); if (readBytes < 0) ereport(ERROR, @@ -2627,6 +2657,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, readBytes, (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange))))); + file->curOffset += readBytes; + /* * ok, read a full change from disk, now restore it into proper * in-memory format