diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index fcb087ed15dfbb4d567ee0c742e1db8ec1353a2d..6fcd9093268e7bc5aaf6102fa9d6adcdcced1d6d 100644 *** a/src/backend/commands/async.c --- b/src/backend/commands/async.c *************** static ListCell * *** 1285,1290 **** --- 1285,1291 ---- asyncQueueAddEntries(ListCell *nextNotify) { AsyncQueueEntry qe; + QueuePosition queue_head; int pageno; int offset; int slotno; *************** asyncQueueAddEntries(ListCell *nextNotif *** 1292,1299 **** /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */ LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); /* Fetch the current page */ ! pageno = QUEUE_POS_PAGE(QUEUE_HEAD); slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId); /* Note we mark the page dirty before writing in it */ AsyncCtl->shared->page_dirty[slotno] = true; --- 1293,1313 ---- /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */ LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); + /* + * We work with a local copy of QUEUE_HEAD, which we write back to shared + * memory upon exiting. The reason for this is that if we have to advance + * to a new page, SimpleLruZeroPage might fail (out of disk space, for + * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise, + * subsequent insertions would try to put entries into a page that slru.c + * thinks doesn't exist yet.) So, use a local position variable. Note + * that if we do fail, any already-inserted queue entries are forgotten; + * this is okay, since they'd be useless anyway after our transaction + * rolls back. + */ + queue_head = QUEUE_HEAD; + /* Fetch the current page */ ! pageno = QUEUE_POS_PAGE(queue_head); slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId); /* Note we mark the page dirty before writing in it */ AsyncCtl->shared->page_dirty[slotno] = true; *************** asyncQueueAddEntries(ListCell *nextNotif *** 1305,1311 **** /* Construct a valid queue entry in local variable qe */ asyncQueueNotificationToEntry(n, &qe); ! offset = QUEUE_POS_OFFSET(QUEUE_HEAD); /* Check whether the entry really fits on the current page */ if (offset + qe.length <= QUEUE_PAGESIZE) --- 1319,1325 ---- /* Construct a valid queue entry in local variable qe */ asyncQueueNotificationToEntry(n, &qe); ! offset = QUEUE_POS_OFFSET(queue_head); /* Check whether the entry really fits on the current page */ if (offset + qe.length <= QUEUE_PAGESIZE) *************** asyncQueueAddEntries(ListCell *nextNotif *** 1331,1338 **** &qe, qe.length); ! /* Advance QUEUE_HEAD appropriately, and note if page is full */ ! if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length)) { /* * Page is full, so we're done here, but first fill the next page --- 1345,1352 ---- &qe, qe.length); ! /* Advance queue_head appropriately, and detect if page is full */ ! if (asyncQueueAdvance(&(queue_head), qe.length)) { /* * Page is full, so we're done here, but first fill the next page *************** asyncQueueAddEntries(ListCell *nextNotif *** 1342,1353 **** * asyncQueueIsFull() ensured that there is room to create this * page without overrunning the queue. */ ! slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD)); /* And exit the loop */ break; } } LWLockRelease(AsyncCtlLock); return nextNotify; --- 1356,1370 ---- * asyncQueueIsFull() ensured that there is room to create this * page without overrunning the queue. */ ! slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head)); /* And exit the loop */ break; } } + /* Success, so update the global QUEUE_HEAD */ + QUEUE_HEAD = queue_head; + LWLockRelease(AsyncCtlLock); return nextNotify;