diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index d708117a40..637847dfd0 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -411,51 +411,201 @@ PageRestoreTempPage(Page tempPage, Page oldPage) } /* - * sorting support for PageRepairFragmentation and PageIndexMultiDelete + * Item pruning support for PageRepairFragmentation and PageIndexMultiDelete */ -typedef struct itemIdSortData +typedef struct itemIdCompactData { uint16 offsetindex; /* linp array index */ int16 itemoff; /* page offset of item data */ uint16 alignedlen; /* MAXALIGN(item data len) */ -} itemIdSortData; -typedef itemIdSortData *itemIdSort; - -static int -itemoffcompare(const void *itemidp1, const void *itemidp2) -{ - /* Sort in decreasing itemoff order */ - return ((itemIdSort) itemidp2)->itemoff - - ((itemIdSort) itemidp1)->itemoff; -} +} itemIdCompactData; +typedef itemIdCompactData *itemIdCompact; /* * After removing or marking some line pointers unused, move the tuples to * remove the gaps caused by the removed items. + * + * Callers may pass 'presorted' as true if the itemidbase array is sorted in + * descending order of itemoff. This allows a slightly more optimal code path + * to be taken. + * + * Callers must ensure that nitems is > 0 */ static void -compactify_tuples(itemIdSort itemidbase, int nitems, Page page) +compactify_tuples(itemIdCompact itemidbase, int nitems, Page page, bool presorted) { PageHeader phdr = (PageHeader) page; Offset upper; int i; - /* sort itemIdSortData array into decreasing itemoff order */ - qsort((char *) itemidbase, nitems, sizeof(itemIdSortData), - itemoffcompare); + /* Code within will not work correctly if nitems == 0 */ + Assert(nitems > 0); - upper = phdr->pd_special; - for (i = 0; i < nitems; i++) + if (presorted) { - itemIdSort itemidptr = &itemidbase[i]; - ItemId lp; + Offset copy_tail; + Offset copy_head; + itemIdCompact itemidptr; + + /* + * The order of lineitem offsets is already in the optimal order, i.e, + * lower item pointers have a higher offset. This allows us to move + * the tuples up to the end of the heap without having to worry about + * overwriting memory of other tuples during the move operation. + */ +#ifdef USE_ASSERT_CHECKING + { + /* Check caller set the preordered flag correctly */ + Offset lastoff = phdr->pd_special; + + for (i = 0; i < nitems; i++) + { + itemidptr = &itemidbase[i]; + + Assert(lastoff > itemidptr->itemoff); + + lastoff = itemidptr->itemoff; + } + } +#endif /* USE_ASSERT_CHECKING */ + + upper = phdr->pd_special; - lp = PageGetItemId(page, itemidptr->offsetindex + 1); - upper -= itemidptr->alignedlen; + /* Skip over initial tuples that are already in the correct place */ + i = 0; + do { + itemidptr = &itemidbase[i]; + if (upper != itemidptr->itemoff + itemidptr->alignedlen) + break; + upper -= itemidptr->alignedlen; + + i++; + } while (i < nitems); + + /* + * Do the tuple compactification. Collapse memmove calls for adjacent + * tuples. + */ + copy_tail = copy_head = itemidptr->itemoff + itemidptr->alignedlen; + for (; i < nitems; i++) + { + ItemId lp; + + itemidptr = &itemidbase[i]; + lp = PageGetItemId(page, itemidptr->offsetindex + 1); + + if (copy_head != itemidptr->itemoff + itemidptr->alignedlen) + { + /* + * memmove is likely to have pre-checks when the destination + * and source are the same address. + */ + memmove((char *) page + upper, + page + copy_head, + copy_tail - copy_head); + copy_tail = itemidptr->itemoff + itemidptr->alignedlen; + } + upper -= itemidptr->alignedlen; + copy_head = itemidptr->itemoff; + + lp->lp_off = upper; + + } + + /* move the remaining chunk. Could be 0 bytes. */ memmove((char *) page + upper, - (char *) page + itemidptr->itemoff, - itemidptr->alignedlen); - lp->lp_off = upper; + page + copy_head, + copy_tail - copy_head); + } + else + { + Offset copy_tail; + Offset copy_head; + itemIdCompact itemidptr = NULL; + PGAlignedBlock scratch; + char *scratchptr = scratch.data; + + /* + * The tuples in the itemidbase array may be in any order so in order to + * move these to the end of the page we must make a temp copy of each + * tuple before we copy them back into the page at the new position. + * + * If a large number of the tuples have been pruned (>75%) then we'll copy + * these into the temp buffer tuple-by-tuple, otherwise we'll just do a + * single memcpy for all tuples that we need to store in the temp buffer. + */ + if (nitems < PageGetMaxOffsetNumber(page) / 4) + { + for (i = 0; i < nitems; i++) + { + itemidptr = &itemidbase[i]; + memcpy(scratchptr + itemidptr->itemoff, page + itemidptr->itemoff, + itemidptr->alignedlen); + } + + /* Set things up for the compactification code below */ + i = 0; + itemidptr = &itemidbase[0]; + upper = phdr->pd_special; + } + else + { + upper = phdr->pd_special; + + /* + * Detect which tuples at the end of the page don't need to be + * moved. It's quite common that many tuples won't need to be + * touched so we don't need to copy these to our temp buffer. + */ + i = 0; + do { + itemidptr = &itemidbase[i]; + if (upper != itemidptr->itemoff + itemidptr->alignedlen) + break; + upper -= itemidptr->alignedlen; + + i++; + } while (i < nitems); + + /* Copy all tuples that need to be moved into the temp buffer */ + memcpy(scratchptr + phdr->pd_upper, + page + phdr->pd_upper, + upper - phdr->pd_upper); + } + + /* + * Do the tuple compactification. Depending on which path we took + * above to do the backup, we may have already skipped the tuples + * at the end of the page which don't need to be moved. + */ + copy_tail = copy_head = itemidptr->itemoff + itemidptr->alignedlen; + for (; i < nitems; i++) + { + ItemId lp; + + itemidptr = &itemidbase[i]; + lp = PageGetItemId(page, itemidptr->offsetindex + 1); + + /* copy pending tuples when we detect a gap */ + if (copy_head != itemidptr->itemoff + itemidptr->alignedlen) + { + memcpy((char *) page + upper, + scratchptr + copy_head, + copy_tail - copy_head); + /* reset where we've copied up to */ + copy_tail = itemidptr->itemoff + itemidptr->alignedlen; + } + upper -= itemidptr->alignedlen; + copy_head = itemidptr->itemoff; + + lp->lp_off = upper; + + } + + /* Copy the remaining chunk */ + memcpy((char *) page + upper, + scratchptr + copy_head, + copy_tail - copy_head); } phdr->pd_upper = upper; @@ -477,14 +627,16 @@ PageRepairFragmentation(Page page) Offset pd_lower = ((PageHeader) page)->pd_lower; Offset pd_upper = ((PageHeader) page)->pd_upper; Offset pd_special = ((PageHeader) page)->pd_special; - itemIdSortData itemidbase[MaxHeapTuplesPerPage]; - itemIdSort itemidptr; + Offset last_offset; + itemIdCompactData itemidbase[MaxHeapTuplesPerPage]; + itemIdCompact itemidptr; ItemId lp; int nline, nstorage, nunused; int i; Size totallen; + bool presorted = true; /* For now */ /* * It's worth the trouble to be more paranoid here than in most places, @@ -509,6 +661,7 @@ PageRepairFragmentation(Page page) nline = PageGetMaxOffsetNumber(page); itemidptr = itemidbase; nunused = totallen = 0; + last_offset = pd_special; for (i = FirstOffsetNumber; i <= nline; i++) { lp = PageGetItemId(page, i); @@ -518,6 +671,12 @@ PageRepairFragmentation(Page page) { itemidptr->offsetindex = i - 1; itemidptr->itemoff = ItemIdGetOffset(lp); + + if (last_offset > itemidptr->itemoff) + last_offset = itemidptr->itemoff; + else + presorted = false; + if (unlikely(itemidptr->itemoff < (int) pd_upper || itemidptr->itemoff >= (int) pd_special)) ereport(ERROR, @@ -552,7 +711,7 @@ PageRepairFragmentation(Page page) errmsg("corrupted item lengths: total %u, available space %u", (unsigned int) totallen, pd_special - pd_lower))); - compactify_tuples(itemidbase, nstorage, page); + compactify_tuples(itemidbase, nstorage, page, presorted); } /* Set hint bit for PageAddItem */ @@ -831,9 +990,9 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems) Offset pd_lower = phdr->pd_lower; Offset pd_upper = phdr->pd_upper; Offset pd_special = phdr->pd_special; - itemIdSortData itemidbase[MaxIndexTuplesPerPage]; + itemIdCompactData itemidbase[MaxIndexTuplesPerPage]; ItemIdData newitemids[MaxIndexTuplesPerPage]; - itemIdSort itemidptr; + itemIdCompact itemidptr; ItemId lp; int nline, nused; @@ -932,7 +1091,7 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems) phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData); /* and compactify the tuple data */ - compactify_tuples(itemidbase, nused, page); + compactify_tuples(itemidbase, nused, page, false); }