diff --git a/contrib/pg_upgrade/pg_upgrade.c b/contrib/pg_upgrade/pg_upgrade.c index 3b8241b..f0a023f 100644 --- a/contrib/pg_upgrade/pg_upgrade.c +++ b/contrib/pg_upgrade/pg_upgrade.c @@ -423,8 +423,10 @@ copy_clog_xlog_xid(void) /* set the next transaction id and epoch of the new cluster */ prep_status("Setting next transaction ID and epoch for new cluster"); exec_prog(UTILITY_LOG_FILE, NULL, true, - "\"%s/pg_resetxlog\" -f -x %u \"%s\"", - new_cluster.bindir, old_cluster.controldata.chkpnt_nxtxid, + "\"%s/pg_resetxlog\" -f -x %u -c %u \"%s\"", + new_cluster.bindir, + old_cluster.controldata.chkpnt_nxtxid, + old_cluster.controldata.chkpnt_nxtxid, new_cluster.pgdata); exec_prog(UTILITY_LOG_FILE, NULL, true, "\"%s/pg_resetxlog\" -f -e %u \"%s\"", diff --git a/contrib/pg_xlogdump/rmgrdesc.c b/contrib/pg_xlogdump/rmgrdesc.c index bfb3573..c0a0409 100644 --- a/contrib/pg_xlogdump/rmgrdesc.c +++ b/contrib/pg_xlogdump/rmgrdesc.c @@ -9,6 +9,7 @@ #include "postgres.h" #include "access/clog.h" +#include "access/committs.h" #include "access/gin.h" #include "access/gist_private.h" #include "access/hash.h" diff --git a/contrib/test_committs/.gitignore b/contrib/test_committs/.gitignore new file mode 100644 index 0000000..1f95503 --- /dev/null +++ b/contrib/test_committs/.gitignore @@ -0,0 +1,5 @@ +# Generated subdirectories +/log/ +/isolation_output/ +/regression_output/ +/tmp_check/ diff --git a/contrib/test_committs/Makefile b/contrib/test_committs/Makefile new file mode 100644 index 0000000..2240749 --- /dev/null +++ b/contrib/test_committs/Makefile @@ -0,0 +1,45 @@ +# Note: because we don't tell the Makefile there are any regression tests, +# we have to clean those result files explicitly +EXTRA_CLEAN = $(pg_regress_clean_files) ./regression_output ./isolation_output + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/test_committs +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif + +# We can't support installcheck because normally installcheck users don't have +# the required track_commit_timestamp on +installcheck:; + +check: regresscheck + +submake-regress: + $(MAKE) -C $(top_builddir)/src/test/regress all + +submake-test_committs: + $(MAKE) -C $(top_builddir)/contrib/test_committs + +REGRESSCHECKS=committs_on + +regresscheck: all | submake-regress submake-test_committs + $(MKDIR_P) regression_output + $(pg_regress_check) \ + --temp-config $(top_srcdir)/contrib/test_committs/committs.conf \ + --temp-install=./tmp_check \ + --extra-install=contrib/test_committs \ + --outputdir=./regression_output \ + $(REGRESSCHECKS) + +regresscheck-install-force: | submake-regress submake-test_committs + $(pg_regress_installcheck) \ + --extra-install=contrib/test_committs \ + $(REGRESSCHECKS) + +PHONY: submake-test_committs submake-regress check \ + regresscheck regresscheck-install-force \ No newline at end of file diff --git a/contrib/test_committs/committs.conf b/contrib/test_committs/committs.conf new file mode 100644 index 0000000..d221a60 --- /dev/null +++ b/contrib/test_committs/committs.conf @@ -0,0 +1 @@ +track_commit_timestamp = on \ No newline at end of file diff --git a/contrib/test_committs/expected/committs_on.out b/contrib/test_committs/expected/committs_on.out new file mode 100644 index 0000000..9920343 --- /dev/null +++ b/contrib/test_committs/expected/committs_on.out @@ -0,0 +1,21 @@ +-- +-- Commit Timestamp (on) +-- +CREATE TABLE committs_test(id serial, ts timestamptz default now()); +INSERT INTO committs_test DEFAULT VALUES; +INSERT INTO committs_test DEFAULT VALUES; +INSERT INTO committs_test DEFAULT VALUES; +SELECT id, pg_get_transaction_extradata(xmin), + pg_get_transaction_committime(xmin) >= ts, + pg_get_transaction_committime(xmin) < now(), + pg_get_transaction_committime(xmin) - ts < '60s' -- 60s should give a lot of reserve +FROM committs_test +ORDER BY id; + id | pg_get_transaction_extradata | ?column? | ?column? | ?column? +----+------------------------------+----------+----------+---------- + 1 | 0 | t | t | t + 2 | 0 | t | t | t + 3 | 0 | t | t | t +(3 rows) + +DROP TABLE committs_test; diff --git a/contrib/test_committs/sql/committs_on.sql b/contrib/test_committs/sql/committs_on.sql new file mode 100644 index 0000000..aec6438 --- /dev/null +++ b/contrib/test_committs/sql/committs_on.sql @@ -0,0 +1,18 @@ +-- +-- Commit Timestamp (on) +-- + +CREATE TABLE committs_test(id serial, ts timestamptz default now()); + +INSERT INTO committs_test DEFAULT VALUES; +INSERT INTO committs_test DEFAULT VALUES; +INSERT INTO committs_test DEFAULT VALUES; + +SELECT id, pg_get_transaction_extradata(xmin), + pg_get_transaction_committime(xmin) >= ts, + pg_get_transaction_committime(xmin) < now(), + pg_get_transaction_committime(xmin) - ts < '60s' -- 60s should give a lot of reserve +FROM committs_test +ORDER BY id; + +DROP TABLE committs_test; diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 9494439..ef4c41e 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2669,6 +2669,21 @@ include_dir 'conf.d' + + track_commit_timestamp (bool) + + track_commit_timestamp configuration parameter + + + + Record commit time of transactions. This parameter + can only be set in + the postgresql.conf file or on the server command line. + The default value is off. + + + + diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 3a7cfa9..fa69c94 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -15908,6 +15908,48 @@ SELECT collation for ('foo' COLLATE "de_DE"); For example 10:20:10,14,15 means xmin=10, xmax=20, xip_list=10, 14, 15. + + + The functions shown in + provide information about transactions that have been already committed. + These functions mainly provide information about when the transactions + were committed. They only provide useful data when + configuration option is enabled + and only for transactions that were committed after it was enabled. + + + + Committed transaction information + + + Name Return Type Description + + + + + pg_get_transaction_committime(xid) + timestamp with time zone + get commit time of transaction + + + pg_get_transaction_extradata(xid) + integer + get additional data from transaction commit timestamp record + + + pg_get_transaction_committime_data(xid) + committime timestamp with time zone, extradata integer + get commit time and additional data from transaction commit timestamp + + + pg_get_latest_transaction_committime_data() + xid xid, committime timestamp with time zone, extradata integer + get transaction Id, commit timestamp and additional data of latest transaction commit + + + +
+ diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index 7d092d2..20c88a8 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -8,7 +8,8 @@ subdir = src/backend/access/rmgrdesc top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = clogdesc.o dbasedesc.o gindesc.o gistdesc.o hashdesc.o heapdesc.o \ +OBJS = clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o hashdesc.o \ + heapdesc.o \ mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o smgrdesc.o spgdesc.o \ standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o diff --git a/src/backend/access/rmgrdesc/committsdesc.c b/src/backend/access/rmgrdesc/committsdesc.c new file mode 100644 index 0000000..2bf7fed --- /dev/null +++ b/src/backend/access/rmgrdesc/committsdesc.c @@ -0,0 +1,75 @@ +/*------------------------------------------------------------------------- + * + * committsdesc.c + * rmgr descriptor routines for access/transam/committs.c + * + * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/committsdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/committs.h" +#include "utils/timestamp.h" + + +void +committs_desc(StringInfo buf, XLogRecord *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + if (info == COMMITTS_ZEROPAGE) + { + int pageno; + + memcpy(&pageno, rec, sizeof(int)); + appendStringInfo(buf, "zeropage: %d", pageno); + } + else if (info == COMMITTS_TRUNCATE) + { + int pageno; + + memcpy(&pageno, rec, sizeof(int)); + appendStringInfo(buf, "truncate before: %d", pageno); + } + else if (info == COMMITTS_SETTS) + { + xl_committs_set *xlrec = (xl_committs_set *) rec; + int i; + + appendStringInfo(buf, "set committs %s for: %u", + timestamptz_to_str(xlrec->timestamp), + xlrec->mainxid); + for (i = 0; i < xlrec->nsubxids; i++) + appendStringInfo(buf, ", %u", xlrec->subxids[i]); + } + else + appendStringInfo(buf, "UNKNOWN"); +} + +const char * +committs_identify(uint8 info) +{ + const char *id = NULL; + + switch (info) + { + case COMMITTS_ZEROPAGE: + id = "ZEROPAGE"; + break; + case COMMITTS_TRUNCATE: + id = "TRUNCATE"; + break; + case COMMITTS_SETTS: + id = "SETTS"; + break; + } + + return id; +} diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index e0957ff..1333244 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -45,7 +45,7 @@ xlog_desc(StringInfo buf, XLogRecord *record) appendStringInfo(buf, "redo %X/%X; " "tli %u; prev tli %u; fpw %s; xid %u/%u; oid %u; multi %u; offset %u; " "oldest xid %u in DB %u; oldest multi %u in DB %u; " - "oldest running xid %u; %s", + "oldest CommitTs xid: %u; oldest running xid %u; %s", (uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo, checkpoint->ThisTimeLineID, checkpoint->PrevTimeLineID, @@ -58,6 +58,7 @@ xlog_desc(StringInfo buf, XLogRecord *record) checkpoint->oldestXidDB, checkpoint->oldestMulti, checkpoint->oldestMultiDB, + checkpoint->oldestCommitTs, checkpoint->oldestActiveXid, (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online"); } diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index eb6cfc5..ace913e 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \ timeline.o twophase.o twophase_rmgr.o xlog.o xlogarchive.o xlogfuncs.o \ - xlogreader.o xlogutils.o + xlogreader.o xlogutils.o committs.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index 27ca4c6..3300f84 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -152,8 +152,7 @@ TransactionIdSetTreeStatus(TransactionId xid, int nsubxids, status == TRANSACTION_STATUS_ABORTED); /* - * See how many subxids, if any, are on the same page as the parent, if - * any. + * See how many subxids, if any, are on the same page as the parent. */ for (i = 0; i < nsubxids; i++) { diff --git a/src/backend/access/transam/committs.c b/src/backend/access/transam/committs.c new file mode 100644 index 0000000..e7298a5 --- /dev/null +++ b/src/backend/access/transam/committs.c @@ -0,0 +1,846 @@ +/*------------------------------------------------------------------------- + * + * committs.c + * PostgreSQL commit timestamp manager + * + * This module is a pg_clog-like system that stores the commit timestamp + * for each transaction. + * + * XLOG interactions: this module generates an XLOG record whenever a new + * CommitTs page is initialized to zeroes. Also, one XLOG record is + * generated for setting of values when the caller requests it; this allows + * us to support values coming from places other than transaction commit. + * Other writes of CommitTS come from recording of transaction commit in + * xact.c, which generates its own XLOG records for these events and will + * re-perform the status update on redo; so we need make no additional XLOG + * entry here. + * + * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/transam/committs.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/committs.h" +#include "access/htup_details.h" +#include "access/slru.h" +#include "access/transam.h" +#include "catalog/pg_type.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "pg_trace.h" +#include "utils/builtins.h" +#include "utils/snapmgr.h" +#include "utils/timestamp.h" + +/* + * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used + * everywhere else in Postgres. + * + * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF, + * CommitTs page numbering also wraps around at + * 0xFFFFFFFF/COMMITTS_XACTS_PER_PAGE, and CommitTs segment numbering at + * 0xFFFFFFFF/COMMITTS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no + * explicit notice of that fact in this module, except when comparing segment + * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes). + */ + +/* We need 8+4 bytes per xact */ +typedef struct CommitTimestampEntry +{ + TimestampTz time; + CommitExtraData extra; +} CommitTimestampEntry; + +#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, extra) + \ + sizeof(CommitExtraData)) + +#define COMMITTS_XACTS_PER_PAGE \ + (BLCKSZ / SizeOfCommitTimestampEntry) + +#define TransactionIdToCTsPage(xid) \ + ((xid) / (TransactionId) COMMITTS_XACTS_PER_PAGE) +#define TransactionIdToCTsEntry(xid) \ + ((xid) % (TransactionId) COMMITTS_XACTS_PER_PAGE) + +/* + * Link to shared-memory data structures for CLOG control + */ +static SlruCtlData CommitTsCtlData; + +#define CommitTsCtl (&CommitTsCtlData) + +/* + * We keep a cache of the last value set in shared memory. This is protected + * by CommitTsLock. + */ +typedef struct CommitTimestampShared +{ + TransactionId xidLastCommit; + CommitTimestampEntry dataLastCommit; +} CommitTimestampShared; + +CommitTimestampShared *commitTsShared; + + +/* GUC variables */ +bool commit_ts_enabled; + +static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, + TransactionId *subxids, TimestampTz committs, + CommitExtraData extra, int pageno); +static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz committs, + CommitExtraData extra, int slotno); +static int ZeroCommitTsPage(int pageno, bool writeXlog); +static bool CommitTsPagePrecedes(int page1, int page2); +static void WriteZeroPageXlogRec(int pageno); +static void WriteTruncateXlogRec(int pageno); +static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, + TransactionId *subxids, TimestampTz timestamp, + CommitExtraData data); + + +/* + * TransactionTreeSetCommitTimestamp + * + * Record the final commit timestamp of transaction entries in the commit log + * for a transaction and its subtransaction tree, as efficiently as possible. + * + * xid is the top level transaction id. + * + * subxids is an array of xids of length nsubxids, representing subtransactions + * in the tree of xid. In various cases nsubxids may be zero. + * + * The do_xlog parameter tells us whether to include a XLog record of this + * or not. Normal path through RecordTransactionCommit() will be related + * to a transaction commit XLog record, and so should pass "false" here. + * Other callers probably want to pass true, so that the given values persist + * in case of crashes. + */ +void +TransactionTreeSetCommitTimestamp(TransactionId xid, int nsubxids, + TransactionId *subxids, TimestampTz timestamp, + CommitExtraData extra, bool do_xlog) +{ + int i; + TransactionId headxid; + + Assert(xid != InvalidTransactionId); + + if (!commit_ts_enabled) + return; + + /* + * Comply with the WAL-before-data rule: if caller specified it wants + * this value to be recorded in WAL, do so before touching the data. + */ + if (do_xlog) + WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, extra); + + /* + * We split the xids to set the timestamp to in groups belonging to the + * same SLRU page; the first element in each such set is its head. The + * first group has the main XID as the head; subsequent sets use the + * first subxid not on the previous page as head. This way, we only have + * to lock/modify each SLRU page once. + */ + for (i = 0, headxid = xid;;) + { + int pageno = TransactionIdToCTsPage(headxid); + int j; + + for (j = i; j < nsubxids; j++) + { + if (TransactionIdToCTsPage(subxids[j]) != pageno) + break; + } + /* subxids[i..j] are on the same page as the head */ + + SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, extra, + pageno); + + /* if we wrote out all subxids, we're done. */ + if (j + 1 >= nsubxids) + break; + + /* + * Set the new head and skip over it, as well as over the subxids + * we just wrote. + */ + headxid = subxids[j]; + i += j - i + 1; + } + + /* + * Update the cached value in shared memory + */ + LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); + commitTsShared->xidLastCommit = xid; + commitTsShared->dataLastCommit.time = timestamp; + commitTsShared->dataLastCommit.extra = extra; + LWLockRelease(CommitTsLock); +} + +/* + * Record the commit timestamp of transaction entries in the commit log for all + * entries on a single page. Atomic only on this page. + */ +static void +SetXidCommitTsInPage(TransactionId xid, int nsubxids, + TransactionId *subxids, TimestampTz committs, + CommitExtraData extra, int pageno) +{ + int slotno; + int i; + + LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); + + slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid); + + TransactionIdSetCommitTs(xid, committs, extra, slotno); + for (i = 0; i < nsubxids; i++) + TransactionIdSetCommitTs(subxids[i], committs, extra, slotno); + + CommitTsCtl->shared->page_dirty[slotno] = true; + + LWLockRelease(CommitTsControlLock); +} + +/* + * Sets the commit timestamp of a single transaction. + * + * Must be called with CommitTsControlLock held + */ +static void +TransactionIdSetCommitTs(TransactionId xid, TimestampTz committs, + CommitExtraData extra, int slotno) +{ + int entryno = TransactionIdToCTsEntry(xid); + CommitTimestampEntry *entry; + + entry = (CommitTimestampEntry *) + (CommitTsCtl->shared->page_buffer[slotno] + + SizeOfCommitTimestampEntry * entryno); + + entry->time = committs; + entry->extra = extra; +} + +/* + * Interrogate the commit timestamp of a transaction. + */ +void +TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, + CommitExtraData *data) +{ + int pageno = TransactionIdToCTsPage(xid); + int entryno = TransactionIdToCTsEntry(xid); + int slotno; + CommitTimestampEntry *entry; + TransactionId oldestCommitTs; + + /* Return empty if module not enabled */ + if (!commit_ts_enabled) + { + if (ts) + *ts = InvalidTransactionId; + if (data) + *data = (CommitExtraData) 0; + return; + } + + /* Also return empty if the requested value is older than what we have */ + LWLockAcquire(CommitTsControlLock, LW_SHARED); + oldestCommitTs = ShmemVariableCache->oldestCommitTs; + LWLockRelease(CommitTsControlLock); + + if (!TransactionIdIsValid(oldestCommitTs) || + TransactionIdPrecedes(xid, oldestCommitTs)) + { + if (ts) + *ts = InvalidTransactionId; + if (data) + *data = (CommitExtraData) 0; + return; + } + + /* + * Use an unlocked atomic read on our cached value in shared memory; + * if it's a hit, acquire a lock and read the data, after verifying + * that it's still what we initially read. Otherwise, fall through + * to read from SLRU. + */ + if (commitTsShared->xidLastCommit == xid) + { + LWLockAcquire(CommitTsLock, LW_SHARED); + if (commitTsShared->xidLastCommit == xid) + { + if (ts) + *ts = commitTsShared->dataLastCommit.time; + if (data) + *data = commitTsShared->dataLastCommit.extra; + LWLockRelease(CommitTsLock); + return; + } + LWLockRelease(CommitTsLock); + } + + /* lock is acquired by SimpleLruReadPage_ReadOnly */ + slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid); + entry = (CommitTimestampEntry *) + (CommitTsCtl->shared->page_buffer[slotno] + + SizeOfCommitTimestampEntry * entryno); + + if (ts) + *ts = entry->time; + + if (data) + *data = entry->extra; + + LWLockRelease(CommitTsControlLock); +} + +/* + * Return the Xid of the latest committed transaction. (As far as this module + * is concerned, anyway; it's up to the caller to ensure the value is useful + * for its purposes.) + * + * ts and extra are filled with the corresponding data; they can be passed + * as NULL if not wanted. + */ +TransactionId +GetLatestCommitTimestampData(TimestampTz *ts, CommitExtraData *extra) +{ + TransactionId xid; + + /* Return empty if module not enabled */ + if (!commit_ts_enabled) + { + if (ts) + *ts = InvalidTransactionId; + if (extra) + *extra = (CommitExtraData) 0; + return InvalidTransactionId; + } + + LWLockAcquire(CommitTsLock, LW_SHARED); + xid = commitTsShared->xidLastCommit; + if (ts) + *ts = commitTsShared->dataLastCommit.time; + if (extra) + *extra = commitTsShared->dataLastCommit.extra; + LWLockRelease(CommitTsLock); + + return xid; +} + +/* + * SQL-callable wrapper to obtain commit time of a transaction + */ +PG_FUNCTION_INFO_V1(pg_get_transaction_committime); +Datum +pg_get_transaction_committime(PG_FUNCTION_ARGS) +{ + TransactionId xid = PG_GETARG_UINT32(0); + TimestampTz committs; + + TransactionIdGetCommitTsData(xid, &committs, NULL); + + PG_RETURN_TIMESTAMPTZ(committs); +} + +PG_FUNCTION_INFO_V1(pg_get_transaction_extradata); +Datum +pg_get_transaction_extradata(PG_FUNCTION_ARGS) +{ + TransactionId xid = PG_GETARG_UINT32(0); + CommitExtraData data; + + TransactionIdGetCommitTsData(xid, NULL, &data); + + PG_RETURN_INT32(data); +} + +PG_FUNCTION_INFO_V1(pg_get_transaction_committime_data); +Datum +pg_get_transaction_committime_data(PG_FUNCTION_ARGS) +{ + TransactionId xid = PG_GETARG_UINT32(0); + TimestampTz committs; + CommitExtraData data; + Datum values[2]; + bool nulls[2]; + TupleDesc tupdesc; + HeapTuple htup; + + /* + * Construct a tuple descriptor for the result row. This must match this + * function's pg_proc entry! + */ + tupdesc = CreateTemplateTupleDesc(2, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp", + TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "extra", + INT4OID, -1, 0); + tupdesc = BlessTupleDesc(tupdesc); + + /* and construct a tuple with our data */ + TransactionIdGetCommitTsData(xid, &committs, &data); + + values[0] = TimestampTzGetDatum(committs); + nulls[0] = false; + + values[1] = Int32GetDatum(data); + nulls[1] = false; + + htup = heap_form_tuple(tupdesc, values, nulls); + + PG_RETURN_DATUM(HeapTupleGetDatum(htup)); +} + +PG_FUNCTION_INFO_V1(pg_get_latest_transaction_committime_data); +Datum +pg_get_latest_transaction_committime_data(PG_FUNCTION_ARGS) +{ + TransactionId xid; + TimestampTz committs; + CommitExtraData data; + Datum values[3]; + bool nulls[3]; + TupleDesc tupdesc; + HeapTuple htup; + + /* + * Construct a tuple descriptor for the result row. This must match this + * function's pg_proc entry! + */ + tupdesc = CreateTemplateTupleDesc(3, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp", + TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra", + INT4OID, -1, 0); + tupdesc = BlessTupleDesc(tupdesc); + + /* and construct a tuple with our data */ + xid = GetLatestCommitTimestampData(&committs, &data); + + values[0] = TransactionIdGetDatum(xid); + nulls[0] = false; + + values[1] = TimestampTzGetDatum(committs); + nulls[1] = false; + + values[2] = Int32GetDatum(data); + nulls[2] = false; + + htup = heap_form_tuple(tupdesc, values, nulls); + + PG_RETURN_DATUM(HeapTupleGetDatum(htup)); +} + +/* + * Number of shared CommitTS buffers. + * + * We use a very similar logic as for the number of CLOG buffers; see comments + * in CLOGShmemBuffers. + */ +Size +CommitTsShmemBuffers(void) +{ + return Min(16, Max(4, NBuffers / 1024)); +} + +/* + * Initialization of shared memory for CommitTs + */ +Size +CommitTsShmemSize(void) +{ + return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) + + sizeof(CommitTimestampShared); +} + +void +CommitTsShmemInit(void) +{ + bool found; + + CommitTsCtl->PagePrecedes = CommitTsPagePrecedes; + SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0, + CommitTsControlLock, "pg_committs"); + + commitTsShared = ShmemInitStruct("CommitTs shared", + sizeof(CommitTimestampShared), + &found); + + if (!IsUnderPostmaster) + { + Assert(!found); + + commitTsShared->xidLastCommit = InvalidTransactionId; + commitTsShared->dataLastCommit.time = 0; + commitTsShared->dataLastCommit.extra = 0; + } + else + Assert(found); +} + +/* + * This function must be called ONCE on system install. + * + * (The CommitTs directory is assumed to have been created by initdb, and + * CommitTsShmemInit must have been called already.) + */ +void +BootStrapCommitTs(void) +{ + /* + * Nothing to do here at present, unlike most other SLRU modules; segments + * are created when the server is started with this module enabled. + * See StartupCommitTs. + */ +} + +/* + * Initialize (or reinitialize) a page of CommitTs to zeroes. + * If writeXlog is TRUE, also emit an XLOG record saying we did this. + * + * The page is not actually written, just set up in shared memory. + * The slot number of the new page is returned. + * + * Control lock must be held at entry, and will be held at exit. + */ +static int +ZeroCommitTsPage(int pageno, bool writeXlog) +{ + int slotno; + + slotno = SimpleLruZeroPage(CommitTsCtl, pageno); + + if (writeXlog) + WriteZeroPageXlogRec(pageno); + + return slotno; +} + +/* + * This must be called ONCE during postmaster or standalone-backend startup, + */ +void +StartupCommitTs(void) +{ + TransactionId xid = ShmemVariableCache->nextXid; + int pageno = TransactionIdToCTsPage(xid); + + LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); + + /* + * Initialize our idea of the latest page number. + */ + CommitTsCtl->shared->latest_page_number = pageno; + + LWLockRelease(CommitTsControlLock); +} + +/* + * This must be called ONCE during postmaster or standalone-backend startup, + * when commit timestamp is enabled. + * Must be called after recovery has finished. + * + * This is in charge of creating the currently active segment, if it's not + * already there. The reason for this is that the server might have been + * running with this module disabled for a while and thus might have skipped + * the normal creation point. + */ +void +InitCommitTs(void) +{ + TransactionId xid = ShmemVariableCache->nextXid; + int pageno = TransactionIdToCTsPage(xid); + + LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); + + /* + * Re-Initialize our idea of the latest page number. + */ + CommitTsCtl->shared->latest_page_number = pageno; + + /* + * If this module is not currently enabled, make sure we don't hand back + * possibly-invalid data; also remove segments of old data. + */ + if (!commit_ts_enabled) + { + ShmemVariableCache->oldestCommitTs = InvalidTransactionId; + LWLockRelease(CommitTsControlLock); + + TruncateCommitTs(ReadNewTransactionId()); + + return; + } + + /* + * If CommitTs is enabled, but it wasn't in the previous server run, we + * need to set the oldest value to the next Xid; that way, we will not try + * to read data that might not have been set. + * + * XXX does this have a problem if a server is started with commitTs + * enabled, then started with commitTs disabled, then restarted with it + * enabled again? It doesn't look like it does, because there should be a + * checkpoint that sets the value to InvalidTransactionId at end of + * recovery; and so any chance of injecting new transactions without + * CommitTs values would occur after the oldestCommitTs has been set to + * Invalid temporarily. + */ + if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId) + ShmemVariableCache->oldestCommitTs = ReadNewTransactionId(); + + /* Finally, create the current segment file, if necessary */ + if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno)) + { + int slotno; + + slotno = ZeroCommitTsPage(pageno, false); + SimpleLruWritePage(CommitTsCtl, slotno); + Assert(!CommitTsCtl->shared->page_dirty[slotno]); + } + + LWLockRelease(CommitTsControlLock); +} + +/* + * This must be called ONCE during postmaster or standalone-backend shutdown + */ +void +ShutdownCommitTs(void) +{ + /* Flush dirty CommitTs pages to disk */ + SimpleLruFlush(CommitTsCtl, false); +} + +/* + * Perform a checkpoint --- either during shutdown, or on-the-fly + */ +void +CheckPointCommitTs(void) +{ + /* Flush dirty CommitTs pages to disk */ + SimpleLruFlush(CommitTsCtl, true); +} + +/* + * Make sure that CommitTs has room for a newly-allocated XID. + * + * NB: this is called while holding XidGenLock. We want it to be very fast + * most of the time; even when it's not so fast, no actual I/O need happen + * unless we're forced to write out a dirty CommitTs or xlog page to make room + * in shared memory. + */ +void +ExtendCommitTs(TransactionId newestXact) +{ + int pageno; + + /* nothing to do if module not enabled */ + if (!commit_ts_enabled) + return; + + /* + * No work except at first XID of a page. But beware: just after + * wraparound, the first XID of page zero is FirstNormalTransactionId. + */ + if (TransactionIdToCTsEntry(newestXact) != 0 && + !TransactionIdEquals(newestXact, FirstNormalTransactionId)) + return; + + pageno = TransactionIdToCTsPage(newestXact); + + LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); + + /* Zero the page and make an XLOG entry about it */ + ZeroCommitTsPage(pageno, !InRecovery); + + LWLockRelease(CommitTsControlLock); +} + +/* + * Remove all CommitTs segments before the one holding the passed + * transaction ID + * + * Note that we don't need to flush XLOG here. + */ +void +TruncateCommitTs(TransactionId oldestXact) +{ + int cutoffPage; + + /* + * The cutoff point is the start of the segment containing oldestXact. We + * pass the *page* containing oldestXact to SimpleLruTruncate. + */ + cutoffPage = TransactionIdToCTsPage(oldestXact); + + /* Check to see if there's any files that could be removed */ + if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence, &cutoffPage)) + return; /* nothing to remove */ + + /* Write XLOG record */ + WriteTruncateXlogRec(cutoffPage); + + /* Now we can remove the old CommitTs segment(s) */ + SimpleLruTruncate(CommitTsCtl, cutoffPage); +} + +/* + * Set the earliest value for which commit TS can be consulted. + */ +void +SetCommitTsLimit(TransactionId oldestXact) +{ + /* + * Be careful not to overwrite values that are either further into the + * "future" or signal a disabled committs. + */ + LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); + if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId && + TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact)) + ShmemVariableCache->oldestCommitTs = oldestXact; + LWLockRelease(CommitTsControlLock); +} + +/* + * Decide which of two CLOG page numbers is "older" for truncation purposes. + * + * We need to use comparison of TransactionIds here in order to do the right + * thing with wraparound XID arithmetic. However, if we are asked about + * page number zero, we don't want to hand InvalidTransactionId to + * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So, + * offset both xids by FirstNormalTransactionId to avoid that. + */ +static bool +CommitTsPagePrecedes(int page1, int page2) +{ + TransactionId xid1; + TransactionId xid2; + + xid1 = ((TransactionId) page1) * COMMITTS_XACTS_PER_PAGE; + xid1 += FirstNormalTransactionId; + xid2 = ((TransactionId) page2) * COMMITTS_XACTS_PER_PAGE; + xid2 += FirstNormalTransactionId; + + return TransactionIdPrecedes(xid1, xid2); +} + + +/* + * Write a ZEROPAGE xlog record + */ +static void +WriteZeroPageXlogRec(int pageno) +{ + XLogRecData rdata; + + rdata.data = (char *) (&pageno); + rdata.len = sizeof(int); + rdata.buffer = InvalidBuffer; + rdata.next = NULL; + (void) XLogInsert(RM_COMMITTS_ID, COMMITTS_ZEROPAGE, &rdata); +} + +/* + * Write a TRUNCATE xlog record + */ +static void +WriteTruncateXlogRec(int pageno) +{ + XLogRecData rdata; + + rdata.data = (char *) (&pageno); + rdata.len = sizeof(int); + rdata.buffer = InvalidBuffer; + rdata.next = NULL; + XLogInsert(RM_COMMITTS_ID, COMMITTS_TRUNCATE, &rdata); +} + +/* + * Write a SETTS xlog record + */ +static void +WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, + TransactionId *subxids, TimestampTz timestamp, + CommitExtraData data) +{ + XLogRecData rdata; + xl_committs_set record; + + record.timestamp = timestamp; + record.data = data; + record.mainxid = mainxid; + record.nsubxids = nsubxids; + memcpy(record.subxids, subxids, sizeof(TransactionId) * nsubxids); + + rdata.data = (char *) &record; + rdata.len = offsetof(xl_committs_set, subxids) + + nsubxids * sizeof(TransactionId); + rdata.buffer = InvalidBuffer; + rdata.next = NULL; + XLogInsert(RM_COMMITTS_ID, COMMITTS_SETTS, &rdata); +} + + +/* + * CommitTS resource manager's routines + */ +void +committs_redo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + /* Backup blocks are not used in committs records */ + Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + + if (info == COMMITTS_ZEROPAGE) + { + int pageno; + int slotno; + + memcpy(&pageno, XLogRecGetData(record), sizeof(int)); + + LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); + + slotno = ZeroCommitTsPage(pageno, false); + SimpleLruWritePage(CommitTsCtl, slotno); + Assert(!CommitTsCtl->shared->page_dirty[slotno]); + + LWLockRelease(CommitTsControlLock); + } + else if (info == COMMITTS_TRUNCATE) + { + int pageno; + + memcpy(&pageno, XLogRecGetData(record), sizeof(int)); + + /* + * During XLOG replay, latest_page_number isn't set up yet; insert a + * suitable value to bypass the sanity test in SimpleLruTruncate. + */ + CommitTsCtl->shared->latest_page_number = pageno; + + SimpleLruTruncate(CommitTsCtl, pageno); + } + else if (info == COMMITTS_SETTS) + { + xl_committs_set *setts = (xl_committs_set *) XLogRecGetData(record); + + TransactionTreeSetCommitTimestamp(setts->mainxid, setts->nsubxids, + setts->subxids, setts->timestamp, + setts->data, false); + } + else + elog(PANIC, "committs_redo: unknown op code %u", info); +} diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 2645a7a..53116f6 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -8,6 +8,7 @@ #include "postgres.h" #include "access/clog.h" +#include "access/committs.h" #include "access/gin.h" #include "access/gist_private.h" #include "access/hash.h" diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 7013fb8..c70bebe 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -14,6 +14,7 @@ #include "postgres.h" #include "access/clog.h" +#include "access/committs.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/xact.h" @@ -157,9 +158,10 @@ GetNewTransactionId(bool isSubXact) * XID before we zero the page. Fortunately, a page of the commit log * holds 32K or more transactions, so we don't have to do this very often. * - * Extend pg_subtrans too. + * Extend pg_subtrans and pg_committs too. */ ExtendCLOG(xid); + ExtendCommitTs(xid); ExtendSUBTRANS(xid); /* diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 5b5d31b..ca5d28f 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -20,6 +20,7 @@ #include #include +#include "access/committs.h" #include "access/multixact.h" #include "access/subtrans.h" #include "access/transam.h" @@ -1166,6 +1167,17 @@ RecordTransactionCommit(void) } /* + * We don't need to log the commit timestamp separately since the commit + * record logged above has all the necessary action to set the timestamp + * again. + */ + if (markXidCommitted) + { + TransactionTreeSetCommitTimestamp(xid, nchildren, children, + xactStopTimestamp, 0, false); + } + + /* * Check if we want to commit asynchronously. We can allow the XLOG flush * to happen asynchronously if synchronous_commit=off, or if the current * transaction has not performed any WAL-logged operation. The latter @@ -4683,6 +4695,7 @@ xactGetCommittedChildren(TransactionId **ptr) */ static void xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, + TimestampTz commit_time, TransactionId *sub_xids, int nsubxacts, SharedInvalidationMessage *inval_msgs, int nmsgs, RelFileNode *xnodes, int nrels, @@ -4710,6 +4723,10 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, LWLockRelease(XidGenLock); } + /* Set the transaction commit time */ + TransactionTreeSetCommitTimestamp(xid, nsubxacts, sub_xids, + commit_time, 0, false); + if (standbyState == STANDBY_DISABLED) { /* @@ -4829,7 +4846,8 @@ xact_redo_commit(xl_xact_commit *xlrec, /* invalidation messages array follows subxids */ inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); - xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts, + xact_redo_commit_internal(xid, lsn, xlrec->xact_time, + subxacts, xlrec->nsubxacts, inval_msgs, xlrec->nmsgs, xlrec->xnodes, xlrec->nrels, xlrec->dbId, @@ -4844,7 +4862,8 @@ static void xact_redo_commit_compact(xl_xact_commit_compact *xlrec, TransactionId xid, XLogRecPtr lsn) { - xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts, + xact_redo_commit_internal(xid, lsn, xlrec->xact_time, + xlrec->subxacts, xlrec->nsubxacts, NULL, 0, /* inval msgs */ NULL, 0, /* relfilenodes */ InvalidOid, /* dbId */ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 235b442..2901d26 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -22,6 +22,7 @@ #include #include "access/clog.h" +#include "access/committs.h" #include "access/multixact.h" #include "access/rewriteheap.h" #include "access/subtrans.h" @@ -4945,6 +4946,7 @@ BootStrapXLOG(void) checkPoint.oldestXidDB = TemplateDbOid; checkPoint.oldestMulti = FirstMultiXactId; checkPoint.oldestMultiDB = TemplateDbOid; + checkPoint.oldestCommitTs = InvalidTransactionId; checkPoint.time = (pg_time_t) time(NULL); checkPoint.oldestActiveXid = InvalidTransactionId; @@ -4954,6 +4956,7 @@ BootStrapXLOG(void) MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB); + SetCommitTsLimit(InvalidTransactionId); /* Set up the XLOG page header */ page->xlp_magic = XLOG_PAGE_MAGIC; @@ -5035,6 +5038,7 @@ BootStrapXLOG(void) /* Bootstrap the commit log, too */ BootStrapCLOG(); + BootStrapCommitTs(); BootStrapSUBTRANS(); BootStrapMultiXact(); @@ -6281,6 +6285,9 @@ StartupXLOG(void) ereport(DEBUG1, (errmsg("oldest MultiXactId: %u, in database %u", checkPoint.oldestMulti, checkPoint.oldestMultiDB))); + ereport(DEBUG1, + (errmsg("oldest CommitTs Xid: %u", + checkPoint.oldestCommitTs))); if (!TransactionIdIsNormal(checkPoint.nextXid)) ereport(PANIC, (errmsg("invalid next transaction ID"))); @@ -6292,6 +6299,7 @@ StartupXLOG(void) MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB); + SetCommitTsLimit(checkPoint.oldestCommitTs); MultiXactSetSafeTruncate(checkPoint.oldestMulti); XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch; XLogCtl->ckptXid = checkPoint.nextXid; @@ -6513,11 +6521,12 @@ StartupXLOG(void) ProcArrayInitRecovery(ShmemVariableCache->nextXid); /* - * Startup commit log and subtrans only. MultiXact has already - * been started up and other SLRUs are not maintained during - * recovery and need not be started yet. + * Startup commit log, commit timestamp and subtrans + * only. MultiXact has already been started up and other SLRUs are + * not maintained during recovery and need not be started yet. */ StartupCLOG(); + StartupCommitTs(); StartupSUBTRANS(oldestActiveXID); /* @@ -7164,12 +7173,13 @@ StartupXLOG(void) LWLockRelease(ProcArrayLock); /* - * Start up the commit log and subtrans, if not already done for hot - * standby. + * Start up the commit log, commit timestamp and subtrans, if not already + * done for hot standby. */ if (standbyState == STANDBY_DISABLED) { StartupCLOG(); + StartupCommitTs(); StartupSUBTRANS(oldestActiveXID); } @@ -7205,6 +7215,12 @@ StartupXLOG(void) XLogReportParameters(); /* + * Local WAL inserts enables, so it's time to finish initialization + * of commit timestamp. + */ + InitCommitTs(); + + /* * All done. Allow backends to write WAL. (Although the bool flag is * probably atomic in itself, we use the info_lck here to ensure that * there are no race conditions concerning visibility of other recent @@ -7750,6 +7766,7 @@ ShutdownXLOG(int code, Datum arg) CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); } ShutdownCLOG(); + ShutdownCommitTs(); ShutdownSUBTRANS(); ShutdownMultiXact(); @@ -8101,6 +8118,10 @@ CreateCheckPoint(int flags) checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB; LWLockRelease(XidGenLock); + LWLockAcquire(CommitTsControlLock, LW_SHARED); + checkPoint.oldestCommitTs = ShmemVariableCache->oldestCommitTs; + LWLockRelease(CommitTsControlLock); + /* Increase XID epoch if we've wrapped around since last checkpoint */ checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch; if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid) @@ -8386,6 +8407,7 @@ static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags) { CheckPointCLOG(); + CheckPointCommitTs(); CheckPointSUBTRANS(); CheckPointMultiXact(); CheckPointPredicate(); diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index e5fefa3..f5e7ddc 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -23,6 +23,7 @@ #include #include "access/clog.h" +#include "access/committs.h" #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" @@ -1055,6 +1056,7 @@ vac_truncate_clog(TransactionId frozenXID, * multixacts; that will be done by the next checkpoint. */ TruncateCLOG(frozenXID); + TruncateCommitTs(frozenXID); /* * Update the wrap limit for GetNewTransactionId and creation of new @@ -1064,6 +1066,7 @@ vac_truncate_clog(TransactionId frozenXID, */ SetTransactionIdLimit(frozenXID, oldestxid_datoid); SetMultiXactIdLimit(minMulti, minmulti_datoid); + SetCommitTsLimit(frozenXID); } diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 9f1b20e..f9b49c4 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -132,6 +132,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record) case RM_GIST_ID: case RM_SEQ_ID: case RM_SPGIST_ID: + case RM_COMMITTS_ID: break; case RM_NEXT_ID: elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) buf.record.xl_rmid); diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 1d04c55..9025601 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -15,6 +15,7 @@ #include "postgres.h" #include "access/clog.h" +#include "access/committs.h" #include "access/heapam.h" #include "access/multixact.h" #include "access/nbtree.h" @@ -117,6 +118,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, ProcGlobalShmemSize()); size = add_size(size, XLOGShmemSize()); size = add_size(size, CLOGShmemSize()); + size = add_size(size, CommitTsShmemSize()); size = add_size(size, SUBTRANSShmemSize()); size = add_size(size, TwoPhaseShmemSize()); size = add_size(size, BackgroundWorkerShmemSize()); @@ -198,6 +200,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) */ XLOGShmemInit(); CLOGShmemInit(); + CommitTsShmemInit(); SUBTRANSShmemInit(); MultiXactShmemInit(); InitBufferPool(); diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index ea82882..fb0e20d 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -46,6 +46,7 @@ #include #include "access/clog.h" +#include "access/committs.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/xact.h" diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 9fe6855..6794eed 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -29,6 +29,7 @@ #include "postgres.h" #include "access/clog.h" +#include "access/committs.h" #include "access/multixact.h" #include "access/subtrans.h" #include "commands/async.h" @@ -259,6 +260,9 @@ NumLWLocks(void) /* clog.c needs one per CLOG buffer */ numLocks += CLOGShmemBuffers(); + /* committs.c needs one per CommitTs buffer */ + numLocks += CommitTsShmemBuffers(); + /* subtrans.c needs one per SubTrans buffer */ numLocks += NUM_SUBTRANS_BUFFERS; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 8111b93..94081a2 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -26,6 +26,7 @@ #include #endif +#include "access/committs.h" #include "access/gin.h" #include "access/transam.h" #include "access/twophase.h" @@ -836,6 +837,15 @@ static struct config_bool ConfigureNamesBool[] = check_bonjour, NULL, NULL }, { + {"track_commit_timestamp", PGC_POSTMASTER, REPLICATION, + gettext_noop("Collects transaction commit time."), + NULL + }, + &commit_ts_enabled, + false, + NULL, NULL, NULL + }, + { {"ssl", PGC_POSTMASTER, CONN_AUTH_SECURITY, gettext_noop("Enables SSL connections."), NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index dac6776..5e3e776 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -227,6 +227,7 @@ #wal_sender_timeout = 60s # in milliseconds; 0 disables #max_replication_slots = 0 # max number of replication slots +#track_commit_timestamp = off # collect timestamp of transaction commit # (change requires restart) # - Master Server - diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index c8ff2cb..3935bab 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -185,6 +185,7 @@ static const char *subdirs[] = { "pg_xlog", "pg_xlog/archive_status", "pg_clog", + "pg_committs", "pg_dynshmem", "pg_notify", "pg_serial", diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 118e653..8dc3e00 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -240,6 +240,8 @@ main(int argc, char *argv[]) ControlFile.checkPointCopy.oldestMulti); printf(_("Latest checkpoint's oldestMulti's DB: %u\n"), ControlFile.checkPointCopy.oldestMultiDB); + printf(_("Latest checkpoint's oldestCommitTs: %u\n"), + ControlFile.checkPointCopy.oldestCommitTs); printf(_("Time of latest checkpoint: %s\n"), ckpttime_str); printf(_("Fake LSN counter for unlogged rels: %X/%X\n"), diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c index 028a1f0..8744d04 100644 --- a/src/bin/pg_resetxlog/pg_resetxlog.c +++ b/src/bin/pg_resetxlog/pg_resetxlog.c @@ -62,6 +62,7 @@ static bool guessed = false; /* T if we had to guess at any values */ static const char *progname; static uint32 set_xid_epoch = (uint32) -1; static TransactionId set_xid = 0; +static TransactionId set_committs = 0; static Oid set_oid = 0; static MultiXactId set_mxid = 0; static MultiXactOffset set_mxoff = (MultiXactOffset) -1; @@ -111,7 +112,7 @@ main(int argc, char *argv[]) } - while ((c = getopt(argc, argv, "D:fl:m:no:O:x:e:")) != -1) + while ((c = getopt(argc, argv, "D:fl:m:no:O:x:e:c:")) != -1) { switch (c) { @@ -157,6 +158,21 @@ main(int argc, char *argv[]) } break; + case 'c': + set_committs = strtoul(optarg, &endptr, 0); + if (endptr == optarg || *endptr != '\0') + { + fprintf(stderr, _("%s: invalid argument for option -c\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); + exit(1); + } + if (set_committs == 0) + { + fprintf(stderr, _("%s: transaction ID (-c) must not be 0\n"), progname); + exit(1); + } + break; + case 'o': set_oid = strtoul(optarg, &endptr, 0); if (endptr == optarg || *endptr != '\0') @@ -333,6 +349,9 @@ main(int argc, char *argv[]) ControlFile.checkPointCopy.oldestXidDB = InvalidOid; } + if (set_committs != 0) + ControlFile.checkPointCopy.oldestCommitTs = set_committs; + if (set_oid != 0) ControlFile.checkPointCopy.nextOid = set_oid; @@ -609,6 +628,8 @@ PrintControlValues(bool guessed) ControlFile.checkPointCopy.oldestMulti); printf(_("Latest checkpoint's oldestMulti's DB: %u\n"), ControlFile.checkPointCopy.oldestMultiDB); + printf(_("Latest checkpoint's oldestCommitTs: %u\n"), + ControlFile.checkPointCopy.oldestCommitTs); printf(_("Maximum data alignment: %u\n"), ControlFile.maxAlign); /* we don't print floatFormat since can't say much useful about it */ @@ -690,6 +711,12 @@ PrintNewControlValues() printf(_("NextXID epoch: %u\n"), ControlFile.checkPointCopy.nextXidEpoch); } + + if (set_committs != 0) + { + printf(_("oldestCommitTs: %u\n"), + ControlFile.checkPointCopy.oldestCommitTs); + } } @@ -1092,6 +1119,7 @@ usage(void) printf(_(" -O OFFSET set next multitransaction offset\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -x XID set next transaction ID\n")); + printf(_(" -c XID set the oldest retrievable commit timestamp\n")); printf(_(" -?, --help show this help, then exit\n")); printf(_("\nReport bugs to .\n")); } diff --git a/src/include/access/committs.h b/src/include/access/committs.h new file mode 100644 index 0000000..0f96185 --- /dev/null +++ b/src/include/access/committs.h @@ -0,0 +1,63 @@ +/* + * committs.h + * + * PostgreSQL commit timestamp manager + * + * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/committs.h + */ +#ifndef COMMITTS_H +#define COMMITTS_H + +#include "access/xlog.h" +#include "datatype/timestamp.h" + + +extern PGDLLIMPORT bool commit_ts_enabled; + +typedef uint32 CommitExtraData; + +extern void TransactionTreeSetCommitTimestamp(TransactionId xid, int nsubxids, + TransactionId *subxids, + TimestampTz timestamp, + CommitExtraData data, + bool do_xlog); +extern void TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, + CommitExtraData *data); +extern TransactionId GetLatestCommitTimestampData(TimestampTz *ts, + CommitExtraData *extra); + +extern Size CommitTsShmemBuffers(void); +extern Size CommitTsShmemSize(void); +extern void CommitTsShmemInit(void); +extern void BootStrapCommitTs(void); +extern void StartupCommitTs(void); +extern void InitCommitTs(void); +extern void ShutdownCommitTs(void); +extern void CheckPointCommitTs(void); +extern void ExtendCommitTs(TransactionId newestXact); +extern void TruncateCommitTs(TransactionId oldestXact); +extern void SetCommitTsLimit(TransactionId oldestXact); + +/* XLOG stuff */ +#define COMMITTS_ZEROPAGE 0x00 +#define COMMITTS_TRUNCATE 0x10 +#define COMMITTS_SETTS 0x20 + +typedef struct xl_committs_set +{ + TimestampTz timestamp; + CommitExtraData data; + TransactionId mainxid; + int nsubxids; + TransactionId subxids[FLEXIBLE_ARRAY_MEMBER]; +} xl_committs_set; + + +extern void committs_redo(XLogRecPtr lsn, XLogRecord *record); +extern void committs_desc(StringInfo buf, XLogRecord *record); +extern const char *committs_identify(uint8 info); + +#endif /* COMMITTS_H */ diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 77d4574..c648a6a 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -24,7 +24,7 @@ * Changes to this list possibly need a XLOG_PAGE_MAGIC bump. */ -/* symbol name, textual name, redo, desc, startup, cleanup */ +/* symbol name, textual name, redo, desc, identify, startup, cleanup */ PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL) PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL) PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL) @@ -42,3 +42,4 @@ PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gi PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup) PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL) PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup) +PG_RMGR(RM_COMMITTS_ID, "CommitTs", committs_redo, committs_desc, committs_identify, NULL, NULL) diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 32d1b29..b59fd98 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -124,6 +124,11 @@ typedef struct VariableCacheData Oid oldestXidDB; /* database with minimum datfrozenxid */ /* + * These fields are protected by CommitTsControlLock + */ + TransactionId oldestCommitTs; + + /* * These fields are protected by ProcArrayLock. */ TransactionId latestCompletedXid; /* newest XID that has committed or diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index ba79d25..9e048ea 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -46,6 +46,7 @@ typedef struct CheckPoint MultiXactId oldestMulti; /* cluster-wide minimum datminmxid */ Oid oldestMultiDB; /* database with minimum datminmxid */ pg_time_t time; /* time stamp of checkpoint */ + TransactionId oldestCommitTs; /* oldest Xid with valid commit timestamp */ /* * Oldest XID still running. This is only needed to initialize hot standby diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 4736532..36dd72f 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -2988,6 +2988,18 @@ DESCR("view two-phase transactions"); DATA(insert OID = 3819 ( pg_get_multixact_members PGNSP PGUID 12 1 1000 0 0 f f f f t t v 1 0 2249 "28" "{28,28,25}" "{i,o,o}" "{multixid,xid,mode}" _null_ pg_get_multixact_members _null_ _null_ _null_ )); DESCR("view members of a multixactid"); +DATA(insert OID = 3787 ( pg_get_transaction_committime PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 1184 "28" _null_ _null_ _null_ _null_ pg_get_transaction_committime _null_ _null_ _null_ )); +DESCR("get commit time of transaction"); + +DATA(insert OID = 3788 ( pg_get_transaction_extradata PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 23 "28" _null_ _null_ _null_ _null_ pg_get_transaction_extradata _null_ _null_ _null_ )); +DESCR("get additional data from transaction commit timestamp record"); + +DATA(insert OID = 3789 ( pg_get_transaction_committime_data PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 2249 "28" "{28,1184,23}" "{i,o,o}" "{xid,committime,extradata}" _null_ pg_get_transaction_committime_data _null_ _null_ _null_ )); +DESCR("get commit time and additional data from transaction commit timestamp record"); + +DATA(insert OID = 3790 ( pg_get_latest_transaction_committime_data PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 2249 "" "{28,1184,23}" "{o,o,o}" "{xid,committime,extradata}" _null_ pg_get_latest_transaction_committime_data _null_ _null_ _null_ )); +DESCR("get transaction Id, commit timestamp and additional data of latest transaction commit"); + DATA(insert OID = 3537 ( pg_describe_object PGNSP PGUID 12 1 0 0 0 f f f f t f s 3 0 25 "26 26 23" _null_ _null_ _null_ _null_ pg_describe_object _null_ _null_ _null_ )); DESCR("get identification of SQL object"); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 02c8f1a..20d79a4 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -127,7 +127,10 @@ extern PGDLLIMPORT LWLockPadded *MainLWLockArray; #define AutoFileLock (&MainLWLockArray[35].lock) #define ReplicationSlotAllocationLock (&MainLWLockArray[36].lock) #define ReplicationSlotControlLock (&MainLWLockArray[37].lock) -#define NUM_INDIVIDUAL_LWLOCKS 38 +#define CommitTsControlLock (&MainLWLockArray[38].lock) +#define CommitTsLock (&MainLWLockArray[39].lock) + +#define NUM_INDIVIDUAL_LWLOCKS 40 /* * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index fb1b4a4..5be1631 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -1180,6 +1180,12 @@ extern Datum pg_prepared_xact(PG_FUNCTION_ARGS); /* access/transam/multixact.c */ extern Datum pg_get_multixact_members(PG_FUNCTION_ARGS); +/* access/transam/committs.c */ +extern Datum pg_get_transaction_committime(PG_FUNCTION_ARGS); +extern Datum pg_get_transaction_extradata(PG_FUNCTION_ARGS); +extern Datum pg_get_transaction_committime_data(PG_FUNCTION_ARGS); +extern Datum pg_get_latest_transaction_committime_data(PG_FUNCTION_ARGS); + /* catalogs/dependency.c */ extern Datum pg_describe_object(PG_FUNCTION_ARGS); extern Datum pg_identify_object(PG_FUNCTION_ARGS); diff --git a/src/test/regress/expected/committs_off.out b/src/test/regress/expected/committs_off.out new file mode 100644 index 0000000..0a94f9d --- /dev/null +++ b/src/test/regress/expected/committs_off.out @@ -0,0 +1,21 @@ +-- +-- Commit Timestamp (off) +-- +CREATE TABLE committs_test(id serial, ts timestamptz default now()); +INSERT INTO committs_test DEFAULT VALUES; +INSERT INTO committs_test DEFAULT VALUES; +INSERT INTO committs_test DEFAULT VALUES; +SELECT id, pg_get_transaction_extradata(xmin), + pg_get_transaction_committime(xmin) >= ts, + pg_get_transaction_committime(xmin) < now(), + pg_get_transaction_committime(xmin) - ts < '60s' -- 60s should give a lot of reserve +FROM committs_test +ORDER BY id; + id | pg_get_transaction_extradata | ?column? | ?column? | ?column? +----+------------------------------+----------+----------+---------- + 1 | 0 | f | t | t + 2 | 0 | f | t | t + 3 | 0 | f | t | t +(3 rows) + +DROP TABLE committs_test; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 9902dbe..abc6800 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -88,7 +88,7 @@ test: privileges security_label collate matview lock replica_identity rowsecurit # ---------- # Another group of parallel tests # ---------- -test: alter_generic misc psql async +test: alter_generic misc psql async committs_off # rules cannot run concurrently with any test that creates a view test: rules diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule index 2902a05..d190ad2 100644 --- a/src/test/regress/serial_schedule +++ b/src/test/regress/serial_schedule @@ -147,3 +147,4 @@ test: largeobject test: with test: xml test: stats +test: committs_off diff --git a/src/test/regress/sql/committs_off.sql b/src/test/regress/sql/committs_off.sql new file mode 100644 index 0000000..0f97666 --- /dev/null +++ b/src/test/regress/sql/committs_off.sql @@ -0,0 +1,18 @@ +-- +-- Commit Timestamp (off) +-- + +CREATE TABLE committs_test(id serial, ts timestamptz default now()); + +INSERT INTO committs_test DEFAULT VALUES; +INSERT INTO committs_test DEFAULT VALUES; +INSERT INTO committs_test DEFAULT VALUES; + +SELECT id, pg_get_transaction_extradata(xmin), + pg_get_transaction_committime(xmin) >= ts, + pg_get_transaction_committime(xmin) < now(), + pg_get_transaction_committime(xmin) - ts < '60s' -- 60s should give a lot of reserve +FROM committs_test +ORDER BY id; + +DROP TABLE committs_test;