diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c index 5948218c77..e47f50c04c 100644 --- a/src/backend/access/gist/gistvacuum.c +++ b/src/backend/access/gist/gistvacuum.c @@ -100,32 +100,10 @@ gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) typedef struct GistBDItem { - GistNSN parentlsn; - BlockNumber blkno; + Buffer buffer; struct GistBDItem *next; } GistBDItem; -static void -pushStackIfSplited(Page page, GistBDItem *stack) -{ - GISTPageOpaque opaque = GistPageGetOpaque(page); - - if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) && - (GistFollowRight(page) || stack->parentlsn < GistPageGetNSN(page)) && - opaque->rightlink != InvalidBlockNumber /* sanity check */ ) - { - /* split page detected, install right link to the stack */ - - GistBDItem *ptr = (GistBDItem *) palloc(sizeof(GistBDItem)); - - ptr->blkno = opaque->rightlink; - ptr->parentlsn = stack->parentlsn; - ptr->next = stack->next; - stack->next = ptr; - } -} - - /* * Bulk deletion of all index entries pointing to a set of heap tuples and * check invalid tuples left after upgrade. @@ -138,9 +116,12 @@ IndexBulkDeleteResult * gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, IndexBulkDeleteCallback callback, void *callback_state) { - Relation rel = info->index; - GistBDItem *stack, - *ptr; + Relation rel = info->index; + BlockNumber npages; + bool needLock; + BlockNumber blkno; + GistNSN startNSN = GetInsertRecPtr(); + GistBDItem *bufferStack = NULL; /* first time through? */ if (stats == NULL) @@ -149,125 +130,132 @@ gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, stats->estimated_count = false; stats->num_index_tuples = 0; - stack = (GistBDItem *) palloc0(sizeof(GistBDItem)); - stack->blkno = GIST_ROOT_BLKNO; + /* + * Need lock unless it's local to this backend. + */ + needLock = !RELATION_IS_LOCAL(rel); - while (stack) + /* try to find deleted pages */ + if (needLock) + LockRelationForExtension(rel, ExclusiveLock); + npages = RelationGetNumberOfBlocks(rel); + if (needLock) + UnlockRelationForExtension(rel, ExclusiveLock); + + for (blkno = GIST_ROOT_BLKNO; blkno < npages; blkno++) { - Buffer buffer; - Page page; - OffsetNumber i, - maxoff; - IndexTuple idxtuple; - ItemId iid; + BlockNumber nextBlock = blkno; + bool needScan = true; - buffer = ReadBufferExtended(rel, MAIN_FORKNUM, stack->blkno, - RBM_NORMAL, info->strategy); - LockBuffer(buffer, GIST_SHARE); - gistcheckpage(rel, buffer); - page = (Page) BufferGetPage(buffer); + vacuum_delay_point(); - if (GistPageIsLeaf(page)) + while (needScan) { - OffsetNumber todelete[MaxOffsetNumber]; - int ntodelete = 0; + Buffer buffer; + Page page; + OffsetNumber i, + maxoff; + IndexTuple idxtuple; + ItemId iid; - LockBuffer(buffer, GIST_UNLOCK); - LockBuffer(buffer, GIST_EXCLUSIVE); + needScan = false; + buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, + info->strategy); + /* + * We are not going to stay here for a long time, calling recursive algorithms. + * Especially for an internal page. So, agressivly grab an exclusive lock. + */ + LockBuffer(buffer, GIST_EXCLUSIVE); page = (Page) BufferGetPage(buffer); - if (stack->blkno == GIST_ROOT_BLKNO && !GistPageIsLeaf(page)) + + if (PageIsNew(page) || GistPageIsDeleted(page)) { - /* only the root can become non-leaf during relock */ UnlockReleaseBuffer(buffer); - /* one more check */ + /* TODO: Should not we record free page here? */ continue; } - /* - * check for split proceeded after look at parent, we should check - * it after relock - */ - pushStackIfSplited(page, stack); - - /* - * Remove deletable tuples from page - */ - maxoff = PageGetMaxOffsetNumber(page); - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + if (GistPageIsLeaf(page)) { - iid = PageGetItemId(page, i); - idxtuple = (IndexTuple) PageGetItem(page, iid); - - if (callback(&(idxtuple->t_tid), callback_state)) - todelete[ntodelete++] = i; - else - stats->num_index_tuples += 1; - } - - stats->tuples_removed += ntodelete; - - if (ntodelete) - { - START_CRIT_SECTION(); + OffsetNumber todelete[MaxOffsetNumber]; + int ntodelete = 0; + GISTPageOpaque opaque = GistPageGetOpaque(page); + + /* + * If this page was splitted after start of the VACUUM we have to + * revisit rightlink, if it points to block we already scanned. + * This is recursive revisit, should not be deep, but we check + * the possibility of stack overflow anyway. + */ + if ((GistFollowRight(page) || startNSN < GistPageGetNSN(page)) && + (opaque->rightlink != InvalidBlockNumber) && (opaque->rightlink < blkno)) + { + nextBlock = opaque->rightlink; + needScan = true; + } + + /* + * Remove deletable tuples from page + */ + + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + iid = PageGetItemId(page, i); + idxtuple = (IndexTuple) PageGetItem(page, iid); - MarkBufferDirty(buffer); + if (callback(&(idxtuple->t_tid), callback_state)) + todelete[ntodelete++] = i; + else + stats->num_index_tuples += 1; + } - PageIndexMultiDelete(page, todelete, ntodelete); - GistMarkTuplesDeleted(page); + stats->tuples_removed += ntodelete; - if (RelationNeedsWAL(rel)) + /* We have dead tuples on the page */ + if (ntodelete) { - XLogRecPtr recptr; + START_CRIT_SECTION(); - recptr = gistXLogUpdate(buffer, - todelete, ntodelete, - NULL, 0, InvalidBuffer); - PageSetLSN(page, recptr); - } - else - PageSetLSN(page, gistGetFakeLSN(rel)); + MarkBufferDirty(buffer); - END_CRIT_SECTION(); - } + PageIndexMultiDelete(page, todelete, ntodelete); + GistMarkTuplesDeleted(page); - } - else - { - /* check for split proceeded after look at parent */ - pushStackIfSplited(page, stack); + if (RelationNeedsWAL(rel)) + { + XLogRecPtr recptr; - maxoff = PageGetMaxOffsetNumber(page); + recptr = gistXLogUpdate(buffer, + todelete, ntodelete, + NULL, 0, InvalidBuffer); + PageSetLSN(page, recptr); + } + else + PageSetLSN(page, gistGetFakeLSN(rel)); - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + END_CRIT_SECTION(); + } + } + if (needScan) + { + GistBDItem *ptr = (GistBDItem *) palloc(sizeof(GistBDItem)); + ptr->buffer = buffer; + ptr->next = bufferStack; + bufferStack = ptr; + } + else { - iid = PageGetItemId(page, i); - idxtuple = (IndexTuple) PageGetItem(page, iid); - - ptr = (GistBDItem *) palloc(sizeof(GistBDItem)); - ptr->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); - ptr->parentlsn = BufferGetLSNAtomic(buffer); - ptr->next = stack->next; - stack->next = ptr; - - if (GistTupleIsInvalid(idxtuple)) - ereport(LOG, - (errmsg("index \"%s\" contains an inner tuple marked as invalid", - RelationGetRelationName(rel)), - errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."), - errhint("Please REINDEX it."))); + UnlockReleaseBuffer(buffer); } } - - UnlockReleaseBuffer(buffer); - - ptr = stack->next; - pfree(stack); - stack = ptr; - - vacuum_delay_point(); + while (bufferStack) + { + UnlockReleaseBuffer(bufferStack->buffer); + bufferStack = bufferStack->next; + } } return stats;