diff --git a/contrib/test_decoding/expected/spill.out b/contrib/test_decoding/expected/spill.out index 10734bd..e1d150b6 100644 --- a/contrib/test_decoding/expected/spill.out +++ b/contrib/test_decoding/expected/spill.out @@ -247,6 +247,25 @@ GROUP BY 1 ORDER BY 1; 'serialize-nested-subbig-subbigabort-subbig-3 | 5000 | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbigabort-subbig-3:5001' | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbigabort-subbig-3:10000' (2 rows) +-- large number of spilling subxacts. Should not exceed maxAllocatedDescs +BEGIN; +DO $$ +BEGIN + FOR i IN 1..600 LOOP + BEGIN + INSERT INTO spill_test select generate_series(1, 5000) ; + EXCEPTION + when division_by_zero then perform 'dummy'; + END; + END LOOP; +END $$; +COMMIT; +SELECT 1 from pg_logical_slot_get_changes('regression_slot', NULL,NULL) LIMIT 1; + ?column? +---------- + 1 +(1 row) + DROP TABLE spill_test; SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot diff --git a/contrib/test_decoding/sql/spill.sql b/contrib/test_decoding/sql/spill.sql index e638cac..715d1f9 100644 --- a/contrib/test_decoding/sql/spill.sql +++ b/contrib/test_decoding/sql/spill.sql @@ -174,6 +174,21 @@ SELECT (regexp_split_to_array(data, ':'))[4] COLLATE "C", COUNT(*), (array_agg(d FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT' GROUP BY 1 ORDER BY 1; +-- large number of spilling subxacts. Should not exceed maxAllocatedDescs +BEGIN; +DO $$ +BEGIN + FOR i IN 1..600 LOOP + BEGIN + INSERT INTO spill_test select generate_series(1, 5000) ; + EXCEPTION + when division_by_zero then perform 'dummy'; + END; + END LOOP; +END $$; +COMMIT; +SELECT 1 from pg_logical_slot_get_changes('regression_slot', NULL,NULL) LIMIT 1; + DROP TABLE spill_test; SELECT pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 8ce28ad..1e5a725 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -103,13 +103,21 @@ typedef struct ReorderBufferTupleCidEnt CommandId combocid; /* just for debugging */ } ReorderBufferTupleCidEnt; +/* Virtual file descriptor with file offset tracking */ +typedef struct TXNEntryFile +{ + File vfd; /* -1 when the file is closed */ + off_t curOffset; /* offset for next write or read. Reset to 0 + * when vfd is opened. */ +} TXNEntryFile; + /* k-way in-order change iteration support structures */ typedef struct ReorderBufferIterTXNEntry { XLogRecPtr lsn; ReorderBufferChange *change; ReorderBufferTXN *txn; - int fd; + TXNEntryFile file; XLogSegNo segno; } ReorderBufferIterTXNEntry; @@ -194,7 +202,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change); static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno); + TXNEntryFile *file, XLogSegNo *segno); static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -988,7 +996,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) for (off = 0; off < state->nr_txns; off++) { - state->entries[off].fd = -1; + state->entries[off].file.vfd = -1; state->entries[off].segno = 0; } @@ -1013,7 +1021,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) { /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, txn); - ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd, + ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file, &state->entries[off].segno); } @@ -1043,7 +1051,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, cur_txn); ReorderBufferRestoreChanges(rb, cur_txn, - &state->entries[off].fd, + &state->entries[off].file, &state->entries[off].segno); } cur_change = dlist_head_element(ReorderBufferChange, node, @@ -1124,7 +1132,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) dlist_delete(&change->node); dlist_push_tail(&state->old_change, &change->node); - if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd, + if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, &state->entries[off].segno)) { /* successfully restored changes from disk */ @@ -1163,8 +1171,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, for (off = 0; off < state->nr_txns; off++) { - if (state->entries[off].fd != -1) - CloseTransientFile(state->entries[off].fd); + if (state->entries[off].file.vfd != -1) + FileClose(state->entries[off].file.vfd); } /* free memory we might have "leaked" in the last *Next call */ @@ -2517,11 +2525,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, */ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno) + TXNEntryFile *file, XLogSegNo *segno) { Size restored = 0; XLogSegNo last_segno; dlist_mutable_iter cleanup_iter; + File *fd = &file->vfd; Assert(txn->first_lsn != InvalidXLogRecPtr); Assert(txn->final_lsn != InvalidXLogRecPtr); @@ -2562,7 +2571,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, *segno); - *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY); + + /* No harm in resetting the offset even in case of failure */ + file->curOffset = 0; + if (*fd < 0 && errno == ENOENT) { *fd = -1; @@ -2582,23 +2595,26 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, * end of this file. */ ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); + readBytes = FileRead(file->vfd, rb->outbuf, + sizeof(ReorderBufferDiskChange), + file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ); /* eof */ if (readBytes == 0) { - CloseTransientFile(*fd); + FileClose(*fd); *fd = -1; (*segno)++; continue; } - else if (readBytes < 0) + if (readBytes < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from reorderbuffer spill file: %m"))); - else if (readBytes != sizeof(ReorderBufferDiskChange)) + + file->curOffset += readBytes; + + if (readBytes != sizeof(ReorderBufferDiskChange)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", @@ -2611,10 +2627,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, sizeof(ReorderBufferDiskChange) + ondisk->size); ondisk = (ReorderBufferDiskChange *) rb->outbuf; - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), - ondisk->size - sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); + readBytes = FileRead(file->vfd, + rb->outbuf + sizeof(ReorderBufferDiskChange), + ondisk->size - sizeof(ReorderBufferDiskChange), + file->curOffset, + WAIT_EVENT_REORDER_BUFFER_READ); if (readBytes < 0) ereport(ERROR, @@ -2627,6 +2644,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, readBytes, (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange))))); + file->curOffset += readBytes; + /* * ok, read a full change from disk, now restore it into proper * in-memory format